Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added extension point to manage database migrations #292

Merged
merged 8 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@
"build:web": "webpack --env prod",
"predist:python": "npm run build:python && npm run docs:python",
"prebuild:python": "node -e \"process.exit(process.env.PHOVEA_SKIP_TESTS === undefined?1:0)\" || npm run test:python",
"prebuild:web": "node -e \"process.exit(process.env.PHOVEA_SKIP_TESTS === undefined?1:0)\" || npm run test:web"
"prebuild:web": "node -e \"process.exit(process.env.PHOVEA_SKIP_TESTS === undefined?1:0)\" || npm run test:web",
"db-migration:base": "docker-compose run api python phovea_server/__main__.py --env dev db-migration",
"db-migration": "npm run db-migration:base -- exec",
"db-migration:list": "npm run db-migration:base -- list"
},
"dependencies": {
"@types/d3": "~3.5.36",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ urllib3==1.25.3
flask-swagger-ui==3.20.9
yamlreader==3.0.4
openpyxl~=2.6.3
alembic==1.3.3
10 changes: 10 additions & 0 deletions tdp_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ def phovea(registry):
registry.append('mapping_provider', 'tdp_core', 'tdp_core.mapping_table')
registry.append('greenifier', 'psycopg2', 'tdp_core.sql_use_gevent', {})
registry.append('json-encoder', 'bytes-to-string-encoder', 'tdp_core.bytes_to_string_encoder', {})

# DB migration plugins
registry.append('manager', 'db-migration-manager', 'tdp_core.dbmigration', {'singleton': True, 'factory': 'create_migration_manager'})
registry.append('command', 'db-migration', 'tdp_core.dbmigration', {'factory': 'create_migration_command'})
registry.append('json-encoder', 'db-migration-encoder', 'tdp_core.dbmigration_api', {'factory': 'create_migration_encoder'})
registry.append('namespace', 'db-migration-api', 'tdp_core.dbmigration_api',
{
'factory': 'create_migration_api',
'namespace': '/api/tdp/db-migration'
})
# generator-phovea:end
pass

Expand Down
40 changes: 40 additions & 0 deletions tdp_core/dbmigration.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# A generic, single database configuration.

[alembic]
# path to migration scripts
# script_location =

# template used to generate migration files
file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d_%%(rev)s_%%(slug)s

# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =

# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8

# We inject the url using the DBMigrationManager
# sqlalchemy.url =

238 changes: 238 additions & 0 deletions tdp_core/dbmigration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import logging
import re

from phovea_server.config import view as configview
from phovea_server.plugin import list as list_plugins, lookup, AExtensionDesc
from .db import configs as engines
from typing import List, Dict
import alembic.command
import alembic.config
from os import path
from argparse import REMAINDER


__author__ = 'Datavisyn'
_log = logging.getLogger(__name__)

alembic_cfg = alembic.config.Config(path.join(path.abspath(path.dirname(__file__)), 'dbmigration.ini'))


class DBMigration(object):
"""
DBMigration object stores the required arguments to execute commands using Alembic.
"""

def __init__(self, id: str, db_key: str, script_location: str, auto_upgrade: bool=False):
"""
Initializes a new migration object and optionally carries out an upgrade.
:param str id: ID of the migration object
:param str db_key: Key of the engine (coming from tdp_core#db)
:param str script_location: Location of the base directory (containing env.py and the versions directory)
:param bool auto_upgrade: True if the migration should automatically upgrade the database to head
"""
self.id: str = id
self.db_key: str = db_key
self.script_location: str = script_location
self.auto_upgrade: bool = auto_upgrade
self.custom_commands: Dict[str, str] = dict()

missing_fields = []
if not self.id:
missing_fields.append('id')
if not self.script_location:
missing_fields.append('scriptLocation')
if not self.db_key:
missing_fields.append('dbKey')

if len(missing_fields) > 0:
raise ValueError('No {} defined for DBMigration {} - is your configuration up to date?'.format(', '.join(missing_fields), self.id or '<UNKNOWN>'))

# Because we can't easily pass "-1" as npm argument, we add a custom command for that without the space
self.add_custom_command('downgrade-(\d+)', 'downgrade -{}')

# Automatically upgrade to head (if enabled)
if self.auto_upgrade:
_log.info('Upgrading database {}'.format(self.id))
self.execute(['upgrade', 'head'])

def __repr__(self) -> str:
return f'DBMigration({self.id})'

def __str__(self) -> str:
return self.id

def add_custom_command(self, pattern: str, target: str):
"""
Adds a custom command to the migration.

:param str pattern: Regex pattern of the command. Can include capture groups which will be used to format the target string.
:param str target: Target pattern for the command. Can include .format placeholders such as {} or {0} which will be replaced by the captured group.

Example usage: Rewriting the command 'downgrade-<number>' to 'downgrade -<number>'
can be done with the pattern 'downgrade-(\d+)' and the target 'downgrade -{}'.
"""
self.custom_commands[pattern] = target

def remove_custom_command(self, origin: str):
self.custom_commands.pop(origin, None)

def get_custom_command(self, arguments: List[str]) -> List[str]:
"""
Returns the rewritten command if it matches the pattern of a custom command.
:param List[str] arguments: Argument to rewrite.
"""
# Join the list with spaces
arguments = ' '.join(arguments)
# For all the command patterns we have ..
for key, value in self.custom_commands.items():
# .. check if we can match the command pattern with the given string
matched = re.match(f"{key}$", arguments)
if matched:
# If we have a match, call format with the captured groups and split by ' '
return value.format(*matched.groups()).split(' ')
return None

def execute(self, arguments: List[str] = []) -> bool:
"""
Executes a command on the migration object.
:param List[str] arguments: Arguments for the underlying Alembic instance. See https://alembic.sqlalchemy.org/en/latest/api/ for details.

Example usage: migration.execute(['upgrade', 'head']) upgrades to the database to head.
"""
# Check if engine exists
if self.db_key not in engines:
raise ValueError('No engine called {} found for DBMigration {} - aborting migration'.format(self.db_key, self.id))

# Rewrite command if possible
rewritten_arguments = self.get_custom_command(arguments)
if rewritten_arguments:
_log.info(f"Command {' '.join(arguments)} was rewritten to {' '.join(rewritten_arguments)}")
arguments = rewritten_arguments

# Setup an alembic command line parser to parse the arguments
cmd_parser = alembic.config.CommandLine()

# Parse the options (incl. validation)
options = cmd_parser.parser.parse_args(arguments)

# Retrieve engine
engine = engines.engine(self.db_key)

# Inject options in the configuration object
alembic_cfg.cmd_opts = options
alembic_cfg.set_main_option('script_location', self.script_location)
alembic_cfg.set_main_option('sqlalchemy.url', str(engine.url))
alembic_cfg.set_main_option('migration_id', self.id)

# Run the command
cmd_parser.run_cmd(alembic_cfg, options)

return True


class DBMigrationManager(object):
"""
DBMigrationManager retrieves all 'tdp-sql-database-migration' plugins and initializes DBMigration objects.
The possible configuration keys for this extension point are:
- configKey: Key of the configuration entry (i.e. <app_name>.migration)
- id: ID of the migration for logging purposes (passed to DBManager)
- dbKey: Key of the engine used for the migration (passed to DBManager)
- scriptLocation: Location of the alembic root folder (passed to DBManager)
- autoUpgrade: Flag which auto-upgrades to the latest revision (passed to DBManager)

The keys are retrieved from the following sources (in order):
- File configuration at configKey
- Plugin configuration
"""

def __init__(self, plugins: List[AExtensionDesc] = []):
self._migrations: Dict[str, DBMigration] = dict()

_log.info('Initializing DBMigrationManager')

for p in plugins:
_log.info('DBMigration found: %s', p.id)

# Check if configKey is set, otherwise use the plugin configuration
config = configview(p.configKey) if hasattr(p, 'configKey') else {}

# Priority of assignments: Configuration File -> Plugin Definition
id = config.get('id') or (p.id if hasattr(p, 'id') else None)
db_key = config.get('dbKey') or (p.dbKey if hasattr(p, 'dbKey') else None)
script_location = config.get('scriptLocation') or (p.scriptLocation if hasattr(p, 'scriptLocation') else None)
auto_upgrade = config.get('autoUpgrade') if type(config.get('autoUpgrade')) == bool else \
(p.autoUpgrade if hasattr(p, 'autoUpgrade') and type(p.autoUpgrade) == bool else False)

# Create new migration
migration = DBMigration(id, db_key, script_location, auto_upgrade)

# Store migration
self._migrations[migration.id] = migration

def __contains__(self, item):
return item in self._migrations

def __getitem__(self, item):
if item not in self:
raise NotImplementedError('missing db migration: ' + item)
return self._migrations[item]

def __len__(self):
return len(self._migrations)

@property
def ids(self) -> List[str]:
return list(self._migrations.keys())

@property
def migrations(self) -> List[DBMigration]:
return list(self._migrations.values())


def get_db_migration_manager():
return lookup('db-migration-manager')


def create_migration_manager():
return DBMigrationManager(list_plugins('tdp-sql-database-migration'))


def create_migration_command(parser):
"""
Creates a migration command used by the 'command' extension point.
"""
db_migration_manager = get_db_migration_manager()

subparsers = parser.add_subparsers(dest='action', required=True)

subparsers.add_parser('list', help='List all available migrations')

command_parser = subparsers.add_parser('exec', help='Execute command on migration(s)')

# Either require individual ids or all flag
command_parser.add_argument('id',
choices=db_migration_manager.ids + ['all'],
help='ID of the migration, or all of them')

command_parser.add_argument('command',
nargs=REMAINDER,
help='Command executed by the migration')

def execute(args):
if args.action == 'list':
if(len(db_migration_manager) == 0):
print('No migrations found')
else:
print('Available migrations: {}'.format(', '.join(str(migration) for migration in db_migration_manager.migrations)))
elif args.action == 'exec':
if args.id == 'all':
# TODO: For some reason, the migrations can only be executed for a single id.
# When using multiple ids, alembic doesn't do anything in the 2nd, 3rd, ... migration.
print('Currently, only single migrations are supported. Please execute the command for each migration individually as we are working on a fix.')
return

# Using REMAINDER as nargs causes the argument to be be optional, but '+' does not work because it also parses additional --attr with the parser which should actually be ignored.
# Therefore, args.command might be empty and we simply pass None to trigger the error message
db_migration_manager[args.id].execute(args.command if len(args.command) > 0 else None)

return lambda args: lambda: execute(args)
56 changes: 56 additions & 0 deletions tdp_core/dbmigration_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging

from phovea_server.ns import Namespace, abort
from .security import tdp_login_required
from phovea_server.util import jsonify
from .dbmigration import get_db_migration_manager, DBMigration, DBMigrationManager


__author__ = 'Datavisyn'
_log = logging.getLogger(__name__)

app = Namespace(__name__)


class DBMigrationEncoder(object):
"""
JSON encoder for DBMigrationManager and DBMigration objects.
"""
def __contains__(self, obj):
return isinstance(obj, DBMigrationManager) or isinstance(obj, DBMigration)

def __call__(self, obj, base_encoder):
if isinstance(obj, DBMigrationManager):
return obj.migrations
if isinstance(obj, DBMigration):
return obj.__dict__


# Global migration encoder
db_migration_encoder = DBMigrationEncoder()


def create_migration_encoder():
return db_migration_encoder


def _get_migration_by_id(id: str) -> DBMigration:
if id not in get_db_migration_manager():
abort(404, 'No migration with id {} found'.format(id))
return get_db_migration_manager()[id]


@app.route('/')
@tdp_login_required
def list_migrations():
return jsonify(get_db_migration_manager()), 200


@app.route('/<string:id>')
@tdp_login_required
def list_migration(id):
return jsonify(_get_migration_by_id(id)), 200


def create_migration_api():
return app
Loading