Triggering Matillion ETL from a storage queue via an Azure function
This tutorial explains the arrival of a new object in an Azure Storage account container and how to automatically trigger a Matillion ETL job to load it to a staging table, transform it, and append the transformed data to a persistent table. If you need to track these files to ensure one or more have successfully loaded before continuing to a transformation, please consult the Tracking Loaded Files documentation.
A file can be uploaded to a container from an Azure or third party streaming service.
Note
The following instructions are for illustrative purposes. Any screenshots, code, or steps may change. If you refer to the third party involved, you may find more up to date instructions.
Trigger an Azure Function
An Azure function can be triggered whenever a new object lands in a storage container. The function is passed some metadata, including the blob file name.
Matillion ETL can run a job whenever a message arrives on an Azure Storage queue. The message tells the tool which job to run, and any variables it needs (in this case, the filename that just arrived is the only variable we need).
The diagram below outlines the basic architecture. While it may seem complex, it is just a very simple Azure function to glue the processes together.
The Azure Storage Account Queue
The queue is specific to the storage account. After selecting a storage account, click Queues.
The Matillion ETL virtual machine (VM) will need to have the Storage Account Contributor role assigned for the storage account. Follow the directions in Roles & Permissions (Azure) to assign an identity to the VM and then grant that identity access to the storage account.
Click + Queue and specify a name for the queue.
The Azure Function App
From the Create Resource pane on the left side of the portal, choose Function App.
Click + Create and choose the same resource group and region as the storage account. Type a name for the function app. Choose to Publish as Code using the .NET Runtime Stack.
Click "Next:Hosting". Choose the storage account and the Consumption (Serverless) plan type.
Click Review + Create to accept the remaining defaults. Select Create after reviewing the selections.
The Azure Function
When the deployment of the Function App is complete, click Go to resource.
From the function app, click Functions and then click + Create. Select the template Azure Blob Storage trigger. In the path replace samples-workitems
with the container path that will be monitored within the storage account. To configure the storage account connection, choose New and select the storage account for the container in the drop down. The name of the storage account will automatically have _STORAGE
added as the suffix.
Once the function is created, click Code + Test and add the code below as a starting point for your function. Click Save. If required, this code can be edited later.
#r "Newtonsoft.Json"
using System.Text;
using Newtonsoft.Json;
public static void Run(Stream myBlob, string name, ICollector<string> outputQueueItem, ILogger log)
{
log.LogInformation($"C# Blob trigger function Processed blob. Name: {name}. Size: {myBlob.Length}");
string jsonBody = JsonConvert.SerializeObject(
new {
group = Environment.GetEnvironmentVariable("METL_PROJECT_GROUP_NAME"),
project = Environment.GetEnvironmentVariable("METL_PROJECT_NAME"),
version = Environment.GetEnvironmentVariable("METL_VERSION_NAME"),
environment = Environment.GetEnvironmentVariable("METL_ENVIRONMENT_NAME"),
job = Environment.GetEnvironmentVariable("METL_JOB_NAME"),
variables = new {
file_to_load = name
}
}
);
outputQueueItem.Add(jsonBody);
}
In the above example, variables are used to define the Matillion objects.
METL_PROJECT_GROUP_NAME
- The project group name in Matillion ETL.METL_PROJECT_NAME
- The project name in Matillion ETL.METL_VERSION_NAME
- The name of the version of the project to execute in Matillion ETL.METL_ENVIRONMENT_NAME
- The name of the environment to execute against in Matillion ETL.METL_JOB_NAME
- The name of the orchestration job in Matillion ETL.Variables
The variables that are passed to the job in Matillion ETL. In this case we just pass one variablefile_to_load
. However, it is possible to pass as many as required.
It is good practice to add more checking and commenting. Important to writing these functions effectively is to understand the format of the event and context objects passed in, and this can depend on the services that trigger the function.
Application Settings
The variable names and values are added to the function app as application settings. Navigate to the function app and select Configuration.
Click + New application setting to create five application settings.
In this example, the following five settings are defined. The values for each of the objects can be copied from the Matillion ETL UI.
Matillion ETL Object | Where to find |
---|---|
METL_PROJECT_GROUP_NAME |
Project → Rename Project Group |
METL_PROJECT_NAME |
Project → Manage Project |
METL_VERSION_NAME |
Project → Manage Versions |
METL_ENVIRONMENT_NAME |
Environments → right-click on an environment name → Edit Environment |
METL_JOB_NAME |
Right-click on job → Manage Job |
Click Save and Continue once the five settings have been created and the values assigned based on the object names in Matillion ETL.
Configure Output
Select the function that was created. Select Integration in the menu. The integration will show the components that have been created. There is no "Output" yet defined. Click + Add output.
Set the Binding Type as Azure Queue Storage. Select the storage account connection and enter the name of the storage account queue that was created earlier. Click OK.
Trigger an ETL job to extract, load, and transform
Here is the Matillion ETL job that will load the data each time a file lands. It maintains the target table, and on each run truncates the table and loads the latest file into it. The job then runs a data transformation on the loaded data, which appends the results to a persistent table.
The exported job and data files are available in the Attachments pane. Be sure to download the JSON that applies to your cloud data platform (named SF_ for Snowflake, SY_ for Synapse, DLDB_ for Delta Lake on Databricks).
Each time this runs we want to load a different file. It's possible to define a variable for this process. We can define the file_to_load
variable within Matillion ETL and specify a default value to test the job.
- Click Project → Edit Environment Variables.
- Create an environment variable as below.
This variable is referenced in the component we have named Load Latest File (an Azure Blob Storage Load component) as the Azure Storage Location parameter. In this case, the file is provided by the Azure function.
Since the variable has a default value, the job can be tested in isolation. Then, to actually run this whenever a new queue message arrives (via our function) we can configure the queue within Matillion ETL. To do this, click Project → Manage Queue Configuration.
Manage Queue Configuration has a Listen section:
Note that the Listen Queue is the same queue that was defined in the "Outputs" of the function integration. Note also that Matillion ETL is using the instance credentials attached to the Matillion ETL instance—it could have used a user-defined credential, but either way the credentials must be able to read from Blob Storage Queue to pick up the message, and read the data files.
Now you are ready to add some files into the bucket and trigger the job. Click Project → Task History or view the task panel to view a job as it executes.
Further Consideration
In a production environment you might also consider:
What if the ETL job can't keep up with the arrival rate?
There is some overhead loading a file, so each run will take a certain minimum amount of time. If jobs are stacking up behind one another, you might need a smaller number of larger files to make that overhead worthwhile.
Take a clone of the EC2 instance running Matillion ETL and launch more instances from it. The ETL job, the connection details, and the SQS listen configuration will all be cloned, and you will now have two instances that can listen for and process arrival messages.
What if it fails?
Azure Queue Configuration also allows you to post messages on success and failure. They could be handled by app functions that move the Azure Blob files into "success" and "failed" folders, send notifications, etc.