Skip to content

Streaming pipelines

Try in Console

In this guide, you will learn how to use Matillion's API to create and manage a Streaming pipeline. Streaming pipelines provide a complete end-to-end solution for near-real-time data ingestion, allowing you to capture data from a source database and write it to either cloud storage or a cloud data warehouse. For more details, read Streaming pipelines overview.


Prerequisites

Before you begin, make sure you have the following:

  • A Data Productivity Cloud account.
  • Valid API credentials for Matillion API access.
  • An access token with the required privileges. For more information, read Obtaining an API access token.
  • A Data Productivity Cloud project. The examples in this article use the project ID 3dda0acf-e646-4f4b-b4f6-e00449b69427; you should substitute your own project ID. For how to get the project ID through the API, see below.
  • A Streaming agent that has been deployed to your cloud infrastructure. The examples in this article use the agent ID 2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6; you should substitute your own agent ID. For how to get the agent ID through the API, see below.
  • Any secrets used in the Streaming pipeline must be added to the secrets manager accessible to the agent. The examples in this article use the secrets sqlserver-password, snowflake-private-key, and snowflake-private-key-passphrase added to the Azure Key Vault streaming-key-vault that the agent has permissions to read; you should substitute your own secrets.

Get the project ID

Retrieve the list of projects using the Project API.

Base URL: GET /v1/projects

Example response:

{
  "page": 0,
  "results": [
    {
      "description": "project-1",
      "id": "3dda0acf-e646-4f4b-b4f6-e00449b69427",
      "name": "Test-project"
    }
  ],
  "size": 0,
  "total": 0
}

This response includes the project ID we need in our examples, 3dda0acf-e646-4f4b-b4f6-e00449b69427.

Get the agent ID

Once you have the project ID, use the Environment API to obtain details about the environment in which the project is running. These details include the agent ID.

Base URL: GET /v1/projects/{projectId}/environments

Example response:

{
  "page": 0,
  "results": [
    {
      "defaultAgentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
      "defaultAgentName": "Test-agent",
      "name": "Environment-1"
    }
  ],
  "size": 0,
  "total": 0
}

This response includes the agent ID we need in our examples, 2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6.


Create a Streaming pipeline

Use an API endpoint to create a new Streaming pipeline in a specified project. The pipeline configuration must be defined in a JSON object, which you will send in the body of a POST request.

The POST request used to create the pipeline is:

Base URL: POST /v1/projects/{projectId}/streaming-pipelines

This requires the following headers to be specified:

Header Value
content-type application/json
authorization bearer

The following example shows the JSON body needed to create a pipeline that streams from an SQL Server source to a Snowflake table. You should modify this example appropriately with the values needed to create your pipeline:

{
  "name": "my-streaming-pipeline",
  "agentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
  "streamingSource": {
    "type": "sqlserver",
    "connection": {
      "host": "sqlserver.my-org.com",
      "port": 1433,
      "database": "my_database",
      "username": "sa",
      "password": {
        "secretType": "AZURE_KEY_VAULT",
        "secretLocation": "streaming-key-vault",
        "secretName": "sqlserver-password"
      },
      "jdbcProperties" : {
        "lockTimeout": 60
      }
    },
    "tables": [
      {
        "schema": "my_schema",
        "table": "my_first_table"
      },
      {
        "schema": "my_schema",
        "table": "my_second_table"
      }
    ]
  },
  "streamingTarget": {
    "type": "snowflake",
    "connection": {
      "accountName": "my-snowflake-account.eu-central-1",
      "username": "STREAMING_USER",
      "authentication": {
        "type": "key-pair",
        "privateKey": {
          "secretType": "AZURE_KEY_VAULT",
          "secretLocation": "streaming-key-vault",
          "secretName": "snowflake-private-key"
        },
        "passphrase": {
          "secretType": "AZURE_KEY_VAULT",
          "secretLocation": "streaming-key-vault",
          "secretName": "snowflake-private-key-passphrase"
        }
      },
      "jdbcProperties": {
        "networkTimeout": 60
      }
    },
    "role": "STREAMING_ROLE",
    "warehouse": "STREAMING_WH",
    "database": "STREAMING_DB",
    "stageSchema": "STREAMING_SCHEMA",
    "stageName": "STREAMING_STAGE",
    "stagePrefix": "MY_STREAMING_PIPELINE",
    "tableSchema": "STREAMING_SCHEMA",
    "tablePrefixType": "PREFIX",
    "transformationType": "CHANGE_LOG",
    "temporalMapping": "NATIVE"
  },
  "advancedProperties": {
    "max.batch.size": "40000"
  }
}

An example of this request in cURL is:

curl --request POST \
  --url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines \
  --header 'authorization: Bearer $DPC_TOKEN' \
  --header 'content-type: application/json' \
  --data @streaming-pipeline.json

Where the request body is stored as streaming-pipeline.json and the API token is set as the environment variable DPC_TOKEN.

A successful request should return a 201 status code and a response body containing the newly created Streaming pipeline definition. For example:

{
  "streamingPipelineId": "50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3",
  "name": "my-streaming-pipeline",
  "agentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
  "streamingSource": {
    "type": "sqlserver"
    "connection": {
      "host": "sqlserver.my-org.com",
      "port": 1433,
      "database": "my_database",
      "username": "sa",
      "password": {
        "secretType": "AZURE_KEY_VAULT",
        "secretLocation": "streaming-key-vault",
        "secretName": "sqlserver-password"
      },
      "jdbcProperties" : {
        "lockTimeout": 60
      }
    },
    "tables": [
      {
        "schema": "my_schema",
        "table": "my_first_table"
      },
      {
        "schema": "my_schema",
        "table": "my_second_table"
      }
    ]
  },
  "streamingTarget": {
    "type": "snowflake"
    "connection": {
      "accountName": "my-snowflake-account.eu-central-1",
      "username": "STREAMING_USER",
      "authentication": {
        "type": "key-pair",
        "privateKey": {
          "secretType": "AZURE_KEY_VAULT",
          "secretLocation": "streaming-key-vault",
          "secretName": "snowflake-private-key"
        },
        "passphrase": {
          "secretType": "AZURE_KEY_VAULT",
          "secretLocation": "streaming-key-vault",
          "secretName": "snowflake-private-key-passphrase"
        }
      },
      "jdbcProperties": {
        "networkTimeout": 60
      }
    },
    "role": "STREAMING_ROLE",
    "warehouse": "STREAMING_WH",
    "database": "STREAMING_DB",
    "stageSchema": "STREAMING_SCHEMA",
    "stageName": "STREAMING_STAGE",
    "stagePrefix": "MY_STREAMING_PIPELINE",
    "tableSchema": "STREAMING_SCHEMA",
    "tablePrefixType": "PREFIX",
    "transformationType": "CHANGE_LOG",
    "temporalMapping": "NATIVE"
  },
  "advancedProperties": {
    "max.batch.size": "40000"
  }
}

The streamingPipelineId field is a UUID value that's generated for the pipeline when it's created and acts as a unique identifier for the pipeline. You will need this identifier to perform management of the pipeline through the API, such as viewing, starting and stopping, or checking the status of the pipeline.

If the streaming pipeline definition sent in the POST body is incomplete or invalid, the response code will be 400 and the response body will contain a list of the issues and the path to the relevant fields from the request.

For example, using the same request body as above but removing the streamingSource.connection.username and streamingTarget.connection.authentication fields to make the request invalid, the response body would be:

{
  "title": "Bad Request",
  "status": 400,
  "detail": "There are validation errors with the streaming pipeline definition",
  "instance": "/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines",
  "validation-errors": [
    {
      "field": "streamingTarget.connection.authentication",
      "message": "must not be null"
    },
    {
      "field": "streamingSource.connection.username",
      "message": "must not be empty"
    }
  ]
}

View a Streaming pipeline

The Streaming pipeline ID can be used to view the Streaming pipeline definition, using the following GET call:

Base URL: GET /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}

An example of this request in cURL is:

curl --request GET \
  --url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3 \
  --header 'authorization: Bearer $DPC_TOKEN'

Start or stop a Streaming pipeline

The Streaming pipeline ID can be used to start or stop the pipeline, using the following POST call:

Base URL: POST /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}/commands

The request body must contain the start or stop commands:

{
  "command": "start"
}
{
  "command": "stop"
}

A successful call will respond with a 200 status code, indicating that the agent has received the request and initiated the pipeline start or stop process.

An example of this request in cURL is:

curl --request POST \
  --url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3/commands \
  --header 'authorization: Bearer $DPC_TOKEN' \
  --header 'content-type: application/json' \
  --data '{"command":"start"}'

Check the Streaming pipeline status

The Streaming pipeline ID can be used to check the status of the pipeline, using the following GET call:

Base URL: GET /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}/status

This will return a response body with a status field value. If the pipeline has just been started, this may return a value of starting or streaming.

An example of this request in cURL is:

curl --request GET \
  --url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3/status \
  --header 'authorization: Bearer $DPC_TOKEN'