Skip to content

Comments

In S3ToGcsOperator return the list of copied files even in deferrable mode#49768

Open
AlexisBRENON wants to merge 2 commits intoapache:mainfrom
AlexisBRENON:s3_to_gcs_deferrable_return_value
Open

In S3ToGcsOperator return the list of copied files even in deferrable mode#49768
AlexisBRENON wants to merge 2 commits intoapache:mainfrom
AlexisBRENON:s3_to_gcs_deferrable_return_value

Conversation

@AlexisBRENON
Copy link
Contributor

This operator exhibited a different behavior between deferrable and non-deferrable mode.
If the first one, it returned the list of the copied files, while in the later it just returned None making it barely usable for subsequent tasks.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Apr 25, 2025
Copy link
Contributor

@kiran2706 kiran2706 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: make execute_complete files parameter optional to maintain compatibility

The execute_complete method signature was changed to require a 'files' argument,
which caused TypeError in deferrable mode and broke provider compatibility tests.

This commit updates execute_complete to accept 'files' as an optional parameter
(defaulting to None) to ensure backward compatibility and fix the test failures.

Also updates relevant tests to handle the optional parameter properly.

@AlexisBRENON AlexisBRENON force-pushed the s3_to_gcs_deferrable_return_value branch from c9f8fa5 to 30c74c7 Compare July 11, 2025 10:08
@dejii
Copy link
Contributor

dejii commented Aug 8, 2025

I think that it's fine to just use the S3Hook in subsequent tasks instead. Something like:

hook = S3Hook(aws_conn_id='aws_default')
s3_objects = hook.list_keys(
    bucket_name='s3-bucket-name',
    prefix='some-prefix/',
)

I'm suggesting that because the behaviour won't be consistent if the triggerer encountered an interruption that caused a restart i.e files will be copied but the return statement ends up being None

return None if files is None else list(files)

@AlexisBRENON
Copy link
Contributor Author

You're suggested solution does not cover my use case where files are always copied to the same prefix.
This imply that I need to check the creation time of the files also.
Actually I don't know how the deffered operators behave if the triggerrer is restarted during the deferring.

@dejii
Copy link
Contributor

dejii commented Aug 8, 2025

You're suggested solution does not cover my use case where files are always copied to the same prefix. This imply that I need to check the creation time of the files also.

I don’t think checking the creation time is necessary. The list of objects to be copied is determined by S3ListOperator.execute, so as long as you use the same prefix used in the S3ToGCSOperator, you should get the same result across subsequent tasks

# use the super method to list all the files in an S3 bucket/key
s3_objects = super().execute(context)

Actually I don't know how the deffered operators behave if the triggerrer is restarted during the deferring.

It's not restarted during deferral, but it’s designed to be stateless and resilient to restarts. To preserve that statelessness with your proposed solution, you'd need to serialize the list of objects—which might not be ideal, as it could consume significant space in the metadata database.

@AlexisBRENON
Copy link
Contributor Author

I don’t think checking the creation time is necessary. The list of objects to be copied is determined by S3ListOperator.execute, so as long as you use the same prefix used in the S3ToGCSOperator, you should get the same result across subsequent tasks

I'm sorry, I don't get your point.
I have a source buckets, where client puts files everyday (all files at the root of the bucket).
I need to copy these files to a GCS bucket, following the same (non-)hierarchy.

So on day 1, there is file1 in the source bucket, I copy it to the destination bucket, and so I want the operator to return [file1] (this is the new file to process today).

On day 2, client pushed file2 and so the source bucket contains both (file1 and file2). I use this operator to sync buckets, hence copying file2 in destination bucket. The only new file to process today is file2 and so I want to be able to know that. That's why I expect this operator to return the list of copied files.

If I use any of the (S3|Gcs)ListOperator it will return the list of all the files in the bucket. Filtering on the creation time may allow to get the files copied by a specific dag run.

It's not restarted during deferral, but it’s designed to be stateless and resilient to restarts. To preserve that statelessness with your proposed solution, you'd need to serialize the list of objects—which might not be ideal, as it could consume significant space in the metadata database.

I understand that storing a long list of copied files may not be ideal (however, in the non-deferred setup, the list of copied files is stored anyway as the XCom value).

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 10, 2025
@AlexisBRENON
Copy link
Contributor Author

ping @dejii

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 17, 2025
@shahar1 shahar1 self-requested a review January 24, 2026 13:36
Copy link
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response, this PR went under the radar (I'm now going all over open GCP related PRs).
Overall LGTM, but could you please add a unit test? Also, it would be great if you could test it on a real GCP instance - please let me know if you can't do it for any reason.

CC: @VladaZakharova @MaksYermak

@shahar1 shahar1 changed the title fix: return the list of copied files even in deferrable mode In S3ToGcsOperator return the list of copied files even in deferrable mode Jan 24, 2026
poll_interval=self.poll_interval,
),
method_name="execute_complete",
kwargs=dict(files=files),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexisBRENON could you please clarify what behavior you want to achieve by this line?

If you want to put files to trigger or return it from execute_complete then it is better to add it as an attribute to CloudStorageTransferServiceCreateJobsTrigger.

Comment on lines +339 to +347
@overload
def execute_complete(self, context: Context, event: dict[str, Any], files: None) -> None: ...
@overload
def execute_complete(
self, context: Context, event: dict[str, Any], files: Iterable[str]
) -> list[str]: ...
def execute_complete(
self, context: Context, event: dict[str, Any], files: Iterable[str] | None = None
) -> list[str] | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexisBRENON as I understand you want to overload the execute_complete for it being possible to use the files variable from self.defer please correct me if I am wrong?

I think it is better to put files as attribute to CloudStorageTransferServiceCreateJobsTrigger and, then, return it with success event here. In that case the operator returns file names only when the transfer's operation is successful. What do you think about this idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You get it right. I tried to make the files list available in the execute_complete to be able to return it and so make it available to subsequent tasks.

Passing it through the Trigger and then the event dict may be a solution, that would avoid to change the signature of the execute_complete. But, can you explain why you think it's a better solution (which is not typed) instead of using regular kwargs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexisBRENON yes, sure

self.defer() is a method from the BaseOperator and the main purpose of this method is to say to the Operator that the execution will continue on the triggerer process using trigger code. And when execution on triggerer is finished then the code backs to the method specified in the method_name attribute and continues execution on the worker process. All attributes in the defer method are needed for start and configure deferable execution. All business related attributes are better to pass to the trigger. Also, as I see it is not a common practice in Airflow using kwargs in defer for business related attributes. I found only one operator which did it.

Because it is not a common practice and attributes in defer are not intended for business logic I think the solution to pass this attribute to trigger is better.

P.S. We can look for one more solution for this issue. We can make the s3_objects variable as a private attribute for the S3ToGCSOperator class and return it in the execute_complete method. But it should be checked that this attribute will not be None after returning to execution on worker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants