Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
# under the License.
from __future__ import annotations

from collections.abc import Sequence
from collections.abc import Iterable, Sequence
from datetime import datetime, timezone
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, overload

from airflow.configuration import conf
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -281,6 +281,7 @@ def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H
poll_interval=self.poll_interval,
),
method_name="execute_complete",
kwargs=dict(files=files),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexisBRENON could you please clarify what behavior you want to achieve by this line?

If you want to put files to trigger or return it from execute_complete then it is better to add it as an attribute to CloudStorageTransferServiceCreateJobsTrigger.

)

def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
Expand Down Expand Up @@ -335,7 +336,15 @@ def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H

return job_names

def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
@overload
def execute_complete(self, context: Context, event: dict[str, Any], files: None) -> None: ...
@overload
def execute_complete(
self, context: Context, event: dict[str, Any], files: Iterable[str]
) -> list[str]: ...
def execute_complete(
self, context: Context, event: dict[str, Any], files: Iterable[str] | None = None
) -> list[str] | None:
Comment on lines +339 to +347
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexisBRENON as I understand you want to overload the execute_complete for it being possible to use the files variable from self.defer please correct me if I am wrong?

I think it is better to put files as attribute to CloudStorageTransferServiceCreateJobsTrigger and, then, return it with success event here. In that case the operator returns file names only when the transfer's operation is successful. What do you think about this idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You get it right. I tried to make the files list available in the execute_complete to be able to return it and so make it available to subsequent tasks.

Passing it through the Trigger and then the event dict may be a solution, that would avoid to change the signature of the execute_complete. But, can you explain why you think it's a better solution (which is not typed) instead of using regular kwargs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexisBRENON yes, sure

self.defer() is a method from the BaseOperator and the main purpose of this method is to say to the Operator that the execution will continue on the triggerer process using trigger code. And when execution on triggerer is finished then the code backs to the method specified in the method_name attribute and continues execution on the worker process. All attributes in the defer method are needed for start and configure deferable execution. All business related attributes are better to pass to the trigger. Also, as I see it is not a common practice in Airflow using kwargs in defer for business related attributes. I found only one operator which did it.

Because it is not a common practice and attributes in defer are not intended for business logic I think the solution to pass this attribute to trigger is better.

P.S. We can look for one more solution for this issue. We can make the s3_objects variable as a private attribute for the S3ToGCSOperator class and return it in the execute_complete method. But it should be checked that this attribute will not be None after returning to execution on worker.

"""
Return immediately and relies on trigger to throw a success event. Callback for the trigger.

Expand All @@ -345,6 +354,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info("%s completed with response %s ", self.task_id, event["message"])
return None if files is None else list(files)

def get_transfer_hook(self):
return CloudDataTransferServiceHook(
Expand Down