Triggering ETL from an S3 event via AWS Lambda
This example links the arrival of a new object in Amazon S3 and automatically triggers a Matillion ETL job to load it, transform it, and append the transformed data to a fact table. If you need to track these files to ensure one or more have successfully loaded before continuing to a transformation, please read Tracking Loaded Files.
Note
The following instructions are for illustrative purposes. Any screenshots, code, or steps may change. If you refer to the third party involved, you may find more up to date instructions.
S3 file lands
A file could be uploaded to an S3 bucket from a third-party service, for example Amazon Kinesis or AWS Data Pipeline directly using the API to have an app upload a file.
Trigger an AWS Lambda function
Lambda functions can be triggered whenever a new object lands in S3. The function is passed some metadata, including the object path.
Matillion ETL can run a job whenever a message arrives on an SQS queue. The message tells the tool which job to run, and any variables it needs (in this case, the filename that just arrived in S3 is the only variable we need).
The diagram below outlines the basic architecture.
While this diagram may appear complex, it is a rather simple Lambda function used to glue the processes together.
The Lambda function
To get started:
- In the AWS Management Console, navigate to Services, then click Lambda.
- Select Create a Lambda Function.
- Choose s3-get-object-python.
-
Configure the correct S3 source for your bucket.
-
Click Next.
- Enter a name for the function.
The function needs a role. That role needs to be able to monitor the S3 bucket, and send the SQS message. This can all be done with simple managed policies from the IAM console. Create or attach an IAM Role with permissions to write to SQS Queues (for example, Policy - AmazonSQSFullAccess).
You can add the code below as a starting point for your function (you can edit this later).
AWS Lambda supports Python, and includes the Python API for AWS. No compilation or third-party libraries are required for this function, it can even be written directly into the AWS console. Below is an example Lambda function to get started. Some elements require modification, and are explained beneath.
lang-Python
import boto3
import json
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName='ETL_Launchpad')
def lambda_handler (event, context):
for record in event['Records']:
filename_from_event = record['s3']['object']['key']
sqs_msg = {
"group":"AWS",
"project":"Lambda",
"version":"default",
"environment": "live",
"job":"RunETL",
"variables":
{"file_to_load": <filename_from_event>}
}
queue.send_message(MessageBody=json.dumps(sqs_msg))
return event
In the above example, the variables are used as follows. All must exactly match the text name strings used for your Matillion ETL resources.
- QueueName: The name of the Amazon SQS queue that will be used to store and pass the messages.
- Group: The project group name in Matillion ETL.
- Project: The project name in Matillion ETL.
- Version: The name of the version of the project to execute in Matillion ETL.
- Environment: The name of the environment to execute against in Matillion ETL.
- Job: The name of the orchestration job in Matillion ETL.
- Variables: The variables that are passed to the job in Matillion ETL. In this case, we just pass one variable,
file_to_load
. However, it is possible to pass as many as required. Later we will show how this variable is used.
It might be sensible to add some more checking, commenting, and so on. Writing these functions effectively means understanding the format of the event and the context objects pass in, and that depends to some extent on the services that trigger the function.
The function needs a role. That role needs to be able to monitor the S3 bucket, and send the SQS message. That can all be done with simple managed policies from the IAM console.
- In the Role drop-down, create a new S3 Execution Role. Once this is created we will also need to modify this role to allow SQS access:
- In Services → IAM → Roles, select the role you created.
- Click Attach Policy and add AmazonSQSFullAccess.
- Click Next → Review → Create Function.
Trigger an ETL job to extract, load, and transform
Here is the Matillion ETL job that will load the data each time a file lands. It maintains the target table, and on each run truncates the table and loads the latest file into it. It then runs a data transformation on the loaded data, which adds some calculated fields, looks up some details of the airline and airport, and finally appends the results to the final fact table.
Each time this runs, we want to load a different file. So we can define a variable for that—we already saw in the Lambda function that we intend to pass a variable named file_to_load
, so we should define that within Matillion ETL and provide a default value we can use to test the job. To do this:
- Select Project, then click Edit Environment Variables.
-
Create an environment variable as below.
This is referenced in the component Load Latest File (an S3 Load Component) as the S3 Object Prefix parameter. In this case, the entire path to the file is provided by the Lambda function.
Since all variables must have a default value, the job can be tested in isolation. Then, to actually run this whenever a new SQS Queue message arrives (via our Lambda function) we can configure SQS within Matillion ETL. To do this, click Project, then click SQS.
The SQS Configuration has a Listen section:
The Listen Queue is the queue that our Lambda function writes to.
Note
Matillion ETL is using the instance credentials attached to the EC2 instance—it could have used a manual access key and secret key, but regardless, the credentials need to read from SQS to pick up the message, and read from S3 to read the data files.
Now you're ready to add some files into the S3 bucket and trigger the job, you can see the job executing in your task panel or via Project → Task History.
Further considerations
In a production environment you should also consider:
- What if the ETL job can't keep up with the arrival rate?
- There is some overhead loading a file, so each run will take a certain minimum amount of time. If jobs are stacking up behind one another, you probably need a smaller number of larger files to make that overhead worthwhile.
- Take a clone of the EC2 instance running Matillion ETL and launch another instance from it. The ETL job, the connection details, and the SQS listen configuration will all be cloned, and you will now have two instances that can listen for and process arrival messages.
- What if it fails?
- The SQS configuration also posts return messages on success and failure. These could be handled by Lambda functions that move the S3 files into "success" and "failed" folders.
- You can also post an SNS message notifying that a load succeeded or failed.