Skip to content

RuntimeError when retrying DAG run with zero-length mapped tasks #43214

@szeswee

Description

@szeswee

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

  • 2.9.2
  • 2.10.4

What happened?

The Airflow docs state the following behaviour when encountering zero-length maps when using Dynamic Task Mapping:

If the input is empty (zero length), no new tasks will be created and the mapped task will be marked as SKIPPED.

The abovementioned behaviour is indeed correctly observed when a mapped task is first executed as part of a new DAG run (i.e. try_number = 1).

However, on subsequent tries (i.e. try_number > 1), the mapped task will instead throw the following exception:

[2024-10-21, 10:54:51 UTC] {taskinstance.py:2306} INFO - Starting attempt 2 of 2
[2024-10-21, 10:54:51 UTC] {taskinstance.py:2330} INFO - Executing <Mapped(PythonOperator): print_args> on 2024-10-21 10:54:27.291897+00:00
[2024-10-21, 10:54:51 UTC] {standard_task_runner.py:63} INFO - Started process 341 to run task
[2024-10-21, 10:54:51 UTC] {standard_task_runner.py:90} INFO - Running: ['airflow', 'tasks', 'run', 'dtm_failure', 'print_args', 'manual__2024-10-21T10:54:27.291897+00:00', '--job-id', '13517', '--raw', '--subdir', 'DAGS_FOLDER/dtm_failure.py', '--cfg-path', '/tmp/tmpahwek3h8', '--map-index', '0']
[2024-10-21, 10:54:51 UTC] {standard_task_runner.py:91} INFO - Job 13517: Subtask print_args
[2024-10-21, 10:54:51 UTC] {task_command.py:426} INFO - Running <TaskInstance: dtm_failure.print_args manual__2024-10-21T10:54:27.291897+00:00 map_index=0 [running]> on host 172.21.174.13
[2024-10-21, 10:54:52 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2479, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode, session=session)
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2633, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context, jinja_env=jinja_env)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3094, in render_templates
    original_task.render_template_fields(context, jinja_env)
  File "/usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 829, in render_template_fields
    mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 688, in _expand_mapped_kwargs
    return self._get_specified_expand_input().resolve(context, session)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 202, in resolve
    data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()}
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 202, in <dictcomp>
    data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()}
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 182, in _expand_mapped_field
    found_index = _find_index_for_this_field(map_index)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 176, in _find_index_for_this_field
    raise RuntimeError(f"cannot expand field mapped to length {mapped_length!r}")
RuntimeError: cannot expand field mapped to length 0
[2024-10-21, 10:54:52 UTC] {taskinstance.py:2953} ERROR - Unable to unmap task to determine if we need to send an alert email
[2024-10-21, 10:54:52 UTC] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=dtm_failure, task_id=print_args, run_id=manual__2024-10-21T10:54:27.291897+00:00, map_index=0, execution_date=20241021T105427, start_date=20241021T105451, end_date=20241021T105452
[2024-10-21, 10:54:52 UTC] {standard_task_runner.py:110} ERROR - Failed to execute job 13517 for task print_args (cannot expand field mapped to length 0; 341)

To emphasise, this issue only occurs when a zero-length map is encountered on try_number > 1. When try_number = 1 and there exists a zero-length map, this issue does not occur.

What you think should happen instead?

There shouldn't be an exception raised. Looking at the logs above, it seems to be a bug with the handling of mapped tasks as it relates to sending alerts to emails.

How to reproduce

  1. Create the following DAG with this code:
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    
    
    def generate_args() -> list[dict]:
       from airflow.operators.python import get_current_context
    
       context = get_current_context()
       is_first_try = context["ti"].try_number == 1
    
       return [{"foo": f"bar_{idx}"} for idx in range(5)] if is_first_try else []
    
    
    with DAG(
       "dtm_failure",
       description="Demonstrate DAG failure with zero-length mapped tasks on subsequent tries",
       schedule=None,
    ):
       task_generate_args = PythonOperator(
           task_id="generate_args",
           python_callable=generate_args,
       )
    
       task_print_args = PythonOperator.partial(
           task_id="print_args",
           python_callable=lambda foo: print(f"Arg: {foo}"),
       ).expand(op_kwargs=task_generate_args.output)
  2. On your Airflow deployment, manually trigger a run of the dtm_failure DAG.
  3. The first try will succeed and you should see the following: Screenshot 2024-10-21 at 7 13 53 PM
  4. Select Clear > Clear existing tasks to retry the entire DAG run.
  5. The second try will fail and you should see the following: Screenshot 2024-10-21 at 7 14 28 PM
  6. Select the failed mapped task to view its logs.

Operating System

Debian 11

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

  • Python version: 3.11.4

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions