Sync All Tables
Process the latest files from cloud storage to maintain tables in your cloud data platform.
If you have configured a streaming pipeline with Amazon S3 or Azure Blob as the destination, these Data Productivity Cloud pre-built pipelines can be used to load the Avro files into Snowflake or Databricks.
Pre-built pipelines are available at Matillion Exchange.
Compatibility
Cloud data platform | Full SaaS | Hybrid SaaS |
---|---|---|
Snowflake | ✅ | ✅ |
Databricks | ❌ | ✅ |
Amazon Redshift | ❌ | ❌ |
Installation
- Download the latest zip file for your target data platform.
- Open a branch on your Data Productivity Cloud project.
- If you already have a folder named "Matillion CDC Pipelines" in the root of your project, delete it.
- Hover over the root folder in your project, click the three-dot menu and select Import.
- Browse to and select the ZIP file.
- You will now have a folder named "Matillion CDC Pipelines" containing the latest version of the pipelines.
Usage
- Open the "Load CDC Data" pipeline.
- Copy the "Sync All Tables - Template" Run Orchestration component and paste it into your own orchestration pipeline.
- Click into the "Sync All Tables - Template" component and edit the
Set Scalar Variables
andSet Grid Variables
parameters accordingly. - You can now run or schedule your orchestration pipeline to keep Snowflake up to date with your streaming files.
Set Scalar Variables
The following variables can be set in the Set Scalar Variables
parameter.
cloud_storage_url
= string
The url of the cloud storage location that the streaming pipeline is writing to. This should take one of the following formats:
s3://<bucket>/<prefix>
azure://<storage_account>.blob.core.windows.net/<container>/<prefix>
gs://<bucket>/<prefix>
warehouse
= string
The Snowflake virtual warehouse used to execute the SQL statements. Read Overview of Warehouses to learn more.
target_database
= string
The Snowflake database where the external table and target tables will be created. Read Databases, Tables and Views - Overview to learn more.
target_schema
= string
The Snowflake schema where the external table will be created. By default, the target tables will also be created in this schema. See the parameter use_source_schemas
. Read Database, Schema, and Share DDL to learn more.
external_stage
= string
The name of an existing external stage that contains the files output by the streaming pipeline. The URL of the external stage must contain the cloud_storage_url
.
external_table
= string
The external table that will be created to read the files output by the streaming pipeline.
concurrency
= string
Controls whether the files from the source tables are processed table-by-table, or all tables at once. Options are Concurrent
or Sequential
.
use_source_schemas
= string
Optionally create the target tables in a schema with the same name as the schema containing the source table. If the schema doesn't already exist, the pipeline will try to create it. Options are Y
or N
.
target_prefix
= string
A prefix to add to the source table name to generate the target table name. If no prefix is specified, the target table will have the same name as the source table.
fully_qualify_target_table
= string
Optionally includes the source database and schema in the target table name. If use_source_schemas = N
, it is recommended to set this to Y
, unless you are confident that your source table names will always be unique. Options are Y
or N
.
transformation_type
= string
The type of transformation used when applying the change events to the target table. Available options:
- Copy Table: The target table will be maintained as a copy of the source table.
- Copy Table with Soft Deletes: Same as
Copy Table
, but records deleted in the source table will be retained in the target table. - Change Log: All change events will be extracted and appended to the target table.
Note
A primary key is required on the source table for Copy Table
and Copy Table with Soft Deletes
transformations. The primary key is used by the pre-built pipeline to merge updates into the target table. If the source table doesn't have a primary key, the transformation type will be updated to Change Log
for that table.
append_metadata
= string
Whether to add all metadata columns to the target table, or just the minimum required for the selected transformation_type
. Options are Y
or N
.
bytes_to_decimal_function
= string
The name of a user defined function (UDF) that will be created to convert VariableScaleDecimals back to a decimal representation. If no function name is specified, any columns of type VariableScaleDecimal in the Avro files will be created as Variants in Snowflake.
schema_drift_action
= string
If the pipeline detects that there have been schema changes in the source table, and which are not compatible with the current target table, the target table can be altered to accept the new data. Options are Update Target
or Fail Job
.
cloud_storage_url
= string
The url of the cloud storage location that the streaming pipeline is writing to. This should take one of the following formats:
s3://<bucket>/<prefix>
azure://<storage_account>.blob.core.windows.net/<container>/<prefix>
gs://<bucket>/<prefix>
catalog
= string
The name of the Databricks Unity Catalog containing the schemas. For workspaces that do not have Unity Catalog enabled, this should be set to hive_metastore.
target_schema
= string
The Databricks schema where the working tables will be created. By default, the target tables will also be created in this schema. See the parameter use_source_schemas
.
use_source_schema
= string
Optionally create the target tables in a schema with the same name as the schema containing the source table. If the schema doesn't already exist, the pipeline will try to create it. Options are Y
or N
.
concurrency
= string
Controls whether the files from the source tables are processed table-by-table, or all tables at once. Options are Concurrent
or Sequential
.
stage_prefix
= string
A prefix to add to the names of the working tables.
target_prefix
= string
A prefix to add to the source table name to generate the target table name. If no prefix is specified, the target table will have the same name as the source table.
fully_qualify_target_table
= string
Optionally includes the source database and schema in the target table name. If use_source_schemas
= N
, it is recommended to set this to Y
, unless you are confident that your source table names will always be unique. Options are Y
or N
.
transformation_type
= string
The type of transformation used when applying the change events to the target table. Available options:
- Copy Table: The target table will be maintained as a copy of the source table.
- Copy Table with Soft Deletes: Same as
Copy Table
, but records deleted in the source table will be retained in the target table. - Change Log: All change events will be extracted and appended to the target table.
Note
A primary key is required on the source table for Copy Table
and Copy Table with Soft Deletes
transformations. The primary key is used by the pre-built pipeline to merge updates into the target table. If the source table doesn't have a primary key, the transformation type will be updated to Change Log
for that table.
append_metadata
= string
Whether to add all metadata columns to the target table, or just the minimum required for the selected transformation_type
. Options are Y
or N
.
bytes_to_decimal_function
= string
Reserved for future use.
schema_drift_action
= string
If the pipeline detects that there have been schema changes in the source table, and which are not compatible with the current target table, the target table can be altered to accept the new data. Options are Update Target
, Fail Job
, or Skip
. For non-Unity Catalog workspaces, this must be set to Skip
.
Set Grid Variables
The following variables can be set in the Set Grid Variables
parameter.
primary_key_override
= drop-down
Optionally provide a list of primary key columns for the source tables. By default, the job will read the primary key columns from the change data capture Avro files. However, if the source table does not a have a primary key defined in its DDL, a list of unique columns can be specified here to enable Copy Table transformations.
Note
The values for the source_database, source_schema, source_table, and source_column are case sensitive, and must match the source database.