diff --git a/airflow/providers/google/cloud/example_dags/example_gcs.py b/airflow/providers/google/cloud/example_dags/example_gcs.py deleted file mode 100644 index 1834ddccc0686..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_gcs.py +++ /dev/null @@ -1,249 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG for Google Cloud Storage operators. -""" - -import os -from datetime import datetime -from tempfile import gettempdir - -from airflow import models -from airflow.operators.bash import BashOperator -from airflow.providers.google.cloud.operators.gcs import ( - GCSBucketCreateAclEntryOperator, - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSDeleteObjectsOperator, - GCSFileTransformOperator, - GCSListObjectsOperator, - GCSObjectCreateAclEntryOperator, -) -from airflow.providers.google.cloud.sensors.gcs import ( - GCSObjectExistenceSensor, - GCSObjectsWithPrefixExistenceSensor, - GCSObjectUpdateSensor, - GCSUploadSessionCompleteSensor, -) -from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator -from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator -from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator - -START_DATE = datetime(2021, 1, 1) - -PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id") -BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket") -GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers") -GCS_ACL_BUCKET_ROLE = "OWNER" -GCS_ACL_OBJECT_ROLE = "OWNER" - -BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2") - -temp_dir_path = gettempdir() -PATH_TO_TRANSFORM_SCRIPT = os.getenv( - "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py") -) -PATH_TO_UPLOAD_FILE = os.getenv( - "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt") -) -PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-") -PATH_TO_SAVED_FILE = os.getenv( - "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt") -) - -BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1] - -# Upload 'test-gcs-manual-example-upload.txt' manually in the after triggering the DAG. -PATH_TO_MANUAL_UPLOAD_FILE = os.getenv( - "GCP_GCS_PATH_TO_MANUAL_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-manual-example-upload.txt") -) -BUCKET_MANUAL_UPLOAD_FILE_LOCATION = PATH_TO_MANUAL_UPLOAD_FILE.rpartition("/")[-1] -PATH_TO_MANUAL_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_MANUAL_UPLOAD_FILE_PREFIX", "test-gcs-manual-") - -with models.DAG( - "example_gcs", - start_date=START_DATE, - catchup=False, - schedule_interval='@once', - tags=['example'], -) as dag: - create_bucket1 = GCSCreateBucketOperator( - task_id="create_bucket1", - bucket_name=BUCKET_1, - project_id=PROJECT_ID, - resource={ - "iamConfiguration": { - "uniformBucketLevelAccess": { - "enabled": False, - }, - }, - }, - ) - - create_bucket2 = GCSCreateBucketOperator( - task_id="create_bucket2", bucket_name=BUCKET_2, project_id=PROJECT_ID - ) - - list_buckets = GCSListObjectsOperator(task_id="list_buckets", bucket=BUCKET_1) - - list_buckets_result = BashOperator( - task_id="list_buckets_result", - bash_command=f"echo {list_buckets.output}", - ) - - upload_file = LocalFilesystemToGCSOperator( - task_id="upload_file", - src=PATH_TO_UPLOAD_FILE, - dst=BUCKET_FILE_LOCATION, - bucket=BUCKET_1, - ) - - transform_file = GCSFileTransformOperator( - task_id="transform_file", - source_bucket=BUCKET_1, - source_object=BUCKET_FILE_LOCATION, - transform_script=["python", PATH_TO_TRANSFORM_SCRIPT], - ) - # [START howto_operator_gcs_bucket_create_acl_entry_task] - gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator( - bucket=BUCKET_1, - entity=GCS_ACL_ENTITY, - role=GCS_ACL_BUCKET_ROLE, - task_id="gcs_bucket_create_acl_entry_task", - ) - # [END howto_operator_gcs_bucket_create_acl_entry_task] - - # [START howto_operator_gcs_object_create_acl_entry_task] - gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator( - bucket=BUCKET_1, - object_name=BUCKET_FILE_LOCATION, - entity=GCS_ACL_ENTITY, - role=GCS_ACL_OBJECT_ROLE, - task_id="gcs_object_create_acl_entry_task", - ) - # [END howto_operator_gcs_object_create_acl_entry_task] - - # [START howto_operator_gcs_download_file_task] - download_file = GCSToLocalFilesystemOperator( - task_id="download_file", - object_name=BUCKET_FILE_LOCATION, - bucket=BUCKET_1, - filename=PATH_TO_SAVED_FILE, - ) - # [END howto_operator_gcs_download_file_task] - - copy_file = GCSToGCSOperator( - task_id="copy_file", - source_bucket=BUCKET_1, - source_object=BUCKET_FILE_LOCATION, - destination_bucket=BUCKET_2, - destination_object=BUCKET_FILE_LOCATION, - ) - - delete_files = GCSDeleteObjectsOperator( - task_id="delete_files", bucket_name=BUCKET_1, objects=[BUCKET_FILE_LOCATION] - ) - - # [START howto_operator_gcs_delete_bucket] - delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket_1", bucket_name=BUCKET_1) - delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket_2", bucket_name=BUCKET_2) - # [END howto_operator_gcs_delete_bucket] - - [create_bucket1, create_bucket2] >> list_buckets >> list_buckets_result - [create_bucket1, create_bucket2] >> upload_file - upload_file >> [download_file, copy_file] - upload_file >> gcs_bucket_create_acl_entry_task >> gcs_object_create_acl_entry_task >> delete_files - - create_bucket1 >> delete_bucket_1 - create_bucket2 >> delete_bucket_2 - create_bucket2 >> copy_file - create_bucket1 >> copy_file - list_buckets >> delete_bucket_1 - upload_file >> delete_bucket_1 - create_bucket1 >> upload_file >> delete_bucket_1 - upload_file >> transform_file >> delete_bucket_1 - gcs_bucket_create_acl_entry_task >> delete_bucket_1 - gcs_object_create_acl_entry_task >> delete_bucket_1 - download_file >> delete_bucket_1 - copy_file >> delete_bucket_1 - copy_file >> delete_bucket_2 - delete_files >> delete_bucket_1 - - -with models.DAG( - "example_gcs_sensors", - start_date=START_DATE, - catchup=False, - schedule_interval='@once', - tags=['example'], -) as dag2: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", bucket_name=BUCKET_1, project_id=PROJECT_ID - ) - upload_file = LocalFilesystemToGCSOperator( - task_id="upload_file", - src=PATH_TO_UPLOAD_FILE, - dst=BUCKET_FILE_LOCATION, - bucket=BUCKET_1, - ) - # [START howto_sensor_object_exists_task] - gcs_object_exists = GCSObjectExistenceSensor( - bucket=BUCKET_1, - object=BUCKET_FILE_LOCATION, - mode='poke', - task_id="gcs_object_exists_task", - ) - # [END howto_sensor_object_exists_task] - # [START howto_sensor_object_with_prefix_exists_task] - gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor( - bucket=BUCKET_1, - prefix=PATH_TO_UPLOAD_FILE_PREFIX, - mode='poke', - task_id="gcs_object_with_prefix_exists_task", - ) - # [END howto_sensor_object_with_prefix_exists_task] - - # [START howto_sensor_gcs_upload_session_complete_task] - gcs_upload_session_complete = GCSUploadSessionCompleteSensor( - bucket=BUCKET_1, - prefix=PATH_TO_MANUAL_UPLOAD_FILE_PREFIX, - inactivity_period=60, - min_objects=1, - allow_delete=True, - previous_objects=set(), - task_id="gcs_upload_session_complete_task", - ) - # [END howto_sensor_gcs_upload_session_complete_task] - - # [START howto_sensor_object_update_exists_task] - gcs_update_object_exists = GCSObjectUpdateSensor( - bucket=BUCKET_1, - object=BUCKET_MANUAL_UPLOAD_FILE_LOCATION, - task_id="gcs_object_update_sensor_task", - ) - # [END howto_sensor_object_update_exists_task] - - delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1) - - create_bucket >> upload_file >> [gcs_object_exists, gcs_object_with_prefix_exists] >> delete_bucket - create_bucket >> gcs_upload_session_complete >> gcs_update_object_exists >> delete_bucket - - -if __name__ == '__main__': - dag.clear() - dag.run() diff --git a/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py b/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py deleted file mode 100644 index 6a36f3ea3add4..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py +++ /dev/null @@ -1,65 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG for Google Cloud Storage time-span file transform operator. -""" - -import os -from datetime import datetime - -from airflow import models -from airflow.providers.google.cloud.operators.gcs import GCSTimeSpanFileTransformOperator -from airflow.utils.state import State - -SOURCE_BUCKET = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket") -SOURCE_PREFIX = "gcs_timespan_file_transform_source" -SOURCE_GCP_CONN_ID = "google_cloud_default" -DESTINATION_BUCKET = SOURCE_BUCKET -DESTINATION_PREFIX = "gcs_timespan_file_transform_destination" -DESTINATION_GCP_CONN_ID = "google_cloud_default" - -PATH_TO_TRANSFORM_SCRIPT = os.environ.get( - 'GCP_GCS_PATH_TO_TRANSFORM_SCRIPT', 'test_gcs_timespan_transform_script.py' -) - - -with models.DAG( - "example_gcs_timespan_file_transform", - start_date=datetime(2021, 1, 1), - catchup=False, - schedule_interval='@once', - tags=['example'], -) as dag: - - # [START howto_operator_gcs_timespan_file_transform_operator_Task] - gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator( - task_id="gcs_timespan_transform_files", - source_bucket=SOURCE_BUCKET, - source_prefix=SOURCE_PREFIX, - source_gcp_conn_id=SOURCE_GCP_CONN_ID, - destination_bucket=DESTINATION_BUCKET, - destination_prefix=DESTINATION_PREFIX, - destination_gcp_conn_id=DESTINATION_GCP_CONN_ID, - transform_script=["python", PATH_TO_TRANSFORM_SCRIPT], - ) - # [END howto_operator_gcs_timespan_file_transform_operator_Task] - - -if __name__ == '__main__': - dag.clear(dag_run_state=State.NONE) - dag.run() diff --git a/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py deleted file mode 100644 index f2d4b6e77a52d..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +++ /dev/null @@ -1,152 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG for Google Cloud Storage to Google Cloud Storage transfer operators. -""" - -import os -from datetime import datetime - -from airflow import models -from airflow.providers.google.cloud.operators.gcs import GCSSynchronizeBucketsOperator -from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator - -BUCKET_1_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sync-1-src") -BUCKET_1_DST = os.environ.get("GCP_GCS_BUCKET_1_DST", "test-gcs-sync-1-dst") - -BUCKET_2_SRC = os.environ.get("GCP_GCS_BUCKET_2_SRC", "test-gcs-sync-2-src") -BUCKET_2_DST = os.environ.get("GCP_GCS_BUCKET_2_DST", "test-gcs-sync-2-dst") - -BUCKET_3_SRC = os.environ.get("GCP_GCS_BUCKET_3_SRC", "test-gcs-sync-3-src") -BUCKET_3_DST = os.environ.get("GCP_GCS_BUCKET_3_DST", "test-gcs-sync-3-dst") - -OBJECT_1 = os.environ.get("GCP_GCS_OBJECT_1", "test-gcs-to-gcs-1") -OBJECT_2 = os.environ.get("GCP_GCS_OBJECT_2", "test-gcs-to-gcs-2") - -with models.DAG( - "example_gcs_to_gcs", - schedule_interval='@once', - start_date=datetime(2021, 1, 1), - catchup=False, - tags=['example'], -) as dag: - # [START howto_synch_bucket] - sync_bucket = GCSSynchronizeBucketsOperator( - task_id="sync_bucket", source_bucket=BUCKET_1_SRC, destination_bucket=BUCKET_1_DST - ) - # [END howto_synch_bucket] - - # [START howto_synch_full_bucket] - sync_full_bucket = GCSSynchronizeBucketsOperator( - task_id="sync_full_bucket", - source_bucket=BUCKET_1_SRC, - destination_bucket=BUCKET_1_DST, - delete_extra_files=True, - allow_overwrite=True, - ) - # [END howto_synch_full_bucket] - - # [START howto_synch_to_subdir] - sync_to_subdirectory = GCSSynchronizeBucketsOperator( - task_id="sync_to_subdirectory", - source_bucket=BUCKET_1_SRC, - destination_bucket=BUCKET_1_DST, - destination_object="subdir/", - ) - # [END howto_synch_to_subdir] - - # [START howto_sync_from_subdir] - sync_from_subdirectory = GCSSynchronizeBucketsOperator( - task_id="sync_from_subdirectory", - source_bucket=BUCKET_1_SRC, - source_object="subdir/", - destination_bucket=BUCKET_1_DST, - ) - # [END howto_sync_from_subdir] - - # [START howto_operator_gcs_to_gcs_single_file] - copy_single_file = GCSToGCSOperator( - task_id="copy_single_gcs_file", - source_bucket=BUCKET_1_SRC, - source_object=OBJECT_1, - destination_bucket=BUCKET_1_DST, # If not supplied the source_bucket value will be used - destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used - ) - # [END howto_operator_gcs_to_gcs_single_file] - - # [START howto_operator_gcs_to_gcs_wildcard] - copy_files_with_wildcard = GCSToGCSOperator( - task_id="copy_files_with_wildcard", - source_bucket=BUCKET_1_SRC, - source_object="data/*.txt", - destination_bucket=BUCKET_1_DST, - destination_object="backup/", - ) - # [END howto_operator_gcs_to_gcs_wildcard] - - # [START howto_operator_gcs_to_gcs_without_wildcard] - copy_files_without_wildcard = GCSToGCSOperator( - task_id="copy_files_without_wildcard", - source_bucket=BUCKET_1_SRC, - source_object="subdir/", - destination_bucket=BUCKET_1_DST, - destination_object="backup/", - ) - # [END howto_operator_gcs_to_gcs_without_wildcard] - - # [START howto_operator_gcs_to_gcs_delimiter] - copy_files_with_delimiter = GCSToGCSOperator( - task_id="copy_files_with_delimiter", - source_bucket=BUCKET_1_SRC, - source_object="data/", - destination_bucket=BUCKET_1_DST, - destination_object="backup/", - delimiter='.txt', - ) - # [END howto_operator_gcs_to_gcs_delimiter] - - # [START howto_operator_gcs_to_gcs_list] - copy_files_with_list = GCSToGCSOperator( - task_id="copy_files_with_list", - source_bucket=BUCKET_1_SRC, - source_objects=[OBJECT_1, OBJECT_2], # Instead of files each element could be a wildcard expression - destination_bucket=BUCKET_1_DST, - destination_object="backup/", - ) - # [END howto_operator_gcs_to_gcs_list] - - # [START howto_operator_gcs_to_gcs_single_file_move] - move_single_file = GCSToGCSOperator( - task_id="move_single_file", - source_bucket=BUCKET_1_SRC, - source_object=OBJECT_1, - destination_bucket=BUCKET_1_DST, - destination_object="backup_" + OBJECT_1, - move_object=True, - ) - # [END howto_operator_gcs_to_gcs_single_file_move] - - # [START howto_operator_gcs_to_gcs_list_move] - move_files_with_list = GCSToGCSOperator( - task_id="move_files_with_list", - source_bucket=BUCKET_1_SRC, - source_objects=[OBJECT_1, OBJECT_2], - destination_bucket=BUCKET_1_DST, - destination_object="backup/", - ) - # [END howto_operator_gcs_to_gcs_list_move] diff --git a/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py b/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py deleted file mode 100644 index 50f62d23e00e0..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py +++ /dev/null @@ -1,43 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import os -from datetime import datetime - -from airflow import models -from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator - -PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id") -BUCKET = os.environ.get("GCP_GCS_BUCKET", "test-gcs-example-bucket") - -PATH_TO_REMOTE_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example-remote.txt") -PATH_TO_LOCAL_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-local.txt") - -with models.DAG( - "example_gcs_to_local", - schedule_interval='@once', - start_date=datetime(2021, 1, 1), - catchup=False, - tags=['example'], -) as dag: - # [START howto_operator_gcs_download_file_task] - download_file = GCSToLocalFilesystemOperator( - task_id="download_file", - object_name=PATH_TO_REMOTE_FILE, - bucket=BUCKET, - filename=PATH_TO_LOCAL_FILE, - ) - # [END howto_operator_gcs_download_file_task] diff --git a/airflow/providers/google/cloud/example_dags/example_local_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_local_to_gcs.py deleted file mode 100644 index 68740befbcd03..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_local_to_gcs.py +++ /dev/null @@ -1,45 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import os -from datetime import datetime - -from airflow import models -from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator - -# [START howto_gcs_environment_variables] -BUCKET_NAME = os.environ.get('GCP_GCS_BUCKET', 'example-bucket-name') -PATH_TO_UPLOAD_FILE = os.environ.get('GCP_GCS_PATH_TO_UPLOAD_FILE', 'example-text.txt') -DESTINATION_FILE_LOCATION = os.environ.get('GCP_GCS_DESTINATION_FILE_LOCATION', 'example-text.txt') -# [END howto_gcs_environment_variables] - -with models.DAG( - 'example_local_to_gcs', - schedule_interval='@once', - start_date=datetime(2021, 1, 1), - catchup=False, - tags=['example'], -) as dag: - # [START howto_operator_local_filesystem_to_gcs] - upload_file = LocalFilesystemToGCSOperator( - task_id="upload_file", - src=PATH_TO_UPLOAD_FILE, - dst=DESTINATION_FILE_LOCATION, - bucket=BUCKET_NAME, - ) - # [END howto_operator_local_filesystem_to_gcs] diff --git a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst index 54a7d4ba787b3..487d786aa2ed3 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst @@ -60,7 +60,7 @@ The time span is defined by the time span's start and end timestamps. If a DAG does not have a *next* DAG instance scheduled, the time span end infinite, meaning the operator processes all files older than ``data_interval_start``. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_transform_timespan.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_timespan_file_transform_operator_Task] @@ -80,7 +80,7 @@ For parameter definition, take a look at Using the operator """""""""""""""""" -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_acl.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_bucket_create_acl_entry_task] @@ -114,7 +114,7 @@ For parameter definition, take a look at Using the operator """""""""""""""""" -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_acl.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_object_create_acl_entry_task] @@ -145,7 +145,7 @@ Deleting Bucket allows you to remove bucket object from the Google Cloud Storage It is performed through the :class:`~airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator` operator. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_upload_download.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_delete_bucket] @@ -174,7 +174,7 @@ GCSObjectExistenceSensor Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor` to wait (poll) for the existence of a file in Google Cloud Storage. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_sensor.py :language: python :dedent: 4 :start-after: [START howto_sensor_object_exists_task] @@ -187,7 +187,7 @@ GCSObjectsWithPrefixExistenceSensor Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor` to wait (poll) for the existence of a file with a specified prefix in Google Cloud Storage. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_sensor.py :language: python :dedent: 4 :start-after: [START howto_sensor_object_with_prefix_exists_task] @@ -200,7 +200,7 @@ GCSUploadSessionCompleteSensor Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor` to check for a change in the number of files with a specified prefix in Google Cloud Storage. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_sensor.py :language: python :dedent: 4 :start-after: [START howto_sensor_gcs_upload_session_complete_task] @@ -213,7 +213,7 @@ GCSObjectUpdateSensor Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor` to check if an object is updated in Google Cloud Storage. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_sensor.py :language: python :dedent: 4 :start-after: [START howto_sensor_object_update_exists_task] diff --git a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst index f190786b68477..12ba4ce7607a9 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst @@ -94,7 +94,7 @@ Copy single file The following example would copy a single file, ``OBJECT_1`` from the ``BUCKET_1_SRC`` GCS bucket to the ``BUCKET_1_DST`` bucket. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_to_gcs_single_file] @@ -105,7 +105,7 @@ Copy multiple files There are several ways to copy multiple files, various examples of which are presented following. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_to_gcs_wildcard] @@ -115,7 +115,7 @@ The ``source_object`` value may contain one wild card, denoted as "*". All files be copied. In this example, all root level files ending with ``.txt`` in ``BUCKET_1_SRC`` will be copied to the ``data`` folder in ``BUCKET_1_DST``, with file names unchanged. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_to_gcs_delimiter] @@ -127,7 +127,7 @@ Then copy files from source_objects to destination_object and rename each source The following example would copy all the files in ``subdir/`` folder (i.e subdir/a.csv, subdir/b.csv, subdir/c.csv) from the ``BUCKET_1_SRC`` GCS bucket to the ``backup/`` folder in ``BUCKET_1_DST`` bucket. (i.e backup/a.csv, backup/b.csv, backup/c.csv) -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_to_gcs_without_wildcard] @@ -137,7 +137,7 @@ The delimiter filed may be specified to select any source files starting with `` value supplied to ``delimiter``. This example uses the ``delimiter`` value to implement the same functionality as the prior example. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_to_gcs_list] @@ -153,7 +153,7 @@ Move single file Supplying ``True`` to the ``move`` argument causes the operator to delete ``source_object`` once the copy is complete. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_to_gcs_single_file_move] @@ -165,7 +165,7 @@ Move multiple files Multiple files may be moved by supplying ``True`` to the ``move`` argument. The same rules concerning wild cards and the ``delimiter`` argument apply to moves as well as copies. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_to_gcs_list_move] @@ -198,7 +198,7 @@ The following example will ensure all files in ``BUCKET_1_SRC``, including any i ``BUCKET_1_DST``. It will not overwrite identically named files in ``BUCKET_1_DST`` if they already exist. It will not delete any files in ``BUCKET_1_DST`` not in ``BUCKET_1_SRC``. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_synch_bucket] @@ -211,7 +211,7 @@ This example will ensure all files in ``BUCKET_1_SRC``, including any in subdire ``BUCKET_1_DST``. It will overwrite identically named files in ``BUCKET_1_DST`` if they already exist. It will delete any files in ``BUCKET_1_DST`` not in ``BUCKET_1_SRC``. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_synch_full_bucket] @@ -224,7 +224,7 @@ The following example will ensure all files in ``BUCKET_1_SRC``, including any i ``subdir`` folder in ``BUCKET_1_DST``. It will not overwrite identically named files in ``BUCKET_1_DST/subdir`` if they already exist and it will not delete any files in ``BUCKET_1_DST/subdir`` not in ``BUCKET_1_SRC``. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_synch_to_subdir] @@ -237,7 +237,7 @@ This example will ensure all files in ``BUCKET_1_SRC/subdir``, including any in in ``BUCKET_1_DST``. It will not overwrite identically named files in ``BUCKET_1_DST`` if they already exist and it will not delete any files in ``BUCKET_1_DST`` not in ``BUCKET_1_SRC/subdir``. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_gcs.py :language: python :dedent: 4 :start-after: [START howto_sync_from_subdir] diff --git a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_local.rst b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_local.rst index 20ce446f3b0ac..a64a5dfa8ca1a 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_local.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_local.rst @@ -37,7 +37,7 @@ data from GCS to local filesystem. Below is an example of using this operator to download a file from GCS. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_local.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_upload_download.py :language: python :dedent: 0 :start-after: [START howto_operator_gcs_download_file_task] diff --git a/docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst index 0e2dcc1a2815a..429e7d93506b9 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst @@ -38,7 +38,7 @@ When you use this operator, you can optionally compress the data being uploaded. Below is an example of using this operator to upload a file to GCS. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_local_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_upload_download.py :language: python :dedent: 0 :start-after: [START howto_operator_local_filesystem_to_gcs] diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index bd26944233297..546c4920cea4e 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -213,8 +213,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): 'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDeidentifyTemplateOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPListDLPJobsOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator', - 'airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor', - 'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor', } def test_missing_example_for_operator(self): diff --git a/tests/providers/google/cloud/operators/test_gcs_system.py b/tests/providers/google/cloud/operators/test_gcs_system.py deleted file mode 100644 index ff22e8fe3c5a2..0000000000000 --- a/tests/providers/google/cloud/operators/test_gcs_system.py +++ /dev/null @@ -1,70 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import pytest - -from tests.providers.google.cloud.operators.test_gcs_system_helper import GcsSystemTestHelper -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.fixture(scope="module") -def helper(): - return GcsSystemTestHelper() - - -@pytest.fixture -def file_to_upload(helper): - helper.create_file_to_upload() - yield - helper.remove_file_to_upload() - - -@pytest.fixture -def script_to_transform(helper): - helper.create_script_to_transform() - yield - helper.remove_script_to_transform() - - -@pytest.fixture -def saved_file(helper): - # file is created by operator inside DAG - yield - helper.remove_saved_file() - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_GCS_KEY) -class GoogleCloudStorageExampleDagsTest(GoogleSystemTest): - @provide_gcp_context(GCP_GCS_KEY) - def setUp(self): - super().setUp() - - @provide_gcp_context(GCP_GCS_KEY) - def tearDown(self): - super().tearDown() - - @provide_gcp_context(GCP_GCS_KEY) - @pytest.mark.usefixtures("file_to_upload", "script_to_transform", "saved_file") - def test_run_example_dag(self): - self.run_dag('example_gcs', CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_GCS_KEY) - @pytest.mark.usefixtures("file_to_upload") - def test_run_example_gcs_sensor_dag(self): - self.run_dag('example_gcs_sensors', CLOUD_DAG_FOLDER) diff --git a/tests/providers/google/cloud/operators/test_gcs_system_helper.py b/tests/providers/google/cloud/operators/test_gcs_system_helper.py deleted file mode 100644 index 1807c1e5a8a36..0000000000000 --- a/tests/providers/google/cloud/operators/test_gcs_system_helper.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import os - -from airflow.providers.google.cloud.example_dags.example_gcs import ( - BUCKET_1, - BUCKET_2, - PATH_TO_SAVED_FILE, - PATH_TO_TRANSFORM_SCRIPT, - PATH_TO_UPLOAD_FILE, -) -from tests.test_utils.logging_command_executor import CommandExecutor - - -class GcsSystemTestHelper(CommandExecutor): - @staticmethod - def create_file_to_upload(): - with open(PATH_TO_UPLOAD_FILE, "w+") as file: - file.writelines(["This is a test file"]) - - @staticmethod - def create_script_to_transform(): - with open(PATH_TO_TRANSFORM_SCRIPT, "w+") as file: - file.write( - """import sys -source = sys.argv[1] -destination = sys.argv[2] - -print('running script') -with open(source, "r") as src, open(destination, "w+") as dest: - lines = [l.upper() for l in src.readlines()] - print(lines) - dest.writelines(lines) - """ - ) - - @staticmethod - def remove_file_to_upload(): - os.remove(PATH_TO_UPLOAD_FILE) - - @staticmethod - def remove_script_to_transform(): - os.remove(PATH_TO_TRANSFORM_SCRIPT) - - @staticmethod - def remove_saved_file(): - os.remove(PATH_TO_SAVED_FILE) - - def remove_bucket(self): - self.execute_cmd(["gsutil", "rm", "-r", f"gs://{BUCKET_1}"]) - self.execute_cmd(["gsutil", "rm", "-r", f"gs://{BUCKET_2}"]) diff --git a/tests/providers/google/cloud/operators/test_gcs_timespan_file_transform_system.py b/tests/providers/google/cloud/operators/test_gcs_timespan_file_transform_system.py deleted file mode 100644 index f52dd1a729883..0000000000000 --- a/tests/providers/google/cloud/operators/test_gcs_timespan_file_transform_system.py +++ /dev/null @@ -1,113 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import os -from tempfile import NamedTemporaryFile - -import pytest - -from airflow.providers.google.cloud.example_dags.example_gcs_timespan_file_transform import ( - DESTINATION_BUCKET, - DESTINATION_PREFIX, - PATH_TO_TRANSFORM_SCRIPT, - SOURCE_BUCKET, - SOURCE_PREFIX, -) -from tests.providers.google.cloud.operators.test_gcs_system_helper import GcsSystemTestHelper -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.credential_file(GCP_GCS_KEY) -class GCSTimespanFileTransformExampleDagsTest(GoogleSystemTest): - helper = GcsSystemTestHelper() - testfile_content = ["This is a test file"] - - @provide_gcp_context(GCP_GCS_KEY) - def setUp(self): - super().setUp() - - # 1. Create a bucket - self.execute_cmd(["gsutil", "mb", f"gs://{SOURCE_BUCKET}"]) - - # 2. Create a file to be processed and upload to source bucket using with source prefix - with NamedTemporaryFile() as source_file: - with open(source_file.name, "w+") as file: - file.writelines(self.testfile_content) - self.helper.execute_cmd( - [ - "gsutil", - "cp", - source_file.name, - f"gs://{SOURCE_BUCKET}/{SOURCE_PREFIX}/test.txt", - ] - ) - - # 3. Create test.py file that processes the file - with open(PATH_TO_TRANSFORM_SCRIPT, "w+") as file: - file.write( - """import sys -from pathlib import Path -source = sys.argv[1] -destination = sys.argv[2] -timespan_start = sys.argv[3] -timespan_end = sys.argv[4] - -print(sys.argv) -print(f'running script, called with source: {source}, destination: {destination}') -print(f'timespan_start: {timespan_start}, timespan_end: {timespan_end}') - -with open(Path(destination) / "output.txt", "w+") as dest: - for f in Path(source).glob("**/*"): - if f.is_dir(): - continue - with open(f) as src: - lines = [line.upper() for line in src.readlines()] - print(lines) - dest.writelines(lines) - """ - ) - - @provide_gcp_context(GCP_GCS_KEY) - def tearDown(self): - # 1. Delete test.py file - os.remove(PATH_TO_TRANSFORM_SCRIPT) - - # 2. Delete bucket - self.execute_cmd(["gsutil", "rm", "-r", f"gs://{SOURCE_BUCKET}"]) - - super().tearDown() - - @provide_gcp_context(GCP_GCS_KEY) - def test_run_example_dag(self): - # Run DAG - self.run_dag('example_gcs_timespan_file_transform', CLOUD_DAG_FOLDER) - - # Download file and verify content. - with NamedTemporaryFile() as dest_file: - - self.helper.execute_cmd( - [ - "gsutil", - "cp", - f"gs://{DESTINATION_BUCKET}/{DESTINATION_PREFIX}/output.txt", - dest_file.name, - ] - ) - with open(dest_file.name) as file: - dest_file_content = file.readlines() - assert dest_file_content == [line.upper() for line in self.testfile_content] diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs_system.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs_system.py deleted file mode 100644 index 8878fb77ed64b..0000000000000 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs_system.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""System tests for Google Cloud Build operators""" -import pytest - -from airflow.providers.google.cloud.example_dags.example_gcs_to_gcs import ( - BUCKET_1_DST, - BUCKET_1_SRC, - BUCKET_2_DST, - BUCKET_2_SRC, - BUCKET_3_DST, - BUCKET_3_SRC, -) -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_GCS_KEY) -class GcsToGcsExampleDagsSystemTest(GoogleSystemTest): - def create_buckets(self): - """Create a buckets in Google Cloud Storage service with sample content.""" - - # 1. Create bucket - for name in [BUCKET_1_SRC, BUCKET_1_DST, BUCKET_2_SRC, BUCKET_2_DST, BUCKET_3_SRC, BUCKET_3_DST]: - self.create_gcs_bucket(name) - - # 2. Prepare parents - first_parent = f"gs://{BUCKET_1_SRC}/parent-1.bin" - second_parent = f"gs://{BUCKET_1_SRC}/parent-2.bin" - - self.execute_with_ctx( - [ - "bash", - "-c", - f"cat /dev/urandom | head -c $((1 * 1024 * 1024)) | gsutil cp - {first_parent}", - ], - key=GCP_GCS_KEY, - ) - - self.execute_with_ctx( - [ - "bash", - "-c", - f"cat /dev/urandom | head -c $((1 * 1024 * 1024)) | gsutil cp - {second_parent}", - ], - key=GCP_GCS_KEY, - ) - - self.upload_to_gcs(first_parent, f"gs://{BUCKET_1_SRC}/file.bin") - self.upload_to_gcs(first_parent, f"gs://{BUCKET_1_SRC}/subdir/file.bin") - self.upload_to_gcs(first_parent, f"gs://{BUCKET_2_SRC}/file.bin") - self.upload_to_gcs(first_parent, f"gs://{BUCKET_2_SRC}/subdir/file.bin") - self.upload_to_gcs(second_parent, f"gs://{BUCKET_2_DST}/file.bin") - self.upload_to_gcs(second_parent, f"gs://{BUCKET_2_DST}/subdir/file.bin") - self.upload_to_gcs(second_parent, f"gs://{BUCKET_3_DST}/file.bin") - self.upload_to_gcs(second_parent, f"gs://{BUCKET_3_DST}/subdir/file.bin") - - self.delete_gcs_bucket(first_parent) - self.delete_gcs_bucket(second_parent) - - @provide_gcp_context(GCP_GCS_KEY) - def setUp(self): - super().setUp() - self.create_buckets() - - @provide_gcp_context(GCP_GCS_KEY) - def test_run_example_dag(self): - self.run_dag('example_gcs_to_gcs', CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_GCS_KEY) - def tearDown(self): - for name in [BUCKET_1_SRC, BUCKET_1_DST, BUCKET_2_SRC, BUCKET_2_DST, BUCKET_3_SRC, BUCKET_3_DST]: - self.delete_gcs_bucket(name) - super().tearDown() diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_local_system.py b/tests/providers/google/cloud/transfers/test_gcs_to_local_system.py deleted file mode 100644 index b8365b86d09da..0000000000000 --- a/tests/providers/google/cloud/transfers/test_gcs_to_local_system.py +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import os - -import pytest - -from airflow.providers.google.cloud.example_dags.example_gcs_to_local import ( - BUCKET, - PATH_TO_LOCAL_FILE, - PATH_TO_REMOTE_FILE, -) -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_GCS_KEY) -class GoogleCloudStorageToLocalExampleDagsTest(GoogleSystemTest): - @provide_gcp_context(GCP_GCS_KEY) - def setUp(self): - super().setUp() - self.create_gcs_bucket(BUCKET) - self.upload_content_to_gcs( - lines=f"{os.urandom(1 * 1024 * 1024)}", bucket=BUCKET, filename=PATH_TO_REMOTE_FILE - ) - - @provide_gcp_context(GCP_GCS_KEY) - def tearDown(self): - self.delete_gcs_bucket(BUCKET) - os.remove(PATH_TO_LOCAL_FILE) - super().tearDown() - - @provide_gcp_context(GCP_GCS_KEY) - def test_run_example_dag(self): - self.run_dag('example_gcs_to_local', CLOUD_DAG_FOLDER) diff --git a/tests/providers/google/cloud/transfers/test_local_to_gcs_system.py b/tests/providers/google/cloud/transfers/test_local_to_gcs_system.py deleted file mode 100644 index edabb76908afd..0000000000000 --- a/tests/providers/google/cloud/transfers/test_local_to_gcs_system.py +++ /dev/null @@ -1,45 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import os - -import pytest - -from airflow.providers.google.cloud.example_dags.example_local_to_gcs import BUCKET_NAME, PATH_TO_UPLOAD_FILE -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_GCS_KEY) -class LocalFilesystemToGCSOperatorExampleDagsTest(GoogleSystemTest): - @provide_gcp_context(GCP_GCS_KEY) - def setUp(self): - super().setUp() - self.create_gcs_bucket(BUCKET_NAME) - with open(PATH_TO_UPLOAD_FILE, 'w+') as file: - file.writelines(['example test files']) - - @provide_gcp_context(GCP_GCS_KEY) - def tearDown(self): - self.delete_gcs_bucket(BUCKET_NAME) - os.remove(PATH_TO_UPLOAD_FILE) - super().tearDown() - - @provide_gcp_context(GCP_GCS_KEY) - def test_run_example_dag(self): - self.run_dag('example_local_to_gcs', CLOUD_DAG_FOLDER) diff --git a/tests/system/providers/google/gcs/example_gcs_acl.py b/tests/system/providers/google/gcs/example_gcs_acl.py new file mode 100644 index 0000000000000..83783abbfd497 --- /dev/null +++ b/tests/system/providers/google/gcs/example_gcs_acl.py @@ -0,0 +1,121 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Storage ACL (Access Control List) operators. +""" + +import os +from datetime import datetime +from pathlib import Path + +from airflow import models +from airflow.providers.google.cloud.operators.gcs import ( + GCSBucketCreateAclEntryOperator, + GCSCreateBucketOperator, + GCSDeleteBucketOperator, + GCSObjectCreateAclEntryOperator, +) +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "gcs_acl" + +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +FILE_NAME = "example_upload.txt" +UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) + +GCS_ACL_ENTITY = "allUsers" +GCS_ACL_BUCKET_ROLE = "OWNER" +GCS_ACL_OBJECT_ROLE = "OWNER" + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["gcs", "acl", "example"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=BUCKET_NAME, + project_id=PROJECT_ID, + resource={ + "iamConfiguration": { + "uniformBucketLevelAccess": { + "enabled": False, + }, + }, + }, + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=UPLOAD_FILE_PATH, + dst=FILE_NAME, + bucket=BUCKET_NAME, + ) + + # [START howto_operator_gcs_bucket_create_acl_entry_task] + gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator( + bucket=BUCKET_NAME, + entity=GCS_ACL_ENTITY, + role=GCS_ACL_BUCKET_ROLE, + task_id="gcs_bucket_create_acl_entry_task", + ) + # [END howto_operator_gcs_bucket_create_acl_entry_task] + + # [START howto_operator_gcs_object_create_acl_entry_task] + gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator( + bucket=BUCKET_NAME, + object_name=FILE_NAME, + entity=GCS_ACL_ENTITY, + role=GCS_ACL_OBJECT_ROLE, + task_id="gcs_object_create_acl_entry_task", + ) + # [END howto_operator_gcs_object_create_acl_entry_task] + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + ( + # TEST SETUP + create_bucket + >> upload_file + # TEST BODY + >> gcs_bucket_create_acl_entry_task + >> gcs_object_create_acl_entry_task + # TEST TEARDOWN, + >> delete_bucket + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/gcs/example_gcs_copy_delete.py b/tests/system/providers/google/gcs/example_gcs_copy_delete.py new file mode 100644 index 0000000000000..b00c947def291 --- /dev/null +++ b/tests/system/providers/google/gcs/example_gcs_copy_delete.py @@ -0,0 +1,130 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Storage operators for listing, copying and deleting +bucket content. +""" + +import os +from datetime import datetime +from pathlib import Path + +from airflow import models +from airflow.models.baseoperator import chain +from airflow.operators.bash import BashOperator +from airflow.providers.google.cloud.operators.gcs import ( + GCSCreateBucketOperator, + GCSDeleteBucketOperator, + GCSDeleteObjectsOperator, + GCSListObjectsOperator, +) +from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "gcs_copy_delete" + +BUCKET_NAME_SRC = f"bucket_{DAG_ID}_{ENV_ID}" +BUCKET_NAME_DST = f"bucket_dst_{DAG_ID}_{ENV_ID}" +FILE_NAME = "example_upload.txt" +UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["gcs", "example"], +) as dag: + create_bucket_src = GCSCreateBucketOperator( + task_id="create_bucket_src", + bucket_name=BUCKET_NAME_SRC, + project_id=PROJECT_ID, + ) + + create_bucket_dst = GCSCreateBucketOperator( + task_id="create_bucket_dst", + bucket_name=BUCKET_NAME_DST, + project_id=PROJECT_ID, + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=UPLOAD_FILE_PATH, + dst=FILE_NAME, + bucket=BUCKET_NAME_SRC, + ) + + # [START howto_operator_gcs_list_bucket] + list_buckets = GCSListObjectsOperator(task_id="list_buckets", bucket=BUCKET_NAME_SRC) + # [END howto_operator_gcs_list_bucket] + + list_buckets_result = BashOperator( + task_id="list_buckets_result", + bash_command=f"echo {list_buckets.output}", + ) + + copy_file = GCSToGCSOperator( + task_id="copy_file", + source_bucket=BUCKET_NAME_SRC, + source_object=FILE_NAME, + destination_bucket=BUCKET_NAME_DST, + destination_object=FILE_NAME, + ) + + # [START howto_operator_gcs_delete_object] + delete_files = GCSDeleteObjectsOperator( + task_id="delete_files", bucket_name=BUCKET_NAME_SRC, objects=[FILE_NAME] + ) + # [END howto_operator_gcs_delete_object] + + delete_bucket_src = GCSDeleteBucketOperator( + task_id="delete_bucket_src", bucket_name=BUCKET_NAME_SRC, trigger_rule=TriggerRule.ALL_DONE + ) + delete_bucket_dst = GCSDeleteBucketOperator( + task_id="delete_bucket_dst", bucket_name=BUCKET_NAME_DST, trigger_rule=TriggerRule.ALL_DONE + ) + + chain( + # TEST SETUP + [create_bucket_src, create_bucket_dst], + upload_file, + # TEST BODY + list_buckets, + list_buckets_result, + copy_file, + delete_files, + # TEST TEARDOWN + [delete_bucket_src, delete_bucket_dst], + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/gcs/example_gcs_sensor.py b/tests/system/providers/google/gcs/example_gcs_sensor.py new file mode 100644 index 0000000000000..90c6016db9b71 --- /dev/null +++ b/tests/system/providers/google/gcs/example_gcs_sensor.py @@ -0,0 +1,139 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Storage sensors. +""" + +import os +from datetime import datetime +from pathlib import Path + +from airflow import models +from airflow.models.baseoperator import chain +from airflow.operators.bash import BashOperator +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.sensors.gcs import ( + GCSObjectExistenceSensor, + GCSObjectsWithPrefixExistenceSensor, + GCSObjectUpdateSensor, + GCSUploadSessionCompleteSensor, +) +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "gcs_sensor" + +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +FILE_NAME = "example_upload.txt" +UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["gcs", "example"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID + ) + + # [START howto_sensor_gcs_upload_session_complete_task] + gcs_upload_session_complete = GCSUploadSessionCompleteSensor( + bucket=BUCKET_NAME, + prefix=FILE_NAME, + inactivity_period=15, + min_objects=1, + allow_delete=True, + previous_objects=set(), + task_id="gcs_upload_session_complete_task", + ) + # [END howto_sensor_gcs_upload_session_complete_task] + + # [START howto_sensor_object_update_exists_task] + gcs_update_object_exists = GCSObjectUpdateSensor( + bucket=BUCKET_NAME, + object=FILE_NAME, + task_id="gcs_object_update_sensor_task", + ) + # [END howto_sensor_object_update_exists_task] + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=UPLOAD_FILE_PATH, + dst=FILE_NAME, + bucket=BUCKET_NAME, + ) + + # [START howto_sensor_object_exists_task] + gcs_object_exists = GCSObjectExistenceSensor( + bucket=BUCKET_NAME, + object=FILE_NAME, + mode='poke', + task_id="gcs_object_exists_task", + ) + # [END howto_sensor_object_exists_task] + + # [START howto_sensor_object_with_prefix_exists_task] + gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor( + bucket=BUCKET_NAME, + prefix=FILE_NAME[:5], + mode='poke', + task_id="gcs_object_with_prefix_exists_task", + ) + # [END howto_sensor_object_with_prefix_exists_task] + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + sleep = BashOperator(task_id='sleep', bash_command='sleep 5') + + chain( + # TEST SETUP + create_bucket, + sleep, + upload_file, + # TEST BODY + [gcs_object_exists, gcs_object_with_prefix_exists], + # TEST TEARDOWN + delete_bucket, + ) + chain( + create_bucket, + # TEST BODY + gcs_upload_session_complete, + gcs_update_object_exists, + delete_bucket, + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/gcs/example_gcs_to_gcs.py b/tests/system/providers/google/gcs/example_gcs_to_gcs.py new file mode 100644 index 0000000000000..f067c29ef74d9 --- /dev/null +++ b/tests/system/providers/google/gcs/example_gcs_to_gcs.py @@ -0,0 +1,244 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Storage GCSSynchronizeBucketsOperator and +GCSToGCSOperator operators. +""" + +import os +from datetime import datetime + +from airflow import models +from airflow.models.baseoperator import chain +from airflow.operators.bash import BashOperator +from airflow.providers.google.cloud.operators.gcs import ( + GCSCreateBucketOperator, + GCSDeleteBucketOperator, + GCSSynchronizeBucketsOperator, +) +from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "gcs_to_gcs" + +BUCKET_NAME_SRC = f"bucket_{DAG_ID}_{ENV_ID}" +BUCKET_NAME_DST = f"bucket_dst_{DAG_ID}_{ENV_ID}" +RANDOM_FILE_NAME = OBJECT_1 = OBJECT_2 = "random.bin" + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["gcs", "example"], +) as dag: + generate_random_file = BashOperator( + task_id="generate_random_file", + bash_command=f"cat /dev/urandom | head -c $((1 * 1024 * 1024)) > {RANDOM_FILE_NAME}", + ) + + create_bucket_src = GCSCreateBucketOperator( + task_id="create_bucket_src", + bucket_name=BUCKET_NAME_SRC, + project_id=PROJECT_ID, + ) + + create_bucket_dst = GCSCreateBucketOperator( + task_id="create_bucket_dst", + bucket_name=BUCKET_NAME_DST, + project_id=PROJECT_ID, + ) + + upload_file_src = LocalFilesystemToGCSOperator( + task_id="upload_file_src", + src=RANDOM_FILE_NAME, + dst=RANDOM_FILE_NAME, + bucket=BUCKET_NAME_SRC, + ) + + upload_file_src_sub = LocalFilesystemToGCSOperator( + task_id="upload_file_src_sub", + src=RANDOM_FILE_NAME, + dst=f"subdir/{RANDOM_FILE_NAME}", + bucket=BUCKET_NAME_SRC, + ) + + upload_file_dst = LocalFilesystemToGCSOperator( + task_id="upload_file_dst", + src=RANDOM_FILE_NAME, + dst=RANDOM_FILE_NAME, + bucket=BUCKET_NAME_DST, + ) + + upload_file_dst_sub = LocalFilesystemToGCSOperator( + task_id="upload_file_dst_sub", + src=RANDOM_FILE_NAME, + dst=f"subdir/{RANDOM_FILE_NAME}", + bucket=BUCKET_NAME_DST, + ) + + # [START howto_synch_bucket] + sync_bucket = GCSSynchronizeBucketsOperator( + task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST + ) + # [END howto_synch_bucket] + + # [START howto_synch_full_bucket] + sync_full_bucket = GCSSynchronizeBucketsOperator( + task_id="sync_full_bucket", + source_bucket=BUCKET_NAME_SRC, + destination_bucket=BUCKET_NAME_DST, + delete_extra_files=True, + allow_overwrite=True, + ) + # [END howto_synch_full_bucket] + + # [START howto_synch_to_subdir] + sync_to_subdirectory = GCSSynchronizeBucketsOperator( + task_id="sync_to_subdirectory", + source_bucket=BUCKET_NAME_SRC, + destination_bucket=BUCKET_NAME_DST, + destination_object="subdir/", + ) + # [END howto_synch_to_subdir] + + # [START howto_sync_from_subdir] + sync_from_subdirectory = GCSSynchronizeBucketsOperator( + task_id="sync_from_subdirectory", + source_bucket=BUCKET_NAME_SRC, + source_object="subdir/", + destination_bucket=BUCKET_NAME_DST, + ) + # [END howto_sync_from_subdir] + + # [START howto_operator_gcs_to_gcs_single_file] + copy_single_file = GCSToGCSOperator( + task_id="copy_single_gcs_file", + source_bucket=BUCKET_NAME_SRC, + source_object=OBJECT_1, + destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used + destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used + ) + # [END howto_operator_gcs_to_gcs_single_file] + + # [START howto_operator_gcs_to_gcs_wildcard] + copy_files_with_wildcard = GCSToGCSOperator( + task_id="copy_files_with_wildcard", + source_bucket=BUCKET_NAME_SRC, + source_object="data/*.txt", + destination_bucket=BUCKET_NAME_DST, + destination_object="backup/", + ) + # [END howto_operator_gcs_to_gcs_wildcard] + + # [START howto_operator_gcs_to_gcs_without_wildcard] + copy_files_without_wildcard = GCSToGCSOperator( + task_id="copy_files_without_wildcard", + source_bucket=BUCKET_NAME_SRC, + source_object="subdir/", + destination_bucket=BUCKET_NAME_DST, + destination_object="backup/", + ) + # [END howto_operator_gcs_to_gcs_without_wildcard] + + # [START howto_operator_gcs_to_gcs_delimiter] + copy_files_with_delimiter = GCSToGCSOperator( + task_id="copy_files_with_delimiter", + source_bucket=BUCKET_NAME_SRC, + source_object="data/", + destination_bucket=BUCKET_NAME_DST, + destination_object="backup/", + delimiter='.txt', + ) + # [END howto_operator_gcs_to_gcs_delimiter] + + # [START howto_operator_gcs_to_gcs_list] + copy_files_with_list = GCSToGCSOperator( + task_id="copy_files_with_list", + source_bucket=BUCKET_NAME_SRC, + source_objects=[OBJECT_1, OBJECT_2], # Instead of files each element could be a wildcard expression + destination_bucket=BUCKET_NAME_DST, + destination_object="backup/", + ) + # [END howto_operator_gcs_to_gcs_list] + + # [START howto_operator_gcs_to_gcs_single_file_move] + move_single_file = GCSToGCSOperator( + task_id="move_single_file", + source_bucket=BUCKET_NAME_SRC, + source_object=OBJECT_1, + destination_bucket=BUCKET_NAME_DST, + destination_object="backup_" + OBJECT_1, + move_object=True, + ) + # [END howto_operator_gcs_to_gcs_single_file_move] + + # [START howto_operator_gcs_to_gcs_list_move] + move_files_with_list = GCSToGCSOperator( + task_id="move_files_with_list", + source_bucket=BUCKET_NAME_SRC, + source_objects=[OBJECT_1, OBJECT_2], + destination_bucket=BUCKET_NAME_DST, + destination_object="backup/", + ) + # [END howto_operator_gcs_to_gcs_list_move] + + delete_bucket_src = GCSDeleteBucketOperator( + task_id="delete_bucket_src", bucket_name=BUCKET_NAME_SRC, trigger_rule=TriggerRule.ALL_DONE + ) + delete_bucket_dst = GCSDeleteBucketOperator( + task_id="delete_bucket_dst", bucket_name=BUCKET_NAME_DST, trigger_rule=TriggerRule.ALL_DONE + ) + + chain( + # TEST SETUP + [create_bucket_src, create_bucket_dst], + [upload_file_src, upload_file_src_sub], + [upload_file_dst, upload_file_dst_sub], + # TEST BODY + sync_bucket, + sync_full_bucket, + sync_to_subdirectory, + sync_from_subdirectory, + copy_single_file, + copy_files_with_wildcard, + copy_files_without_wildcard, + copy_files_with_delimiter, + copy_files_with_list, + move_single_file, + move_files_with_list, + # TEST TEARDOWN + [delete_bucket_src, delete_bucket_dst], + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/gcs/example_gcs_transform.py b/tests/system/providers/google/gcs/example_gcs_transform.py new file mode 100644 index 0000000000000..75bef5113aaf3 --- /dev/null +++ b/tests/system/providers/google/gcs/example_gcs_transform.py @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Storage GCSFileTransformOperator operator. +""" + +import os +from datetime import datetime +from pathlib import Path + +from airflow import models +from airflow.providers.google.cloud.operators.gcs import ( + GCSCreateBucketOperator, + GCSDeleteBucketOperator, + GCSFileTransformOperator, +) +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "gcs_transform" + +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" + +FILE_NAME = "example_upload.txt" +UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) + +TRANSFORM_SCRIPT_PATH = str(Path(__file__).parent / "resources" / "transform_script.py") + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["gcs", "example"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=BUCKET_NAME, + project_id=PROJECT_ID, + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=UPLOAD_FILE_PATH, + dst=FILE_NAME, + bucket=BUCKET_NAME, + ) + + # [START howto_operator_gcs_transform] + transform_file = GCSFileTransformOperator( + task_id="transform_file", + source_bucket=BUCKET_NAME, + source_object=FILE_NAME, + transform_script=["python", TRANSFORM_SCRIPT_PATH], + ) + # [END howto_operator_gcs_transform] + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + ( + # TEST SETUP + create_bucket + >> upload_file + # TEST BODY + >> transform_file + # TEST TEARDOWN + >> delete_bucket + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/gcs/example_gcs_transform_timespan.py b/tests/system/providers/google/gcs/example_gcs_transform_timespan.py new file mode 100644 index 0000000000000..40f2eeafa059c --- /dev/null +++ b/tests/system/providers/google/gcs/example_gcs_transform_timespan.py @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Storage GCSTimeSpanFileTransformOperator operator. +""" + +import os +from datetime import datetime +from pathlib import Path + +from airflow import models +from airflow.models.baseoperator import chain +from airflow.providers.google.cloud.operators.gcs import ( + GCSCreateBucketOperator, + GCSDeleteBucketOperator, + GCSTimeSpanFileTransformOperator, +) +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "gcs_transform_timespan" + +BUCKET_NAME_SRC = f"bucket_{DAG_ID}_{ENV_ID}" +BUCKET_NAME_DST = f"bucket_dst_{DAG_ID}_{ENV_ID}" + +SOURCE_GCP_CONN_ID = DESTINATION_GCP_CONN_ID = "google_cloud_default" + +FILE_NAME = "example_upload.txt" +SOURCE_PREFIX = "timespan_source" +DESTINATION_PREFIX = "timespan_destination" +UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) + +TRANSFORM_SCRIPT_PATH = str(Path(__file__).parent / "resources" / "transform_timespan.py") + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["gcs", "example"], +) as dag: + create_bucket_src = GCSCreateBucketOperator( + task_id="create_bucket_src", + bucket_name=BUCKET_NAME_SRC, + project_id=PROJECT_ID, + ) + + create_bucket_dst = GCSCreateBucketOperator( + task_id="create_bucket_dst", + bucket_name=BUCKET_NAME_DST, + project_id=PROJECT_ID, + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=UPLOAD_FILE_PATH, + dst=f"{SOURCE_PREFIX}/{FILE_NAME}", + bucket=BUCKET_NAME_SRC, + ) + + # [START howto_operator_gcs_timespan_file_transform_operator_Task] + gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator( + task_id="gcs_timespan_transform_files", + source_bucket=BUCKET_NAME_SRC, + source_prefix=SOURCE_PREFIX, + source_gcp_conn_id=SOURCE_GCP_CONN_ID, + destination_bucket=BUCKET_NAME_DST, + destination_prefix=DESTINATION_PREFIX, + destination_gcp_conn_id=DESTINATION_GCP_CONN_ID, + transform_script=["python", TRANSFORM_SCRIPT_PATH], + ) + # [END howto_operator_gcs_timespan_file_transform_operator_Task] + + delete_bucket_src = GCSDeleteBucketOperator( + task_id="delete_bucket_src", bucket_name=BUCKET_NAME_SRC, trigger_rule=TriggerRule.ALL_DONE + ) + delete_bucket_dst = GCSDeleteBucketOperator( + task_id="delete_bucket_dst", bucket_name=BUCKET_NAME_DST, trigger_rule=TriggerRule.ALL_DONE + ) + + chain( + # TEST SETUP + [create_bucket_src, create_bucket_dst], + upload_file, + # TEST BODY + gcs_timespan_transform_files_task, + # TEST TEARDOWN + [delete_bucket_src, delete_bucket_dst], + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/gcs/example_gcs_upload_download.py b/tests/system/providers/google/gcs/example_gcs_upload_download.py new file mode 100644 index 0000000000000..98934aaea43e4 --- /dev/null +++ b/tests/system/providers/google/gcs/example_gcs_upload_download.py @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for testing interaction between Google Cloud Storage and local file system. +""" + +import os +from datetime import datetime +from pathlib import Path + +from airflow import models +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "gcs_upload_download" + +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +FILE_NAME = "example_upload.txt" +PATH_TO_SAVED_FILE = "example_upload_download.txt" +UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["gcs", "example"], +) as dag: + # [START howto_operator_gcs_create_bucket] + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=BUCKET_NAME, + project_id=PROJECT_ID, + ) + # [END howto_operator_gcs_create_bucket] + + # [START howto_operator_local_filesystem_to_gcs] + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=UPLOAD_FILE_PATH, + dst=FILE_NAME, + bucket=BUCKET_NAME, + ) + # [END howto_operator_local_filesystem_to_gcs] + + # [START howto_operator_gcs_download_file_task] + download_file = GCSToLocalFilesystemOperator( + task_id="download_file", + object_name=FILE_NAME, + bucket=BUCKET_NAME, + filename=PATH_TO_SAVED_FILE, + ) + # [END howto_operator_gcs_download_file_task] + + # [START howto_operator_gcs_delete_bucket] + delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME) + # [END howto_operator_gcs_delete_bucket] + delete_bucket.trigger_rule = TriggerRule.ALL_DONE + + ( + # TEST SETUP + create_bucket + >> upload_file + # TEST BODY + >> download_file + # TEST TEARDOWN + >> delete_bucket + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/gcs/resources/example_upload.txt b/tests/system/providers/google/gcs/resources/example_upload.txt new file mode 100644 index 0000000000000..8dba6ebeaedac --- /dev/null +++ b/tests/system/providers/google/gcs/resources/example_upload.txt @@ -0,0 +1 @@ +Example file diff --git a/tests/system/providers/google/gcs/resources/transform_script.py b/tests/system/providers/google/gcs/resources/transform_script.py new file mode 100644 index 0000000000000..59813b7f77c70 --- /dev/null +++ b/tests/system/providers/google/gcs/resources/transform_script.py @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys + +source = sys.argv[1] +destination = sys.argv[2] + +print('Running script') +with open(source) as src, open(destination, "w+") as dest: + lines = [line.upper() for line in src.readlines()] + print(lines) + dest.writelines(lines) diff --git a/tests/system/providers/google/gcs/resources/transform_timespan.py b/tests/system/providers/google/gcs/resources/transform_timespan.py new file mode 100644 index 0000000000000..3f4fbe2ba8f20 --- /dev/null +++ b/tests/system/providers/google/gcs/resources/transform_timespan.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys +from pathlib import Path + +source = sys.argv[1] +destination = sys.argv[2] +timespan_start = sys.argv[3] +timespan_end = sys.argv[4] + +print(sys.argv) +print(f'Running script, called with source: {source}, destination: {destination}') +print(f'timespan_start: {timespan_start}, timespan_end: {timespan_end}') + +with open(Path(destination) / "output.txt", "w+") as dest: + for f in Path(source).glob("**/*"): + if f.is_dir(): + continue + with open(f) as src: + lines = [line.upper() for line in src.readlines()] + print(lines) + dest.writelines(lines)