Microbatch replication
Overview
This document describes how to set up a near-real-time data replication pipeline using a microbatching job in Matillion ETL.
In this scenario, the data source is an OLTP system which constantly records new transactions in a relational database. The goal is to load these transactions into a table with minimum latency, and to integrate them into an accumulating fact table.
The solution builds upon incremental loading techniques described here, and demonstrates how to constantly keep the data within a few seconds of real time.
Data Architecture
New transactions are constantly being added to the OLTP database, and are created with a unique identifier and a timestamp.
The transaction timestamp always ascends. So, if the highest currently extracted timestamp is known, it's easy to select only the newer transactions. This is the fundamental technique underpinning the SQL query which extracts new records from the source database.
At any point in time the replicated data will be slightly out-of-date compared to the OLTP database, due to the new transactions which have just been added. Every small batch adds a small increment of transactions to the replicated table.
Over time, doing this repeatedly will accumulate a large amount of data. However the individual data loads only need to deal with a small number of rows. This makes the incremental data extraction very fast, and enables the microbatching to keep very close to real time.
A characteristic sawtooth pattern develops, with the difference to real time increasing for a number of seconds before the next batch brings it back down close to zero again.
Matillion ETL maintains a high water mark, simply in the form of an aggregate view over the replicated table. This makes the architecture resilient to failure, since if a batch ever fails, the highwater mark won't ascend, and the same data will simply be requested again.
Process Architecture
The microbatch is implemented as an ordinary Matillion ETL orchestration job with one key differentiator: the final step is including a messaging component specific to your platform:
- For AWS-hosted instances, use the SQS Message component.
- For GCP-hosted instances, use the Cloud Pub-Sub component.
- For Azure-hosted instances, use the Azure Queue Message component.
Note that Message Integration must be switched on for this mechanism to work. To enable these, see the below articles:
- For AWS-hosted instances, use Manage SQS.
- For GCP-hosted instances, use Manage Pub-Sub.
- For Azure-hosted instances, use Manage Azure Queue Message.
Matillion ETL's internal architecture ensures that only a single instance of a job can be running at any one time, which removes the risk of having overlapping batch windows. The Group, Project, Version and Environment are all available as runtime Environment Variables, so the messaging component just needs to:
- Specify the queue name
- Name the Job (i.e. itself)
- Formatting this message as described in the article for your platform:
Having a job re-start itself upon successful termination creates an endless loop. Mechanisms are required to ensure that the microbatches can be stopped upon operator request, and to make sure they don't overlap with the source system's maintenance windows.
Manual Stop Mechanism
This is implemented using a global Environment Variable which can be set to an "on" or "off" value. In the Matillion ETL microbatch, an "If" component checks that the variable is still "on" before requesting the next iteration.
Avoiding Maintenance Windows
Host platforms may reserve a small amount of time as a maintenance window for every cluster. This is a 30-minute time slot which is initially allocated at random. You are free to modify the timing although it must be at least half an hour long.
The cluster may not be available during the maintenance window, so microbatches should be switched off during this time.
Another Matillion ETL "If" component is used to perform this check. In the component's Advanced mode, a Javascript expression involving Date.getUTCHours() and Date.getUTCMinutes() is evaluated to make sure the maintenance window is not approaching.
Similarly, it's likely that source systems will have time windows which Amazon (for RDS) or the IT department (in-house databases) reserve for maintenance activities. A third Matillion ETL "If" component should be used to perform this check.
Please Note
Javascript expressions are only available to use as parameter values within Matillion ETL components. Any valid single Javascript expression may be used, however it is recommended that only simple expressions are used. All variables defined in the job and / or environment should also be available to reference.
Error
When entering values into a component's parameter editor, everything enclosed within ${ }
of the literal string will be evaluated immediately. This validation process currently does not take variables into account and may assume the value is incorrect.
Scheduling
Having provided mechanisms for switching off the replication pipeline, a mechanism is also needed to ensure that it's normally running.
This is in the form of a Matillion ETL Orchestration job, scheduled to run once per day at a fixed time, which places the first microbatch request onto the message queue.
The schedule time must be coordinated with the checks described above around maintenance windows.
Initial Load
When the very first microbatch is launched there is no data at all in the target table. The aggregate view must provide a default start point to get the replication going if this happens, using a SQL expression such as the one given below:
COALESCE("max_updated_ts", '2016-12-01')
The very first microbatch is likely to run for much longer than normal while it loads a large amount of data, but apart from that the mechanism is just the same as normal.
For very large tables, the initial load may actually be too large to be practical. In such cases it may be worth considering an alternative Initial Load mechanism analogous to Amazon Snowball.
Matillion ETL implementation
The example implementation is made up of a number of different Orchestration and Transformation jobs.
Message Configuration
A dedicated queue is needed for the job requests, and must be configured in the Manage Configuration window. The example below shows SQS Configuration but the principle is identical for other message types.
Setup
This job only needs to be run once, during initial setup—Micro Batching setup
It simply calls two other jobs:
- Create Database Structures - creates the staging table stg_tx_replica and the real, replicated table tx_replica
- Create HWM View - creates the view v_stg_tx_replica_max which contains one row showing the highest currently extracted timestamp (or high water mark).
The main job: Micro Batch CDC—contains the logic and data flows which repeat in every microbatch.
Part of the microbatch is to append the newly staged records into the real, replicated table. This is done by calling a transformation job: CDC Append
A final piece of the configuration is the job which needs to be scheduled to start the replication pipeline every day: Start CDC
Ensure that messaging (SQS, Pub-Sub or Azure Queue) integration has been switched on, the environment variable is in the "on" setting, and run this job once to start the microbatch replication!
Example of Data Output
Once the microbatch replication has been started, you should see a fairly continuous series of tasks running.
In the database, you'll see that data is being continuously appended to the target table, a small number of rows at a time.
Your users can now begin to query this data while the microbatch replication is running. This allows them to take advantage of massively parallel processing power, while at the same time always remaining within a few seconds of real time with respect to the original source system.