Skip to content

Triggerer: too many open files #61916

@ecodina

Description

@ecodina

Apache Airflow version

3.1.7

If "Other Airflow 3 version" selected, which one?

No response

What happened?

The triggerer fails every few hours with the OSError "Too many open files". In there I run ExternalTaskSensor as well as a custom trigger (below).

I thought it could be related to #56366, but my triggerer does not use the cleanup method. I've also seen issues for the worker (#51624) and dagprocessor (#49887).

I have been investigating and see that /proc/7/fd always increases. Instead, /proc/24/fd does handle closing files / sockets correctly. From what I've seen, my trigger code runs as PID 24 (used os.getpid() to verify it), so PID 7 is probably the parent process:

root@99ee11a02435:/opt/airflow# ps aux
USER         PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
default        1  0.0  0.0   2336  1024 ?        Ss   16:59   0:00 /usr/bin/dumb-init -- /entrypoint triggerer --skip-serve-logs
default        7  2.3  2.7 386360 221004 ?       Ssl  16:59   0:16 /usr/python/bin/python3.12 /home/airflow/.local/bin/airflow triggerer --skip-serve-logs
default       24  0.2  1.8 359924 153816 ?       Sl   17:01   0:01 /usr/python/bin/python3.12 /home/airflow/.local/bin/airflow triggerer --skip-serve-logs
What my custom trigger does

My trigger basically connects to a Redis database and waits for a certain key to change to a certain status.

class MyTrigger(BaseTrigger):
    
    ...
   
    async def check_slurm_state(self, redis_conn: Redis):
        """
        Checks the slurm's job state every *self.polling_interval* on the Redis database.
        """
        finished_ok = False
        final_message = ""

        while True:
            state = await self.get_sacct_output(redis_conn)

            # We check if state in SACCT_RUNNING in case it is stuck in a completed state
            if (
                self.last_known_state == state["state"]
                and state["state"] in SACCT_RUNNING
                or state["state"] == "UNKNOWN"
            ):
                await asyncio.sleep(self.polling_interval)
                continue

            # The state has changed!
            self.log.ainfo(f"Job has changed to status {state['state']}")    # I've also tried self.log.info
            await self.store_state(redis_conn, state["state"])
            is_finished, finished_ok, final_message = await self.parse_state_change(
                state["state"], state["reason"]
            )

            self.last_known_state = state["state"]

            if not is_finished:
                await asyncio.sleep(self.polling_interval)
            else:
                break

        return finished_ok, final_message

    async def run(self):
        redis_hook = RedisHook(redis_conn_id="my_redis_conn")
        conn = await redis_hook.aget_connection(redis_hook.redis_conn_id)

        redis_client = Redis(
            host=conn.host,
            port=conn.port,
            username=conn.login,
            password=None
            if str(conn.password).lower() in ["none", "false", ""]
            else conn.password,
            db=conn.extra_dejson.get("db"),
            max_connections=5,
            decode_responses=True,
        )

        async with redis_client:
            ...
            finished_ok, final_message = await self.check_slurm_state(redis_client)

        ...

        yield TriggerEvent({"finished_ok": finished_ok, "final_message": final_message)

When I saw this problem I thought that it may be due to logging. We use a custom FileTaskHandler, but have configured the trigger not to use it by setting the variable AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS to "":

default@99ee11a02435:/opt/airflow$ airflow info
Apache Airflow
version                | 3.1.7                                                         
executor               | LocalExecutor                                                 
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler           
sql_alchemy_conn       | postgresql+psycopg2://db_editor:****@db:5432/airflow
dags_folder            | /opt/airflow/dags                                             
plugins_folder         | /opt/airflow/plugins                                          
base_log_folder        | /opt/airflow/logs                                             
remote_base_log_folder |                          

This started happening when we upgraded to AF3. In Airflow 2.2 to 2.11 we didn't have this problem and used the same trigger.

What you think should happen instead?

Files / sockets should be closed when they are no longer needed for PID 7.

How to reproduce

Run Airflow in Docker and create 2 Dags: a "parent" Dag, and a "child" Dag with an ExternalTaskSensor in deferrable mode.

Access the container and check how many fd are there for each PID:

ls /proc/7/fd | wc -l
ls /proc/24/fd | wc -l

You'll see that the number of fd for PID 24 increases when the ExternalTaskSensor starts, and decreases when it finishes.
You'll also see that the number of fd for PID 7 increases when the ExternalTaskSensor starts, but never decreases.

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-common-compat==1.13.0
apache-airflow-providers-common-io==1.7.1
apache-airflow-providers-common-sql==1.30.4
apache-airflow-providers-ftp==3.14.1
apache-airflow-providers-git==0.2.2
apache-airflow-providers-http==5.6.4
apache-airflow-providers-imap==3.10.3
apache-airflow-providers-keycloak==0.5.1
apache-airflow-providers-postgres==6.5.3
apache-airflow-providers-redis==4.4.2
apache-airflow-providers-sftp==5.7.0
apache-airflow-providers-smtp==2.4.2
apache-airflow-providers-ssh==4.3.1
apache-airflow-providers-standard==1.11.0

Deployment

Docker-Compose

Deployment details

  • We use Docker Swarm.
  • We use Airflow's Docker image where we install our own provider that has our trigger, as well as some other providers.
  • We use python 3.12
  • The logs volume is mounted as an NFS volume with options nfsvers=4.2,rw,noatime,nocto,actimeo=5,nolock

Anything else?

Log

2026-02-15T08:09:13.823619Z [info     ] 33 triggers currently running  [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:738
2026-02-15T08:09:13.823792Z [info     ] 0 watchers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:738
2026-02-15T08:10:06.424254Z [warning  ] OSError while changing ownership of the log file. [Errno 24] Too many open files: '/opt/airflow/logs/dag_id=mydag/run_id=scheduled__2026-02-15T08:05:00+00:00/task_id=mytask/attempt=1.log.trigger.25760445.log' [airflow.sdk._shared.logging.structlog] loc=log.py:156
2026-02-15T08:10:06.424545Z [error    ] Exception when executing TriggerRunnerSupervisor.run [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:176
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 173, in _execute
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 532, in run
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 773, in _service_subprocess
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 1723, in cb
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 729, in _process_log_messages_from_subprocess
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 713, in get_logger
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 304, in call
  File "/usr/python/lib/python3.12/pathlib.py", line 1013, in open
OSError: [Errno 24] Too many open files: '/opt/airflow/logs/dag_id=mydag/run_id=scheduled__2026-02-15T08:05:00+00:00/task_id=mytask/attempt=1.log.trigger.25760445.log'
2026-02-15T08:10:06.425481Z [info     ] Waiting for triggers to clean up [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:179
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 6, in
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/main.py", line 55, in main
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in wrapper
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 69, in triggerer
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 72, in
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 55, in triggerer_run
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in run_job
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in execute_job
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 183, in _execute
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 696, in kill
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/init.py", line 1291, in send_signal
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/init.py", line 1261, in _send_signal
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/init.py", line 461, in _raise_if_pid_reused
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/init.py", line 639, in is_running
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/init.py", line 314, in init
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/init.py", line 347, in _init
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/init.py", line 394, in _get_ident
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_pslinux.py", line 1593, in wrapper
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_pslinux.py", line 1857, in create_time
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_pslinux.py", line 1593, in wrapper
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_common.py", line 377, in wrapper
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_common.py", line 375, in wrapper
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_pslinux.py", line 1683, in _parse_stat_file
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_common.py", line 730, in bcat
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_common.py", line 718, in cat
  File "/home/airflow/.local/lib/python3.12/site-packages/psutil/_common.py", line 682, in open_binary
OSError: [Errno 24] Too many open files: '/proc/31/stat'
--- Supervised process Last chance exception handler ---
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 1152, in run_trigger
    async for event in trigger.run():
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/standard/triggers/external_task.py", line 125, in run
    allowed_count = await get_count_func(self.allowed_states)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/standard/triggers/external_task.py", line 138, in _get_count_af_3
    count = await sync_to_async(RuntimeTaskInstance.get_ti_count)(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 504, in call
    ret = await asyncio.shield(exec_coro)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/greenback/_impl.py", line 217, in _greenback_shim
    next_send = outcome.Value((yield next_yield))
                               ^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/thread.py", line 59, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 559, in thread_handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 487, in get_ti_count
    response = SUPERVISOR_COMMS.send(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 772, in send
    return async_to_sync(self.asend)(msg)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 325, in call
    return call_result.result()
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 365, in main_wrap
    result = await awaitable
             ^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 801, in asend
    return await self._aget_response(frame.id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 787, in _aget_response
    frame = await self._aread_frame()
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 779, in _aread_frame
    length = int.from_bytes(len_bytes, byteorder="big")
                            ^^^^^^^^^
UnboundLocalError: cannot access local variable 'len_bytes' where it is not associated with a value
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 1006, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/greenback/_impl.py", line 119, in greenback_shim
    return await _greenback_shim(orig_coro, next_send)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/greenback/_impl.py", line 208, in _greenback_shim
    next_yield, resume_greenlet = resume_greenlet.switch(next_send)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/greenback/_impl.py", line 84, in trampoline
    next_yield: Any = next_send.send(orig_coro)  # type: ignore
                      ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/outcome/_impl.py", line 231, in send
    return gen.throw(self.error)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 1174, in run_trigger
    await self.log.ainfo("trigger completed", name=name)
  File "/home/airflow/.local/lib/python3.12/site-packages/structlog/_native.py", line 178, in ameth
    await asyncio.get_running_loop().run_in_executor(
  File "/home/airflow/.local/lib/python3.12/site-packages/greenback/_impl.py", line 217, in _greenback_shim
    next_send = outcome.Value((yield next_yield))
                               ^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/thread.py", line 59, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/structlog/_native.py", line 180, in
    lambda: ctx.run(
            ^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/structlog/_native.py", line 181, in
    lambda: self._proxy_to_logger(name, event, **kw)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/structlog/_base.py", line 223, in _proxy_to_logger
    return getattr(self._logger, method_name)(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/structlog/_output.py", line 321, in msg
    self._write(message + b"\n")
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 388, in _fork_main

    target()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 742, in run_in_process
    TriggerRunner().run()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 856, in run
    asyncio.run(self.arun())
  File "/usr/python/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 878, in arun
    finished_ids = await self.cleanup_finished_triggers()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 1015, in cleanup_finished_triggers
    self.log.exception(
  File "/home/airflow/.local/lib/python3.12/site-packages/structlog/_native.py", line 47, in exception
    return self.error(event, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/_shared/logging/structlog.py", line 100, in meth
    return self._proxy_to_logger(name, event % args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/structlog/_base.py", line 223, in _proxy_to_logger
    return getattr(self._logger, method_name)(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/structlog/_output.py", line 321, in msg
    self._write(message + b"\n")
BrokenPipeError: [Errno 32] Broken pipe

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions