Skip to content

Streaming pipelines

Streaming pipelines are a complete end-to-end solution for near-real-time data ingestion, allowing you to capture and synchronize data from source databases and write to a target data warehouse.

Streaming pipelines are an Enterprise edition feature. Read Editions to learn more.


Sources and destinations

Supported data sources are:

Supported data targets are:


Core concepts

There are some important concepts to understand before you begin using streaming pipelines. These are detailed below.

Concept Description
Streaming agent The agent is a component within the Data Productivity Cloud that serves as a bridge between the source database and the target cloud data lake, enabling the execution and scheduling of streaming pipelines. The agent will be hosted in your own infrastructure, using a Hybrid-SaaS solution.
Streaming pipeline A pipeline is a collection of configuration details, including the source configuration, target configuration, and any advanced properties that allow the agent to begin monitoring and consuming database changes and delivering them to the appropriate data lake.
Source The source is a collection of connection properties for the source database you wish to configure a streaming pipeline for. The configuration of the source database will vary depending on the specific source.
Target The target (or destination) is a collection of connection properties for where the agent will deliver change event data captured from the source database.

Key features of the streaming pipeline architecture

Streaming pipelines offer several benefits and capabilities to provide a consistent and up-to-date view of the connected source database while minimizing resource usage and maintaining the regular operations of the source database.

  • Data stays within your private cloud: This means that the change data that's being streamed from the source and written to the destination by the streaming pipeline remains within your infrastructure.
  • Configuration and monitoring: The streaming pipeline tab within your Data Productivity Cloud projects allows you to configure and manage the streaming pipeline. You can define the specific settings for the source database, schemas, and data snapshotting. Additionally, the interface allows you to monitor the status of the streaming pipeline, ensuring visibility of the data integration process.
  • Cloud secrets: The streaming agent requires access to a cloud secrets service, which is a secure storage system for storing authentication and connection secrets. These secrets are used by the agent to authenticate itself with the source database and establish a secure connection for capturing the data changes. For details of how the Data Productivity Cloud uses secrets, read Secrets overview.
  • Log-based transactions:
    • Low-level logs for consistent view: Streaming pipelines leverage low-level logs from the source database to provide a consistent and up-to-date picture of the connected source database and the tables it is monitoring. This ensures that the captured data reflects the latest changes and provides an accurate representation of the source database.
    • Minimal resource usage: The log consumption approach used by streaming pipelines keeps the resource usage on the source database to a minimum.
  • Continuous operation: Once the streaming pipeline agent is configured and started, it operates autonomously without requiring much intervention. The agent continuously monitors all changes occurring in the source database, consumes those changes from the low-level logs, and delivers them to the designated target data lake or storage. This ensures a continuous and reliable change data capture process.

High level architecture


Batching process

When you create a streaming pipeline, the following happens:

  1. Connecting to the source database: The streaming agent establishes a connection with the source database, allowing it to capture data changes and track them in real time.
  2. Processing change events: Once connected to the source database, the agent processes the change events occurring in the database. Data changes are captured and transformed into manageable units and written out in near-real-time.
  3. Writing to storage: Once processed, the change batches are sent to the designated destination.

When running a streaming pipeline within the Data Productivity Cloud, the changes captured from the source database are buffered within the streaming agent before being written out to files in the specified storage destination. This uses the following principles:

  • Change buffering: When changes are emitted from the source database, they are buffered within the streaming agent. This buffering allows for efficient handling and processing of the changes before writing them to files.
  • Writing changes: The files containing the changes are written when one of the following conditions is met:
    • Time threshold: If the oldest change in a partition reaches a specified time threshold, the changes within that partition are written to a file. This ensures that changes are not held in the buffer for an extended period.
    • File size limit: If the size of changes within a partition exceeds a specified limit, the changes are written to a file. This helps maintain manageable file sizes.

Buffering the changes within the agent and writing them to the destination based on time thresholds or file size limits ensures efficient and timely handling of data changes. The buffer time setting helps ensure that changes are processed and written within a specified time frame, maintaining near-real-time capture and minimizing any delay in data availability.


Files

Each file contains a series of change records in the Avro file format. This Avro format is a supported format for optimized ingestion into a cloud data warehouse. This is the same common structure for each source; however, variations may occur for individual keys. Each row in the file is a change record and contains the following structure (here we are using PostgreSQL as an example):

{
    "before": null,
    "after": {
      "actor_id": "70ac0033-c25b-7687-5a86-6861c08cabdd",
      "first_name": "john",
      "last_name": "smith",
      "last_update": 1635638400123456,
      "version": 0
    },
    "metadata": {
      "connector": "postgresql",
      "db": "postgres_db",
      "key": ["actor_id"],
      "lsn": 37094192,
      "name": "matillion",
      "op": "r",
      "schema": "public",
      "table": "actor",
      "ts_ms": 1635638400234,
      "txId": 543,
      "version": "1.7.1.Final"
    }
}

The op field contains the type of the change for this record:

  • r: Read (during snapshot)
  • c: Create
  • d: Delete
  • u: Update

The before and after fields contain the values in that row as they were before and after the change was applied, and as such the fields will differ by table. In cases where a record is created, the value for the before field will be empty (null); in cases where a record is deleted, the value for the after field will be empty (null).


Get started

Read Create a streaming pipeline to learn how to set up your first streaming pipeline.