Skip to content

Remote logging to Cloudwatch fails after the first task is logged. #50802

@emiliadecaudin

Description

@emiliadecaudin

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow==3.0.1
apache-airflow-core==3.0.1
apache-airflow-providers-amazon==9.7.0
apache-airflow-providers-celery==3.10.6
apache-airflow-providers-cncf-kubernetes==10.4.3
apache-airflow-providers-common-compat==1.6.1
apache-airflow-providers-common-io==1.5.4
apache-airflow-providers-common-messaging==1.0.1
apache-airflow-providers-common-sql==1.27.0
apache-airflow-providers-docker==4.3.1
apache-airflow-providers-elasticsearch==6.2.2
apache-airflow-providers-fab==2.0.2
apache-airflow-providers-ftp==3.12.3
apache-airflow-providers-git==0.0.2
apache-airflow-providers-google==15.1.0
apache-airflow-providers-grpc==3.7.3
apache-airflow-providers-hashicorp==4.1.1
apache-airflow-providers-http==5.2.2
apache-airflow-providers-microsoft-azure==12.3.1
apache-airflow-providers-mysql==6.2.2
apache-airflow-providers-odbc==4.9.2
apache-airflow-providers-openlineage==2.2.0
apache-airflow-providers-postgres==6.1.3
apache-airflow-providers-redis==4.0.2
apache-airflow-providers-sendgrid==4.0.1
apache-airflow-providers-sftp==5.2.1
apache-airflow-providers-slack==9.0.5
apache-airflow-providers-smtp==2.0.3
apache-airflow-providers-snowflake==6.3.0
apache-airflow-providers-ssh==4.0.1
apache-airflow-providers-standard==1.1.0
apache-airflow-task-sdk==1.0.1

Apache Airflow version

3.0.1

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Docker-Compose

Deployment details

Docker deployment based on the docker-compose.yaml provided in the Airflow documentation. A Dockerfile is provided to install a custom provider into the image. I use the following environment variables (defined in docker-compose.yaml) to control logging:

    AIRFLOW__LOGGING__LOGGING_LEVEL: ${AIRFLOW__LOGGING__LOGGING_LEVEL:-INFO}
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "cloudwatch://arn:aws:logs:us-east-1:XXX:log-group:XXX"
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "aws_default"
    AIRFLOW__LOGGING__REMOTE_LOGGING: true

What happened

I switched my remote logging destination from an s3 bucket to a Cloudwatch log group. When running a single-task DAG to test if logs are being written correctly, the first invocation works, whereas the second and subsequent invocations fail to actually write the logs to Cloudwatch. This is first reflected by a header in the log panel on the web interface reporting that the specified log stream doesn't exist (I confirmed it doesn't upon checking Cloudwatch itself). If I run a DAG with multiple tasks, the first task writes its log successfully, while the subsequent tasks fail to do so. Further invocations of the DAG result in no tasks successfully uploading their logs to Cloudwatch. Lastly, dag_processer log streams are created and updated sporadically.

What you think should happen instead

Logs should be written to Cloudwatch after the completion of every task, and upon every beat of the dag processor.

How to reproduce

I was able to reproduce this error using a separate and "minimal" new environment with the following docker-compose.yml file:

  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.1}
  environment: &airflow-common-env
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__CORE__DEFAULT_TIMEZONE: "America/New_York"
    AIRFLOW__CORE__EXECUTION_API_SERVER_URL: "http://airflow-apiserver:8080/execution/"
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__LOAD_EXAMPLES: "true"
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__LOGGING__LOGGING_LEVEL: ${AIRFLOW__LOGGING__LOGGING_LEVEL:-INFO}
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "cloudwatch://arn:aws:logs:us-east-1:XXX:log-group:XXX"
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "aws_default"
    AIRFLOW__LOGGING__REMOTE_LOGGING: "true"
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
    AIRFLOW_CONFIG: /opt/airflow/config/airflow.cfg
  volumes:
    - ./config:/opt/airflow/config
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on: &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: [ "CMD", "pg_isready", "-U", "airflow" ]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always

  redis:
    # Redis is limited to 7.2-bookworm due to licencing change
    # https://redis.io/blog/redis-adopts-dual-source-available-licensing/
    image: redis:7.2-bookworm
    expose:
      - 6379
    healthcheck:
      test: [ "CMD", "redis-cli", "ping" ]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always

  airflow-apiserver:
    <<: *airflow-common
    command: api-server
    ports:
      - "8080:8080"
    healthcheck:
      test: [ "CMD", "curl", "--fail", "http://localhost:8080/api/v2/version" ]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: [ "CMD", "curl", "--fail", "http://localhost:8974/health" ]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-dag-processor:
    <<: *airflow-common
    command: dag-processor
    healthcheck:
      test: [ "CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"' ]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      # yamllint disable rule:line-length
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    environment:
      <<: *airflow-common-env
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-apiserver:
        condition: service_healthy
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: [ "CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"' ]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        /entrypoint airflow version

    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: "true"
      _AIRFLOW_WWW_USER_CREATE: "true"
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
    user: "0:0"

volumes:
  postgres-db-volume:

Anything else

I am happy to share DEBUG level logs from the worker process if requested.

This is the IAM policy of the AWS user:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:DescribeLogStreams",
                "logs:GetLogEvents",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

I'm not sure what the fix is, but if folks can help figure out what the problem is, I'm happy to learn how to contribute the fix.

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:UIRelated to UI/UX. For Frontend Developers.area:providerskind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions