Skip to content

Server migration tool (deprecated)

Warning

The server migration tool and this page are deprecated and exist only to support customers on legacy versions of Matillion ETL. The server migration tool. If your version is higher than 1.34, read Migration instead.

Note

Your network administrator must make sure that the target Matillion ETL instance allows incoming TCP/IP connections from the source Matillion ETL instance.

The server migration tool lets users push all assets from current (source) Matillion ETL instance to a new (target) instance. This tool runs as a shell script when placed on the server you wish to move. Through the tool, users are able to filter what is migrated, although everything will be migrated by default.

This can be useful in a number of cases:

  • Setting up a new live environment by pushing all assets from a staging environment.
  • Setting up a test environment by pushing a single project from live to a test environment to diagnose.
  • Migrating a single-node Matillion ETL setup to a cluster by provisioning a new cluster using CloudFormation and then pushing everything to it from the existing environment.

To use this tool, simply copy the below scripts to your server (the Matillion ETL instance), SSH onto the server and run the Python script with the target hostname (including port if required) as an argument. It may be necessary to update your python installation before running the tool.

You will also need API permissions granted to the Matillion ETL user for both source and target instances.

The server migration tool comes with help that can be accessed through the --help argument. A list of optional arguments is given below.

Argument Description
--source-server SOURCE_SERVER Source Server Hostname (Default: http://127.0.0.1)
--skip-drivers Skip JDBC Driver migration (true/false)
--skip-profiles Skip API Profile migration (true/false)
--skip-projects Skip Matillion Project migration (true/false)
--skip-schedules Skip Schedule migration (otherwise schedules are migrated but disabled) (true/false)
--skip-sqssetup Skip SQS Listener setup (even when included queues are initially disabled) (true/false)
--skip-oauth Skip OAuth Token migration (true/false)
--allow-deletes Allow deleting same-named resources. Projects are deleted and recreated. (true/false)
--project-filter PROJECT_FILTER Regular Expression to match against Group- name/Project-name/Version-name; default .* (migrate everything)

Usage:

migrate_server.py [-h] [--source-server SOURCE_SERVER] [--skip-drivers][--skip-profiles] [--skip-projects][--skip-schedules] [--skip-sqssetup] [--skip-oauth][--allow-deletes] [--project-filter PROJECT_FILTER] target_server

Example

Below is a generic example for running the server migration tool from an SSH session with the necessary updates that may be required to run to script.

Note

If you're running the server migration tool from the source or the target server, use your private IP or 127.0.0.1:8080 to identify localhost instead of using the IP address. Servers may not recognize their public IP and may throw an error.

  1. Copy the migrate.py script.
  2. Save the script to /tmp and use the following three commands, replacing the source and targer server IPs with your own:

    sudo yum install python34-pip.noarch
    
    sudo pip-3.4 install requests
    
    sudo /tmp/migrate_server.py --allow-deletes --source-server http://source-server-ip http://target-server-ip
    
  3. You'll need to provide source credentials (ec2-user/instance-id) and target credentials (ec2-user/instance-id). The migrate.py script will then proceed to migrate all available assets.

    Note

    Schedules on the new server will be disabled by default.

  4. Disable SQS integration on the old server and shutdown the server.

  5. On the new server, enable SQS integration and enable all schedules.

migrate.py script

#! /usr/bin/env python3
import argparse
import requests
from urllib.parse import urlparse
import getpass
import logging
import re
import json

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

REST_ENDPOINT = '/rest/v1'
LEGACY_ENDPOINT = '/rest/v0'

source_session = requests.Session()
target_session = requests.Session()


def main(args):
    # Setup auth for source_session, if prompted
    response = source_session.get(_rest('/apiprofile', server=args.source_server))
    if response.status_code == requests.status_codes.codes.UNAUTHORIZED:
        user = input('Username for {}: '.format(args.source_server))
        pwd = getpass.getpass('Password: ')
        source_session.auth = (user, pwd)

    # Setup auth for target_session
    response = target_session.get(_rest('/apiprofile', server=args.target_server))
    if response.status_code == requests.status_codes.codes.UNAUTHORIZED:
        user = input('Username for {}: '.format(args.target_server))
        pwd = getpass.getpass('Password: ')
        target_session.auth = (user, pwd)

    # /v1/driver/
    if not args.skip_drivers:
        migrate_drivers(args.source_server, args.target_server, args.allow_deletes)

    # /v1/oauth
    if not args.skip_oauth:
        migrate_oauths(args.source_server, args.target_server, args.allow_deletes)

    # /v1/apiprofile/
    if not args.skip_profiles:
        migrate_apiprofiles(args.source_server, args.target_server)

    # /v0/project API V0 Project Migration
    # /v1/schedule for schedules
    if not args.skip_projects:
        migrate_projects(args.source_server, args.target_server, args.skip_schedules, args.project_filter, args.allow_deletes)

    # /v1/sqslistener
    # Deliberately left till last, since no point listenening until jobs are imported
    if not args.skip_sqssetup:
        migrate_sqssetup(args.source_server, args.target_server)


def _rest(end, server, target=REST_ENDPOINT):
    result = urlparse('{}{}{}'.format(server, target, end))
    return result.geturl()


def migrate_schedules(source_server, server, group, project):
    scheds = source_session.get(_rest('/group/name/{}/project/name/{}/schedule'.format(group, project), server=source_server))
    scheds.raise_for_status()
    # each schedule in turn
    for schedule in scheds.json():
        s = source_session.get(_rest('/group/name/{}/project/name/{}/schedule/name/{}/export'.format(group, project, schedule), server=source_server))
        s.raise_for_status()

        # Disable the schedules! We don't want duplicate triggers on old/new servers
        disabled = s.json()
        for o in disabled.get('objects'):
            o['enabled'] = False

        r = target_session.post(_rest('/group/name/{}/project/name/{}/schedule/import'.format(group, project), server=server), json=disabled)
        try:
            r.raise_for_status()
            logging.info('    Migrated Schedule {}/{}/{} [DISABLED]'.format(group, project, schedule))
        except:
            logging.info('    Migrating Schedule {}/{}/{} FAILED'.format(group, project, schedule))


def migrate_projects(source_server, server, skip_schedules, filter, allow_deletes):
    logging.info("Beginning Project Migration")
    # get list of all groups/projects
    projects = source_session.get(_rest('/projects', target=LEGACY_ENDPOINT, server=source_server))
    projects.raise_for_status()

    for project_group in projects.json().get('groups'):
        for project in project_group.get('projects'):
            versions = [version.get('name') for version in project.get('versions').values()]
            for version in versions[:]:
                if not re.match(filter, '{}/{}/{}'.format(project_group.get('projectGroup'), project.get('name'), version)):
                    versions.remove(version)

            if len(versions)==0:
                # we filtered out all versions in this project; skip entirely
                break

            # delete existing project; if exists
            if allow_deletes:
                target = _rest('/projects', target=LEGACY_ENDPOINT, server=server)
                delete = target_session.delete(target, params={'groupName': project_group.get('projectGroup'), 'projectName': project.get('name')})
                if delete.status_code==200:
                    logging.info("  DELETED {}/{} from target server".format(project_group.get('projectGroup'), project.get('name')))
                else:
                    logging.info("  Project {}/{} not found on target server".format(project_group.get('projectGroup'), project.get('name')))

            logging.info('  Migrating project [{}/{}]'.format(project_group.get('projectGroup'), project.get('name')))
            logging.info('    (versions: {})'.format(versions))

            proj_params={
                'groupName'  : project_group.get('projectGroup'),
                'projectName': project.get('name'),
                'versionName': versions,
                'export'     : 'true',
            }

            proj = source_session.get(_rest('/projects', target=LEGACY_ENDPOINT, server=source_server), params=proj_params)
            proj.raise_for_status()

            target = _rest('/projects'.format(project_group.get('projectGroup')), target=LEGACY_ENDPOINT, server=server)
            try:
                response = target_session.post(target, json=proj.json())
                response.raise_for_status()
                logging.debug("    Transferred {} bytes".format(len(proj.content)))
                if not skip_schedules:
                     migrate_schedules(source_server, server, project_group.get('projectGroup'), project.get('name'))
            except:
                logging.warning("    Error migrating project. {}".format(response.json().get('msg')))

    logging.info("Completed Projects migration")


def migrate_apiprofiles(source_server, server):
    logging.info("Beginning APIProfile Migration...")
    profiles = source_session.get(_rest('/apiprofile', server=source_server)).json()

    # currently broken EMD-5314
    profiles.remove("Google Adwords")
    # EMD-5314

    profiles.remove("Matillion API")
    profiles.remove("Facebook")
    profiles.remove("Twitter")
    profiles.remove("Google Analytics")
    profiles.remove("Jira")
    profiles.remove("Zuora")
    profiles.remove("Zendesk")
    profiles.remove("Mixpanel")
    profiles.remove("Sample")

    # migrate each in turn
    for profile_name in profiles:
        logging.info("    Migrating api profile [{}]".format(profile_name))
        profile = source_session.get(_rest('/apiprofile/name/{}/export'.format(profile_name), server=source_server))
        profile.raise_for_status()

        try:
            response = target_session.post(_rest('/apiprofile/import', server=server), json=profile.json())
            response.raise_for_status()
            logging.debug("      Transferred {} bytes".format(len(profile.content)))
        except:
            logging.warning("      Failed to transfer API Profile: {}".format(response.json().get('msg')))
    logging.info("Completed APIProfile migration")


def migrate_sqssetup(source_server, server):
    logging.info("Beginning SQS Listener Setup Migration...")
    queues = source_session.get(_rest('/queue/export', server=source_server)).json()

    # DISABLE the listeners; don't want old/new servers listening at the same time
    queues['sqsEnabled'] = False
    queues['successEnabled'] = False
    queues['failureEnabled'] = False

    listenQueue = queues.get('listenQueue', None)
    if listenQueue is not None and len(listenQueue)>0:
        target_session.post(_rest('/queue/import', server=server), json=queues)
        logging.debug("  Transferred {} bytes".format(len(json.dumps(queues))))
        logging.info("Completed SQS Listener Setup")
    else:
        logging.info("Skipping SQS Import as no listen queue configured anyway.")


def migrate_oauths(source_server, server, allow_deletes):
    logging.info("Beginning OAuth Token Migration")
    oauths = source_session.get(_rest('/oauth', server=source_server)).json()
    for auth in oauths:
        logging.info("  Migrating OAuth Token(s) [{}]".format(auth))
        source_endpoint = _rest('/oauth/name/{}/export'.format(auth), server=source_server)
        content = source_session.get(source_endpoint).json()

        # delete existing (if exists)
        if allow_deletes:
            d = _rest('/oauth/name/{}'.format(auth), server=server)
            r=target_session.delete(d)

        # upload new
        target_endpoint = _rest('/oauth/import'.format(auth), server=server)
        r = target_session.post(target_endpoint, json=content)
        try:
            r.raise_for_status()
            if r.json().get('success')==False:
                raise Exception('    Migrating OAuth [{}] failed; try --allow-deletes'.format(auth))
        except:
            logging.warning("     Migrating OAuth [{}] failed; try --allow-deletes".format(auth))
        logging.debug("    Transferred {} bytes".format(len(r.content)))
    logging.info("Completed OAuth Token Migration")


def migrate_drivers(source_server, server, allow_deletes):
    logging.info("Beginning JDBC Driver Migration")

    for driver_group in source_session.get(_rest('/driver', server=source_server)).json():
        logging.info("  Migrating Driver [{}]".format(driver_group.get('driverName')))
        # migrate the files
        for file in driver_group.get('files'):
            source_endpoint = _rest('/driver/group/name/{}/file/name/{}/download'.format(driver_group.get('driverName'), file.get('fileName')), server=source_server)
            file_content = source_session.get(source_endpoint)
            file_content.raise_for_status()

            # delete on target, if exists
            if allow_deletes:
                target_delete = _rest('/driver/group/name/{}/file/name/{}'.format(driver_group.get('driverName'), file.get('fileName')), server=server)
                try:
                    r=target_session.delete(target_delete)
                except:
                    pass #?

            # upload new
            target_endpoint = _rest('/driver/group/name/{}/upload'.format(driver_group.get('driverName')), server=server)
            try:
                r = target_session.post(target_endpoint, files={'file': (file.get('fileName'), file_content.content)})
                r.raise_for_status()
                logging.info("    /{}".format(file.get('fileName')))
                logging.debug("     Transferred {} bytes".format(len(file_content.content)))
            except:
                logging.warning("    /{} {}".format(file.get('fileName'), r.json().get('msg')))

    logging.info("Completed JDBC Driver Migration")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Matillion Server Migration Tool")
    parser.add_argument('target_server', help="Target Server Hostname, including protocol (and port if required) e.g. https://mtln-prod.mycompany.com")
    parser.add_argument('--source-server', default="http://127.0.0.1:8080", help="Source Server Hostname (Default: http://127.0.0.1")
    parser.add_argument('--skip-drivers', action="store_true", help="Skip JDBC Driver migration")
    parser.add_argument('--skip-profiles', action="store_true", help="Skip API Profile migration")
    parser.add_argument('--skip-projects', action="store_true", help="Skip Matillion Project migration")
    parser.add_argument('--skip-schedules', action="store_true", help="Skip Schedule migration (otherwise schedules are migrated but disabled anyway)")
    parser.add_argument('--skip-sqssetup', action="store_true", help="Skip SQS Listener setup (even when included queues are initially disabled)")
    parser.add_argument('--skip-oauth', action="store_true", help="Skip OAuth Token migration")
    parser.add_argument('--allow-deletes', action="store_true", help="Allow deleting same-named resources. Projects are deleted and recreated.")
    parser.add_argument('--project-filter', default=".*", help="Regular Expression to match against Group-name/Project-name/Version-name; default .* (migrate everything)")

    args = parser.parse_args()

    #sanitize user input a bit
    if args.source_server.endswith('/'):
        args.source_server=args.source_server[:-1]
    if args.target_server.endswith('/'):
        args.target_server=args.target_server[:-1]

    main(args)

migrate-server.sh

$ ./migrate_server.py --help
usage: migrate_server.py [-h] [--source-server SOURCE_SERVER] [--skip-drivers]
                         [--skip-profiles] [--skip-projects]
                         [--skip-schedules] [--skip-sqssetup] [--skip-oauth]
                         [--allow-deletes] [--project-filter PROJECT_FILTER]
                         target_server

Matillion Server Migration Tool

positional arguments:
  target_server         Target Server Hostname, including protocol (and port
                        if required) e.g. https://mtln-prod.mycompany.com

optional arguments:
  -h, --help            show this help message and exit
  --source-server SOURCE_SERVER
                        Source Server Hostname (Default: http://127.0.0.1
  --skip-drivers        Skip JDBC Driver migration
  --skip-profiles       Skip API Profile migration
  --skip-projects       Skip Matillion Project migration
  --skip-schedules      Skip Schedule migration (otherwise schedules are
                        migrated but disabled anyway)
  --skip-sqssetup       Skip SQS Listener setup (even when included queues are
                        initially disabled)
  --skip-oauth          Skip OAuth Token migration
  --allow-deletes       Allow deleting same-named resources. Projects are
                        deleted and recreated.
  --project-filter PROJECT_FILTER
                        Regular Expression to match against Group-
                        name/Project-name/Version-name; default .* (migrate
                        everything)