Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help using TriggerDagRunOperator #1001

Closed
pedrorjbr opened this issue Feb 13, 2016 · 10 comments
Closed

Help using TriggerDagRunOperator #1001

pedrorjbr opened this issue Feb 13, 2016 · 10 comments

Comments

@pedrorjbr
Copy link

I running into problems use the TriggerDagRunOperator:

def foo(context, dag_run_obj):
    #What goes here? 
    #create a DagRun? 
    pass

dag = DAG(dag_id='test_short_circuit_novo', default_args=args, schedule_interval=None)

task = BashOperator(
    task_id='runme',
    bash_command='echo run_me',
    dag=dag)

trigger = TriggerDagRunOperator(task_id='trigger_other_dag', dag_id="test_trigger", python_callable=foo, dag=dag)
task.set_downstream(trigger)

    trigger = TriggerDagRunOperator(task_id='trigger_other_dag', dag_id="test_trigger", python_callable=foo)
  File "/home/pedro/workspace/mdm-source/env/local/lib/python2.7/site-packages/airflow/utils.py", line 472, in wrapper
    result = func(*args, **kwargs)
TypeError: __init__() takes at least 3 arguments (2 given)

Does anyone can write a sample code for the def foo(context, dag_run_obj): function?

@pedrorjbr
Copy link
Author

Something is wrong in the documentation:

From documentation:
class airflow.operators.TriggerDagRunOperator(dag_id, python_callable, _args, *_kwargs)
dag_id (str) – the dag_id to trigger

From test case:

        t = operators.TriggerDagRunOperator(
            task_id='test_trigger_dagrun',
            trigger_dag_id='example_bash_operator',
            python_callable=trigga,
            dag=self.dag)

@r39132
Copy link
Contributor

r39132 commented Feb 15, 2016

Hi @pedrorjbr,
Great question and it's very confusing, I agree. This is screaming for better documentation.

I put together an example to help you. There are some gotcha's however that tripped me up:

  1. Make sure you run everything on UTC -- Airflow does not handle non-UTC dates in a clear way at all and in fact caused me scratch my head as I saw an 8 hour delay in my triggered dag_runs actually executing.
  2. When you use the TriggerDagRunOperator, there are 2 DAGs being executed: the Controller and the Target. Both DAGs must be enabled. The Target DAG should have a schedule_interval=None

Illustrative Example:
Step 1 : Define your Target DAG with schedule_interval=None. DAG_RUNs of this target will be created and executed every time that your trigger fires.

from airflow.operators import *
from airflow.models import DAG
from datetime import date, datetime, time, timedelta

args = {
    'start_date': datetime.now(),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_dummy_dag_v1',
    default_args=args,
    schedule_interval=None)


run_this = DummyOperator(
    task_id='run_this',
    dag=dag)

Step 2 : Define your Controller DAG with any interval that makes sense for you! It's going to be checking a condition on that interval and triggering the Target DAG whenever the condition is met. In the example below, I am running it every minute and always executing the trigger. The idea of the foo function is to check a condition and then return dag_run_obj if you want to trigger the Target DAG. If your condition is not met, then return None.

from airflow import DAG, utils
from airflow.operators import *
from datetime import date, datetime, time, timedelta


def foo(context, dag_run_obj):
    if True:
        return dag_run_obj

dag = DAG(dag_id='test_trigger_dag_run_for_Sid',
          default_args={"owner" : "me",
                        "start_date":datetime.now()},
          schedule_interval='*/1 * * * *')

trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
                                trigger_dag_id="simple_dummy_dag_v1",
                                python_callable=foo,
                                dag=dag

Optional Step 3 : You can also manually trigger your Target DAG using the CLI, perhaps when testing or if you are using some custom automation via trigger_dag simple_dummy_dag_v1

Your DAG_Run table will look like :

screenshot 2016-02-15 23 00 17

You can see 3 types of entries (from top to bottom):

  • Manually triggered (via CLI) DAG_RUNs of the Target DAG (e.g. run_id : manual__2016-02-15T22:57:49.996114)
  • Controller triggered DAG_RUNs of the Target DAG (e.g. run_id : trig__2016-02-15T22:26:37.425566)
  • Controller DAG_RUNs (e.g. run_id : scheduled__2016-02-15T11:15:00)

The documentation sucks. Please do your part and update it as and when questions are answered. I can do this one. Closing for now, but ping me here or on gitter if you need it reopened.

@pedrorjbr
Copy link
Author

@r39132: Thanks again for the response. I will update the documentation. Do you have any example how can I pass information from a dag to a triggered dag? How I get the information on the triggered dag?

@r39132 r39132 reopened this Feb 19, 2016
@r39132
Copy link
Contributor

r39132 commented Feb 19, 2016

Good question. Reopening questions and I now understand the other issue that you opened. Post the question on gitter - I know @nicktrav is doing something with triggering subdags. That might hold a solution. I'll investigate as well later tonight,

@mistercrunch
Copy link
Member

http://pythonhosted.org/airflow/code.html#airflow.operators.TriggerDagRunOperator
If your python_callable returns True, a dagrun is created, you can pass anything picklable (should be small-ish) to the payload object

def foo(context, dagrun):
    if condition_ismet():
        dagrun.payload = {'execution_date': context['execution_date'], 'foo': 'bar'}
        return True

You can refer to the code

@pedrorjbr
Copy link
Author

Thanks! @mistercrunch

But how can I get the pickable object in the triggered dag?

Is it a property of the DAG or Task? Can you post a sample of code. Sorry, maybe it is a noob question.

From DOC:
"... and the payload has to be a picklable object that will be made available to your tasks while executing that DAG run."

@r39132
Copy link
Contributor

r39132 commented Feb 19, 2016

Here you go:
#1043

If this meets your needs, close the issue. Else, please ping me on gitter.. email tends to get lost. If I don't hear back in a day, I'll assume the issue closed

@r39132
Copy link
Contributor

r39132 commented Feb 20, 2016

Closing. If you have any questions, add them here and ping me on gitter and I can reopen.

@r39132 r39132 closed this as completed Feb 20, 2016
@pedrorjbr
Copy link
Author

@r39132

I am getting in kwargs, and cannot get the conf value...

{u'dag_run': None, u'run_id': None, u'dag': <DAG: test_trigger>, u'conf': <module 'airflow.configuration' from '/usr/local/lib/python2.7/dist-packages/airflow/configuration.pyc'>, u'tables': None, u'task_instance_key_str': u'test_trigger__run_this__20160220', u'END_DATE': '2016-02-20', u'execution_date': datetime.datetime(2016, 2, 20, 12, 45, 36), u'ts': '2016-02-20T12:45:36', u'macros': <module 'airflow.macros' from '/usr/local/lib/python2.7/dist-packages/airflow/macros/init.pyc'>, u'params': {}, u'ti': <TaskInstance: test_trigger.run_this 2016-02-20 12:45:36 [running]>, u'ds_nodash': u'20160220', u'test_mode': False, u'end_date': '2016-02-20', 'templates_dict': None, u'task': <Task(PythonOperator): run_this>, u'task_instance': <TaskInstance: test_trigger.run_this 2016-02-20 12:45:36 [running]>, u'latest_date': '2016-02-20', u'yesterday_ds': '2016-02-19', u'ts_nodash': u'20160220T124536', u'tomorrow_ds': '2016-02-21'}

@r39132 r39132 reopened this Feb 20, 2016
@r39132
Copy link
Contributor

r39132 commented Feb 20, 2016

Let's keep the conversation in a single issue. Let's keep the conversation in #1029 I'll change that title to be the same as this one 'Help using TriggerDagRunOperator'

@r39132 r39132 closed this as completed Feb 20, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants