Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdks/python: enable recursive deletion for GCSFileSystem Paths #33611

Merged
merged 4 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ def delete(self, paths):

for path in paths:
if path.endswith('/'):
path_to_use = path + '*'
self._gcsIO().delete(path, recursive=True)
continue
else:
path_to_use = path
match_result = self.match([path_to_use])[0]
Expand Down
25 changes: 23 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,37 @@ def open(
else:
raise ValueError('Invalid file open mode: %s.' % mode)

def delete(self, path):
def delete(self, path, recursive=False):
"""Deletes the object at the given GCS path.

If the path is a directory (prefix), it deletes all blobs under that prefix
when recursive=True.

Args:
path: GCS file path pattern in the form gs://<bucket>/<name>.
recursive (bool, optional): If True, deletes all objects under the prefix
when the path is a directory (default: False).
"""
bucket_name, blob_name = parse_gcs_path(path)
bucket = self.client.bucket(bucket_name)
if recursive:
# List and delete all blobs under the prefix.
blobs = bucket.list_blobs(prefix=blob_name)
for blob in blobs:
self._delete_blob(bucket, blob.name)
else:
# Delete only the specific blob.
self._delete_blob(bucket, blob_name)

def _delete_blob(self, bucket, blob_name):
"""Helper method to delete a single blob from GCS.

Args:
bucket: The GCS bucket object.
blob_name: The name of the blob to delete under the bucket.
"""
if self._use_blob_generation:
# blob can be None if not found
# Fetch blob generation if required.
blob = bucket.get_blob(blob_name, retry=self._storage_client_retry)
generation = getattr(blob, "generation", None)
else:
Expand Down
29 changes: 28 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ def delete_blob(self, name, **kwargs):
if name in bucket.blobs:
del bucket.blobs[name]

def list_blobs(self, prefix=None, **kwargs):
bucket = self._get_canonical_bucket()
return self.client.list_blobs(bucket, prefix, **kwargs)


class FakeBlob(object):
def __init__(
Expand Down Expand Up @@ -445,20 +449,43 @@ def test_bad_file_modes(self):
self.gcs.open(file_name, 'r+b')

def test_delete(self):
# File path.
file_name = 'gs://gcsio-test/delete_me'
file_size = 1024
bucket_name, blob_name = gcsio.parse_gcs_path(file_name)

# Test deletion of non-existent file.
bucket = self.client.get_bucket(bucket_name)
self.gcs.delete(file_name)

# Insert a random file for testing.
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(blob_name in bucket.blobs)

# Deleting the file.
self.gcs.delete(file_name)

self.assertFalse(blob_name in bucket.blobs)

# Now test deleting a directory (prefix) with multiple files.
prefix = 'gs://gcsio-test/directory_to_delete/'
file_names = [f"{prefix}file1", f"{prefix}file2", f"{prefix}file3"]
blobs = [gcsio.parse_gcs_path(file_name) for file_name in file_names]

# Insert random files under the prefix.
for file_name in file_names:
self._insert_random_file(self.client, file_name, file_size)

# Verify the files exist before deletion
for blob in blobs:
self.assertTrue(blob[1] in bucket.blobs)

# Deleting the directory (all files under the prefix).
self.gcs.delete(prefix, recursive=True)

# Verify that the files are deleted.
for blob in blobs:
self.assertFalse(blob[1] in bucket.blobs)

def test_copy(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
Expand Down
Loading