-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
No response
Apache Airflow version
3.0.2
Operating System
linux
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
I was trying to replicate the behavior in #52869 and noticed something interesting.
When executed with deferable = True no files are passed to the S3KeySensor by a check_fn, even though they exist in S3. If deferable = False, the correct files in the bucket will be passed. I used the following function:
def check_key_not_present(files: list, **kwargs) -> bool:
print(f"Files received by check_fn: {files}")
return len(files) == 0
These are the two tasks:
deferrable_sensor = S3KeySensor(
task_id='deferrable_sensor',
bucket_name='#########################',
bucket_key='scripts/extract-raw-data-job.py',
poke_interval=10,
timeout=60,
check_fn=check_key_not_present,
deferrable=True,
mode="poke",
aws_conn_id='aws_default',
dag=dag,
trigger_rule='none_skipped'
)
non_deferrable_sensor = S3KeySensor(
task_id='non_deferrable_sensor',
bucket_name='######################',
bucket_key='scripts/extract-raw-data-job.py',
poke_interval=10,
timeout=60,
check_fn=check_key_not_present,
deferrable=False,
mode="poke",
aws_conn_id='aws_default',
dag=dag,
trigger_rule='none_skipped'
)
Logs of non_deferrable_sensor:
[2025-07-04, 20:24:09] INFO - Files received by check_fn: [{'Size': 363, 'Key': None}]: chan="stdout": source="task"
Logs of the deferrable_sensor:
[2025-07-04, 20:23:53] INFO - Files received by check_fn: []: chan="stdout": source="task"
After my adjustments:
INFO - Files received by check_fn: [{'Key': 'xxxxxxxxxxx', 'LastModified': DateTime(2025, 7, 5, 8, 59, 14, tzinfo=Timezone('UTC')), 'ETag': '"9fe1b02003ad4a0a062ff7df14b36d54"', 'ChecksumAlgorithm': ['CRC64NVME'], 'ChecksumType': 'FULL_OBJECT', 'Size': 209, 'StorageClass': 'STANDARD'}]
The problem lies here:
async def get_files_async(
self,
client: AioBaseClient,
bucket: str,
bucket_keys: str | list[str],
wildcard_match: bool,
delimiter: str | None = "/",
) -> list[Any]:
"""Get a list of files in the bucket."""
keys: list[Any] = []
for key in bucket_keys:
prefix = key
if wildcard_match:
prefix = re.split(r"[\[*?]", key, 1)[0]
paginator = client.get_paginator("list_objects_v2")
params = {
"Bucket": bucket,
"Prefix": prefix,
"Delimiter": delimiter,
}
Why dont we check whether bucket_keys is a single string or a list? Is this somehow intentional? If bucket_keys is a single string: for key in bucket_keys will split it into individual chars.
I am opening this issue instead of a PR, to validate that this is not actually intended and I am missing something. Because this test makes me question whether this is intended behavior?
What you think should happen instead
No response
How to reproduce
Create a Dag with two S3 KeySensors, one deferrable and one not and pass a custom check_fn.
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct