Skip to content

Celery executor does not log to stdout for Elasticsearch #49863

@xaerocom

Description

@xaerocom

Apache Airflow version

3.0.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

After switch to Airflow 3.0.0 Celery executor is logging only into files (local filesystem) instead of standard out. As we're running Airflow in Kubernetes, we need to scrape task logs from stdout and ship them into Elastic using fluentbit.

Image

Worker container details:

Apache Airflow
version | 3.0.0
executor | CeleryExecutor
task_logging_handler | airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler
sql_alchemy_conn | postgresql+psycopg2://airflow:airflow@postgres/airflow
dags_folder | /opt/airflow/dags
plugins_folder | /opt/airflow/plugins
base_log_folder | /opt/airflow/logs
remote_base_log_folder |

System info
OS | Linux
architecture | arm
uname | uname_result(system='Linux', node='70b4cb20510b', release='6.12.13-200.fc41.aarch64', version='#1 SMP PREEMPT_DYNAMIC Sat Feb 8 20:30:50 UTC 2025', machine='aarch64')
locale | ('C', 'UTF-8')
python_version | 3.12.10 (main, Apr 9 2025, 03:37:37) [GCC 12.2.0]
python_location | /home/airflow/.local/bin/python

Tools info
git | NOT AVAILABLE
ssh | OpenSSH_9.2p1 Debian-2+deb12u5, OpenSSL 3.0.15 3 Sep 2024
kubectl | NOT AVAILABLE
gcloud | NOT AVAILABLE
cloud_sql_proxy | NOT AVAILABLE
mysql | mysql Ver 15.1 Distrib 10.11.11-MariaDB, for debian-linux-gnu (aarch64) using EditLine wrapper
sqlite3 | 3.40.1 2022-12-28 14:03:47 df5c253c0b3dd24916e4ec7cf77d3db5294cc9fd45ae7b9c5e82ad8197f3alt1
psql | psql (PostgreSQL) 17.4 (Debian 17.4-1.pgdg120+2)

Paths info
airflow_home | /opt/airflow
system_path | /root/bin:/home/airflow/.local/bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
python_path | /home/airflow/.local/bin:/usr/local/lib/python312.zip:/usr/local/lib/python3.12:/usr/local/lib/python3.12/lib-dynload:/home/airflow/.local/lib/python3.12/site-packages:/opt/airflow/conf
| ig:/opt/airflow/plugins:/opt/airflow/dags
airflow_on_path | True

Providers info
apache-airflow-providers-amazon | 9.6.1
apache-airflow-providers-celery | 3.10.6
apache-airflow-providers-cncf-kubernetes | 10.4.3
apache-airflow-providers-common-compat | 1.6.0
apache-airflow-providers-common-io | 1.5.4
apache-airflow-providers-common-messaging | 1.0.0
apache-airflow-providers-common-sql | 1.26.0
apache-airflow-providers-docker | 4.3.1
apache-airflow-providers-elasticsearch | 6.2.2
apache-airflow-providers-fab | 2.0.1
apache-airflow-providers-ftp | 3.12.3
apache-airflow-providers-git | 0.0.2
apache-airflow-providers-google | 15.1.0
apache-airflow-providers-grpc | 3.7.3
apache-airflow-providers-hashicorp | 4.1.1
apache-airflow-providers-http | 5.2.2
apache-airflow-providers-microsoft-azure | 12.3.1
apache-airflow-providers-mysql | 6.2.2
apache-airflow-providers-odbc | 4.9.2
apache-airflow-providers-openlineage | 2.2.0
apache-airflow-providers-postgres | 6.1.3
apache-airflow-providers-redis | 4.0.2
apache-airflow-providers-sendgrid | 4.0.1
apache-airflow-providers-sftp | 5.2.1
apache-airflow-providers-slack | 9.0.5
apache-airflow-providers-smtp | 2.0.3
apache-airflow-providers-snowflake | 6.2.2
apache-airflow-providers-ssh | 4.0.1
apache-airflow-providers-standard | 1.0.0

A custom logging class is not working either (no task logs in stdout)

import os
from airflow.configuration import conf
from copy import deepcopy
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER"))

LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)

REMOTE_TASK_LOG = None

ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES")
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")

ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
    "task": {
        "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
        "formatter": "airflow",
        "base_log_folder": BASE_LOG_FOLDER,
        "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
        "host": ELASTICSEARCH_HOST,
        "frontend": ELASTICSEARCH_FRONTEND,
        "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
        "write_to_es": ELASTICSEARCH_WRITE_TO_ES,
        "target_index": ELASTICSEARCH_TARGET_INDEX,
        "json_format": ELASTICSEARCH_JSON_FORMAT,
        "json_fields": ELASTICSEARCH_JSON_FIELDS,
        "host_field": ELASTICSEARCH_HOST_FIELD,
        "offset_field": ELASTICSEARCH_OFFSET_FIELD,
    },
}

LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
LOGGING_CONFIG["loggers"]["airflow.task"]["handlers"] = ["task","console"]

What you think should happen instead?

When remote logging is enabled, Elasticsearch is configured (host) and parameter AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: 'true' is set, a default logging class should redirect a task log to stdout as before (Airflow 2.x).

How to reproduce

A default docker compose running Airflow 3.0.0 with fixed JWT for api and:

AIRFLOW__LOGGING__REMOTE_LOGGING: 'true'
AIRFLOW__ELASTICSEARCH__HOST: 'http://elastic:9200'
AIRFLOW__ELASTICSEARCH__JSON_FORMAT: 'true'
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: 'true'
AIRFLOW__ELASTICSEARCH__WRITE_TO_ES: 'false'

The Celery worker is not printing a task log to stdout

Operating System

MacOS, podman + docker compose

Versions of Apache Airflow Providers

apache-airflow-providers-amazon | 9.6.1
apache-airflow-providers-celery | 3.10.6
apache-airflow-providers-cncf-kubernetes | 10.4.3
apache-airflow-providers-common-compat | 1.6.0
apache-airflow-providers-common-io | 1.5.4
apache-airflow-providers-common-messaging | 1.0.0
apache-airflow-providers-common-sql | 1.26.0
apache-airflow-providers-docker | 4.3.1
apache-airflow-providers-elasticsearch | 6.2.2
apache-airflow-providers-fab | 2.0.1
apache-airflow-providers-ftp | 3.12.3
apache-airflow-providers-git | 0.0.2
apache-airflow-providers-google | 15.1.0
apache-airflow-providers-grpc | 3.7.3
apache-airflow-providers-hashicorp | 4.1.1
apache-airflow-providers-http | 5.2.2
apache-airflow-providers-microsoft-azure | 12.3.1
apache-airflow-providers-mysql | 6.2.2
apache-airflow-providers-odbc | 4.9.2
apache-airflow-providers-openlineage | 2.2.0
apache-airflow-providers-postgres | 6.1.3
apache-airflow-providers-redis | 4.0.2
apache-airflow-providers-sendgrid | 4.0.1
apache-airflow-providers-sftp | 5.2.1
apache-airflow-providers-slack | 9.0.5
apache-airflow-providers-smtp | 2.0.3
apache-airflow-providers-snowflake | 6.2.2
apache-airflow-providers-ssh | 4.0.1
apache-airflow-providers-standard | 1.0.0

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions