Skip to content

Conversation

@vatsrahul1001
Copy link
Contributor

closes: #54603

Remote logging was only working when creating connections using env var. When connections were created via Airflow UI, remote logging failed with below error

{"event":"Unable to find AWS Connection ID 'worker_remote_log_s3_conn', switching to empty.","level":"warning","logger":"airflow.task.hooks.airflow.providers.amazon.aws.hooks.s3.S3Hook","timestamp":"2025-08-18T08:15:49.647380Z"}

This PR addresses the timing issue. Also add caching
Testing

image

LOGS


base [2025-08-19T16:54:20.014+0000] {plugin.py:125} WARNING - Astro managed secrets backend is disabled
base {"timestamp":"2025-08-19T16:54:20.073378Z","level":"info","event":"Executing workload","workload":"ExecuteTask(token='eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4YzM0MC04MDNjLTdiMWEtOWIzMC0wMjU4ZDUxNTg4N2IiLCJhdWQiOiJ1cm46YWlyZmxvdy5hcGFjaGUub3JnOnRhc2siLCJuYmYiOjE3NTU2MjI0NDEsImV4cCI6MTc1NTYyMzA0MSwiaWF0IjoxNzU1NjIyNDQxfQ.HGrMjg7D9iktHmKlgigxi1jiv_aoRY4sUPjWy_BmseXFp_9ZbnJHZyaM5qACQjoJl_abH5fuJaF2rsOhLkQ12w', ti=TaskInstance(id=UUID('0198c340-803c-7b1a-9b30-0258d515887b'), task_id='runme_0', dag_id='example_bash_operator', run_id='scheduled__2025-08-19T00:00:00+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=3, executor_config=None, parent_context_carrier={}, context_carrier={}, queued_dttm=None), dag_rel_path=PurePosixPath('standard/example_bash_operator.py'), bundle_info=BundleInfo(name='example_dags', version=None), log_path='dag_id=example_bash_operator/run_id=scheduled__2025-08-19T00:00:00+00:00/task_id=runme_0/attempt=1.log', type='ExecuteTask')","logger":"__main__"}
base {"timestamp":"2025-08-19T16:54:20.450830Z","level":"info","event":"Connecting to server:","server":"http://airflow-test-api-server:8080/execution/","logger":"__main__"}
base {"timestamp":"2025-08-19T16:54:20.486691Z","level":"info","event":"Secrets backends loaded for worker","count":1,"backend_classes":["EnvironmentVariablesBackend"],"logger":"supervisor"}
base {"timestamp":"2025-08-19T16:54:20.546678","level":"info","event":"DAG bundles loaded: dags-folder, example_dags","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager"}
base {"timestamp":"2025-08-19T16:54:20.548847","level":"info","event":"Filling up the DagBag from /usr/local/lib/python3.12/site-packages/airflow/example_dags/standard/example_bash_operator.py","logger":"airflow.models.dagbag.DagBag"}
base {"timestamp":"2025-08-19T16:54:20.631434","level":"info","event":"TaskInstance Details: ","dag_id":"example_bash_operator","task_id":"runme_0","dagrun_id":"scheduled__2025-08-19T00:00:00+00:00","map_index":-1,"run_start_date":"2025-08-19T16:54:20.497016Z","try_number":1,"op_classpath":["airflow.providers.standard.operators.bash.BashOperator","airflow.models.baseoperator.BaseOperator"],"logger":"supervisor"}
base {"timestamp":"2025-08-19T16:54:20.633587Z","level":"info","event":"Task instance is in running state","chan":"stdout","logger":"supervisor"}
base {"timestamp":"2025-08-19T16:54:20.633712Z","level":"info","event":" Previous state of the Task instance: TaskInstanceState.QUEUED","chan":"stdout","logger":"supervisor"}
base {"timestamp":"2025-08-19T16:54:20.633868Z","level":"info","event":"Current task name:runme_0","chan":"stdout","logger":"supervisor"}
base {"timestamp":"2025-08-19T16:54:20.633964Z","level":"info","event":"Dag name:example_bash_operator","chan":"stdout","logger":"supervisor"}
base {"timestamp":"2025-08-19T16:54:20.633490","level":"info","event":"Tmp dir root location: /tmp","logger":"airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"}
base {"timestamp":"2025-08-19T16:54:20.633906","level":"info","event":"Running command: ['/usr/bin/bash', '-c', 'echo \"example_bash_operator__runme_0__20250819\" && sleep 1']","logger":"airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"}
base {"timestamp":"2025-08-19T16:54:20.640640","level":"info","event":"Output:","logger":"airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"}
base {"timestamp":"2025-08-19T16:54:20.668459","level":"info","event":"example_bash_operator__runme_0__20250819","logger":"airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"}
stream closed EOF for airflow-test/example-bash-operator-runme-0-qlfpxism (init-container)
base {"timestamp":"2025-08-19T16:54:21.684068","level":"info","event":"Command exited with return code 0","logger":"airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"}
base {"timestamp":"2025-08-19T16:54:21.685320","level":"info","event":"Pushing xcom","ti":"RuntimeTaskInstance(id=UUID('0198c340-803c-7b1a-9b30-0258d515887b'), task_id='runme_0', dag_id='example_bash_operator', run_id='scheduled__2025-08-19T00:00:00+00:00', try_number=1, map_index=-1, hostname='example-bash-operator-runme-0-qlfpxism', context_carrier={}, task=<Task(BashOperator): runme_0>, bundle_instance=LocalDagBundle(name=example_dags), max_tries=0, start_date=datetime.datetime(2025, 8, 19, 16, 54, 20, 497016, tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None, log_url='http://localhost:8080/dags/example_bash_operator/runs/scheduled__2025-08-19T00%3A00%3A00%2B00%3A00/tasks/runme_0?try_number=1')","logger":"task"}
base {"timestamp":"2025-08-19T16:54:21.776959Z","level":"info","event":"Task instance in success state","chan":"stdout","logger":"supervisor"}
base {"timestamp":"2025-08-19T16:54:21.777258Z","level":"info","event":" Previous state of the Task instance: TaskInstanceState.RUNNING","chan":"stdout","logger":"supervisor"}
base {"timestamp":"2025-08-19T16:54:21.777432Z","level":"info","event":"Task operator:<Task(BashOperator): runme_0>","chan":"stdout","logger":"supervisor"}
base {"event":"Connection Retrieved 'aws_default'","level":"info","logger":"airflow.hooks.base","timestamp":"2025-08-19T16:54:22.016278Z"}
base {"event":"AWS Connection (conn_id='aws_default', conn_type='aws') credentials retrieved from extra.","level":"info","logger":"airflow.providers.amazon.aws.utils.connection_wrapper.AwsConnectionWrapper","timestamp":"2025-08-19T16:54:22.017052Z"}
base {"event":"The hook_class 'airflow.providers.standard.hooks.filesystem.FSHook' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work","level":"info","logger":"airflow.providers_manager","timestamp":"2025-08-19T16:54:22.078753Z"}
base {"event":"The hook_class 'airflow.providers.standard.hooks.package_index.PackageIndexHook' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work","level":"info","logger":"airflow.providers_manager","timestamp":"2025-08-19T16:54:22.079122Z"}
base {"event":"The hook_class 'airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work","level":"info","logger":"airflow.providers_manager","timestamp":"2025-08-19T16:54:22.111117Z"}
base {"timestamp":"2025-08-19T16:54:28.070909Z","level":"info","event":"Task finished","exit_code":0,"duration":7.585227897012373,"final_state":"success","logger":"supervisor"}
stream closed EOF for airflow-test/example-bash-operator-runme-0-qlfpxism (base)
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information.
In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).

@vatsrahul1001 vatsrahul1001 marked this pull request as draft August 19, 2025 19:19
@vatsrahul1001 vatsrahul1001 marked this pull request as ready for review August 20, 2025 00:05
@dosubot

This comment was marked as spam.

@vatsrahul1001 vatsrahul1001 requested a review from ashb August 20, 2025 00:05
@ashb ashb added this to the Airflow 3.0.6 milestone Aug 20, 2025
@vatsrahul1001 vatsrahul1001 added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Aug 20, 2025
@ashb ashb merged commit b340e8c into apache:main Aug 20, 2025
77 checks passed
@ashb ashb deleted the fix-remote-logging branch August 20, 2025 09:59
github-actions bot pushed a commit that referenced this pull request Aug 20, 2025
…pervisor (#54679)

Using connections stored in the Airflow Metadata for Remote logging was partially fixed
in 3.0.4, but the connection was only set/put-in-scope when the logger was created before
task startup. However for blob stores (i.e. S3 or WASB) that is a no-op, and we needed the
connection when we try to upload too.
(cherry picked from commit b340e8c)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
@github-actions
Copy link

Backport successfully created: v3-0-test

Status Branch Result
v3-0-test PR Link

github-actions bot pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 20, 2025
…pervisor (apache#54679)

Using connections stored in the Airflow Metadata for Remote logging was partially fixed
in 3.0.4, but the connection was only set/put-in-scope when the logger was created before
task startup. However for blob stores (i.e. S3 or WASB) that is a no-op, and we needed the
connection when we try to upload too.
(cherry picked from commit b340e8c)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
ashb pushed a commit that referenced this pull request Aug 20, 2025
…pervisor (#54679) (#54720)

Using connections stored in the Airflow Metadata for Remote logging was partially fixed
in 3.0.4, but the connection was only set/put-in-scope when the logger was created before
task startup. However for blob stores (i.e. S3 or WASB) that is a no-op, and we needed the
connection when we try to upload too.
(cherry picked from commit b340e8c)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
mangal-vairalkar pushed a commit to mangal-vairalkar/airflow that referenced this pull request Aug 30, 2025
…ache#54679)

Using connections stored in the Airflow Metadata for Remote logging was partially fixed
in 3.0.4, but the connection was only set/put-in-scope when the logger was created before
task startup. However for blob stores (i.e. S3 or WASB) that is a no-op, and we needed the
connection when we try to upload too.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[3.0.4][Remote Logging] "Unable to find AWS Connection ID" under KubernetesExecuotor configuration

2 participants