Skip to content

Conversation

@carlinix
Copy link
Contributor

This PR fixes two inconsistencies between deferrable and non-deferrable execution modes of the S3KeySensor:

  1. Task context (context) is lost when resuming from the trigger.
  2. metadata_keys is ignored — the deferrable trigger (S3KeyTrigger) only returns object names (list[str]) instead of metadata dictionaries.

Both behaviors cause user-defined check_fn functions to fail or behave inconsistently between execution modes.

Reproduction

Use the following DAG:

from __future__ import annotations
from datetime import datetime, UTC
from typing import Any
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sdk.execution_time.context import get_current_context

def ensure_datetime(value, fallback):
    if not value:
        return fallback
    if isinstance(value, datetime):
        return value
    return datetime.fromisoformat(value)

def check_new_objects(files: list[dict[str, Any]], **context: Any) -> bool:
    ti = context.get("ti") or get_current_context().get("ti")
    extractor = ti.xcom_pull(task_ids="start_task", key="extractor") or {}

    print(type(files), files)
    print(type(context), context)
    return True

check_s3 = S3KeySensor(
    task_id="check_new_s3_objects",
    bucket_name="example-bucket",
    bucket_key="example-prefix/*",
    aws_conn_id="aws_default",
    wildcard_match=True,
    metadata_keys=["Key", "LastModified", "Size", "ETag"],
    poke_interval=30,
    timeout=300,
    deferrable=True,
    check_fn=check_new_objects,
)

Observed logs:

<class 'list'> ['XYZ YTD Export.csv', 'XYZ_Weekly_Export.csv']
<class 'dict'> {}
  • files is a list of strings, and context is empty.
    If deferrable=False, both contain full metadata and context as expected.

Root Cause

  1. Context loss:
    S3KeySensor.execute_complete() currently calls:
    found_keys = self.check_fn(event["files"])

Instead of passing context like _check_key() does:

if any(param.kind == inspect.Parameter.VAR_KEYWORD for param in signature.parameters.values()):
    return self.check_fn(files, **context)
  1. Metadata loss:

S3KeyTrigger emits only:
TriggerEvent({"status": "running", "files": keys}) where keys is a list[str] from get_files_async(), ignoring metadata_keys.

Fixes Introduced

  • Fix 1: Pass task context properly to check_fn in execute_complete() by replicating the signature inspection logic from _check_key().

  • Fix 2: Add metadata_keys support to S3KeyTrigger, allowing metadata retrieval (list_objects_v2 or head_object) for deferrable mode.

Expected Behavior

Both modes (deferrable=True and deferrable=False) now behave consistently:

  • check_fn always receives files as a list of metadata dictionaries.
  • context always includes task instance and DAG context values (ti, dag_run, etc.).

metadata_keys=["*"] returns full head_object() data.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Oct 20, 2025
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 20, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@carlinix carlinix marked this pull request as draft October 20, 2025 22:55
@eladkal eladkal requested a review from vincbeck October 21, 2025 07:48
@vincbeck
Copy link
Contributor

I agree though, some test to cover that would be nice

@carlinix carlinix marked this pull request as ready for review October 21, 2025 17:07
@carlinix carlinix marked this pull request as draft October 21, 2025 17:12
@carlinix carlinix marked this pull request as ready for review October 21, 2025 17:22
@carlinix
Copy link
Contributor Author

@vincbeck , anything else needed from my side, or should I just wait? Tnx

@vincbeck
Copy link
Contributor

Can you please rebase? CI is failing due to bugs in main I think

Fix bug loosing context when defereble is True
Refactor S3 trigger to handle metadata extraction for files.
Added metadata_keys parameter to S3KeyTrigger to specify attributes to gather from head_object.
Added metadata handling to S3KeyTrigger tests.
Refactor S3 trigger tests for clarity and functionality.
Refactor test_run_with_all_metadata to include check_key_async mock and improve async handling.
@vincbeck
Copy link
Contributor

Static checks are failing. Can you please fix them? You can find more information in the documentation.

@vincbeck
Copy link
Contributor

vincbeck commented Nov 3, 2025

Static checks are still failing. Have you run prek to fix them? Please read the documentation I linked you

@carlinix
Copy link
Contributor Author

carlinix commented Nov 3, 2025

@vincbeck

Static checks are still failing. Have you run prek to fix them? Please read the documentation I linked you

prek --hook-stage manual mypy-providers --all-files
Run mypy for providers (manual)..........................................Passed

@vincbeck
Copy link
Contributor

vincbeck commented Nov 3, 2025

Static checks are now succeeding but one test is failing:

FAILED providers/amazon/tests/unit/amazon/aws/triggers/test_s3.py::TestS3KeyTrigger::test_run_success - AssertionError: assert equals failed

@carlinix
Copy link
Contributor Author

carlinix commented Nov 3, 2025

@vincbeck

Static checks are now succeeding but one test is failing:

FAILED providers/amazon/tests/unit/amazon/aws/triggers/test_s3.py::TestS3KeyTrigger::test_run_success - AssertionError: assert equals failed

Fixed.

@carlinix
Copy link
Contributor Author

carlinix commented Nov 4, 2025

@vincbeck , from what I saw in the logs, the errors now don’t seem to be related to my changes.

@vincbeck vincbeck merged commit 3bb724f into apache:main Nov 4, 2025
82 checks passed
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 4, 2025

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

@vincbeck
Copy link
Contributor

vincbeck commented Nov 4, 2025

Merged! Nice job :)

Copilot AI pushed a commit to jason810496/airflow that referenced this pull request Dec 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants