diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 1d309bbb458c..6b410f8a03cc 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -62,6 +62,71 @@ def shutdown(self): self.job_stop_queue.shutdown() +class ItemGrabber: + + def __init__(self, app, grab_type='Job', handler_assignment_method=None, max_grab=None, self_handler_tags=None, handler_tags=None): + self.app = app + self.sa_session = app.model.context + self.grab_this = getattr(model, grab_type) + self.grab_type = grab_type + self._grab_conn_opts = {'autocommit': False} + subq = select([self.grab_this.id]) \ + .where(and_( + self.grab_this.table.c.handler.in_(self_handler_tags), + self.grab_this.table.c.state == self.grab_this.states.NEW)) \ + .order_by(self.grab_this.table.c.id) + if max_grab: + subq = subq.limit(max_grab) + if handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: + subq = subq.with_for_update(skip_locked=True) + self._grab_query = self.grab_this.table.update() \ + .returning(self.grab_this.table.c.id) \ + .where(self.grab_this.table.c.id.in_(subq)) \ + .values(handler=self.app.config.server_name) + if handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: + self._grab_conn_opts['isolation_level'] = 'SERIALIZABLE' + log.info( + "Handler job grabber initialized with '%s' assignment method for handler '%s', tag(s): %s", handler_assignment_method, + self.app.config.server_name, ', '.join(str(x) for x in handler_tags) + ) + + @staticmethod + def get_grabbable_handler_assignment_method(handler_assignment_methods): + grabbable_methods = { + HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION, + HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED, + } + try: + return [m for m in handler_assignment_methods if m in grabbable_methods][0] + except IndexError: + return + + def grab_unhandled_items(self): + """ + Attempts to assign unassigned jobs or invocaions to itself using DB serialization methods, if enabled. This + simply sets `Job.handler` or `WorkflowInvocation.handler` to the current server name, which causes the job to be picked up by + the appropriate handler. + """ + # an excellent discussion on PostgreSQL concurrency safety: + # https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/ + self.sa_session.expunge_all() + conn = self.sa_session.connection(execution_options=self._grab_conn_opts) + with conn.begin() as trans: + try: + rows = conn.execute(self._grab_query).fetchall() + if rows: + log.debug('Grabbed %s(s): %s', self.grab_type, ', '.join(str(row[0]) for row in rows)) + trans.commit() + else: + trans.rollback() + except OperationalError as e: + # If this is a serialization failure on PostgreSQL, then e.orig is a psycopg2 TransactionRollbackError + # and should have attribute `code`. Other engines should just report the message and move on. + if int(getattr(e.orig, 'pgcode', -1)) != 40001: + log.debug('Grabbing %s failed (serialization failures are ok): %s', self.grab_type, unicodify(e)) + trans.rollback() + + class JobHandlerQueue(Monitors): """ Job Handler's Internal Queue, this is what actually implements waiting for @@ -91,38 +156,18 @@ def __init__(self, app, dispatcher): self.job_wrappers = {} name = "JobHandlerQueue.monitor_thread" self._init_monitor_thread(name, target=self.__monitor, config=app.config) - self.__grab_query = None - self.__grab_conn_opts = {'autocommit': False} - self.__initialize_job_grabbing() - - def __initialize_job_grabbing(self): - grabbable_methods = { - HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION, - HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED, - } - try: - method = [m for m in self.app.job_config.handler_assignment_methods if m in grabbable_methods][0] - except IndexError: - return - subq = select([model.Job.id]) \ - .where(and_( - model.Job.table.c.handler.in_(self.app.job_config.self_handler_tags), - model.Job.table.c.state == model.Job.states.NEW)) \ - .order_by(model.Job.table.c.id) - if self.app.job_config.handler_max_grab: - subq = subq.limit(self.app.job_config.handler_max_grab) - if method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: - subq = subq.with_for_update(skip_locked=True) - self.__grab_query = model.Job.table.update() \ - .returning(model.Job.table.c.id) \ - .where(model.Job.table.c.id.in_(subq)) \ - .values(handler=self.app.config.server_name) - if method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: - self.__grab_conn_opts['isolation_level'] = 'SERIALIZABLE' - log.info( - "Handler job grabber initialized with '%s' assignment method for handler '%s', tag(s): %s", method, - self.app.config.server_name, ', '.join(str(x) for x in self.app.job_config.handler_tags) - ) + handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method(self.app.job_config.handler_assignment_methods) + if handler_assignment_method: + self.job_grabber = ItemGrabber( + app=app, + grab_type='Job', + handler_assignment_method=handler_assignment_method, + max_grab=self.app.job_config.handler_max_grab, + self_handler_tags=self.app.job_config.self_handler_tags, + handler_tags=self.app.job_config.handler_tags, + ) + else: + self.job_grabber = None def start(self): """ @@ -248,36 +293,11 @@ def __monitor_step(self): 'internal.galaxy.jobs.handlers.monitor_step', 'Job handler monitor step complete.' ) - if self.__grab_query is not None: - self.__grab_unhandled_jobs() + if self.job_grabber is not None: + self.job_grabber.grab_unhandled_items() self.__handle_waiting_jobs() log.trace(monitor_step_timer.to_str()) - def __grab_unhandled_jobs(self): - """ - Attempts to "grab" jobs (assign unassigned jobs to itself) using DB serialization methods, if enabled. This - simply sets `Job.handler` to the current server name, which causes the job to be picked up by - `__handle_waiting_jobs()`. - """ - # an excellent discussion on PostgreSQL concurrency safety: - # https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/ - self.sa_session.expunge_all() - conn = self.sa_session.connection(execution_options=self.__grab_conn_opts) - with conn.begin() as trans: - try: - rows = conn.execute(self.__grab_query).fetchall() - if rows: - log.debug('Grabbed job(s): %s', ', '.join(str(row[0]) for row in rows)) - trans.commit() - else: - trans.rollback() - except OperationalError as e: - # If this is a serialization failure on PostgreSQL, then e.orig is a psycopg2 TransactionRollbackError - # and should have attribute `code`. Other engines should just report the message and move on. - if int(getattr(e.orig, 'pgcode', -1)) != 40001: - log.debug('Grabbing job failed (serialization failures are ok): %s', unicodify(e)) - trans.rollback() - def __handle_waiting_jobs(self): """ Gets any new jobs (either from the database or from its own queue), then iterates over all new and waiting jobs diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index 6e3a45be4168..188f645f18dc 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -4,6 +4,7 @@ import galaxy.workflow.schedulers from galaxy import model from galaxy.exceptions import HandlerAssignmentError +from galaxy.jobs.handler import ItemGrabber from galaxy.util import ( parse_xml, plugin_config, @@ -31,11 +32,13 @@ class WorkflowSchedulingManager(ConfiguresHandlers): processes. """ DEFAULT_BASE_HANDLER_POOLS = ('workflow-schedulers', 'job-handlers') - UNSUPPORTED_HANDLER_ASSIGNMENT_METHODS = ( - HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION, - HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED, + UNSUPPORTED_HANDLER_ASSIGNMENT_METHODS = { HANDLER_ASSIGNMENT_METHODS.UWSGI_MULE_MESSAGE, - ) + } + GRAB_SELF_METHODS = { + HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION, + HANDLER_ASSIGNMENT_METHODS.UWSGI_MULE_MESSAGE + } def __init__(self, app): self.app = app @@ -267,10 +270,27 @@ def __init__(self, app, workflow_scheduling_manager): self.app = app self.workflow_scheduling_manager = workflow_scheduling_manager self._init_monitor_thread(name="WorkflowRequestMonitor.monitor_thread", target=self.__monitor, config=app.config) + self.invocation_grabber = None + if self.workflow_scheduling_manager.handler_assignment_methods_configured: + self_handler_tags = set(self.app.job_config.self_handler_tags) + self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id) + handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method(self.workflow_scheduling_manager.handler_assignment_methods) + if handler_assignment_method: + self.invocation_grabber = ItemGrabber( + app=app, + grab_type='WorkflowInvocation', + handler_assignment_method=handler_assignment_method, + max_grab=self.workflow_scheduling_manager.handler_max_grab, + self_handler_tags=self_handler_tags, + handler_tags=self_handler_tags, + ) def __monitor(self): to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers while self.monitor_running: + if self.invocation_grabber: + self.invocation_grabber.grab_unhandled_items() + monitor_step_timer = self.app.execution_timer_factory.get_timer( 'internal.galaxy.workflows.scheduling_manager.monitor_step', 'Workflow scheduling manager monitor step complete.' @@ -280,7 +300,6 @@ def __monitor(self): return self.__schedule(workflow_scheduler_id, workflow_scheduler) - log.trace(monitor_step_timer.to_str()) self._monitor_sleep(1)