Skip to content

Commit

Permalink
Add copy_object functionality for wasbhook (#43037)
Browse files Browse the repository at this point in the history
* Add copy_object functionality for wasbhook

* Add test for copy_blobs

* Static check fixes

* Pytest fix

* Update test

* Static check fixes
  • Loading branch information
kunaljubce authored Nov 1, 2024
1 parent 32d4b63 commit 1f7a58a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
27 changes: 27 additions & 0 deletions providers/src/airflow/providers/microsoft/azure/hooks/wasb.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,33 @@ def delete_blobs(self, container_name: str, *blobs, **kwargs) -> None:
self._get_container_client(container_name).delete_blobs(*blobs, **kwargs)
self.log.info("Deleted blobs: %s", blobs)

def copy_blobs(
self,
source_container_name: str,
source_blob_name: str,
destination_container_name: str,
destination_blob_name: str,
) -> None:
"""
Copy the specified blobs from one blob prefix to another.
:param source_container_name: The name of the source container containing the blobs.
:param source_blob_name: The full source blob path without the container name.
:param destination_container_name: The name of the destination container where the blobs
will be copied to.
:param destination_blob_name: The full destination blob path without the container name.
"""
source_blob_client = self._get_blob_client(
container_name=source_container_name, blob_name=source_blob_name
)
source_blob_url = source_blob_client.url

destination_blob_client = self._get_blob_client(
container_name=destination_container_name, blob_name=destination_blob_name
)

destination_blob_client.start_copy_from_url(source_blob_url)

def delete_file(
self,
container_name: str,
Expand Down
27 changes: 27 additions & 0 deletions providers/tests/microsoft/azure/hooks/test_wasb.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,33 @@ def test_delete_more_than_256_blobs(self, mock_check, mock_get_blobslist, mock_d
# `ContainerClient.delete_blobs()` in this test.
assert mock_delete_blobs.call_count == 2

@mock.patch.object(WasbHook, "_get_blob_client")
def test_copy_blobs(self, mock_get_blob_client):
# Arrange
hook = WasbHook(wasb_conn_id=self.azure_shared_key_test)
source_container_name = "source-container"
source_blob_name = "source-blob"
destination_container_name = "destination-container"
destination_blob_name = "destination-blob"

# Mock the blob clients
mock_source_blob_client = mock.MagicMock()
mock_destination_blob_client = mock.MagicMock()
mock_get_blob_client.side_effect = [mock_source_blob_client, mock_destination_blob_client]

# Mock the URL of the source blob
mock_source_blob_client.url = "https://source-url"

hook.copy_blobs(
source_container_name, source_blob_name, destination_container_name, destination_blob_name
)

mock_get_blob_client.assert_any_call(container_name=source_container_name, blob_name=source_blob_name)
mock_get_blob_client.assert_any_call(
container_name=destination_container_name, blob_name=destination_blob_name
)
mock_destination_blob_client.start_copy_from_url.assert_called_once_with("https://source-url")

@mock.patch.object(WasbHook, "get_blobs_list")
@mock.patch.object(WasbHook, "check_for_blob")
def test_delete_nonexisting_blob_fails(self, mock_check, mock_getblobs, mocked_blob_service_client):
Expand Down

0 comments on commit 1f7a58a

Please sign in to comment.