Connect to Snowflake
Matillion CDC can load data from your pipelines into Snowflake as a storage destination.
Note
Matillion CDC for Snowflake provides schema drift support. For more information, read Schema drift support.
Prerequisites
- Your Streaming agent must be version
2.81.12
or higher. - You need a Snowflake account.
- You need the following Snowflake resources:
- A Snowflake user with the required permissions (see below).
- A Snowflake schema into which the tables will be replicated.
- A Snowflake internal stage configured for the
avro
file format (see below). - A Snowflake warehouse that will be used to merge changes into the replicated tables.
- Add an IPv6 to your virtual private cloud (VPC). Depending on Streaming agent, read the following guides:
- Add your IP address or block to the allowed IP addresses in your Snowflake network policy. You'll need to create a network policy if you don't have one already.
Snowflake stage
The CDC pipeline replicates changes into Snowflake by first writing the change records in Avro format to an internal stage location. The internal stage can be created with the following SQL statement:
CREATE STAGE <database>.<stage_schema>.<stage> FILE_FORMAT = ( TYPE = AVRO );
Snowflake role
The Streaming agent will manage the replicated tables and apply the change data within your Snowflake warehouse. To do this, the Snowflake role requires the following privileges:
Usage
on the warehouse.Create Table
on the target schema.Usage
on the target database, stage schema, and target schema (the same schema may be used as both stage and target).Read
andwrite
on the stage.
The correct ROLE should be granted to the provided USER as part of this set up.
Details of the individual permissions are described in Snowflake Access Control Privileges.
Example Snowflake configuration
The CDC pipeline will work with any role that has the required privileges; however, we recommend creating a dedicated user and role for CDC where possible. The following SQL provides an example of setting up Snowflake for CDC by creating a dedicated user, database, schema, stage, and warehouse. Existing Snowflake objects can be used, and multiple pipelines can share the same objects.
This example creates one schema that contains both the stage and the target tables.
-- Create a dedicated user and role for CDC
CREATE USER cdc_user PASSWORD = 'password-goes-here';
CREATE ROLE cdc_role;
GRANT ROLE cdc_role TO USER cdc_user;
-- Create a database and grant the CDC user access
CREATE DATABASE cdc_db;
CREATE SCHEMA cdc_db.cdc_schema;
GRANT USAGE ON DATABASE cdc_db TO ROLE cdc_role;
GRANT USAGE, CREATE TABLE ON SCHEMA cdc_db.cdc_schema TO ROLE cdc_role;
-- Create a stage and grant the CDC user read/write
CREATE STAGE cdc_db.cdc_schema.cdc_stage FILE_FORMAT = ( TYPE = AVRO );
GRANT READ, WRITE ON STAGE cdc_db.cdc_schema.cdc_stage TO ROLE cdc_role;
-- Create a warehouse for this example
CREATE WAREHOUSE cdc_warehouse;
GRANT USAGE ON WAREHOUSE cdc_warehouse TO ROLE cdc_role;
Note
Creating the database or the schema as TRANSIENT
will cause all lower objects (schemas, tables) to also be created as TRANSIENT
. Read the Snowflake documentation for further information on this behavior.
Replicated tables
The selected source tables will be replicated into tables created in the configured Snowflake schema. The CDC pipeline will create replica tables if they do not already exist in the target schema. To avoid ambiguous or overlapping table names, the replica tables follow the naming pattern <source_database>_<source_schema>_<source_table>
.
Choose destination
After you have chosen your agent, configured your database connection, and selected your tables, you must connect to a destination for storage.
Choose Snowflake.
Connect to Snowflake
Complete the destination setup using the table below as reference.
Property | Description |
---|---|
Account | String denoting your Snowflake account identifier. Include everything that comes after https:// but before .snowflakecomputing.com . Read Account Identifiers for more information. For example, if the full URL is https://myorg-account123.snowflakecomputing.com , use myorg-account123 . |
Username | Your Snowflake account username. |
Secret Manager | Select your secret manager service. Current options include AWS Secrets Manager, Azure Key Vault, and Google Secret Manager. |
Secret Name | The identifying name of your secret that holds your Snowflake password. |
Advanced settings | Input any optional advanced JDBC connection settings in parameter=value pairs. Discard a setting with the trashcan button, add an additional row with the + Add parameter button. |
Click Test connection to confirm your connection; or, click Test and continue to move forward upon a successful connection.
Assign your Snowflake settings using the table below as reference.
Property | Description |
---|---|
Role | An entity with privileges. Read Overview of Access Control to learn more. |
Warehouse | A Snowflake warehouse. Read Overview of Warehouses to learn more. |
Database | A Snowflake database. Read Databases, Tables and Views - Overview to learn more. |
Stage Schema | A Snowflake schema for staging. Read Database, Schema, and Share DDL to learn more. |
Stage | An internal stage to load data from files into Snowflake tables. Read CREATE STAGE to learn more. |
Staging Prefix | A unique prefix for the CDC pipeline. The prefix is the name of the 'folder' within the storage bucket or Snowflake stage that all CDC data for this pipeline should be saved to. You can have multiple pipelines using the same storage with different prefixes. This acts as a namespace for storing the pipeline state in the stage. |
Table Schema | The schema to contain the replicated tables. |
Table name prefix | Determines the naming strategy for newly created tables in Snowflake. The source database and schema names can be used (e.g. 'MYDB_MYSCHEMA_MYTABLE'), or the specified staging prefix can be used (e.g. 'MYSTAGINGPREFIX_MYTABLE'), or the prefix can be disabled entirely. |
Click Test connection to validate this configuration; or, click Test and continue to move forward upon a successful connection.
Transformation type
The Snowflake destination has support for three transformation types that determine how the change data is replicated into Snowflake.
Note
A primary key is required on the source table for Copy Table and Copy Table with Soft Deletes transformations. The primary key is used to merge updates into the target table. If the source table doesn't have a primary key, the transformation type will be updated to Change Log for that table.
Copy Table
The Copy Table transformation type produces a replica in Snowflake of each captured source table. Each column in the target table is directly mapped from the corresponding source table and the content of each table matches the source table.
Copy Table with Soft Deletes
The Copy Table with Soft Deletes transformation type is similar to the Copy Table transformation type, but rows deleted in the source are not deleted in the target table. As well as each column mapped from the corresponding source table, an additional boolean column called MTLN_CDC_DELETED
is included.
A value of true
in this column indicates that a row has been deleted and is no longer present in the source table, and the data values in that row show the values of the row at the point it was deleted. A value of false
in this column indicates that a hard delete has taken place instead of a soft delete, and permanently removed the record from the target table.
Change Log
The Change Log transformation type produces Snowflake tables that append entries for each change event that occurs on the source table. The table contains columns for the corresponding source table with the values for each row being the values after the change has occurred, except for delete changes where the values are from the row that was deleted.
Along with the source data columns, there are three metadata columns describing the change event.
MTLN_CDC_LAST_CHANGE_TYPE
is a VARCHAR column containing the type of change operation and one ofr
for a snapshot read,c
for create (insert),u
for an update, ord
for a delete operation.MTLN_CDC_LAST_COMMIT_TIMESTAMP
is a TIMESTAMP column containing the timestamp of the event provided by the source database. In Streaming agent versions prior to 2.95.1 this was a NUMBER(38,0) column where the value was the number of milliseconds since the epoch.MTLN_CDC_SEQUENCE_NUMBER
is a column that contains values that provide an ordering key that matches the order of the change events. The type and format of these values depends on the source system being captured.
Warning
If a snapshot with the Change Log transformation type fails and is restarted, a second record will be created for each row that was processed before the point of failure, resulting in duplicate rows in the destination table. This is expected behavior for a Change Log transformation type, but you need to be aware of the behavior and have a strategy to deal with the duplicate rows.
Example
Given a table in a source system with two columns—a primary key ID, and a value field—if a pipeline is started that captures the changes to that table and the following series of changes are applied to the source table:
- Insert a new row (1, inserted)
- Insert a new row (2, inserted)
- Update value in row 2 to updated
- Insert a new row (3, inserted)
- Delete row 2
- Update value in row 1 to updated
then the resulting source table after these changes would be:
id | value |
---|---|
1 | updated |
3 | inserted |
The resulting Snowflake table depends on the transformation type configured in the pipeline.
Copy Table
A replica of the values in the source table.
id | value |
---|---|
1 | updated |
3 | inserted |
Copy Table With Soft Deletes
The rows with id
values 1
and 3
match the source. The row with the id
of 2
has the value at the time the row was deleted and has been flagged as deleted in the MTLN_CDC_DELETED
field.
id | value | MTLN_CDC_DELETED |
---|---|---|
1 | updated | null |
2 | updated | true |
3 | inserted | null |
Changelog
Each change to the source table has a corresponding entry.
id | value | MTLN_CDC_LAST_CHANGE_TYPE | MTLN_CDC_LAST_COMMIT_TIMESTAMP | MTLN_CDC_SEQUENCE_NUMBER |
---|---|---|---|---|
1 | inserted | c | <change_timestamp> | <sequence_value> |
2 | inserted | c | <change_timestamp> | <sequence_value> |
2 | updated | u | <change_timestamp> | <sequence_value> |
3 | inserted | c | <change_timestamp> | <sequence_value> |
2 | updated | d | <change_timestamp> | <sequence_value> |
1 | updated | u | <change_timestamp> | <sequence_value> |
Dates and times strategy
By default, date/time values are loaded as epoch values. Although this gives an extremely accurate value, it will require conversion to standard date/time values in most data use cases. The Dates and Times Strategy property lets you choose whether to keep epoch values or let the pipeline convert to native Snowflake time formats. Select which method you want to use:
- Integers from Epoch (default): This will load all date/time values as integers in Snowflake. You can use these integers to calculate the date and time by adding the value to the epoch value, applying this value to all date/time fields in the target table. Although this may be complicated, it has the benefit of maintaining a very accurate date/time value without any errors that may be caused by conversion in the pipeline.
- Cast to Snowflake Types (e.g. DATETIME): This will convert the source date/time value to an appropriate Snowflake type. If it's not possible to convert, the pipeline will output the value as a string.
Note
The epoch integer value is the number of days since the Unix epoch for all data sources except Oracle, where it is the number of milliseconds since the Unix epoch.
The following table shows how different source time types will be loaded into the destination table, depending on the setting of the Dates and Times Strategy property.
Source type | Integers from epoch | Cast to Snowflake types |
---|---|---|
date | Integer, number of days since epoch | Snowflake DATE |
timestamp (up to 3 decimal places) | Integer, number of milliseconds since epoch | Snowflake TIMESTAMP_NTZ(9) |
timestamp (4-6 decimal places) | Integer, number of microseconds since epoch | Snowflake TIMESTAMP_NTZ(9) |
timestamp (7-9 decimal places) | Integer, number of nanoseconds since epoch | Snowflake TIMESTAMP_NTZ(9) |
timestamp with time zone | String, ISO formatted, often forced into GMT | Snowflake TIMESTAMP_TZ(9) , often forced into GMT |
time (up to 3 decimal places) | Integer, number of milliseconds past midnight | Snowflake TIME(9) |
time (4-6 decimal places) | Integer, number of microseconds past midnight | Snowflake TIME(9) |
time (7-9 decimal places) | Integer, number of nanoseconds past midnight | Snowflake TIME(9) |
time with time zone | String, ISO formatted, often forced into GMT | String, ISO formatted, often forced into GMT |