Skip to content

Scheduler crashing when running mapped task #54627

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Noticed scheduler is crashing when running mapped task

Logs

scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
scheduler     args.func(args)
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 48, in command
scheduler     return func(*args, **kwargs)
scheduler            ^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in wrapper
scheduler     return f(*args, **kwargs)
scheduler            ^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
scheduler     return func(*args, **kwargs)
scheduler            ^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 52, in scheduler
scheduler     run_command_with_daemon_option(
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
scheduler     callback()
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 55, in <lambda>
scheduler     callback=lambda: _run_scheduler_job(args),
scheduler                      ^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
scheduler     run_job(job=job_runner.job, execute_callable=job_runner._execute)
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
scheduler     return func(*args, session=session, **kwargs)
scheduler            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 355, in run_job
scheduler     return execute_job(job, execute_callable=execute_callable)
scheduler            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 384, in execute_job
scheduler     ret = execute_callable()
scheduler           ^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 979, in _execute
scheduler     self._run_scheduler_loop()
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1265, in _run_scheduler_loop
scheduler     num_queued_tis = self._do_scheduling(session)
scheduler                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1375, in _do_scheduling
scheduler     callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
scheduler                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/utils/retries.py", line 94, in wrapped_function
scheduler     for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
scheduler                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 445, in __iter__
scheduler     do = self.iter(retry_state=retry_state)
scheduler          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 378, in iter
scheduler     result = action(retry_state)
scheduler              ^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 400, in <lambda>
scheduler     self._add_action_func(lambda rs: rs.outcome.result())
scheduler                                      ^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result
scheduler     return self.__get_result()
scheduler            ^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
scheduler     raise self._exception
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/utils/retries.py", line 103, in wrapped_function
scheduler     return func(*args, **kwargs)
scheduler            ^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1783, in _schedule_all_dag_runs
scheduler     callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
scheduler                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1914, in _schedule_dag_run
scheduler     dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query)
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 98, in wrapper
scheduler     return func(*args, **kwargs)
scheduler            ^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/models/dagrun.py", line 2012, in schedule_tis
scheduler     if task.expand_start_from_trigger(context=ti.get_template_context()):
scheduler                                               ^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 1791, in get_template_context
scheduler     context: Context = runtime_ti.get_template_context()
scheduler                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 180, in get_template_context
scheduler     "map_index_template": self.task.map_index_template,
scheduler                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler   File "<attrs generated getattr airflow.models.mappedoperator.MappedOperator>", line 11, in __getattr__
scheduler AttributeError: 'MappedOperator' object has no attribute 'map_index_template'
stream closed EOF for nebular-electron-5470/nebular-electron-5470-scheduler-59dfb9f67c-6v6sk (scheduler)

What you think should happen instead?

Scheduler should not crash

How to reproduce

  1. Executed below mapped DAG
  2. check scheduler logs

DAG CODE

from datetime import datetime, timedelta
from time import sleep

from airflow.sdk import DAG, task
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.date_time import DateTimeSensor, DateTimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync

delays = [30, 60, 90]


@task
def get_delays():
    return delays


@task
def get_wakes(delay, **context):
    "Wake {delay} seconds after the task starts"
    ti = context["ti"]
    return (ti.start_date + timedelta(seconds=delay)).isoformat()


with DAG(
    dag_id="datetime_mapped",
    start_date=datetime(1970, 1, 1),
    schedule=None,
    tags=["taskmap"] 
) as dag:

    wake_times = get_wakes.expand(delay=get_delays())

    DateTimeSensor.partial(task_id="expanded_datetime").expand(target_time=wake_times)
    TimeDeltaSensor.partial(task_id="expanded_timedelta").expand(
        delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
    )

    DateTimeSensorAsync.partial(task_id="expanded_datetime_async").expand(
        target_time=wake_times
    )
    TimeDeltaSensorAsync.partial(task_id="expanded_timedelta_async").expand(
        delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
    )

    TimeDeltaSensor(task_id="static_timedelta", delta=timedelta(seconds=90))
    DateTimeSensor(
        task_id="static_datetime",
        target_time="{{macros.datetime.now() + macros.timedelta(seconds=90)}}",
    )

    PythonOperator(task_id="op_sleep_90", python_callable=lambda: sleep(90))

Operating System

linux

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions