Manage Pub-Sub Configuration
When using Matillion ETL as part of a larger process the best practice way to initiate an orchestration job is to use Pub/Sub Service. To do this, first create a Topic and Subscription via the GCP console.
Note
You must first enable schedules at the project level before the Listen service can receive messages from the queue. Open the Project menu and select Manage Schedules and tick the Schedules are enabled for project checkbox.
Next, open the Manage Pub/Sub Configuration menu via the Project menu in Matillion ETL.
- Enable Pub/Sub: This turns on or off the sub-system that listens to Pub/Sub. Note: This is global, not for a particular project.
- Credentials: Choose the credentials that you will be using to talk to Pub/Sub. The selected service account will need the required permissions to use most Pub/Sub features.
- Project: The GCP Project where the subscription exists.
- Subscription: The name of the subscription to listen for messages. These messages are in a set format as shown below.
{
"group":"<Exactly matches the project group where your job resides>",
"project":"<Exactly matches the project name where your job resides>",
"version":"<Exactly matches the version Name>",
"environment":"<Exactly matches the environment Name>",
"job":"<Exactly matches the orchestration Job Name>",
"variables": {
"<Name 1>": "<Value 1>",
"<Name 2>": "<Value 3>"
}
}
The variables are passed to the orchestration job. Matching variable names must be declared in the project with default values set for each environment. If a variable is passed that is not defined in the project, an error is logged in Project → Task History.
- Enable Success: Enable this if you wish to place a message on a Pub/Sub Topic when your orchestration job has completed.
- Success Topic: The name of the success topic on which to place success messages.
- Compress: When ticked, the body of the message on the Topic will be gzipped. Use this to avoid hitting Pub/Sub limits.
- Enable Failure: Enable this if you wish to place a message on a SQS topic when your orchestration job has failed.
- Failure Topic: The name of the failure topic on which to place failure messages.
- Compress: When ticked, the body of the message on the topic will be gzipped. Use this to avoid hitting Pub/Sub limits.
Clicking the Test button will ensure that the topic exists and can be read.
Returned messages
The messages that are returned on the success or fail topics are in the following format:
{
"type":"PUBSUB_ORCHESTRATION",
"groupName":"<the group name that was executed>",
"projectName":"<the project name that was executed>",
"versionName":"<the version name that was executed>",
"environmentName":"<the version name that was executed>",
"state":"SUCCESS|FAILED",
"enqueuedTime":<Time message placed on message in unix epoc format>,
"startTime":<Time orchestration job began in unix epoc format>,
"endTime":<Time orchestration job ended in unix epoc format>,
"message":<contains error messages where applicable>,
"originator ID":"q_Topic",
"tasks":[ <This is a list of tasks executed in the orchestration>
{
"type":"VALIDATE_ORCHESTRATION",
"jobName":"SimplePubSubJob",
"componentName":"Start 0",
"orchestrationJobName":"SimplePubSubJob",
"orchestrationComponentName":"Start 0",
"state":"SUCCESS",
"rowCount":0,
"startTime":1443526622364,
"endTime":1443526622364,
"message":""
},
{
"type":"VALIDATE_ORCHESTRATION",
"jobName":"SimplePubSUbJob",
"componentName":"End Success 0",
"orchestrationJobName":"SimplePubSubJob",
"orchestrationComponentName":"End Success 0",
"state":"SUCCESS",
"rowCount":0,
"startTime":1443526622365,
"endTime":1443526622369,
"message":""
},
{
"type":"EXECUTE_ORCHESTRATION",
"jobName":"SimplePubSubJob",
"componentName":"End Success 0",
"orchestrationJobName":"SimplePubSubJob",
"orchestrationComponentName":"End Success 0",
"state":"SUCCESS",
"rowCount":0,
"startTime":1443526622369,
"endTime":1443526622369,
"message":""
}
],
"rowCount":0
}
To publish messages onto a Pub/Sub topic, you can adapt this google-cloud-python snippet.
import os
from google.cloud import pubsub
publisher = pubsub.PublisherClient()
topic = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic='MY_TOPIC_NAME', # Set this to something appropriate.
)
publisher.create_topic(topic) # raises conflict if topic exists
publisher.publish(topic, b'My first message!', spam='eggs')