Incremental or high water mark data loading
A common use case is to retrieve regular batches of data from a data source where the source table has a last updated
time stamp or other incrementing key. This makes it easy to retrieve just the data that has been updated.
In this example, we'll examine one way that we can easily perform incremental loads using data staging connectors inside Data Productivity Cloud. The basic concept of this method consists of four pipelines, as follows.
Run these pipelines once to set up your incremental load:
- Initial load: An orchestration pipeline performing an initial load of the entire data set into a table.
- Recent update view: A transformation pipeline to create a view that always contains the "datetime" of the most recent record update.
Run these pipelines to perform the incremental load itself whenever needed (a schedule can be used for regular updates):
- Update table: A transformation pipeline, called by the "incremental load" pipeline, that is responsible for updating the target table.
- Incremental load: The orchestration pipeline that will pull in newly updated data only. This will call the "update table" pipeline. We create this last, as the update table pipeline needs to exist first.
Each of these pipelines is described in detail in the following sections. Note that we are using Salesforce Query for our incremental load as an example, but the advice in this article is equally relevant to any of the Data Productivity Cloud's query components. Similarly, we will be using a DateTime value to judge the recency of a record and will be referring to DateTime throughout the example, but plain digits as incrementing keys will work just as well.
Initial load
To set up for our incremental load, we're going to first need to do a full load of the data into a table.
In this example, we'll be using the Salesforce Query component to pull in the Knowledge_kav data source into a table called sf_articles.
Run this pipeline to create your table and load the initial data. It may be wise to rerun this pipeline for a second table that we will use to stage incremental data loads, as it will ensure the table metadata matches your target table's. In this example we use sf_articles as our main table and sf_articles_stg as the staging table.
Recent update view
Next we need to find the most recent record from the sf_articles table and create a view to track that record's datetime value even if the table data changes.
In a transformation pipeline, use a Table Input component to bring in your table (sf_articles in this example), but select only a datetime column that indicates the record's updated time. In this case, we've chosen the SystemModstamp column.
Now link the Table Input component to an Aggregate component. Add a single entry to the Aggregations property for the chosen column, with the Aggregation Type as "Max". This will result in a single value remaining, which is the most recent update value. You can check this is the case with the Sample tab on the Aggregate component.
Finally, we want to connect this to a Create View component. Name the view something memorable and then run the pipeline to create this view.
We now have a table full of data and a view that has a single record and value: the datetime of that table's most recent update. We have named our view sf_article_recent for this example, and we'll be using it in the Incremental load section.
Update table
We'll now create a small transformation pipeline that will update the table. This is simply a Table Input component connected to a Table Update component.
The Table Input component should be pointed to the staging table defined in the Initial load section. Remember that this should match your target table's metadata. In this example, it's pointing at the sf_articles_stg table.
The Table Update component should be pointed to your main table. In this example, it's updating sf_articles.
Incremental load
Now we can design our incremental load pipeline, an orchestration pipeline that will populate our staging table with recently updated rows.
This pipeline uses the same query component we used for our initial load (in this example, Salesforce Query) but with some differences in how we configure it:
- Point the Target Table property at the staging table. In this example it is pointed at sf_articles_stg.
- Connect a Table Iterator component to the Salesforce Query component.
- Set the Basic/Advanced Mode property of the query component to Advanced, so we can use some custom SQL.
- End the pipeline by linking it to a Run Transformation component that points to the update table pipeline previously described.
Pipeline variables
We need to declare a pipeline variable to hold the datetime of our most recently updated field. With the incremental load pipeline open, follow the instructions in Creating variables and create a variable with type "Text" and a suitable name. Provide a default value such as 2024-01-01 10:48:59.0
. The actual default value is unimportant as it won't be used by the pipeline.
Table iterator component
The Table Iterator component holds the key to this setup. The component's Table Name property is set to point at the view we created earlier. Our view has only one record, so we'll only be doing a single iteration. We use the Column Mapping property to map our single column (with single record) to our pipeline variable, so that we can pass its value as a variable to the query component.
Query component
Set the Basic/Advanced Mode property of the query component to Advanced. Set the Target Table property to your chosen staging table.
In the SQL Query property, we will write some SQL that will load only those records in the data source that are greater than the date in our variable. This will work for finding more recent DateTimes as well as more generic incrementing keys. The general form of this SQL statement is:
SELECT * FROM "<data>" WHERE <DateTime Column> > '${<variable>}'
In our example:
- The data source is "Knowledge_kav"
- The DateTime column is "SystemModstamp"
- The pipeline variable is "recent"
So our SQL query is as follows:
SELECT * FROM "Knowledge__kav" WHERE SystemModstamp > '${recent}'
Scheduling
After running the above pipelines, we have a full table of initial data and a view that always contains that data's most recent DateTime value. We've harnessed a Table Iterator component to read that DateTime and pass it to a query component that uses it to filter future data loads, so we only get records newer than that. These records are placed into a staging table, which is then used to update our complete data table.
You can of course run this pipeline manually as and when you need to, but typically we'd want to schedule this job to run regularly and keep our table up to date automatically. To do this, you should create a schedule to run the incremental load job.