-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
subdag doesn't work with LocalExecutor/SequentialExecutor #1168
Comments
@xiaoliangsc can you please provide some minimal code that reproduces the problem? |
@xiaoliangsc Please provide the following :
If we don't hear back in a day or so, we will close this issue. |
(JLowin: edited to add code blocks) main_dag.pyfrom datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdag import sub_dag
PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'
default_args = {
'owner': 'shawn',
'start_date': datetime(2016, 1, 1),
}
main_dag = DAG(dag_id=PARENT_DAG_NAME, default_args=default_args)
sub_dag = SubDagOperator(
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args),
task_id=CHILD_DAG_NAME,
dag=main_dag,
) subdag.pyfrom airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
def sub_dag(parent_dag_name, child_dag_name, default_args):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
default_args=default_args
)
dummy_operator = DummyOperator(
task_id='dummy_task',
dag=dag,
)
return dag execute:airflow test parent_dag parent_dag.child_dag 2016-01-01 stack trace:Traceback (most recent call last):
File "/Users/shawn.liang/.virtualenvs/common/bin/airflow", line 15, in <module>
args.func(args)
File "/Users/shawn.liang/.virtualenvs/common/lib/python2.7/site-packages/airflow/bin/cli.py", line 272, in test
task = dag.get_task(task_id=args.task_id)
File "/Users/shawn.liang/.virtualenvs/common/lib/python2.7/site-packages/airflow/models.py", line 2375, in get_task
raise AirflowException("Task {task_id} not found".format(**locals()))
airflow.utils.AirflowException: Task parent_dag.child_dag not found installed airflow env: pip install airflow==1.6.2 (on mac). |
@xiaoliangsc the first problem is that it looks like you are trying to execute the subdag ( However, that still creates an error. The reason is that airflow needs to be able to load all DAGs, including subdags, from the file in which they're defined. In your code, you define the subdag as an argument, which means it isn't available to be loaded. If instead you do this, it will work: main_dag.pyfrom datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdag import sub_dag
PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'
default_args = {
'owner': 'shawn',
'start_date': datetime(2016, 1, 1),
}
main_dag = DAG(dag_id=PARENT_DAG_NAME, default_args=default_args)
# define sub dag outside the operator
subdag_object = sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args)
sub_dag = SubDagOperator(
subdag=subdag_object,
task_id=CHILD_DAG_NAME,
dag=main_dag,
) I will think about whether we can "look inside" |
if you create subdag_object in the main_day.py, isn't it scheduled separately by the scheduler? I thought the whole point of using a subdag is to schedule subdag only as part of the main dag? Maybe I'm missing some pieces here. |
@xiaoliangsc I take it back -- it should work the way you've got it. I'm able to run the example_subdag_operator, which is set up very similarly to your example, so I suspect there is something odd going on here. I'm taking a closer look... |
@xiaoliangsc this is very strange, but could you please try making this change in #old
from airflow.operators.subdag_operator import SubDagOperator
#new
from airflow.operators import SubDagOperator |
It works for me know when I change to task_id from subdag id to the subdagoperator id. I guess I previously misused it. Feel free to close the issue and thanks for you help. |
I'm sorry for the red herring reply earlier! This is a really weird case. You SHOULD be able to do what you did -- this happens because Airflow handles its own imports in a strange way, so those two SubDagOperators are not the same (even though they look like they should be). I am going to submit a fix. Thanks for your patience! |
Hmn.. @xiaoliangsc Sorry, can you paste your "fixed" code for completeness and for posterity. I think others will also run into this problem. |
@r39132 Sure, basically I'll need to change from "from airflow.operators.subdag_operator import SubDagOperator" to "from airflow.operators import SubDagOperator" in the previous code sample, and run "airflow test parent_dag child_dag 2016-03-21", it'll work. I guess @jlowin already works out a fix for the problem. |
`airflow.operators.SubDagOperator` and `airflow.operators.subdag_operator.SubDagOperator` are NOT the same. Airflow needs to check against both classes to determine if a task is in fact a SubDagOperator. This is because of Airflow's import machinery. It is *probably* ok to check both classes with `isinstance()` but the behavior is surprising and to cover our bases we check for __class__.__name__ and a `subdag` attr. closes apache#1168
* release automation on tag Signed-off-by: Maciej Obuchowski <maciej.obuchowski@getindata.com> * add new-version script Signed-off-by: Maciej Obuchowski <maciej.obuchowski@getindata.com> Co-authored-by: Maciej Obuchowski <maciej.obuchowski@getindata.com>
* release automation on tag Signed-off-by: Maciej Obuchowski <maciej.obuchowski@getindata.com> * add new-version script Signed-off-by: Maciej Obuchowski <maciej.obuchowski@getindata.com> Co-authored-by: Maciej Obuchowski <maciej.obuchowski@getindata.com>
I'm working with Airflow 1.6.2 on OSX;
When running SubdagOperator using LocalExecutor/SequentialExecutor, it always hit the exception of "DAG {sub_dag_name} could not be found in {dag_folder}'", from https://github.com/airbnb/airflow/blob/master/airflow/bin/cli.py#L159
It looks to me that with LocalExecutor/SequentialExecutor, dags are not pickled, so when subdag operator uses backfill_job to execute subdag as a dag, it always tries to load the subdag from dag folder, which causes error because we hide the subdag using a factory method. (follow http://pythonhosted.org/airflow/concepts.html#subdags)
The text was updated successfully, but these errors were encountered: