Create a Streaming pipeline
A Streaming pipeline is a set of source and target configuration details that enable your Streaming agent to monitor and consume changes from the source database, then stream these changes to the destination.
Read Streaming pipelines overview for a deeper look at the concepts and architecture of Streaming pipelines in the Data Productivity Cloud.
Prerequisites
To run a Streaming pipeline, you need:
-
A Streaming agent. Agents execute your Streaming pipelines and act as a bridge between the Data Productivity Cloud and your data network while preserving your data sovereignty. Read Streaming agent installation to get started.
Note
Each agent will run a single pipeline, so you need to create a separate agent for each new pipeline.
-
A source database configured to support streaming. The required configuration for each source is described in the following documents:
-
An account with a destination service.
- Access to an AWS Secrets Manager or Azure Key Vault service for managing secrets. Read Secrets overview for more information about using managed secrets in the Data Productivity Cloud. Your secret manager and agent must be hosted in the same cloud platform; you can't use an AWS agent with Azure Key Vault, for example.
- If you are using MySQL as a streaming source, you must provide a MySQL driver for the Streaming agent to use.
Get started
- Log in to the Matillion Hub.
- Click ☰ then Designer.
- Select your project.
- Click the Streaming tab. If you have previously created Streaming pipelines, they will be listed on this tab. Read Manage Streaming pipelines for details of how you can interact with this list.
- Click Add Streaming pipeline to create a new Streaming pipeline.
The rest of this guide will explain how to set up your Streaming pipeline.
Pipeline details
To start creating your Streaming pipeline, first complete these fields:
Name
= string
A unique, descriptive name for your Streaming pipeline.
Agent
= drop-down
A Streaming agent you have previously created as described in Streaming agent installation.
The drop-down will list all agents which are currently valid to use, which means:
- The agent must have a status of Running.
- The agent must not have been assigned to any other Streaming pipeline (as there is a 1-1 relation between agents and pipelines).
Source
= drop-down
Your source database. Currently supported sources are:
- Db2 for IBM i
- Microsoft SQL Server
- MySQL
- Oracle
- Postgres
Destination
= drop-down
Your destination service. Currently supported destinations are:
- Snowflake
- Amazon S3
- Azure Blob Storage
Destination connection
This section of the screen is only visible if your destination is Snowflake.
You need to configure a Snowflake connection as described in Streaming to a Snowflake destination.
Destination configuration
Complete the properties in this section to configure the destination. The exact properties will depend on the destination you selected, so consult the appropriate documentation for details:
- Streaming to a Snowflake destination
- Streaming to an Azure Blob Storage destination
- Streaming to an Amazon S3 destination
Source setup
Complete the properties in this section to configure the source. The exact properties will depend on the source you selected, so consult the appropriate documentation for details:
Click Connect to establish the connection to your source database.
Pipeline configuration
In this section, choose which schemas and tables from the source will be included in your Streaming pipeline, and complete additional configuration settings.
Select tables
= dual listbox
Tables available in your source database will be displayed here. Choose the schemas and tables to include in the Streaming pipeline. The pipeline requires a minimum of one table. Selecting more tables will affect performance and resource consumption, so consider carefully which tables you actually need, and select only those.
Snapshot options
= boolean (optional)
Choose the type of snapshot operation you want the source connector to use: initial snapshot and on-demand snapshot. You can choose one or both options, or neither if you don't require snapshotting.
- Initial snapshot: Allows you to take an initial snapshot of the specified tables when the pipeline is started. The initial snapshot captures the current state of the data in those tables at the beginning of the pipeline execution. By enabling this option, you ensure that the pipeline starts with the most recent data available.
- On-demand snapshot: Allows you to trigger snapshots for specific tables in the pipeline manually. This means that new tables can be added to the capture list after the pipeline's initial startup, and snapshots for those tables can be triggered as needed. On-demand snapshots are useful when you want to capture specific points in time or when you need to dynamically expand the set of tables being captured without losing historical data.
Read Snapshots for more details of each snapshot type.
Signal table
= dialog
Choose a signal table. This is only required if the On-demand snapshot checkbox is set to ✅.
Signal tables should be pipeline-specific to avoid accidental overlap of snapshot executions.
Read Signal tables to learn more.
Loading to Redshift
= checkbox (optional)
In the Data preparation section, select this checkbox to ensure that decimal data types are reflected as expected in Redshift. This checkbox is only available if you have selected Azure Blob Storage or Amazon S3 as your destination.
Note
You only need to select this checkbox if you plan to load your data into Amazon Redshift. Selecting this checkbox overcomes a known Redshift limitation that affects decimal data types.
Replication type
= drop-down
If using Snowflake as a destination, select the Streaming pipeline's replication type. This determines how the change data is replicated into Snowflake tables. Read Replication types to learn more.
Dates and times strategy
= drop-down
If using Snowflake as a destination, choose the date, time, and timestamp processing strategy of your Streaming pipeline.
- Snowflake Native Types (e.g. DATETIME): This action will convert the source date/time value to an appropriate Snowflake type. If it's not possible to convert, the pipeline will output the value as a string. This is the default setting.
- Integers from Epoch: This action will load all date/time values as integers in Snowflake. You can use these integers to calculate the date and time by adding the value to the epoch value. Although this may be complicated, it has the benefit of maintaining a very accurate date/time value without any errors that may be caused by conversion in the pipeline.
Read Dates and times strategy to learn more.
Advanced settings
= dialog (optional)
Set up any additional parameters for your Streaming pipeline as key:value pairs. Refer to the documentation of your source connector to determine what parameters are available.
Note
If you experience out of memory (OOM) errors on the Streaming agent, a common cause is the history.dat
file growing very large over time. To solve this problem, edit the Streaming pipeline to add the following parameter to the Advanced settings property:
- Parameter:
matillion.compact-history
. - Value:
true
.
This setting will cause the history.dat
to compact automatically when it grows too large.
Finish setting up
Click Save pipeline to finish creating your Streaming pipeline. You will be returned to the Streaming tab, where you'll see your new Streaming pipeline with basic metadata listed, including:
- The name of the pipeline.
- The status of the pipeline.
- The source database.
- The destination.
Read Manage Streaming pipelines to discover further actions you can take.