From ea87473bbb53f088c237d3cc59991edee722791e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 19 May 2024 21:41:16 -0400 Subject: [PATCH 1/3] Disable soft delete policy when creating new default bucket. --- sdks/python/apache_beam/io/gcp/gcsio.py | 12 +++++- .../io/gcp/gcsio_integration_test.py | 38 +++++++++++++++++++ sdks/python/apache_beam/io/gcp/gcsio_test.py | 32 ++++++++++++++++ sdks/python/setup.py | 2 +- 4 files changed, 81 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 73ff697127d3..e56dc5af5f7e 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -162,12 +162,20 @@ def get_bucket(self, bucket_name): except NotFound: return None - def create_bucket(self, bucket_name, project, kms_key=None, location=None): + def create_bucket( + self, + bucket_name, + project, + kms_key=None, + location=None, + soft_delete_retention_duration_seconds=0): """Create and return a GCS bucket in a specific project.""" try: + bucket = self.client.bucket(bucket_name) + bucket.soft_delete_policy.retention_duration_seconds = soft_delete_retention_duration_seconds bucket = self.client.create_bucket( - bucket_or_name=bucket_name, + bucket_or_name=bucket, project=project, location=location, ) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index a50145d84cae..8b5393899f0e 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -32,10 +32,12 @@ import unittest import uuid +import mock import pytest from apache_beam.io.filesystems import FileSystems from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.options.pipeline_options import GoogleCloudOptions try: from apache_beam.io.gcp import gcsio @@ -141,6 +143,42 @@ def test_batch_copy_and_delete(self): self.assertFalse( result[1], 're-delete should not throw error: %s' % result[1]) + @pytest.mark.it_postcommit + @mock.patch('apache_beam.io.gcp.gcsio.default_gcs_bucket_name') + def test_create_default_bucket(self, mock_default_gcs_bucket_name): + google_cloud_options = self.test_pipeline.options.view_as( + GoogleCloudOptions) + # overwrite kms option here, because get_or_create_default_gcs_bucket() + # requires this option unset. + google_cloud_options.dataflow_kms_key = None + + import random + from hashlib import md5 + # Add a random number to avoid collision if multiple test instances + # are run at the same time. To avoid too many dangling buckets if bucket removal fails, + # we limit the max number of possible bucket names in this test to 1000. + overridden_bucket_name = 'gcsio-it-%d-%s-%s' % ( + random.randint(0, 999), + google_cloud_options.region, + md5(google_cloud_options.project.encode('utf8')).hexdigest()) + + mock_default_gcs_bucket_name.return_value = overridden_bucket_name + + # remove the existing bucket with the same name as the default bucket + existing_bucket = self.gcsio.get_bucket(overridden_bucket_name) + if existing_bucket: + existing_bucket.delete() + + bucket = gcsio.get_or_create_default_gcs_bucket(google_cloud_options) + self.assertIsNotNone(bucket) + + # verify soft delete policy is disabled by default in the default bucket after + # creation + self.assertEqual(bucket.soft_delete_policy.retention_duration_seconds, 0) + bucket.delete() + + self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name)) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 3572f2ffc021..eafc5ed227a3 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -567,6 +567,38 @@ def test_headers(self, mock_get_service_credentials, mock_do_request): self.assertIn(beam_user_agent, actual_headers['User-Agent']) self.assertEqual(actual_headers['x-goog-custom-audit-job'], 'test-job-name') + @mock.patch('google.cloud._http.JSONConnection._do_request') + @mock.patch('apache_beam.internal.gcp.auth.get_service_credentials') + def test_create_default_bucket( + self, mock_get_service_credentials, mock_do_request): + from apache_beam.internal.gcp.auth import _ApitoolsCredentialsAdapter + mock_get_service_credentials.return_value = _ApitoolsCredentialsAdapter( + _make_credentials("test-project")) + + gcs = gcsio.GcsIO(pipeline_options={"job_name": "test-job-name"}) + # no HTTP request when initializing GcsIO + mock_do_request.assert_not_called() + + import requests + response = requests.Response() + response.status_code = 200 + mock_do_request.return_value = response + + # The function of create_bucket() is supposed to send only one HTTP request + gcs.create_bucket("test-bucket", "test-project") + mock_do_request.assert_called_once() + call_args = mock_do_request.call_args[0] + + # Request data is specified as the fourth argument of + # google.cloud._http.JSONConnection._do_request + actual_request_data = call_args[3] + + import json + request_data_json = json.loads(actual_request_data) + # verify soft delete policy is disabled by default in the bucket creation request + self.assertEqual( + request_data_json['softDeletePolicy']['retentionDurationSeconds'], 0) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 852f14117d8a..4a0738c92bb3 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -437,7 +437,7 @@ def get_portability_package_data(): 'google-cloud-datastore>=2.0.0,<3', 'google-cloud-pubsub>=2.1.0,<3', 'google-cloud-pubsublite>=1.2.0,<2', - 'google-cloud-storage>=2.14.0,<3', + 'google-cloud-storage>=2.16.0,<3', # GCP packages required by tests 'google-cloud-bigquery>=2.0.0,<4', 'google-cloud-bigquery-storage>=2.6.3,<3', From ddfdbc8bd11a430881306d668f01e8e38a86a851 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 19 May 2024 22:17:42 -0400 Subject: [PATCH 2/3] Fix lints --- sdks/python/apache_beam/io/gcp/gcsio.py | 3 ++- .../apache_beam/io/gcp/gcsio_integration_test.py | 11 ++++++----- sdks/python/apache_beam/io/gcp/gcsio_test.py | 3 ++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index e56dc5af5f7e..e6af866c349b 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -173,7 +173,8 @@ def create_bucket( try: bucket = self.client.bucket(bucket_name) - bucket.soft_delete_policy.retention_duration_seconds = soft_delete_retention_duration_seconds + bucket.soft_delete_policy.retention_duration_seconds = ( + soft_delete_retention_duration_seconds) bucket = self.client.create_bucket( bucket_or_name=bucket, project=project, diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 8b5393899f0e..fa565e1c431a 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -36,8 +36,8 @@ import pytest from apache_beam.io.filesystems import FileSystems -from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.testing.test_pipeline import TestPipeline try: from apache_beam.io.gcp import gcsio @@ -155,8 +155,9 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): import random from hashlib import md5 # Add a random number to avoid collision if multiple test instances - # are run at the same time. To avoid too many dangling buckets if bucket removal fails, - # we limit the max number of possible bucket names in this test to 1000. + # are run at the same time. To avoid too many dangling buckets if bucket + # removal fails, we limit the max number of possible bucket names in this + # test to 1000. overridden_bucket_name = 'gcsio-it-%d-%s-%s' % ( random.randint(0, 999), google_cloud_options.region, @@ -172,8 +173,8 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): bucket = gcsio.get_or_create_default_gcs_bucket(google_cloud_options) self.assertIsNotNone(bucket) - # verify soft delete policy is disabled by default in the default bucket after - # creation + # verify soft delete policy is disabled by default in the default bucket + # after creation self.assertEqual(bucket.soft_delete_policy.retention_duration_seconds, 0) bucket.delete() diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index eafc5ed227a3..b17e0638d6b5 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -595,7 +595,8 @@ def test_create_default_bucket( import json request_data_json = json.loads(actual_request_data) - # verify soft delete policy is disabled by default in the bucket creation request + # verify soft delete policy is disabled by default in the bucket creation + # request self.assertEqual( request_data_json['softDeletePolicy']['retentionDurationSeconds'], 0) From 7d1e62171b50a83bfecd229f416e7e7f7c828162 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 21 May 2024 09:44:22 -0400 Subject: [PATCH 3/3] Add a check on returned bucket name. --- sdks/python/apache_beam/io/gcp/gcsio_integration_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index fa565e1c431a..ed4dd7e401ec 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -172,6 +172,7 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): bucket = gcsio.get_or_create_default_gcs_bucket(google_cloud_options) self.assertIsNotNone(bucket) + self.assertEqual(bucket.name, overridden_bucket_name) # verify soft delete policy is disabled by default in the default bucket # after creation