Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,12 @@ class GCSToGCSOperator(BaseOperator):
source_objects=['sales/sales-2017/january.avro'],
destination_bucket='data_backup',
destination_object='copied_sales/2017/january-backup.avro',
exact_match=True,
gcp_conn_id=google_cloud_conn_id
)

The following Operator would copy all the Avro files from ``sales/sales-2017``
folder (i.e. with names starting with that prefix) in ``data`` bucket to the
folder (i.e. all files with names starting with that prefix) in ``data`` bucket to the
``copied_sales/2017`` folder in the ``data_backup`` bucket. ::

copy_files = GCSToGCSOperator(
Expand All @@ -135,7 +136,7 @@ class GCSToGCSOperator(BaseOperator):
)

The following Operator would move all the Avro files from ``sales/sales-2017``
folder (i.e. with names starting with that prefix) in ``data`` bucket to the
folder (i.e. all files with names starting with that prefix) in ``data`` bucket to the
same folder in the ``data_backup`` bucket, deleting the original files in the
process. ::

Expand Down Expand Up @@ -314,9 +315,11 @@ def _copy_source_without_wildcard(self, hook, prefix):
"""
List all files in source_objects, copy files to destination_object, and rename each source file.

For source_objects with no wildcard, this operator would first list all
files in source_objects, using provided delimiter if any. Then copy files
from source_objects to destination_object and rename each source file.
For source_objects with no wildcard, this operator would first list
all files in source_objects, using provided delimiter if any. Then copy
files from source_objects to destination_object and rename each source
file. Note that if the flag exact_match=False, then each item in the source_objects
(or source_object itself) will be considered as a prefix for the source objects search.

Example 1:

Expand Down Expand Up @@ -366,6 +369,22 @@ def _copy_source_without_wildcard(self, hook, prefix):
destination_object='b/',
gcp_conn_id=google_cloud_conn_id
)

Example 4:

The following Operator would copy files corresponding to the prefix 'a/foo.txt'
(a/foo.txt, a/foo.txt.abc, a/foo.txt/subfolder/file.txt) in ``data`` bucket to
the ``b/`` folder in the ``data_backup`` bucket
(b/foo.txt, b/foo.txt.abc, b/foo.txt/subfolder/file.txt) ::

copy_files = GCSToGCSOperator(
task_id='copy_files_without_wildcard',
source_bucket='data',
source_object='a/foo.txt',
destination_bucket='data_backup',
destination_object='b/',
gcp_conn_id=google_cloud_conn_id
)
"""
objects = hook.list(
self.source_bucket, prefix=prefix, delimiter=self.delimiter, match_glob=self.match_glob
Expand All @@ -390,11 +409,10 @@ def _copy_source_without_wildcard(self, hook, prefix):
msg = f"{prefix} does not exist in bucket {self.source_bucket}"
self.log.warning(msg)
raise AirflowException(msg)

if len(objects) == 1 and objects[0][-1] != "/":
self._copy_file(hook=hook, source_object=objects[0])
elif len(objects):
self._copy_directory(hook=hook, source_objects=objects, prefix=prefix)
self._copy_multiple_objects(hook=hook, source_objects=objects, prefix=prefix)

def _copy_file(self, hook, source_object):
destination_object = self.destination_object or source_object
Expand All @@ -405,15 +423,25 @@ def _copy_file(self, hook, source_object):
hook=hook, source_object=source_object, destination_object=destination_object
)

def _copy_directory(self, hook, source_objects, prefix):
_prefix = prefix.rstrip("/") + "/"
def _copy_multiple_objects(self, hook, source_objects, prefix):
# Check whether the prefix is a root directory for all the rest of objects.
_pref = prefix.rstrip("/")
is_directory = prefix.endswith("/") or all(
[obj.replace(_pref, "", 1).startswith("/") for obj in source_objects]
)

if is_directory:
base_path = prefix.rstrip("/") + "/"
else:
base_path = prefix[0 : prefix.rfind("/") + 1] if "/" in prefix else ""

for source_obj in source_objects:
if not self._check_exact_match(source_obj, prefix):
continue
if self.destination_object is None:
destination_object = source_obj
else:
file_name_postfix = source_obj.replace(_prefix, "", 1)
file_name_postfix = source_obj.replace(base_path, "", 1)
destination_object = self.destination_object.rstrip("/") + "/" + file_name_postfix

self._copy_single_object(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ Copy single file
----------------

The following example would copy a single file, ``OBJECT_1`` from the ``BUCKET_1_SRC`` GCS bucket to the ``BUCKET_1_DST`` bucket.
Note that if the flag ``exact_match=False`` then the ``source_object`` will be considered as a prefix for search objects
in the ``BUCKET_1_SRC`` GCS bucket. That's why if any will be found, they will be copied as well. To prevent this from
happening, please use ``exact_match=False``.

.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
:language: python
Expand Down Expand Up @@ -165,6 +168,9 @@ Move single file
----------------

Supplying ``True`` to the ``move`` argument causes the operator to delete ``source_object`` once the copy is complete.
Note that if the flag ``exact_match=False`` then the ``source_object`` will be considered as a prefix for search objects
in the ``BUCKET_1_SRC`` GCS bucket. That's why if any will be found, they will be copied as well. To prevent this from
happening, please use ``exact_match=False``.

.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
:language: python
Expand Down
171 changes: 171 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,3 +656,174 @@ def test_execute_source_object_required_flag_true(self, mock_hook):
AirflowException, match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not exist in bucket {TEST_BUCKET}"
):
operator.execute(None)

@pytest.mark.parametrize(
"existing_objects, source_object, match_glob, exact_match, expected_source_objects, "
"expected_destination_objects",
[
(["source/foo.txt"], "source/foo.txt", None, True, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/foo.txt", None, False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source", None, False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/", None, False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/*", None, False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/foo.*", None, False, ["source/foo.txt"], ["{prefix}/txt"]),
(["source/foo.txt"], "source/", "**/foo*", False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/", "**/foo.txt", False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/foo.txt",
None,
True,
["source/foo.txt"],
["{prefix}/foo.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/foo.txt",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/*",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/foo.*",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/txt", "{prefix}/txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/",
"**/foo*",
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/",
"**/foo.txt",
False,
["source/foo.txt"],
["{prefix}/foo.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/foo.txt",
None,
True,
["source/foo.txt"],
["{prefix}/foo.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/foo.txt",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/*",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/foo.*",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/txt", "{prefix}/txt.abc", "{prefix}/txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/",
"**/foo*",
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/",
"**/foo.txt",
False,
["source/foo.txt"],
["{prefix}/foo.txt"],
),
],
)
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_copy_files_into_a_folder(
self,
mock_hook,
existing_objects,
source_object,
match_glob,
exact_match,
expected_source_objects,
expected_destination_objects,
):
mock_hook.return_value.list.return_value = existing_objects
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=source_object,
destination_bucket=DESTINATION_BUCKET,
destination_object=DESTINATION_OBJECT_PREFIX + "/",
exact_match=exact_match,
match_glob=match_glob,
)
operator.execute(None)

mock_calls = [
mock.call(TEST_BUCKET, src, DESTINATION_BUCKET, dst.format(prefix=DESTINATION_OBJECT_PREFIX))
for src, dst in zip(expected_source_objects, expected_destination_objects)
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)
2 changes: 2 additions & 0 deletions tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used
exact_match=True,
)
# [END howto_operator_gcs_to_gcs_single_file]

Expand Down Expand Up @@ -201,6 +202,7 @@
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
exact_match=True,
move_object=True,
)
# [END howto_operator_gcs_to_gcs_single_file_move]
Expand Down