Skip to content

Commit

Permalink
Add workflow invocation grabbing with db-skipped-lock
Browse files Browse the repository at this point in the history
and db-transaction-isolation.
Closes galaxyproject#8209.
  • Loading branch information
mvdbeek committed Sep 18, 2020
1 parent aace792 commit 62dbf67
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 64 deletions.
137 changes: 78 additions & 59 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,71 @@ def shutdown(self):
self.job_stop_queue.shutdown()


class ItemGrabber:

def __init__(self, app, grab_type='Job', handler_assignment_method=None, max_grab=None, self_handler_tags=None, handler_tags=None):
self.app = app
self.sa_session = app.model.context
self.grab_this = getattr(model, grab_type)
self.grab_type = grab_type
self._grab_conn_opts = {'autocommit': False}
subq = select([self.grab_this.id]) \
.where(and_(
self.grab_this.table.c.handler.in_(self_handler_tags),
self.grab_this.table.c.state == self.grab_this.states.NEW)) \
.order_by(self.grab_this.table.c.id)
if max_grab:
subq = subq.limit(max_grab)
if handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED:
subq = subq.with_for_update(skip_locked=True)
self._grab_query = self.grab_this.table.update() \
.returning(self.grab_this.table.c.id) \
.where(self.grab_this.table.c.id.in_(subq)) \
.values(handler=self.app.config.server_name)
if handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION:
self._grab_conn_opts['isolation_level'] = 'SERIALIZABLE'
log.info(
"Handler job grabber initialized with '%s' assignment method for handler '%s', tag(s): %s", handler_assignment_method,
self.app.config.server_name, ', '.join(str(x) for x in handler_tags)
)

@staticmethod
def get_grabbable_handler_assignment_method(handler_assignment_methods):
grabbable_methods = {
HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION,
HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED,
}
try:
return [m for m in handler_assignment_methods if m in grabbable_methods][0]
except IndexError:
return

def grab_unhandled_items(self):
"""
Attempts to assign unassigned jobs or invocaions to itself using DB serialization methods, if enabled. This
simply sets `Job.handler` or `WorkflowInvocation.handler` to the current server name, which causes the job to be picked up by
the appropriate handler.
"""
# an excellent discussion on PostgreSQL concurrency safety:
# https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
self.sa_session.expunge_all()
conn = self.sa_session.connection(execution_options=self._grab_conn_opts)
with conn.begin() as trans:
try:
rows = conn.execute(self._grab_query).fetchall()
if rows:
log.debug('Grabbed %s(s): %s', self.grab_type, ', '.join(str(row[0]) for row in rows))
trans.commit()
else:
trans.rollback()
except OperationalError as e:
# If this is a serialization failure on PostgreSQL, then e.orig is a psycopg2 TransactionRollbackError
# and should have attribute `code`. Other engines should just report the message and move on.
if int(getattr(e.orig, 'pgcode', -1)) != 40001:
log.debug('Grabbing %s failed (serialization failures are ok): %s', self.grab_type, unicodify(e))
trans.rollback()


class JobHandlerQueue(Monitors):
"""
Job Handler's Internal Queue, this is what actually implements waiting for
Expand Down Expand Up @@ -91,38 +156,17 @@ def __init__(self, app, dispatcher):
self.job_wrappers = {}
name = "JobHandlerQueue.monitor_thread"
self._init_monitor_thread(name, target=self.__monitor, config=app.config)
self.__grab_query = None
self.__grab_conn_opts = {'autocommit': False}
self.__initialize_job_grabbing()

def __initialize_job_grabbing(self):
grabbable_methods = {
HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION,
HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED,
}
try:
method = [m for m in self.app.job_config.handler_assignment_methods if m in grabbable_methods][0]
except IndexError:
return
subq = select([model.Job.id]) \
.where(and_(
model.Job.table.c.handler.in_(self.app.job_config.self_handler_tags),
model.Job.table.c.state == model.Job.states.NEW)) \
.order_by(model.Job.table.c.id)
if self.app.job_config.handler_max_grab:
subq = subq.limit(self.app.job_config.handler_max_grab)
if method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED:
subq = subq.with_for_update(skip_locked=True)
self.__grab_query = model.Job.table.update() \
.returning(model.Job.table.c.id) \
.where(model.Job.table.c.id.in_(subq)) \
.values(handler=self.app.config.server_name)
if method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION:
self.__grab_conn_opts['isolation_level'] = 'SERIALIZABLE'
log.info(
"Handler job grabber initialized with '%s' assignment method for handler '%s', tag(s): %s", method,
self.app.config.server_name, ', '.join(str(x) for x in self.app.job_config.handler_tags)
)
self.job_grabber = None
handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method(self.app.job_config.handler_assignment_methods)
if handler_assignment_method:
self.job_grabber = ItemGrabber(
app=app,
grab_type='Job',
handler_assignment_method=handler_assignment_method,
max_grab=self.app.job_config.handler_max_grab,
self_handler_tags=self.app.job_config.self_handler_tags,
handler_tags=self.app.job_config.handler_tags,
)

def start(self):
"""
Expand Down Expand Up @@ -248,36 +292,11 @@ def __monitor_step(self):
'internal.galaxy.jobs.handlers.monitor_step',
'Job handler monitor step complete.'
)
if self.__grab_query is not None:
self.__grab_unhandled_jobs()
if self.job_grabber is not None:
self.job_grabber.grab_unhandled_items()
self.__handle_waiting_jobs()
log.trace(monitor_step_timer.to_str())

def __grab_unhandled_jobs(self):
"""
Attempts to "grab" jobs (assign unassigned jobs to itself) using DB serialization methods, if enabled. This
simply sets `Job.handler` to the current server name, which causes the job to be picked up by
`__handle_waiting_jobs()`.
"""
# an excellent discussion on PostgreSQL concurrency safety:
# https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
self.sa_session.expunge_all()
conn = self.sa_session.connection(execution_options=self.__grab_conn_opts)
with conn.begin() as trans:
try:
rows = conn.execute(self.__grab_query).fetchall()
if rows:
log.debug('Grabbed job(s): %s', ', '.join(str(row[0]) for row in rows))
trans.commit()
else:
trans.rollback()
except OperationalError as e:
# If this is a serialization failure on PostgreSQL, then e.orig is a psycopg2 TransactionRollbackError
# and should have attribute `code`. Other engines should just report the message and move on.
if int(getattr(e.orig, 'pgcode', -1)) != 40001:
log.debug('Grabbing job failed (serialization failures are ok): %s', unicodify(e))
trans.rollback()

def __handle_waiting_jobs(self):
"""
Gets any new jobs (either from the database or from its own queue), then iterates over all new and waiting jobs
Expand Down
25 changes: 20 additions & 5 deletions lib/galaxy/workflow/scheduling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import galaxy.workflow.schedulers
from galaxy import model
from galaxy.exceptions import HandlerAssignmentError
from galaxy.jobs.handler import ItemGrabber
from galaxy.util import (
parse_xml,
plugin_config,
Expand Down Expand Up @@ -31,11 +32,9 @@ class WorkflowSchedulingManager(ConfiguresHandlers):
processes.
"""
DEFAULT_BASE_HANDLER_POOLS = ('workflow-schedulers', 'job-handlers')
UNSUPPORTED_HANDLER_ASSIGNMENT_METHODS = (
HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION,
HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED,
UNSUPPORTED_HANDLER_ASSIGNMENT_METHODS = {
HANDLER_ASSIGNMENT_METHODS.UWSGI_MULE_MESSAGE,
)
}

def __init__(self, app):
self.app = app
Expand Down Expand Up @@ -267,10 +266,27 @@ def __init__(self, app, workflow_scheduling_manager):
self.app = app
self.workflow_scheduling_manager = workflow_scheduling_manager
self._init_monitor_thread(name="WorkflowRequestMonitor.monitor_thread", target=self.__monitor, config=app.config)
self.invocation_grabber = None
if self.workflow_scheduling_manager.handler_assignment_methods_configured:
self_handler_tags = set(self.app.job_config.self_handler_tags)
self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id)
handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method(self.workflow_scheduling_manager.handler_assignment_methods)
if handler_assignment_method:
self.invocation_grabber = ItemGrabber(
app=app,
grab_type='WorkflowInvocation',
handler_assignment_method=handler_assignment_method,
max_grab=self.workflow_scheduling_manager.handler_max_grab,
self_handler_tags=self_handler_tags,
handler_tags=self_handler_tags,
)

def __monitor(self):
to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers
while self.monitor_running:
if self.invocation_grabber:
self.invocation_grabber.grab_unhandled_items()

monitor_step_timer = self.app.execution_timer_factory.get_timer(
'internal.galaxy.workflows.scheduling_manager.monitor_step',
'Workflow scheduling manager monitor step complete.'
Expand All @@ -280,7 +296,6 @@ def __monitor(self):
return

self.__schedule(workflow_scheduler_id, workflow_scheduler)

log.trace(monitor_step_timer.to_str())
self._monitor_sleep(1)

Expand Down

0 comments on commit 62dbf67

Please sign in to comment.