Skip to content

Streaming to a Snowflake destination

Streaming pipelines can use Snowflake as a direct storage destination. This page describes the prerequisites, process, and other considerations of using Snowflake as a streaming pipeline destination.

You should have arrived here by first reading about the Create streaming pipeline wizard.


Prerequisites

  • You need a Snowflake account.
  • You need the following Snowflake resources:
    • A Snowflake user with the required permissions (see below).
    • A Snowflake warehouse that will be used to merge changes into the replicated tables.
    • A Snowflake database and schema into which the tables will be replicated.
    • A Snowflake internal stage configured for the Avro file format (see below).
  • Add an IPv6 to your virtual private cloud (VPC). Depending on the streaming agent, read the following guides:
  • Add your IP address or block to the allowed IP addresses in your Snowflake network policy. You'll need to create a network policy if you don't have one already.

Replicated tables

The selected source tables will be replicated into tables created in the configured Snowflake schema. The streaming pipeline will create replica tables if they don't already exist in the target schema. To avoid ambiguous or overlapping table names, the replica tables follow the naming pattern <source_database>_<source_schema>_<source_table>.


Destination connection

Refer to this section to complete the Destination connection section of the Create streaming pipeline wizard.

Snowflake account = string

Your Snowflake account is the string of text between https:// and .snowflakecomputing.com.

To find your Snowflake account URL:

  1. Visit app.snowflake.com and sign in.
  2. Once you have signed in, click your account menu in the bottom-left of the UI and how over the account you wish to use.
  3. Click the copy button to copy your account URL and then paste it into the Snowflake account field.

Read Finding the organization and account name for an account for additional help.


Username = string

Your Snowflake account username.


Secrets Manager = drop-down

Choose the service you use to manage secrets.


Secret name = string

The name of the secret in your secret manager service that references your Snowflake password.


JDBC parameters and specific connection settings = column editor

This parameter is optional.

Specify any parameter:value pairs as part of an advanced connection. Click Save to finish adding parameters.

Click Connect.


Destination configuration

Refer to this section to complete the Destination configuration section of the Create streaming pipeline wizard.

Role = drop-down

Select your Snowflake role. A role is an entity with privileges. Read Overview of Access Control to learn more.

The streaming agent will manage the replicated tables and apply the change data within your Snowflake warehouse. To do this, the Snowflake role requires the following privileges:

  • USAGE on the warehouse.
  • CREATE TABLE on the target schema.
  • USAGE on the target database, stage schema, and target schema (the same schema may be used as both stage and target).
  • READ and WRITE on the stage.

The correct Snowflake ROLE should be granted to the provided Snowflake USER as part of this setup.

Details of the individual permissions are described in Snowflake Access Control Privileges.


Warehouse = drop-down

Select a Snowflake warehouse. Read Overview of Warehouses to learn more.


Database = drop-down

Select a Snowflake database. Read Databases, Tables and Views - Overview to learn more.


Stage schema = drop-down

Select a Snowflake schema for staging. Read Database, Schema, and Share DDL to learn more.


Stage = drop-down

Select an internal stage to load data from files into Snowflake tables. Read CREATE STAGE to learn more.

The streaming pipeline replicates changes into Snowflake by first writing the change records in the Avro file format to an internal stage location. The internal stage can be created with the following SQL statement:

CREATE STAGE <database>.<stage_schema>.<stage> FILE_FORMAT = ( TYPE = AVRO );

Unique staging prefix = string

At the staging area, this property determines the root folder name. Assigning a unique prefix to your pipeline prevents naming conflicts and enables the reuse of a single staging area for your pipelines.


Table schema = drop-down

The Snowflake schema that will contain the replicated tables.


Table name prefix = drop-down

  • Source Database & Schema (e.g. MYDB_MYSCHEMA_MYTABLE): Use your source database and schema (separated by underscores) as the table name prefix.
  • Re-Use Staging Prefix (e.g. STAGEROOTFOLDER_MYTABLE): Re-use your staging prefix as your table name prefix.
  • No Prefix (e.g. MYTABLE): Do not use a table name prefix.

Click Continue to advance to setting up your source database connection.


Replication types

The Snowflake destination has support for three transformation types that determine how the change data is replicated into Snowflake.

Note

A primary key is required on the source table for Copy Table and Copy Table with Soft Deletes replication types. The primary key is used to merge updates into the target table. If the source table doesn't have a primary key, the replication type will be updated to Change Log for that table.

Copy Table

The Copy Table transformation type produces a replica in Snowflake of each captured source table. Each column in the target table is directly mapped from the corresponding source table and the content of each table matches the source table.

Copy Table with Soft Deletes

The Copy Table with Soft Deletes transformation type is similar to the Copy Table transformation type, but rows deleted in the source are not deleted in the target table. As well as each column mapped from the corresponding source table, an additional boolean column called MTLN_CDC_DELETED is included.

A value of true in this column indicates that a row has been deleted and is no longer present in the source table, and the data values in that row show the values of the row at the point it was deleted. A value of false in this column indicates that a hard delete has taken place instead of a soft delete, and permanently removed the record from the target table.

Change Log

The Change Log transformation type produces Snowflake tables that append entries for each change event that occurs on the source table. The table contains columns for the corresponding source table with the values for each row being the values after the change has occurred, except for delete changes where the values are from the row that was deleted.

Along with the source data columns, there are three metadata columns describing the change event.

Metadata column Description
MTLN_CDC_LAST_CHANGE_TYPE VARCHAR column containing the type of change operation and one of r for a snapshot read, c for create (insert), u for an update, or d for a delete operation.
MTLN_CDC_LAST_COMMIT_TIMESTAMP TIMESTAMP column containing the timestamp of the event provided by the source database.
MTLN_CDC_SEQUENCE_NUMBER Contains values that provide an ordering key that matches the order of the change events. The type and format of these values depends on the source system being captured.

Warning

If a snapshot with the Change Log transformation type fails and is restarted, a second record will be created for each row that was processed before the point of failure, resulting in duplicate rows in the destination table. This is expected behavior for a Change Log transformation type, but you need to be aware of the behavior and have a strategy to deal with the duplicate rows.

Example

Here we have a table in a source system with two columns—a primary key ID, and a value field. If a pipeline is started that captures the changes to that table and the following series of changes are applied to the source table:

  1. Insert a new row (1, inserted).
  2. Insert a new row (2, inserted).
  3. Update value in row 2 to updated.
  4. Insert a new row (3, inserted).
  5. Delete row 2.
  6. Update value in row 1 to updated.

Then the resulting source table after these changes would be:

id value
1 updated
3 inserted

The resulting Snowflake table depends on the replication type configured in the pipeline, as follows.

Copy Table

A replica of the values in the source table.

id value
1 updated
3 inserted

Copy Table With Soft Deletes

The rows with id values 1 and 3 match the source. The row with the id of 2 has the value at the time the row was deleted and has been flagged as deleted in the MTLN_CDC_DELETED field.

id value MTLN_CDC_DELETED
1 updated null
2 updated true
3 inserted null

Changelog

Each change to the source table has a corresponding entry.

id value MTLN_CDC_LAST_CHANGE_TYPE MTLN_CDC_LAST_COMMIT_TIMESTAMP MTLN_CDC_SEQUENCE_NUMBER
1 inserted c <change_timestamp> <sequence_value>
2 inserted c <change_timestamp> <sequence_value>
2 updated u <change_timestamp> <sequence_value>
3 inserted c <change_timestamp> <sequence_value>
2 updated d <change_timestamp> <sequence_value>
1 updated u <change_timestamp> <sequence_value>

Dates and times strategy

  • Snowflake Native Types (e.g. DATETIME): This action will convert the source date/time value to an appropriate Snowflake type. If it's not possible to convert, the pipeline will output the value as a string. This is the default setting.
  • Integers from Epoch: This action will load all date/time values as integers in Snowflake. You can use these integers to calculate the date and time by adding the value to the epoch value, applying this value to all date/time fields in the target table. Although this may be complicated, it has the benefit of maintaining a very accurate date/time value without any errors that may be caused by conversion in the pipeline.

Note

The epoch integer value is the number of days since the Unix epoch for all data sources except Oracle, where it is the number of milliseconds since the Unix epoch.

The following table shows how different source time types will be loaded into the destination table, depending on the setting of the Dates and times strategy property.

Source type Integers from epoch Cast to Snowflake types
date Integer, number of days since epoch Snowflake DATE
timestamp (up to 3 decimal places) Integer, number of milliseconds since epoch Snowflake TIMESTAMP_NTZ(9)
timestamp (4-6 decimal places) Integer, number of microseconds since epoch Snowflake TIMESTAMP_NTZ(9)
timestamp (7-9 decimal places) Integer, number of nanoseconds since epoch Snowflake TIMESTAMP_NTZ(9)
timestamp with time zone String, ISO formatted, often forced into GMT Snowflake TIMESTAMP_TZ(9), often forced into GMT
time (up to 3 decimal places) Integer, number of milliseconds past midnight Snowflake TIME(9)
time (4-6 decimal places) Integer, number of microseconds past midnight Snowflake TIME(9)
time (7-9 decimal places) Integer, number of nanoseconds past midnight Snowflake TIME(9)
time with time zone String, ISO formatted, often forced into GMT String, ISO formatted, often forced into GMT

Schema drift

Schema drift is supported for this destination. Read Schema drift to learn more.


Example Snowflake configuration

The streaming pipeline will work with any Snowflake ROLE that has the required privileges; however, we recommend creating a dedicated Snowflake USER and ROLE for the Data Productivity Cloud where possible.

The following SQL provides an example of setting up Snowflake for the Data Productivity Cloud by creating a dedicated USER, DATABASE, SCHEMA, STAGE, and WAREHOUSE. Existing Snowflake objects can be used, and multiple pipelines can share the same objects.

This example creates one schema that contains both the stage and the target tables.

-- Create a dedicated user and role for streaming
CREATE USER streaming_user PASSWORD = 'password-goes-here';
CREATE ROLE streaming_role;
GRANT ROLE streaming_role TO USER streaming_user;

-- Create a database and grant the streaming user access
CREATE DATABASE streaming_db;
CREATE SCHEMA streaming_db.streaming_schema;
GRANT USAGE ON DATABASE streaming_db TO ROLE streaming_role;
GRANT USAGE, CREATE TABLE ON SCHEMA streaming_db.streaming_schema TO ROLE streaming_role;

-- Create a stage and grant the streaming user read/write
CREATE STAGE streaming_db.streaming_schema.streaming_stage FILE_FORMAT = ( TYPE = AVRO );
GRANT READ, WRITE ON STAGE streaming_db.streaming_schema.streaming_stage TO ROLE streaming_role;

-- Create a warehouse for this example
CREATE WAREHOUSE streaming_warehouse;
GRANT USAGE ON WAREHOUSE streaming_warehouse TO ROLE streaming_role;

Note

Creating the database or the schema as TRANSIENT will cause all lower objects (schemas, tables) to also be created as TRANSIENT. Read the Snowflake documentation to learn more.