Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Task Mapping not working with op_kwargs in PythonOperator #23833

Closed
1 of 2 tasks
erdos2n opened this issue May 20, 2022 · 12 comments · Fixed by #23860
Closed
1 of 2 tasks

Dynamic Task Mapping not working with op_kwargs in PythonOperator #23833

erdos2n opened this issue May 20, 2022 · 12 comments · Fixed by #23860
Assignees
Milestone

Comments

@erdos2n
Copy link
Contributor

erdos2n commented May 20, 2022

Apache Airflow version

2.3.0 (latest released)

What happened

The following DAG was written and expected to generate 3 tasks (one for each string in the list)

dag_code

import logging
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator

from airflow.utils.dates import datetime


def log_strings_operator(string, *args, **kwargs):
    logging.info("we've made it into the method")
    logging.info(f"operator log - {string}")


@dag(
    dag_id='dynamic_dag_test',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example', 'dynamic_tasks']
)
def tutorial_taskflow_api_etl():
    op2 = (PythonOperator
           .partial(task_id="logging_with_operator_task",
                    python_callable=log_strings_operator)
           .expand(op_kwargs=[{"string": "a"}, {"string": "b"}, {"string": "c"}]))

    return op2


tutorial_etl_dag = tutorial_taskflow_api_etl()

error message

Broken DAG: [/usr/local/airflow/dags/dynamic_dag_test.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 343, in _serialize
    return SerializedBaseOperator.serialize_mapped_operator(var)
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 608, in serialize_mapped_operator
    assert op_kwargs[Encoding.TYPE] == DAT.DICT
TypeError: list indices must be integers or slices, not Encoding

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1105, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1013, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'dynamic_dag_test': list indices must be integers or slices, not Encoding

What you think should happen instead

Dag should contain 1 task logging_with_operator_task that contains 3 indices

How to reproduce

copy/paste dag code into a dag file and run on airflow 2.3.0. Airflow UI will flag the error

Operating System

Debian

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@erdos2n erdos2n added area:core kind:bug This is a clearly a bug labels May 20, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented May 20, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@fritz-astronomer
Copy link
Contributor

@ashb ashb added this to the Airflow 2.3.1 milestone May 20, 2022
@snjypl
Copy link
Contributor

snjypl commented May 22, 2022

@erdos2n i guess, you need to pass op_kwargs as a dict :

  .expand(op_kwargs={"string":[ "a", "b","c"]}))

@erdos2n
Copy link
Contributor Author

erdos2n commented May 22, 2022

@snjypl I tried this and it still didn't work, were you able to get it to work?

@snjypl
Copy link
Contributor

snjypl commented May 22, 2022

yes @erdos2n i was able to get that work. can you please share the error you are getting?

@erdos2n
Copy link
Contributor Author

erdos2n commented May 22, 2022

@snjypl here is the code and error message when it runs in Airflow. So the UI Error does disappear, but it still isn't working.

import logging
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator

from airflow.utils.dates import datetime

def log_strings(string):
    logging.info(f"here is the string = {string}")

@dag(
    dag_id='dynamic_dag_test',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example', 'dynamic_tasks']
)
def tutorial_taskflow_api_etl():
    op2 = (PythonOperator
           .partial(task_id="logging_with_operator_task",
                    python_callable=log_strings)
           .expand(op_kwargs={"string":["a", "b", "c"]}))

    return op2


tutorial_etl_dag = tutorial_taskflow_api_etl()

Error message:

[2022-05-22, 17:06:49 UTC] {taskinstance.py:1376} INFO - Executing <Mapped(PythonOperator): logging_with_operator_task> on 2022-05-22 17:06:48.858540+00:00
[2022-05-22, 17:06:49 UTC] {standard_task_runner.py:52} INFO - Started process 290 to run task
[2022-05-22, 17:06:49 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'dynamic_dag_test', 'logging_with_operator_task', 'manual__2022-05-22T17:06:48.858540+00:00', '--job-id', '41', '--raw', '--subdir', 'DAGS_FOLDER/dynamic_dag_test.py', '--cfg-path', '/tmp/tmp1yeh1bff', '--map-index', '0', '--error-file', '/tmp/tmp2mzffl7i']
[2022-05-22, 17:06:49 UTC] {standard_task_runner.py:80} INFO - Job 41: Subtask logging_with_operator_task
[2022-05-22, 17:06:49 UTC] {task_command.py:369} INFO - Running <TaskInstance: dynamic_dag_test.logging_with_operator_task manual__2022-05-22T17:06:48.858540+00:00 map_index=0 [running]> on host 5b49114612fc
[2022-05-22, 17:06:49 UTC] {taskinstance.py:1568} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dynamic_dag_test
AIRFLOW_CTX_TASK_ID=logging_with_operator_task
AIRFLOW_CTX_EXECUTION_DATE=2022-05-22T17:06:48.858540+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-05-22T17:06:48.858540+00:00
[2022-05-22, 17:06:49 UTC] {taskinstance.py:1888} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 168, in execute
    context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/context.py", line 256, in context_merge
    context.update(*args, **kwargs)
  File "/usr/local/lib/python3.9/_collections_abc.py", line 946, in update
    for key, value in other:
ValueError: too many values to unpack (expected 2)
[2022-05-22, 17:06:50 UTC] {taskinstance.py:1394} INFO - Marking task as FAILED. dag_id=dynamic_dag_test, task_id=logging_with_operator_task, map_index=0, execution_date=20220522T170648, start_date=20220522T170649, end_date=20220522T170650
[2022-05-22, 17:06:50 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 41 for task logging_with_operator_task (too many values to unpack (expected 2); 290)
[2022-05-22, 17:06:50 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-05-22, 17:06:50 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

@uranusjr
Copy link
Member

Your original syntax is correct. (#23833 (comment) is wrong.) This looks like a bug in the serialisation code.

@erdos2n
Copy link
Contributor Author

erdos2n commented May 22, 2022

@uranusjr I agree, the first syntax best aligns with the current implementation of the dynamic mapping methods.

@wb-08
Copy link

wb-08 commented Jun 7, 2022

Please fix this bug as soon as possible:)

@potiuk
Copy link
Member

potiuk commented Jun 7, 2022

@wb-08 - maybe you would like to attempt to fix it? Then it will be quite faster. At the very least please observe this isssue and when PR is raised - can we count on you testing it?

@zoid-w
Copy link

zoid-w commented Jul 7, 2022

Tested on 2.3.3rc2-python3.10 using original syntax :
.expand(op_kwargs=[{"string": "a"}, {"string": "b"}, {"string": "c"}]))
which is working as intended. Also verified supplying this syntax by XComArg:
.expand(op_kwargs=XComArg(some_previous_task, key='return_value')) which is also working as intended.

@uranusjr
Copy link
Member

uranusjr commented Jul 7, 2022

@zoid-w Can you add a feedback in #24863? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants