A serverless data pipeline is a data integration architecture that leverages serverless computing services to manage, process, and move data between different stages in a workflow without the need for provisioning or managing servers.
We’ll create a serverless data pipeline using the Lambda function and AWS Glue job. First, let’s get a hang of these services. You may skip this part if you are already familiar with them.
Let's briefly discuss Lambda functions and the AWS Glue jobs.
Lambda: A Lambda function is a serverless computing service that allows us to execute code without provisioning or managing servers. It follows the pay-as-you-go model and only charges for the execution time.
AWS Glue: AWS Glue is a serverless data integration service that allows us to extract, load, and transform data as per our use case requirements. It offers services such as data crawling, data mapping, and data classification. In short, it is one serverless service that has all the required data integration features.
There are multiple ways to design a serverless data pipeline. In this answer, we’ll discuss a simple approach. The following is the high-level architecture diagram of the infrastructure we’ll create in this Answer:
We’ll create two S3 buckets as our data source and target. Follow the steps below to create a source S3 bucket:
On the AWS Management Console, search “S3” and click “S3” from the search results.
Click the “Create bucket” button.
Enter a globally unique name for the S3 bucket.
Select “US East (N. Virginia) us-east-1” as the AWS region.
In the “Block Public Access settings for this bucket” section, uncheck the “Block all public access” option and acknowledge the public access warning by checking it.
Scroll down to the end of the page and click the “Create bucket” button.
Repeat the steps to create the target S3 bucket. We can also create the source and target buckets using AWS CLI. Before using the AWS CLI make sure that the user credentials are configured for AWS CLI using the command given below. Ensure you replace <ACCESS_KEY>
and <SECRET_ACCESS_KEY>
with your credentials.
aws configure set default.region us-east-1 && aws configure set aws_access_key_id <ACCESS_KEY> && aws configure set aws_secret_access_key <SECRET_ACCESS_KEY> && aws configure set default.output json
Run the commands given below to create source and target buckets, respectively.
# Create source bucketaws s3api create-bucket --bucket "$source_bucket" --region $regionaws s3api put-public-access-block --bucket $source_bucket --public-access-block-configuration "BlockPublicAcls=false,IgnorePublicAcls=false,BlockPublicPolicy=false,RestrictPublicBuckets=false"# Create destination bucketaws s3api create-bucket --bucket "$destination_bucket" --region $regionaws s3api put-public-access-block --bucket $destination_bucket --public-access-block-configuration "BlockPublicAcls=false,IgnorePublicAcls=false,BlockPublicPolicy=false,RestrictPublicBuckets=false"echo "Buckets created successfully!"
We need an IAM role to give access to the Glue job to the S3 bucket. Follow the steps below to create the IAM role:
On the AWS Management Console, search for “IAM” and select “IAM” from the search results.
Click “Roles” under the “Access management” heading in the left menu bar and click the “Create role” button.
For “Trusted entity type,” select “Custom trust policy" and enter the following trust policy in the editor:
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Principal": {"Service": "glue.amazonaws.com"},"Action": "sts:AssumeRole"}]}
Click the “Next” button.
On the “Add permissions” page, select AmazonS3FullAccess
and AWSGlueServiceRole
.
Enter a name for the role and click the “Create role” button.
We have created a role to give glue job access. We can also create the IAM role using the command given below:
aws iam create-role --role-name GlueJobRole --assume-role-policy-document '{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Principal": {"Service": "glue.amazonaws.com"},"Action": "sts:AssumeRole"}]}'aws iam attach-role-policy --role-name GlueJobRole --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccessaws iam attach-role-policy --role-name GlueJobRole --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRoleecho "Glue Role created successfully!"
We’ll use ETL visual to create the ETL job. The job defines the source, target, and transformation. Our source and target will be S3 buckets, and we'll use a simple SQL query to define the transformation. Follow the steps below to create an ETL job:
On the AWS Management Console, search for "Glue" and select "AWS Glue". From the "Data Integration and ETL" section, select "Visual ETL".
On the editor add S3 as the source. Enter the location of the source S3 bucket as the source and CSV
as the data format.
Next, enter the SQL query as the transform and change the query to the query given below. Make sure to replace <SQL_ALIASES>
with the SQL aliases assigned to your input source.
select gameID, leagueID,season from <SQL_ALIASES>
Finally, the target is an S3 bucket. Configure the location URI of the bucket and select the desired format and compression type of the transformed data.
After the configuration, name the job and click save. Do not run it, as we'll run it through our Lambda function. We can use AWS CLI to create the Glue job using the commands given below in main.sh
file. However, the AWS CLI command requires the path to the script of the ETL job. The script.py
file for this job is given below. Upload this file to a bucket and replace the <SCRIPT_BUCKET>
on line 6 with the name of the bucket.
aws glue create-job \--name $etl_job_name \--role $role_arn \--command '{"Name": "glueetl","ScriptLocation": "s3://<SCRIPT_BUCKET>"}'echo "Glue job created successfully!"
Before creating a role we must create a policy. Follow the steps below to create a policy:
On the AWS Management Console, search for “IAM” and select “IAM” from the search results.
Click “Policies” under the “Access management” heading in the left menu bar and click the “Create policy” button.
Select JSON as the policy editor and enter the following policy:
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": "logs:CreateLogGroup","Resource": "arn:aws:logs:us-east-1:564384594376:*"},{"Effect": "Allow","Action": ["logs:CreateLogStream","logs:PutLogEvents"],"Resource": ["arn:aws:logs:us-east-1:564384594376:log-group:/aws/lambda/my-answer-function:*"]},{"Effect": "Allow","Action": ["glue:StartJobRun","glue:GetJobRuns"],"Resource": "*"}]}
Enter the policy name and click “Create policy” to create the policy.
Follow the steps below to create a role for Lambda function:
On the IAM Console, click “Roles” under the “Access management” heading in the left menu bar and click the “Create role” button.
Click “Roles” under the “Access management” heading in the left menu bar and click the “Create role” button.
Select “Lambda” as the “Service or use case”.
Click the “Next” button.
On the “Add permissions” page, select the policy we created for the Lambda function.
Enter a name for the role and click the “Create role” button.
We've successfully created a role for our Lambda function. We can also create the IAM role for Lambda function using the command given below:
aws iam create-role --role-name LambdaRole --assume-role-policy-document '{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'aws iam attach-role-policy --role-name LambdaRole --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRoleaws iam attach-role-policy --role-name LambdaRole --policy-arn arn:aws:iam::aws:policy/AWSGlueConsoleFullAccessecho "Glue Role created successfully!"
This Lambda function will be triggered as soon as the data is added to the S3 bucket. Follow the steps to launch the Lambda function:
Open the AWS Management Console for “IAMLabUser.” Search for “Lambda” and select “Lambda” from the search results. Click the “Create function” button.
On the “Create function” page, select “Author from scratch".
Enter the basic information such as name, runtime, and architecture. The code given in the widget below is for python 3.12
.
Select the existing role we created in previous step for our Lambda function.
Leave the remaining settings as they are and click the “Create function” button at the end of the page.
To invoke the function when an object is added to the bucket, we’ll add the trigger to the function. Follow the steps below to add a trigger:
On the main page of the Lambda function, click “Add Trigger”.
Select the S3 bucket as the trigger source, select the name of the source bucket, and select the bucket name.
Select "All object create events" as the event type and add the trigger.
Add the following function code.
import boto3import jsonglue_client = boto3.client('glue')job_name = '<ETL_JOB_NAME>'def is_job_running(glue_client, job_name):# Get the job runs for the specified jobresponse = glue_client.get_job_runs(JobName=job_name)# Check if there are any running job runsrunning_job_runs = [run for run in response['JobRuns'] if run['JobRunState'] == 'RUNNING']return len(running_job_runs) > 0def lambda_handler(event, context):if is_job_running(glue_client, job_name):print(f"The Glue job '{job_name}' is already running.")return {'statusCode': 200,'body': json.dumps('The Glue job is already running.')}else:# Start the Glue job runresponse = glue_client.start_job_run(JobName=job_name)print(f"Started the Glue job run with run ID: {response['JobRunId']}")return response
Here is a brief explanation of the code:
Line 3: Define a glue client using boto3
SDK.
Line 5–12: This function checks if the job is already running to avoid exceeding the concurrent run limit.
Line 14–28: Defines the Lambda function handler that checks if the job is already running. If not, it kicks off the Glue Job.
We have added the function code to run the Glue job. However, our Lambda function needs access to run the job. Thus, edit the role of the Lambda function and add the AWSGlueServiceRole
policy to it. We can also use AWS CLI to create the Lambda function and triggers using the commands given below.
role_arn = $(aws iam get-role --role-name "LambdaRole" --query 'Role.Arn' --output text)# Create the Lambda functionaws lambda create-function \--function-name $lambda_function \--runtime python3.12 \--region $region \--handler lambda_function.handler \--zip-file "fileb://function.zip" \--role $role_arn# Add resource-based permissions to Lambda function to allow S3 to invoke itaws lambda add-permission \--function-name $lambda_function \--statement-id "AllowS3InvokeLambda" \--action "lambda:InvokeFunction" \--principal s3.amazonaws.com \--source-arn arn:aws:s3:::$source_bucket \--region $region# Get ARN of the Lambda functionlambda_function_arn=$(aws lambda list-functions --query "Functions[?FunctionName=='$lambda_function'].FunctionArn" --output text)NOTIFICATION_CONFIGUARTION = '{"LambdaFunctionConfigurations": [{"Id": "1","LambdaFunctionArn": "$lambda_function_arn","Events": ["s3:ObjectCreated:*"]}]}'# Create trigger in the Lambda function.aws s3api put-bucket-notification-configuration `--bucket $source_bucket`--notification-configuration $NOTIFICATION_CONFIGURATIONecho "Lambda function created successfully!"
To test the pipeline, download the .csv
file given in the widget below and upload it to the S3 bucket.
"gameID","leagueID","season","date""81","4","2015",2015-08-08 15:45:00"82","1","2015",2015-08-08 18:00:00"83","3","2015",2015-08-08 18:00:00"84","5","2015",2015-08-08 18:00:00"85","1","2015",2015-08-08 18:00:00"86","4","2015",2015-08-08 20:30:00"87","2","2015",2015-08-09 16:30:00"88","3","2015",2015-08-09 16:30:00"89","5","2015",2015-08-09 19:00:00"90","5","2015",2015-08-10 23:00:00"91","1","2015",2015-08-14 22:45:00"92","1","2015",2015-08-15 15:45:00"93","2","2015",2015-08-15 18:00:00"94","5","2015",2015-08-15 18:00:00"95","4","2015",2015-08-15 18:00:00"96","3","2015",2015-08-15 18:00:00"97","1","2015",2015-08-15 18:00:00"98","3","2015",2015-08-16 16:30:00"99","2","2015",2015-08-16 19:00:00
Once you add the file to the source bucket, open the target bucket. You'll notice the transformed data in the target bucket. The code snippet below shows the transformed .csv
file.
{"gameID":"81","leagueID":"4","season":"2015"}{"gameID":"82","leagueID":"1","season":"2015"}{"gameID":"83","leagueID":"3","season":"2015"}{"gameID":"84","leagueID":"5","season":"2015"}{"gameID":"85","leagueID":"1","season":"2015"}{"gameID":"86","leagueID":"4","season":"2015"}{"gameID":"87","leagueID":"2","season":"2015"}{"gameID":"88","leagueID":"3","season":"2015"}{"gameID":"89","leagueID":"5","season":"2015"}{"gameID":"90","leagueID":"5","season":"2015"}{"gameID":"91","leagueID":"1","season":"2015"}{"gameID":"92","leagueID":"1","season":"2015"}{"gameID":"93","leagueID":"2","season":"2015"}{"gameID":"94","leagueID":"5","season":"2015"}{"gameID":"95","leagueID":"4","season":"2015"}{"gameID":"96","leagueID":"3","season":"2015"}{"gameID":"97","leagueID":"1","season":"2015"}{"gameID":"98","leagueID":"3","season":"2015"}{"gameID":"99","leagueID":"2","season":"2015"}
We can also upload the file to the S3 bucket using the commands given below:
aws s3 cp games.csv s3://$source_bucketecho "Uploaded data to source s3 bucket!"
We have successfully created a serverless data pipeline using AWS Glue and Lambda functions with S3 buckets as the source and target.
We can use CloudWatch to monitor the execution of our Lambda function. Follow the steps below to view the logs generated by the Lambda function:
On the AWS Management Console, search for “Lambda” and select “Lambda” from the search results to navigate to the Lambda console.
Click the name of your function to open the function dashboard.
Switch to the “Monitor“ tab and click “View CloudWatch logs” to open the log group of the Lambda function.
Open the log stream to view the logs generated by the Lambda function.
We can use these logs to debug issues or to monitor the overall execution of our Lambda function.
Serverless data pipelines offer several benefits, revolutionizing how organizations handle data processing and integration. The key advantages are scalability and cost optimization, as serverless architectures automatically scale based on demand and only charge for the execution time.