Skip to content

Commit

Permalink
Simplify database migration. Migration is automatic and does not rely…
Browse files Browse the repository at this point in the history
… on a version number.

Ensure working HTTPretty package
  • Loading branch information
Stephen Pascoe authored and Stephen Pascoe committed Dec 16, 2015
1 parent 8b3988c commit 66ff3c5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 137 deletions.
22 changes: 19 additions & 3 deletions luigi/db_task_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import sqlalchemy.ext.declarative
import sqlalchemy.orm
import sqlalchemy.orm.collections
from sqlalchemy.engine import reflection
Base = sqlalchemy.ext.declarative.declarative_base()

logger = logging.getLogger('luigi-interface')
Expand All @@ -59,6 +60,8 @@ class DbTaskHistory(task_history.TaskHistory):
Task History that writes to a database using sqlalchemy.
Also has methods for useful db queries.
"""
CURRENT_SOURCE_VERSION = 1

@contextmanager
def _session(self, session=None):
if session:
Expand All @@ -81,6 +84,8 @@ def __init__(self):
Base.metadata.create_all(self.engine)
self.tasks = {} # task_id -> TaskRecord

_upgrade_schema(self.engine)

def task_scheduled(self, task):
htask = self._get_task(task, status=PENDING)
self._add_task_event(htask, TaskEvent(event_name=PENDING, ts=datetime.datetime.now()))
Expand Down Expand Up @@ -235,6 +240,17 @@ def __repr__(self):
return "TaskRecord(name=%s, host=%s)" % (self.name, self.host)


version_table = sqlalchemy.Table('version', Base.metadata,
sqlalchemy.Column('version', sqlalchemy.Integer)
)
def _upgrade_schema(engine):
"""
Ensure the database schema is up to date with the codebase.
:param engine: SQLAlchemy engine of the underlying database.
"""
inspector = reflection.Inspector.from_engine(engine)
conn = engine.connect()

# Upgrade 1. Add task_id column and index to tasks
if 'task_id' not in [x['name'] for x in inspector.get_columns('tasks')]:
logger.warn('Upgrading DbTaskHistory schema: Adding tasks.task_id')
conn.execute('ALTER TABLE tasks ADD COLUMN task_id VARCHAR(200)')
conn.execute('CREATE INDEX ix_task_id ON tasks (task_id)')
133 changes: 0 additions & 133 deletions luigi/tools/migrate.py

This file was deleted.

2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ install_command = pip install {opts} --allow-external mysql-connector-python {pa
deps=
mock<2.0
moto<1.0
HTTPretty>=0.8.8
HTTPretty==0.8.10
nose<2.0
unittest2<2.0
boto<3.0
Expand Down

0 comments on commit 66ff3c5

Please sign in to comment.