Streaming pipelines
In this guide, you will learn how to use Matillion's API to create and manage a Streaming pipeline. Streaming pipelines provide a complete end-to-end solution for near-real-time data ingestion, allowing you to capture data from a source database and write it to either cloud storage or a cloud data warehouse. For more details, read Streaming pipelines overview.
Prerequisites
Before you begin, make sure you have the following:
- A Data Productivity Cloud account.
- Valid API credentials for Matillion API access.
- An access token with the required privileges. For more information, read Obtaining an API access token.
- A Data Productivity Cloud project. The examples in this article use the project ID
3dda0acf-e646-4f4b-b4f6-e00449b69427
; you should substitute your own project ID. For how to get the project ID through the API, see below. - A Streaming agent that has been deployed to your cloud infrastructure. The examples in this article use the agent ID
2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6
; you should substitute your own agent ID. For how to get the agent ID through the API, see below. - Any secrets used in the Streaming pipeline must be added to the secrets manager accessible to the agent. The examples in this article use the secrets
sqlserver-password
,snowflake-private-key
, andsnowflake-private-key-passphrase
added to the Azure Key Vaultstreaming-key-vault
that the agent has permissions to read; you should substitute your own secrets.
Get the project ID
Retrieve the list of projects using the Project API.
Base URL: GET /v1/projects
Example response:
{
"page": 0,
"results": [
{
"description": "project-1",
"id": "3dda0acf-e646-4f4b-b4f6-e00449b69427",
"name": "Test-project"
}
],
"size": 0,
"total": 0
}
This response includes the project ID we need in our examples, 3dda0acf-e646-4f4b-b4f6-e00449b69427
.
Get the agent ID
Once you have the project ID, use the Environment API to obtain details about the environment in which the project is running. These details include the agent ID.
Base URL: GET /v1/projects/{projectId}/environments
Example response:
{
"page": 0,
"results": [
{
"defaultAgentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
"defaultAgentName": "Test-agent",
"name": "Environment-1"
}
],
"size": 0,
"total": 0
}
This response includes the agent ID we need in our examples, 2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6
.
Create a Streaming pipeline
Use an API endpoint to create a new Streaming pipeline in a specified project. The pipeline configuration must be defined in a JSON object, which you will send in the body of a POST request.
The POST request used to create the pipeline is:
Base URL: POST /v1/projects/{projectId}/streaming-pipelines
This requires the following headers to be specified:
Header | Value |
---|---|
content-type | application/json |
authorization | bearer |
The following example shows the JSON body needed to create a pipeline that streams from an SQL Server source to a Snowflake table. You should modify this example appropriately with the values needed to create your pipeline:
{
"name": "my-streaming-pipeline",
"agentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
"streamingSource": {
"type": "sqlserver",
"connection": {
"host": "sqlserver.my-org.com",
"port": 1433,
"database": "my_database",
"username": "sa",
"password": {
"secretType": "AZURE_KEY_VAULT",
"secretLocation": "streaming-key-vault",
"secretName": "sqlserver-password"
},
"jdbcProperties" : {
"lockTimeout": 60
}
},
"tables": [
{
"schema": "my_schema",
"table": "my_first_table"
},
{
"schema": "my_schema",
"table": "my_second_table"
}
]
},
"streamingTarget": {
"type": "snowflake",
"connection": {
"accountName": "my-snowflake-account.eu-central-1",
"username": "STREAMING_USER",
"authentication": {
"type": "key-pair",
"privateKey": {
"secretType": "AZURE_KEY_VAULT",
"secretLocation": "streaming-key-vault",
"secretName": "snowflake-private-key"
},
"passphrase": {
"secretType": "AZURE_KEY_VAULT",
"secretLocation": "streaming-key-vault",
"secretName": "snowflake-private-key-passphrase"
}
},
"jdbcProperties": {
"networkTimeout": 60
}
},
"role": "STREAMING_ROLE",
"warehouse": "STREAMING_WH",
"database": "STREAMING_DB",
"stageSchema": "STREAMING_SCHEMA",
"stageName": "STREAMING_STAGE",
"stagePrefix": "MY_STREAMING_PIPELINE",
"tableSchema": "STREAMING_SCHEMA",
"tablePrefixType": "PREFIX",
"transformationType": "CHANGE_LOG",
"temporalMapping": "NATIVE"
},
"advancedProperties": {
"max.batch.size": "40000"
}
}
An example of this request in cURL is:
curl --request POST \
--url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines \
--header 'authorization: Bearer $DPC_TOKEN' \
--header 'content-type: application/json' \
--data @streaming-pipeline.json
Where the request body is stored as streaming-pipeline.json
and the API token is set as the environment variable DPC_TOKEN
.
A successful request should return a 201
status code and a response body containing the newly created Streaming pipeline definition. For example:
{
"streamingPipelineId": "50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3",
"name": "my-streaming-pipeline",
"agentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
"streamingSource": {
"type": "sqlserver"
"connection": {
"host": "sqlserver.my-org.com",
"port": 1433,
"database": "my_database",
"username": "sa",
"password": {
"secretType": "AZURE_KEY_VAULT",
"secretLocation": "streaming-key-vault",
"secretName": "sqlserver-password"
},
"jdbcProperties" : {
"lockTimeout": 60
}
},
"tables": [
{
"schema": "my_schema",
"table": "my_first_table"
},
{
"schema": "my_schema",
"table": "my_second_table"
}
]
},
"streamingTarget": {
"type": "snowflake"
"connection": {
"accountName": "my-snowflake-account.eu-central-1",
"username": "STREAMING_USER",
"authentication": {
"type": "key-pair",
"privateKey": {
"secretType": "AZURE_KEY_VAULT",
"secretLocation": "streaming-key-vault",
"secretName": "snowflake-private-key"
},
"passphrase": {
"secretType": "AZURE_KEY_VAULT",
"secretLocation": "streaming-key-vault",
"secretName": "snowflake-private-key-passphrase"
}
},
"jdbcProperties": {
"networkTimeout": 60
}
},
"role": "STREAMING_ROLE",
"warehouse": "STREAMING_WH",
"database": "STREAMING_DB",
"stageSchema": "STREAMING_SCHEMA",
"stageName": "STREAMING_STAGE",
"stagePrefix": "MY_STREAMING_PIPELINE",
"tableSchema": "STREAMING_SCHEMA",
"tablePrefixType": "PREFIX",
"transformationType": "CHANGE_LOG",
"temporalMapping": "NATIVE"
},
"advancedProperties": {
"max.batch.size": "40000"
}
}
The streamingPipelineId
field is a UUID value that's generated for the pipeline when it's created and acts as a unique identifier for the pipeline. You will need this identifier to perform management of the pipeline through the API, such as viewing, starting and stopping, or checking the status of the pipeline.
If the streaming pipeline definition sent in the POST body is incomplete or invalid, the response code will be 400
and the response body will contain a list of the issues and the path to the relevant fields from the request.
For example, using the same request body as above but removing the streamingSource.connection.username
and streamingTarget.connection.authentication
fields to make the request invalid, the response body would be:
{
"title": "Bad Request",
"status": 400,
"detail": "There are validation errors with the streaming pipeline definition",
"instance": "/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines",
"validation-errors": [
{
"field": "streamingTarget.connection.authentication",
"message": "must not be null"
},
{
"field": "streamingSource.connection.username",
"message": "must not be empty"
}
]
}
View a Streaming pipeline
The Streaming pipeline ID can be used to view the Streaming pipeline definition, using the following GET call:
Base URL: GET /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}
An example of this request in cURL is:
curl --request GET \
--url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3 \
--header 'authorization: Bearer $DPC_TOKEN'
Start or stop a Streaming pipeline
The Streaming pipeline ID can be used to start or stop the pipeline, using the following POST call:
Base URL: POST /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}/commands
The request body must contain the start or stop commands:
{
"command": "start"
}
{
"command": "stop"
}
A successful call will respond with a 200
status code, indicating that the agent has received the request and initiated the pipeline start or stop process.
An example of this request in cURL is:
curl --request POST \
--url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3/commands \
--header 'authorization: Bearer $DPC_TOKEN' \
--header 'content-type: application/json' \
--data '{"command":"start"}'
Check the Streaming pipeline status
The Streaming pipeline ID can be used to check the status of the pipeline, using the following GET call:
Base URL: GET /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}/status
This will return a response body with a status
field value. If the pipeline has just been started, this may return a value of starting
or streaming
.
An example of this request in cURL is:
curl --request GET \
--url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3/status \
--header 'authorization: Bearer $DPC_TOKEN'