Server migration tool (deprecated)
Warning
The server migration tool and this page are deprecated and exist only to support customers on legacy versions of Matillion ETL. The server migration tool. If your version is higher than 1.34, read Migration instead.
Note
Your network administrator must make sure that the target Matillion ETL instance allows incoming TCP/IP connections from the source Matillion ETL instance.
The server migration tool lets users push all assets from current (source) Matillion ETL instance to a new (target) instance. This tool runs as a shell script when placed on the server you wish to move. Through the tool, users are able to filter what is migrated, although everything will be migrated by default.
This can be useful in a number of cases:
- Setting up a new live environment by pushing all assets from a staging environment.
- Setting up a test environment by pushing a single project from live to a test environment to diagnose.
- Migrating a single-node Matillion ETL setup to a cluster by provisioning a new cluster using CloudFormation and then pushing everything to it from the existing environment.
To use this tool, simply copy the below scripts to your server (the Matillion ETL instance), SSH onto the server and run the Python script with the target hostname (including port if required) as an argument. It may be necessary to update your python installation before running the tool.
You will also need API permissions granted to the Matillion ETL user for both source and target instances.
The server migration tool comes with help that can be accessed through the --help
argument. A list of optional arguments is given below.
Argument | Description |
---|---|
--source-server SOURCE_SERVER | Source Server Hostname (Default: http://127.0.0.1) |
--skip-drivers | Skip JDBC Driver migration (true/false) |
--skip-profiles | Skip API Profile migration (true/false) |
--skip-projects | Skip Matillion Project migration (true/false) |
--skip-schedules | Skip Schedule migration (otherwise schedules are migrated but disabled) (true/false) |
--skip-sqssetup | Skip SQS Listener setup (even when included queues are initially disabled) (true/false) |
--skip-oauth | Skip OAuth Token migration (true/false) |
--allow-deletes | Allow deleting same-named resources. Projects are deleted and recreated. (true/false) |
--project-filter PROJECT_FILTER | Regular Expression to match against Group- name/Project-name/Version-name; default .* (migrate everything) |
Usage:
migrate_server.py [-h] [--source-server SOURCE_SERVER] [--skip-drivers][--skip-profiles] [--skip-projects][--skip-schedules] [--skip-sqssetup] [--skip-oauth][--allow-deletes] [--project-filter PROJECT_FILTER] target_server
Example
Below is a generic example for running the server migration tool from an SSH session with the necessary updates that may be required to run to script.
Note
If you're running the server migration tool from the source or the target server, use your private IP or 127.0.0.1:8080 to identify localhost instead of using the IP address. Servers may not recognize their public IP and may throw an error.
- Copy the migrate.py script.
-
Save the script to /tmp and use the following three commands, replacing the source and targer server IPs with your own:
sudo yum install python34-pip.noarch
sudo pip-3.4 install requests
sudo /tmp/migrate_server.py --allow-deletes --source-server http://source-server-ip http://target-server-ip
-
You'll need to provide source credentials (ec2-user/instance-id) and target credentials (ec2-user/instance-id). The migrate.py script will then proceed to migrate all available assets.
Note
Schedules on the new server will be disabled by default.
-
Disable SQS integration on the old server and shutdown the server.
- On the new server, enable SQS integration and enable all schedules.
migrate.py script
#! /usr/bin/env python3
import argparse
import requests
from urllib.parse import urlparse
import getpass
import logging
import re
import json
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
REST_ENDPOINT = '/rest/v1'
LEGACY_ENDPOINT = '/rest/v0'
source_session = requests.Session()
target_session = requests.Session()
def main(args):
# Setup auth for source_session, if prompted
response = source_session.get(_rest('/apiprofile', server=args.source_server))
if response.status_code == requests.status_codes.codes.UNAUTHORIZED:
user = input('Username for {}: '.format(args.source_server))
pwd = getpass.getpass('Password: ')
source_session.auth = (user, pwd)
# Setup auth for target_session
response = target_session.get(_rest('/apiprofile', server=args.target_server))
if response.status_code == requests.status_codes.codes.UNAUTHORIZED:
user = input('Username for {}: '.format(args.target_server))
pwd = getpass.getpass('Password: ')
target_session.auth = (user, pwd)
# /v1/driver/
if not args.skip_drivers:
migrate_drivers(args.source_server, args.target_server, args.allow_deletes)
# /v1/oauth
if not args.skip_oauth:
migrate_oauths(args.source_server, args.target_server, args.allow_deletes)
# /v1/apiprofile/
if not args.skip_profiles:
migrate_apiprofiles(args.source_server, args.target_server)
# /v0/project API V0 Project Migration
# /v1/schedule for schedules
if not args.skip_projects:
migrate_projects(args.source_server, args.target_server, args.skip_schedules, args.project_filter, args.allow_deletes)
# /v1/sqslistener
# Deliberately left till last, since no point listenening until jobs are imported
if not args.skip_sqssetup:
migrate_sqssetup(args.source_server, args.target_server)
def _rest(end, server, target=REST_ENDPOINT):
result = urlparse('{}{}{}'.format(server, target, end))
return result.geturl()
def migrate_schedules(source_server, server, group, project):
scheds = source_session.get(_rest('/group/name/{}/project/name/{}/schedule'.format(group, project), server=source_server))
scheds.raise_for_status()
# each schedule in turn
for schedule in scheds.json():
s = source_session.get(_rest('/group/name/{}/project/name/{}/schedule/name/{}/export'.format(group, project, schedule), server=source_server))
s.raise_for_status()
# Disable the schedules! We don't want duplicate triggers on old/new servers
disabled = s.json()
for o in disabled.get('objects'):
o['enabled'] = False
r = target_session.post(_rest('/group/name/{}/project/name/{}/schedule/import'.format(group, project), server=server), json=disabled)
try:
r.raise_for_status()
logging.info(' Migrated Schedule {}/{}/{} [DISABLED]'.format(group, project, schedule))
except:
logging.info(' Migrating Schedule {}/{}/{} FAILED'.format(group, project, schedule))
def migrate_projects(source_server, server, skip_schedules, filter, allow_deletes):
logging.info("Beginning Project Migration")
# get list of all groups/projects
projects = source_session.get(_rest('/projects', target=LEGACY_ENDPOINT, server=source_server))
projects.raise_for_status()
for project_group in projects.json().get('groups'):
for project in project_group.get('projects'):
versions = [version.get('name') for version in project.get('versions').values()]
for version in versions[:]:
if not re.match(filter, '{}/{}/{}'.format(project_group.get('projectGroup'), project.get('name'), version)):
versions.remove(version)
if len(versions)==0:
# we filtered out all versions in this project; skip entirely
break
# delete existing project; if exists
if allow_deletes:
target = _rest('/projects', target=LEGACY_ENDPOINT, server=server)
delete = target_session.delete(target, params={'groupName': project_group.get('projectGroup'), 'projectName': project.get('name')})
if delete.status_code==200:
logging.info(" DELETED {}/{} from target server".format(project_group.get('projectGroup'), project.get('name')))
else:
logging.info(" Project {}/{} not found on target server".format(project_group.get('projectGroup'), project.get('name')))
logging.info(' Migrating project [{}/{}]'.format(project_group.get('projectGroup'), project.get('name')))
logging.info(' (versions: {})'.format(versions))
proj_params={
'groupName' : project_group.get('projectGroup'),
'projectName': project.get('name'),
'versionName': versions,
'export' : 'true',
}
proj = source_session.get(_rest('/projects', target=LEGACY_ENDPOINT, server=source_server), params=proj_params)
proj.raise_for_status()
target = _rest('/projects'.format(project_group.get('projectGroup')), target=LEGACY_ENDPOINT, server=server)
try:
response = target_session.post(target, json=proj.json())
response.raise_for_status()
logging.debug(" Transferred {} bytes".format(len(proj.content)))
if not skip_schedules:
migrate_schedules(source_server, server, project_group.get('projectGroup'), project.get('name'))
except:
logging.warning(" Error migrating project. {}".format(response.json().get('msg')))
logging.info("Completed Projects migration")
def migrate_apiprofiles(source_server, server):
logging.info("Beginning APIProfile Migration...")
profiles = source_session.get(_rest('/apiprofile', server=source_server)).json()
# currently broken EMD-5314
profiles.remove("Google Adwords")
# EMD-5314
profiles.remove("Matillion API")
profiles.remove("Facebook")
profiles.remove("Twitter")
profiles.remove("Google Analytics")
profiles.remove("Jira")
profiles.remove("Zuora")
profiles.remove("Zendesk")
profiles.remove("Mixpanel")
profiles.remove("Sample")
# migrate each in turn
for profile_name in profiles:
logging.info(" Migrating api profile [{}]".format(profile_name))
profile = source_session.get(_rest('/apiprofile/name/{}/export'.format(profile_name), server=source_server))
profile.raise_for_status()
try:
response = target_session.post(_rest('/apiprofile/import', server=server), json=profile.json())
response.raise_for_status()
logging.debug(" Transferred {} bytes".format(len(profile.content)))
except:
logging.warning(" Failed to transfer API Profile: {}".format(response.json().get('msg')))
logging.info("Completed APIProfile migration")
def migrate_sqssetup(source_server, server):
logging.info("Beginning SQS Listener Setup Migration...")
queues = source_session.get(_rest('/queue/export', server=source_server)).json()
# DISABLE the listeners; don't want old/new servers listening at the same time
queues['sqsEnabled'] = False
queues['successEnabled'] = False
queues['failureEnabled'] = False
listenQueue = queues.get('listenQueue', None)
if listenQueue is not None and len(listenQueue)>0:
target_session.post(_rest('/queue/import', server=server), json=queues)
logging.debug(" Transferred {} bytes".format(len(json.dumps(queues))))
logging.info("Completed SQS Listener Setup")
else:
logging.info("Skipping SQS Import as no listen queue configured anyway.")
def migrate_oauths(source_server, server, allow_deletes):
logging.info("Beginning OAuth Token Migration")
oauths = source_session.get(_rest('/oauth', server=source_server)).json()
for auth in oauths:
logging.info(" Migrating OAuth Token(s) [{}]".format(auth))
source_endpoint = _rest('/oauth/name/{}/export'.format(auth), server=source_server)
content = source_session.get(source_endpoint).json()
# delete existing (if exists)
if allow_deletes:
d = _rest('/oauth/name/{}'.format(auth), server=server)
r=target_session.delete(d)
# upload new
target_endpoint = _rest('/oauth/import'.format(auth), server=server)
r = target_session.post(target_endpoint, json=content)
try:
r.raise_for_status()
if r.json().get('success')==False:
raise Exception(' Migrating OAuth [{}] failed; try --allow-deletes'.format(auth))
except:
logging.warning(" Migrating OAuth [{}] failed; try --allow-deletes".format(auth))
logging.debug(" Transferred {} bytes".format(len(r.content)))
logging.info("Completed OAuth Token Migration")
def migrate_drivers(source_server, server, allow_deletes):
logging.info("Beginning JDBC Driver Migration")
for driver_group in source_session.get(_rest('/driver', server=source_server)).json():
logging.info(" Migrating Driver [{}]".format(driver_group.get('driverName')))
# migrate the files
for file in driver_group.get('files'):
source_endpoint = _rest('/driver/group/name/{}/file/name/{}/download'.format(driver_group.get('driverName'), file.get('fileName')), server=source_server)
file_content = source_session.get(source_endpoint)
file_content.raise_for_status()
# delete on target, if exists
if allow_deletes:
target_delete = _rest('/driver/group/name/{}/file/name/{}'.format(driver_group.get('driverName'), file.get('fileName')), server=server)
try:
r=target_session.delete(target_delete)
except:
pass #?
# upload new
target_endpoint = _rest('/driver/group/name/{}/upload'.format(driver_group.get('driverName')), server=server)
try:
r = target_session.post(target_endpoint, files={'file': (file.get('fileName'), file_content.content)})
r.raise_for_status()
logging.info(" /{}".format(file.get('fileName')))
logging.debug(" Transferred {} bytes".format(len(file_content.content)))
except:
logging.warning(" /{} {}".format(file.get('fileName'), r.json().get('msg')))
logging.info("Completed JDBC Driver Migration")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Matillion Server Migration Tool")
parser.add_argument('target_server', help="Target Server Hostname, including protocol (and port if required) e.g. https://mtln-prod.mycompany.com")
parser.add_argument('--source-server', default="http://127.0.0.1:8080", help="Source Server Hostname (Default: http://127.0.0.1")
parser.add_argument('--skip-drivers', action="store_true", help="Skip JDBC Driver migration")
parser.add_argument('--skip-profiles', action="store_true", help="Skip API Profile migration")
parser.add_argument('--skip-projects', action="store_true", help="Skip Matillion Project migration")
parser.add_argument('--skip-schedules', action="store_true", help="Skip Schedule migration (otherwise schedules are migrated but disabled anyway)")
parser.add_argument('--skip-sqssetup', action="store_true", help="Skip SQS Listener setup (even when included queues are initially disabled)")
parser.add_argument('--skip-oauth', action="store_true", help="Skip OAuth Token migration")
parser.add_argument('--allow-deletes', action="store_true", help="Allow deleting same-named resources. Projects are deleted and recreated.")
parser.add_argument('--project-filter', default=".*", help="Regular Expression to match against Group-name/Project-name/Version-name; default .* (migrate everything)")
args = parser.parse_args()
#sanitize user input a bit
if args.source_server.endswith('/'):
args.source_server=args.source_server[:-1]
if args.target_server.endswith('/'):
args.target_server=args.target_server[:-1]
main(args)
migrate-server.sh
$ ./migrate_server.py --help
usage: migrate_server.py [-h] [--source-server SOURCE_SERVER] [--skip-drivers]
[--skip-profiles] [--skip-projects]
[--skip-schedules] [--skip-sqssetup] [--skip-oauth]
[--allow-deletes] [--project-filter PROJECT_FILTER]
target_server
Matillion Server Migration Tool
positional arguments:
target_server Target Server Hostname, including protocol (and port
if required) e.g. https://mtln-prod.mycompany.com
optional arguments:
-h, --help show this help message and exit
--source-server SOURCE_SERVER
Source Server Hostname (Default: http://127.0.0.1
--skip-drivers Skip JDBC Driver migration
--skip-profiles Skip API Profile migration
--skip-projects Skip Matillion Project migration
--skip-schedules Skip Schedule migration (otherwise schedules are
migrated but disabled anyway)
--skip-sqssetup Skip SQS Listener setup (even when included queues are
initially disabled)
--skip-oauth Skip OAuth Token migration
--allow-deletes Allow deleting same-named resources. Projects are
deleted and recreated.
--project-filter PROJECT_FILTER
Regular Expression to match against Group-
name/Project-name/Version-name; default .* (migrate
everything)