From 464fa92c6885bac5c93d45fe430b6ce085fb2146 Mon Sep 17 00:00:00 2001 From: Alexandr Nikitin Date: Wed, 15 Jun 2016 15:52:46 +0300 Subject: [PATCH] Provide access to the task instance to executors Motivation: Working on a custom executor I want to utilize the existing ecosystem: all operators e.g. DockerOperator and access its fields like image, command, volumes. I want to transform/convert them to be used by another execution system like Mesos. So that I add ability to have access to task instances from an executor. Resolves: AIRFLOW-245 --- airflow/contrib/executors/mesos_executor.py | 2 +- airflow/executors/base_executor.py | 4 ++-- airflow/executors/celery_executor.py | 2 +- airflow/executors/local_executor.py | 2 +- airflow/executors/sequential_executor.py | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py index 45a474dc3a1a3..2973b59bad3ac 100644 --- a/airflow/contrib/executors/mesos_executor.py +++ b/airflow/contrib/executors/mesos_executor.py @@ -258,7 +258,7 @@ def start(self): self.mesos_driver = driver self.mesos_driver.start() - def execute_async(self, key, command, queue=None): + def execute_async(self, key, command, queue=None, task=None): self.task_queue.put((key, command)) def sync(self): diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 2e88fa9425305..4ecb44004d379 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -96,7 +96,7 @@ def heartbeat(self): ti.refresh_from_db() if ti.state != State.RUNNING: self.running[key] = command - self.execute_async(key, command=command, queue=queue) + self.execute_async(key, command=command, queue=queue, task=ti) else: self.logger.debug( 'Task is already running, not sending to ' @@ -124,7 +124,7 @@ def get_event_buffer(self): self.event_buffer = {} return d - def execute_async(self, key, command, queue=None): # pragma: no cover + def execute_async(self, key, command, queue=None, task=None): # pragma: no cover """ This method will execute the command asynchronously. """ diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index de56baf11ad11..8b9fd8689ed8e 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -59,7 +59,7 @@ def start(self): self.tasks = {} self.last_state = {} - def execute_async(self, key, command, queue=DEFAULT_QUEUE): + def execute_async(self, key, command, queue=DEFAULT_QUEUE, task=None): self.logger.info( "[celery] queuing {key} through celery, " "queue={queue}".format(**locals())) self.tasks[key] = execute_command.apply_async( diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 24ef6c69cac96..32a03c736567d 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -60,7 +60,7 @@ def start(self): for w in self.workers: w.start() - def execute_async(self, key, command, queue=None): + def execute_async(self, key, command, queue=None, task=None): self.queue.put((key, command)) def sync(self): diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 53d9f0a626ea5..4365571b7f22b 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -18,7 +18,7 @@ def __init__(self): super(SequentialExecutor, self).__init__() self.commands_to_run = [] - def execute_async(self, key, command, queue=None): + def execute_async(self, key, command, queue=None, task=None): self.commands_to_run.append((key, command,)) def sync(self):