Skip to content

PostgreSQL streaming connector

The PostgreSQL connector can monitor and capture row-level changes within PostgreSQL schemas in a non-intrusive and performant manner, and achieves this by using the raw output stream from the pgoutput decoding plugin. The connector produces a change event record for every insert, update, and delete event occurring in any tables being monitored by the streaming agent.

Note

We are using the Debezium PostgreSQL connector to connect to this source. For more information, read Debezium connector for PostgreSQL.


How the connector works

PostgreSQL normally purges write-ahead log segments (WAL) after a period of time. This means a complete history of changes is not available for the connector. To overcome this scenario, the connector will initially perform a consistent snapshot for all schemas and tables that are being monitored. This will allow the connector to establish a base state, after which, streaming can begin.

Note

Depending on the number of rows within your schema, this may take a while to complete.

Once the snapshot stage has completed, the connector will move to the streaming stage. All changes that have occurred since the snapshot started will be captured during the streaming process. No changes will be missed.

The streaming stage will continue to monitor and consume changes as and when they're occurring within the database and any produced change events will be exported to your selected destination in a consistent and predictable pattern. The connector will maintain an acceptable time lag behind the source database—this lag can be monitored through the pipeline dashboard.

The connector is tolerant of failures. As the connector reads changes and produces events, it records the WAL position for each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart, the connector continues reading the WAL where it last stopped.

If the connector stopped at any point during the snapshot stage, a new snapshot will be taken on restart of the pipeline.


Prerequisites

Observe the following information about supported versions, limitations, and database rules.

Versions

The Data Productivity Cloud supports PostgreSQL versions 12, 13, 14, and 15 (including minor versions). PostgreSQL versions 12 and above contain the pgoutput plugin by default to capture logs natively.

Limitations

  • The numeric values NaN, Infinity, and -Infinity are not supported.
  • Amazon Aurora Serverless database clusters do not support streaming. Only provisioned Amazon Aurora database clusters support streaming.

Database rules

  • Your PostgreSQL database must be configured for streaming. Read Configuring PostgreSQL database to learn more.
  • Your PostgreSQL database must be hosted on the primary server, and this server must be active.

Source setup

Refer to this section when you create a streaming pipeline.

Server address = string

The server address of your PostgreSQL database.


Port = integer

The port number used to access your PostgreSQL database. The default is 5432.


Database name = string

The name of your PostgreSQL container database installation.


Username = string

The username used to log in to the specified database.


Secrets Manager = drop-down

Choose the service you use to manage secrets.


Secret name = string

The name of the secret in your secret manager service that references your PostgreSQL password.


JDBC parameters and specific connection settings = column editor

This parameter is optional.

Specify any parameter:value pairs as part of an advanced connection. Click Save to finish adding parameters.

Click Connect to establish the connection to your source database.


Unavailable field values

When a field (column) is not part of the replica identity for a table and the value is large, Postgres may use TOAST to store the value. For change records where a TOASTED value hasn't been modified as part of the change, that field within the change record will contain the following value: __value_not_modified__.

TOAST values aren't treated any differently from the actual values when replicating changes into the target tables. This means that it's possible for the replica table to inadvertently have rows where the value is updated to this placeholder value, and the actual value of the row is lost.

To avoid this case, set the replica identity of the table to FULL, which ensures all values are always included in the change records. The replica identity setting for a table can be changed with an alter table statement:

ALTER TABLE <the_table> REPLICA IDENTITY FULL;

Create a signal table

Follow this section to create a PostgreSQL signal table.

  1. Create a PostgreSQL signal table with the following SQL statement:

    CREATE TABLE IF NOT EXISTS <schema>.<table> (
    id VARCHAR(42) PRIMARY KEY,
    type VARCHAR(32) NOT NULL,
    data VARCHAR(2048) NULL
    );
    

    Where <schema> and <table> are suitable names for the schema and signal table.

    The signal table should have three columns:

    • id: An identifier for each signal execution of type VARCHAR(42).
    • type: Indicates the type of signal being sent of type VARCHAR(32).
    • data: Stores additional data related to the signal of type VARCHAR(2048).
  2. If the signal table is not already included in the streaming publication, you should add it using the following SQL:

    ALTER PUBLICATION matillion_cdc_publication ADD TABLE <schema>.<table>;
    

    Where <schema> and <table> are the names you chose when creating your signal table.

  3. Grant the necessary privileges to the database role configured for running the pipeline:

    -- Grant the streaming role the ability to access the schema
    GRANT USAGE ON SCHEMA <schema> TO <streaming_role>;
    
    -- Grant the  streaming role the ability to insert records to the signal table
    GRANT INSERT ON <schema>.<table> TO <streaming_role>;
    

    Where <schema> and <table> are the names you chose when creating your signal table, and <streaming_role> is the user configured for streaming.

Once you set up the signal table, on-demand snapshots can be enabled, and new tables added to the pipeline after the initial startup will be snapshotted. If on-demand snapshots are disabled, any new tables added to the pipeline will capture changes from that point onwards but won't sync their historical data with the target, and the pipeline will immediately have a STREAMING status.


Advanced settings

Advanced settings are optional. Advanced setting are entered as parameter:value pairs in the Add advanced settings dialog. Some parameters you might want to add are discussed below.

slot.name

slot.name: Is the name of the PostgreSQL logical decoding slot that's created for streaming changes from a particular plug-in for a particular database/schema. A valid replication slot name must contain only digits, lowercase characters, and underscores with a length of <= 63.

A unique identifier (slot name) associated with a replication slot will be generated, and the default name will be matillion_cdc. A replication slot is a feature provided by the PostgreSQL database management system, to facilitate streaming replication. The agent can connect to the replication slot by referencing the slot name, and consume the captured changes in real-time. When the agent connects, it will look for the replication slot name, matillion_cdc, and if it doesn't find it, then the agent will create one.

Note

This setting must be used if you want to have more than one pipeline using the same source PostgreSQL database.

time.precision.mode

time.precision.mode: A simplified explanation of the mappings for temporal types in PostgreSQL based on the time.precision.mode configuration property:

  • When the time.precision.mode property is set to adaptive, which is the default setting, the connector determines the literal type and semantic type for temporal columns based on their data type definition. This approach guarantees that the events accurately reflect the values stored in the database.
  • When the time.precision.mode property is set to connect, the connector uses connector-logical types to represent temporal types. This can be beneficial when consumers are limited to handling the standard logical types and cannot handle variable-precision time values. However, it's important to note that choosing this mode may lead to a loss of precision for PostgreSQL columns that have a fractional second precision greater than 3.

Configuring new tables

When adding tables to a pipeline, they must be correctly configured to allow the on-demand snapshot to take place and to ensure future changes on those tables will be streamed. Read Configure your PostgreSQL database for a more comprehensive outline of these requirements.

Add tables to the streaming publication

To enable on-demand snapshots and streaming for new tables, you must add the tables to the database publication. Use the following SQL statements:

For adding tables individually:

ALTER PUBLICATION matillion_cdc_publication ADD TABLE <schema>.<table>;

For adding all tables in a schema:

ALTER PUBLICATION matillion_cdc_publication ADD TABLES IN SCHEMA <schema>;

Replace <schema> and <table> with the names of each new table.

Grant USAGE privileges for new schemas

If any of the new tables are in schemas that previously did not have tables captured in the pipeline, grant USAGE privileges to the database role configured for streaming. Use the following SQL statement:

GRANT USAGE ON SCHEMA <schema> TO <streaming_role>;

Replace <schema> with the schema of the new tables and <streaming_role> with the user configured for streaming. Repeat this step for every new schema.

Grant SELECT privileges on new tables

To run on-demand snapshots, the user configured for streaming must have SELECT privileges on the new tables. Use the following SQL statements:

For granting SELECT permissions to tables individually:

GRANT SELECT ON <schema>.<table> TO <streaming_role>;

For granting SELECT permissions on all tables in a schema:

GRANT SELECT ON ALL TABLES IN SCHEMA public TO <streaming_role>;

Replace <schema> and <table> with the names of each new table and <streaming_role> with the user configured for streaming.

By following these steps, you ensure that new tables added to the pipeline will be properly configured for on-demand snapshots and streaming, allowing historical data to be captured without losing synchronization with the target. If on-demand snapshots are disabled, new tables will only capture changes from the point of addition and the pipeline will have a STREAMING status immediately.