From c6e8dad74ce62abff97d6cf19d4707c4feb860a9 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 30 Jul 2024 12:51:29 +0000 Subject: [PATCH 01/21] Added fileWatch for GCP --- README.md | 1 + .../schemas/transfer/bucket_source.json | 6 ++++ .../transfer/bucket_source/fileWatch.json | 20 ++++++++++++ tests/test_schema.py | 31 +++++++++++++++++++ 4 files changed, 58 insertions(+) create mode 100644 src/opentaskpy/addons/gcp/remotehandlers/schemas/transfer/bucket_source/fileWatch.json diff --git a/README.md b/README.md index b3ac3ef..d4b9610 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ Transfers require a few additional arguments to work. These are: - File transfer: ingress/egress from/to Cloud Storage - Renaming functionality - PostCopy functionality + - FileWatch functionality # Configuration diff --git a/src/opentaskpy/addons/gcp/remotehandlers/schemas/transfer/bucket_source.json b/src/opentaskpy/addons/gcp/remotehandlers/schemas/transfer/bucket_source.json index 26510f1..11802c0 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/schemas/transfer/bucket_source.json +++ b/src/opentaskpy/addons/gcp/remotehandlers/schemas/transfer/bucket_source.json @@ -14,6 +14,12 @@ "type": "string", "default": "" }, + "error": { + "type": "boolean" + }, + "fileWatch": { + "$ref": "bucket_source/fileWatch.json" + }, "encryption": { "$ref": "http://localhost/transfer/encryption.json" }, diff --git a/src/opentaskpy/addons/gcp/remotehandlers/schemas/transfer/bucket_source/fileWatch.json b/src/opentaskpy/addons/gcp/remotehandlers/schemas/transfer/bucket_source/fileWatch.json new file mode 100644 index 0000000..9e4ab5c --- /dev/null +++ b/src/opentaskpy/addons/gcp/remotehandlers/schemas/transfer/bucket_source/fileWatch.json @@ -0,0 +1,20 @@ +{ + "$id": "http://localhost/transfer/bucket_source/fileWatch.json", + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "timeout": { + "type": "integer" + }, + "directory": { + "type": "string" + }, + "fileRegex": { + "type": "string" + }, + "watchOnly": { + "type": "boolean" + } + }, + "additionalProperties": false +} diff --git a/tests/test_schema.py b/tests/test_schema.py index 2f08894..0b962be 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -140,3 +140,34 @@ def test_gcp_to_gcp( "destination": valid_bucket_destination_definition, } assert validate_transfer_json(json_data) + + +def test_gcp_source_file_watch(valid_bucket_source_definition): + json_data = { + "type": "transfer", + "source": valid_bucket_source_definition, + } + + json_data["source"]["fileWatch"] = { + "timeout": 10, + "directory": "src", + "fileRegex": ".*\\.txt", + } + + assert validate_transfer_json(json_data) + + # Remove fileRegex + del json_data["source"]["fileWatch"]["fileRegex"] + assert validate_transfer_json(json_data) + + # Remove directory + del json_data["source"]["fileWatch"]["directory"] + assert validate_transfer_json(json_data) + + # Add watchOnly + json_data["source"]["fileWatch"]["watchOnly"] = True + assert validate_transfer_json(json_data) + + # Add error + json_data["source"]["error"] = True + assert validate_transfer_json(json_data) From 88c03143db8d909f99cd17684cf45ecbbace05c2 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 30 Jul 2024 14:05:08 +0000 Subject: [PATCH 02/21] added test for filewatch --- tests/test_download.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/tests/test_download.py b/tests/test_download.py index 2ae8958..a18f9e1 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -68,7 +68,6 @@ def gcp_creds(): def test_gcp_root_to_local_transfer(gcp_creds): - task_definition = { "type": "transfer", "source": deepcopy(bucket_source_root_definition), @@ -82,7 +81,6 @@ def test_gcp_root_to_local_transfer(gcp_creds): def test_gcp_nested_to_local_transfer(gcp_creds): - task_definition = { "type": "transfer", "source": deepcopy(bucket_source_nested_definition), @@ -96,7 +94,6 @@ def test_gcp_nested_to_local_transfer(gcp_creds): def test_gcp_nested_regex_to_local_transfer(gcp_creds): - task_definition = { "type": "transfer", "source": deepcopy(bucket_source_nested_regex_definition), @@ -107,3 +104,33 @@ def test_gcp_nested_regex_to_local_transfer(gcp_creds): transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) assert transfer_obj.run() + + +def test_gcp_root_to_local_transfer(gcp_creds): + task_definition = { + "type": "transfer", + "source": deepcopy(bucket_source_root_definition), + "destination": [deepcopy(bucket_local_definition)], + } + task_definition["source"]["protocol"]["credentials"] = gcp_creds + + transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) + + assert transfer_obj.run() + + +def test_gcp_file_watch(gcp_creds): + task_definition = { + "type": "transfer", + "source": deepcopy(bucket_source_root_definition), + "destination": [deepcopy(bucket_local_definition)], + } + task_definition["source"]["fileWatch"] = { + "timeout": 300, + "directory": "localnested", + "fileRegex": ".*\\.txt", + } + task_definition["source"]["protocol"]["credentials"] = gcp_creds + transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) + + assert transfer_obj.run() From e48cf4c303c49e3ce5c3293aabe500c638a829fa Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 30 Jul 2024 14:18:46 +0000 Subject: [PATCH 03/21] Updated README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d4b9610..925299f 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Transfers require a few additional arguments to work. These are: - File transfer: ingress/egress from/to Cloud Storage - Renaming functionality - PostCopy functionality - - FileWatch functionality + - fileWatch functionality # Configuration From 5d13d819c430fb21a6d521e865227dba6a48fc3b Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 30 Jul 2024 15:02:32 +0000 Subject: [PATCH 04/21] Updated CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a710d9..489d518 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v24.28.2 + +- Added support for fileWatch property + ## v24.28.1 - Fix for FileExpression prop being used instead of FileRegex (GCP source) From 3f1434acdfe00fcd6508e594ddbaa1435574b21a Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 30 Jul 2024 15:05:55 +0000 Subject: [PATCH 05/21] update version number --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bbe7eeb..3121323 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "otf-addons-gcp" -version = "v24.28.1" +version = "v24.28.2" authors = [{ name = "Aldo Troiano", email = "a.troiano@reply.com" }] license = { text = "GPLv3" } classifiers = [ @@ -51,7 +51,7 @@ dev = [ profile = 'black' [tool.bumpver] -current_version = "v24.28.1" +current_version = "v24.28.2" version_pattern = "YY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true From edb69e21abef81c14a01d7eea3e27755e4fb7658 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Wed, 31 Jul 2024 09:46:41 +0000 Subject: [PATCH 06/21] retracting version --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3121323..bbe7eeb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "otf-addons-gcp" -version = "v24.28.2" +version = "v24.28.1" authors = [{ name = "Aldo Troiano", email = "a.troiano@reply.com" }] license = { text = "GPLv3" } classifiers = [ @@ -51,7 +51,7 @@ dev = [ profile = 'black' [tool.bumpver] -current_version = "v24.28.2" +current_version = "v24.28.1" version_pattern = "YY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true From 834ac558200ce81c00a48385203f57a100d2263d Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Wed, 31 Jul 2024 10:14:50 +0000 Subject: [PATCH 07/21] modify version pattern --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index bbe7eeb..441ac0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ profile = 'black' [tool.bumpver] current_version = "v24.28.1" -version_pattern = "YY.WW.PATCH[-TAG]" +version_pattern = "vYY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true tag = true From 0b183025099cdea4bb149d632def75a7fe8fd8b3 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Wed, 31 Jul 2024 10:15:07 +0000 Subject: [PATCH 08/21] bump version v24.28.1 -> v24.31.0 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 441ac0c..5855023 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "otf-addons-gcp" -version = "v24.28.1" +version = "v24.31.0" authors = [{ name = "Aldo Troiano", email = "a.troiano@reply.com" }] license = { text = "GPLv3" } classifiers = [ @@ -51,7 +51,7 @@ dev = [ profile = 'black' [tool.bumpver] -current_version = "v24.28.1" +current_version = "v24.31.0" version_pattern = "vYY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true From 39771f7388299005662c5fd7fe9bc1125e42660c Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Wed, 31 Jul 2024 10:17:37 +0000 Subject: [PATCH 09/21] update CHANGELOG to match the new version --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 489d518..340501e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## v24.28.2 +## v24.31.0 - Added support for fileWatch property From b97c278ba7d994cd2b9c1074a1ffcec303036dbb Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Mon, 5 Aug 2024 12:58:42 +0000 Subject: [PATCH 10/21] fix file regex --- pyproject.toml | 1 + .../addons/gcp/remotehandlers/bucket.py | 83 +++++++++++-------- 2 files changed, 48 insertions(+), 36 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5855023..1868104 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ requires-python = ">=3.11" [project.optional-dependencies] dev = [ + "google-cloud-storage", "localstack", "localstack-client", "pytest-shell", diff --git a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py index ed14019..cadc96b 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py +++ b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py @@ -1,15 +1,18 @@ """GCP Cloud Bucket remote handler.""" import glob +import os import re import opentaskpy.otflogging import requests -from opentaskpy.exceptions import RemoteTransferError +from google.cloud import storage from opentaskpy.remotehandlers.remotehandler import RemoteTransferHandler from .creds import get_access_token +MAX_OBJECTS_PER_QUERY = 100 + class BucketTransfer(RemoteTransferHandler): """GCP CloudBucket remote transfer handler.""" @@ -30,6 +33,7 @@ def __init__(self, spec: dict): # Generating Access Token for Transfer self.credentials = get_access_token(self.spec["protocol"]) + self.storage_client = storage.Client(credentials=self.credentials) def supports_direct_transfer(self) -> bool: """Return False, as all files should go via the worker.""" @@ -264,50 +268,57 @@ def create_flag_files(self) -> int: def list_files( self, directory: str | None = None, file_pattern: str | None = None - ) -> list: - """List Files in GCP. - - List Files available in the given directory with the specified file pattern (glob expression). + ) -> dict: + """Return list of files that match the source definition. Args: - directory (str): A directory to list on the bucket. - file_pattern (str): The pattern to match the file on (e.g. **.txt) + directory (str, optional): The directory to search in. Defaults to None. + file_pattern (str, optional): The file pattern to search for. Defaults to None. Returns: - [obj] if successful, [] if not. + dict: A dict of files that match the source definition. """ - self.logger.info("Listing Files in Bucket.") + bucket_name = self.spec["bucket"] + bucket = self.storage_client.bucket(bucket_name) + prefix = directory or self.spec.get("directory", "") + remote_files = {} + + self.logger.info( + f"Listing files in {bucket_name} matching" + f" {file_pattern}{' in ' + (directory or '')}" + ) + try: - file_pattern = self.spec["fileRegex"] - if "directory" in self.spec and self.spec["directory"] != "": - file_pattern = f"{self.spec['directory']}/{file_pattern}" - - response = requests.get( - f"https://storage.googleapis.com/storage/v1/b/{self.spec['bucket']}/o", - headers={"Authorization": f"Bearer {self.credentials}"}, - timeout=1800, - params={"matchGlob": file_pattern}, - ) - items = [] - if response.status_code == 200: - data = response.json() - if "items" in data: - items = data["items"] - names = [item["name"] for item in items if "name" in item] - return names - self.logger.info( - f"No items which matches {file_pattern} found in directory." - ) - return [] - self.logger.error(f"List files returned {response.status_code} ") - raise RemoteTransferError(response) + blobs = bucket.list_blobs(prefix=prefix) + for blob in blobs: + key = blob.name + filename = os.path.basename(key) + + if file_pattern and not re.match(file_pattern, filename): + continue + + # Ensure the file is directly in the specified directory, not a subdirectory + if directory: + file_directory = os.path.dirname(key) + if file_directory != directory: + continue + + if key.startswith("/"): + continue + + self.logger.info(f"Found file: {filename}") + + remote_files[key] = { + "size": blob.size, + "modified_time": blob.updated.timestamp(), + } except Exception as e: - self.logger.error( - f"Error listing files in directory {self.spec['bucket']}/{self.spec['directory']}/{self.spec['fileRegex']}" - ) + self.logger.error(f"Error listing files in {bucket_name}") self.logger.exception(e) - return [] + raise e + + return remote_files def tidy(self) -> None: """Nothing to tidy.""" From af2ed2c7b07ad6467d5dfe6bb4ff69437b7169a8 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Mon, 5 Aug 2024 13:18:54 +0000 Subject: [PATCH 11/21] Update CHANGELOG.md --- CHANGELOG.md | 4 ++ .../addons/gcp/remotehandlers/bucket.py | 54 ++++++++++++------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 340501e..fda232c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v24.32.0 + +- Added google cloud storage dependency to list blobs from GCP Bucket matching a regex pattern. + ## v24.31.0 - Added support for fileWatch property diff --git a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py index cadc96b..2502aad 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py +++ b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py @@ -280,7 +280,9 @@ def list_files( """ bucket_name = self.spec["bucket"] bucket = self.storage_client.bucket(bucket_name) - prefix = directory or self.spec.get("directory", "") + prefix = directory or self.spec.get( + "directory", "" + ) # files are stored in root dir, defaults to empty string unless directory specified remote_files = {} self.logger.info( @@ -289,29 +291,41 @@ def list_files( ) try: - blobs = bucket.list_blobs(prefix=prefix) - for blob in blobs: - key = blob.name - filename = os.path.basename(key) - - if file_pattern and not re.match(file_pattern, filename): - continue - - # Ensure the file is directly in the specified directory, not a subdirectory - if directory: - file_directory = os.path.dirname(key) - if file_directory != directory: + blobs = bucket.list_blobs(prefix=prefix, max_results=MAX_OBJECTS_PER_QUERY) + while blobs: + for blob in blobs: + key = blob.name + filename = os.path.basename(key) + + # Skip files that do not match the file pattern + if file_pattern and not re.match(file_pattern, filename): continue - if key.startswith("/"): - continue + # Ensure the file is directly in the specified directory, not a subdirectory + if directory: + file_directory = os.path.dirname(key) + if file_directory != directory: + continue - self.logger.info(f"Found file: {filename}") + if key.startswith("/"): + continue + + self.logger.info(f"Found file: {filename}") + + remote_files[key] = { + "size": blob.size, + "modified_time": blob.updated.timestamp(), + } - remote_files[key] = { - "size": blob.size, - "modified_time": blob.updated.timestamp(), - } + # Retrieve the next page token, if it exists. + if blobs.next_page_token: + blobs = bucket.list_blobs( + prefix=prefix, + max_results=MAX_OBJECTS_PER_QUERY, + page_token=blobs.next_page_token, + ) + else: + break except Exception as e: self.logger.error(f"Error listing files in {bucket_name}") From 9617b0587fbed796cdfc3c929db830c649ecaf00 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Mon, 5 Aug 2024 13:19:24 +0000 Subject: [PATCH 12/21] bump version v24.31.0 -> v24.32.0 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1868104..7d5086e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "otf-addons-gcp" -version = "v24.31.0" +version = "v24.32.0" authors = [{ name = "Aldo Troiano", email = "a.troiano@reply.com" }] license = { text = "GPLv3" } classifiers = [ @@ -52,7 +52,7 @@ dev = [ profile = 'black' [tool.bumpver] -current_version = "v24.31.0" +current_version = "v24.32.0" version_pattern = "vYY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true From 64c5ee28a13578ad211d15eb7c4eb6e5792ae1e1 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 6 Aug 2024 10:31:16 +0000 Subject: [PATCH 13/21] Removed google cloud import and handling regex matching locally after fetching all files from gcp --- CHANGELOG.md | 2 +- pyproject.toml | 1 - .../addons/gcp/remotehandlers/bucket.py | 104 ++++++++---------- 3 files changed, 45 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fda232c..748b785 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## v24.32.0 -- Added google cloud storage dependency to list blobs from GCP Bucket matching a regex pattern. +- List all files on GCP bucket and perform regex pattern matching to return files. ## v24.31.0 diff --git a/pyproject.toml b/pyproject.toml index 7d5086e..1798cf1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,6 @@ requires-python = ">=3.11" [project.optional-dependencies] dev = [ - "google-cloud-storage", "localstack", "localstack-client", "pytest-shell", diff --git a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py index 2502aad..1c8c117 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py +++ b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py @@ -1,12 +1,11 @@ """GCP Cloud Bucket remote handler.""" import glob -import os import re import opentaskpy.otflogging import requests -from google.cloud import storage +from opentaskpy.exceptions import RemoteTransferError from opentaskpy.remotehandlers.remotehandler import RemoteTransferHandler from .creds import get_access_token @@ -33,7 +32,6 @@ def __init__(self, spec: dict): # Generating Access Token for Transfer self.credentials = get_access_token(self.spec["protocol"]) - self.storage_client = storage.Client(credentials=self.credentials) def supports_direct_transfer(self) -> bool: """Return False, as all files should go via the worker.""" @@ -266,73 +264,59 @@ def create_flag_files(self) -> int: """Not implemented for this transfer type.""" raise NotImplementedError - def list_files( - self, directory: str | None = None, file_pattern: str | None = None - ) -> dict: - """Return list of files that match the source definition. + def list_files(self, directory: str | None = None) -> list: + """List Files in GCP with pagination and local regex matching. Args: - directory (str, optional): The directory to search in. Defaults to None. - file_pattern (str, optional): The file pattern to search for. Defaults to None. + directory (str): A directory to list on the bucket. Returns: - dict: A dict of files that match the source definition. + list: A list of filenames if successful, an empty list if not. """ - bucket_name = self.spec["bucket"] - bucket = self.storage_client.bucket(bucket_name) - prefix = directory or self.spec.get( - "directory", "" - ) # files are stored in root dir, defaults to empty string unless directory specified - remote_files = {} - - self.logger.info( - f"Listing files in {bucket_name} matching" - f" {file_pattern}{' in ' + (directory or '')}" - ) - + self.logger.info("Listing Files in Bucket.") try: - blobs = bucket.list_blobs(prefix=prefix, max_results=MAX_OBJECTS_PER_QUERY) - while blobs: - for blob in blobs: - key = blob.name - filename = os.path.basename(key) - - # Skip files that do not match the file pattern - if file_pattern and not re.match(file_pattern, filename): - continue - - # Ensure the file is directly in the specified directory, not a subdirectory - if directory: - file_directory = os.path.dirname(key) - if file_directory != directory: - continue - - if key.startswith("/"): - continue - - self.logger.info(f"Found file: {filename}") - - remote_files[key] = { - "size": blob.size, - "modified_time": blob.updated.timestamp(), - } - - # Retrieve the next page token, if it exists. - if blobs.next_page_token: - blobs = bucket.list_blobs( - prefix=prefix, - max_results=MAX_OBJECTS_PER_QUERY, - page_token=blobs.next_page_token, - ) + file_pattern = self.spec["fileRegex"] + directory = directory or self.spec.get("directory", "") + + base_url = ( + f"https://storage.googleapis.com/storage/v1/b/{self.spec['bucket']}/o" + ) + headers = {"Authorization": f"Bearer {self.credentials}"} + params = {"prefix": directory} if directory else {} + items = [] + + while True: + response = requests.get( + base_url, headers=headers, params=params, timeout=1800 + ) + if response.status_code == 200: + data = response.json() + if "items" in data: + items.extend(data["items"]) + + if "nextPageToken" in data: + # Set the nextPageToken for the next request + params["pageToken"] = data["nextPageToken"] + else: + break else: - break + self.logger.error(f"List files returned {response.status_code}") + raise RemoteTransferError(response) + + filenames = [ + item["name"] + for item in items + if "name" in item + and (not file_pattern or re.match(file_pattern, item["name"])) + ] + return filenames except Exception as e: - self.logger.error(f"Error listing files in {bucket_name}") + self.logger.error( + f"Error listing files in directory {self.spec['bucket']}/{directory}" + ) self.logger.exception(e) - raise e - - return remote_files + return [] def tidy(self) -> None: """Nothing to tidy.""" From c925e0c7e8326fde54504c86641a43219366b870 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 6 Aug 2024 10:35:36 +0000 Subject: [PATCH 14/21] limiting query results to 100 per page --- src/opentaskpy/addons/gcp/remotehandlers/bucket.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py index 1c8c117..0118e65 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py +++ b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py @@ -282,7 +282,11 @@ def list_files(self, directory: str | None = None) -> list: f"https://storage.googleapis.com/storage/v1/b/{self.spec['bucket']}/o" ) headers = {"Authorization": f"Bearer {self.credentials}"} - params = {"prefix": directory} if directory else {} + params = ( + {"prefix": directory, "maxResults": MAX_OBJECTS_PER_QUERY} + if directory + else {} + ) items = [] while True: From fe68e7e3848258c8f5290efd1617ff9f649b29e1 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 6 Aug 2024 14:18:20 +0000 Subject: [PATCH 15/21] modified list_files to accept file_pattern arg --- src/opentaskpy/addons/gcp/remotehandlers/bucket.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py index 0118e65..8fcbfaf 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py +++ b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py @@ -264,18 +264,20 @@ def create_flag_files(self) -> int: """Not implemented for this transfer type.""" raise NotImplementedError - def list_files(self, directory: str | None = None) -> list: + def list_files( + self, directory: str | None = None, file_pattern: str | None = None + ) -> list: """List Files in GCP with pagination and local regex matching. Args: directory (str): A directory to list on the bucket. + file_pattern (str): File pattern to match files. Returns: list: A list of filenames if successful, an empty list if not. """ self.logger.info("Listing Files in Bucket.") try: - file_pattern = self.spec["fileRegex"] directory = directory or self.spec.get("directory", "") base_url = ( From 8963c6efd775085f0025c266fd71b83662d38c57 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 6 Aug 2024 14:20:10 +0000 Subject: [PATCH 16/21] bump version v24.32.0 -> v24.32.1 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1798cf1..0a5a4eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "otf-addons-gcp" -version = "v24.32.0" +version = "v24.32.1" authors = [{ name = "Aldo Troiano", email = "a.troiano@reply.com" }] license = { text = "GPLv3" } classifiers = [ @@ -51,7 +51,7 @@ dev = [ profile = 'black' [tool.bumpver] -current_version = "v24.32.0" +current_version = "v24.32.1" version_pattern = "vYY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true From 2b7d361f93a552676f0be97f85cb8f3af46686b2 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Tue, 6 Aug 2024 14:21:36 +0000 Subject: [PATCH 17/21] Updated Changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 748b785..d699a17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## v24.32.1 + +- Minor fix to accept file_pattern arg in list_files method. +- + ## v24.32.0 - List all files on GCP bucket and perform regex pattern matching to return files. From d29d1f81c349d31d84f09f37ae93cd61c279c695 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:52:39 +0000 Subject: [PATCH 18/21] handle token refresh, handle fileRegex correctly, updated tests --- README.md | 2 +- .../addons/gcp/remotehandlers/bucket.py | 17 ++++++---- .../addons/gcp/remotehandlers/creds.py | 13 ++++++-- tests/test_download.py | 33 ++++++++++--------- tests/test_local_to_root.py | 6 ++-- tests/test_upload_nested.py | 6 ++-- 6 files changed, 48 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 925299f..f4739af 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,6 @@ JSON configs for transfers can be defined as follows: "pattern" : "(? None: + """Ensure the credentials are valid, refresh if necessary.""" + self.credentials = get_access_token(self.spec["protocol"]) + def supports_direct_transfer(self) -> bool: """Return False, as all files should go via the worker.""" return False @@ -51,6 +54,7 @@ def handle_post_copy_action(self, files: list[str]) -> int: or self.spec["postCopyAction"]["action"] == "rename" ): try: + self.validate_or_refresh_creds() # refresh creds # Append a directory if one is defined for file in files: @@ -138,6 +142,7 @@ def push_files_from_worker( int: 0 if successful, 1 if not. """ try: + self.validate_or_refresh_creds() # refresh creds if file_list: files = list(file_list.keys()) else: @@ -212,6 +217,7 @@ def pull_files_to_worker( result = 0 self.logger.info("Downloading file from GCP.") try: + self.validate_or_refresh_creds() # refresh creds for file in files: self.logger.info(file) file = file.replace( @@ -278,17 +284,14 @@ def list_files( """ self.logger.info("Listing Files in Bucket.") try: + self.validate_or_refresh_creds() # refresh creds directory = directory or self.spec.get("directory", "") base_url = ( f"https://storage.googleapis.com/storage/v1/b/{self.spec['bucket']}/o" ) headers = {"Authorization": f"Bearer {self.credentials}"} - params = ( - {"prefix": directory, "maxResults": MAX_OBJECTS_PER_QUERY} - if directory - else {} - ) + params = {"prefix": directory, "maxResults": MAX_OBJECTS_PER_QUERY} items = [] while True: @@ -307,7 +310,7 @@ def list_files( break else: self.logger.error(f"List files returned {response.status_code}") - raise RemoteTransferError(response) + self.logger.error(f"Remote files not found: {response}") filenames = [ item["name"] diff --git a/src/opentaskpy/addons/gcp/remotehandlers/creds.py b/src/opentaskpy/addons/gcp/remotehandlers/creds.py index 35d58f0..682b873 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/creds.py +++ b/src/opentaskpy/addons/gcp/remotehandlers/creds.py @@ -1,5 +1,7 @@ """GCP helper functions.""" +from datetime import datetime, timedelta + import opentaskpy.otflogging from google.auth.transport.requests import Request from google.oauth2 import credentials, service_account @@ -15,14 +17,21 @@ def get_access_token(credentials_: dict) -> credentials: logger = opentaskpy.otflogging.init_logging(__name__, None, None) try: # Get an Access Token - auth_creds = None # Initialising authCreds + # auth_creds = None # Initialising authCreds logger.info("Retrieving credentials") auth_creds = service_account.Credentials.from_service_account_info( credentials_["credentials"], scopes=["https://www.googleapis.com/auth/cloud-platform"], ) - auth_creds.refresh(Request()) # Refreshing access token + # Check if the token needs to be refreshed + if ( + auth_creds.expiry is None + or auth_creds.expired + or auth_creds.expiry > datetime.now() - timedelta(minutes=5) + ): + logger.info("Refreshing access token") + auth_creds.refresh(Request()) # Refreshing access token if not auth_creds.token: # Handle exception logger.error("Error Retrieving credentials.") diff --git a/tests/test_download.py b/tests/test_download.py index a18f9e1..b05f032 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -11,17 +11,13 @@ # Set the log level to maximum os.environ["OTF_LOG_LEVEL"] = "DEBUG" +current_dir = os.path.dirname(os.path.realpath(__file__)) +load_dotenv(dotenv_path=f"{current_dir}/../.env") bucket_source_root_definition = { "bucket": "bucket-test-gcpupload", - "directory": "postcopy", - "postCopyAction": { - "action": "rename", - "sub": "LOCAL", - "pattern": "LOCALh", - "destination": "postcopy1", - }, - "fileExpression": "**.txt", + "directory": "localnested", + "fileRegex": ".*\\.txt", "protocol": { "name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer", "credentials": {}, @@ -30,7 +26,7 @@ bucket_source_nested_definition = { "bucket": "bucket-test-gcpupload", "directory": "localnested", - "fileExpression": "**.txt", + "fileRegex": ".*\\.txt", "protocol": { "name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer", "credentials": {}, @@ -39,14 +35,14 @@ bucket_source_nested_regex_definition = { "bucket": "bucket-test-gcpupload", "directory": "localnested", - "fileExpression": "root1dir.txt", + "fileRegex": ".*\\.txt", "protocol": { "name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer", "credentials": {}, }, } bucket_local_definition = { - "directory": "src/tmp/downloaded", + "directory": current_dir, "protocol": {"name": "local"}, } @@ -61,7 +57,7 @@ def gcp_creds(): current_dir = os.path.dirname(os.path.realpath(__file__)) load_dotenv(dotenv_path=f"{current_dir}/../.env") - with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f: + with open(f"{current_dir}/creds.json", "r") as f: keyR = f.read() return json.loads(keyR) @@ -87,6 +83,11 @@ def test_gcp_nested_to_local_transfer(gcp_creds): "destination": [deepcopy(bucket_local_definition)], } task_definition["source"]["protocol"]["credentials"] = gcp_creds + task_definition["source"]["fileWatch"] = { + "timeout": 20, + "directory": "localnested", + "fileRegex": ".*\\.txt", + } transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) @@ -100,6 +101,9 @@ def test_gcp_nested_regex_to_local_transfer(gcp_creds): "destination": [deepcopy(bucket_local_definition)], } task_definition["source"]["protocol"]["credentials"] = gcp_creds + task_definition["source"]["fileWatch"] = { + "timeout": 20, + } transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) @@ -126,9 +130,8 @@ def test_gcp_file_watch(gcp_creds): "destination": [deepcopy(bucket_local_definition)], } task_definition["source"]["fileWatch"] = { - "timeout": 300, - "directory": "localnested", - "fileRegex": ".*\\.txt", + "timeout": 20, + "fileRegex": ".*\\.tx$", } task_definition["source"]["protocol"]["credentials"] = gcp_creds transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) diff --git a/tests/test_local_to_root.py b/tests/test_local_to_root.py index f180ab4..0dde6e3 100644 --- a/tests/test_local_to_root.py +++ b/tests/test_local_to_root.py @@ -11,6 +11,8 @@ # Set the log level to maximum os.environ["OTF_LOG_LEVEL"] = "DEBUG" +current_dir = os.path.dirname(os.path.realpath(__file__)) +load_dotenv(dotenv_path=f"{current_dir}/../.env") bucket_destination_definition = { @@ -33,7 +35,7 @@ def gcp_creds(): current_dir = os.path.dirname(os.path.realpath(__file__)) load_dotenv(dotenv_path=f"{current_dir}/../.env") - with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f: + with open(f"{current_dir}/creds.json", "r") as f: keyR = f.read() return json.loads(keyR) @@ -44,7 +46,7 @@ def test_local_to_gcp_transfer(gcp_creds): task_definition = { "type": "transfer", "source": { - "directory": "src/tmp", + "directory": f"{current_dir}", "fileRegex": ".*\\.txt", "protocol": {"name": "local"}, }, diff --git a/tests/test_upload_nested.py b/tests/test_upload_nested.py index 5bf3fb1..d702627 100644 --- a/tests/test_upload_nested.py +++ b/tests/test_upload_nested.py @@ -11,6 +11,8 @@ # Set the log level to maximum os.environ["OTF_LOG_LEVEL"] = "DEBUG" +current_dir = os.path.dirname(os.path.realpath(__file__)) +load_dotenv(dotenv_path=f"{current_dir}/../.env") bucket_nested_destination_definition = { "bucket": "bucket-test-gcpupload", @@ -32,7 +34,7 @@ def gcp_creds(): current_dir = os.path.dirname(os.path.realpath(__file__)) load_dotenv(dotenv_path=f"{current_dir}/../.env") - with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f: + with open(f"{current_dir}/creds.json", "r") as f: keyR = f.read() return json.loads(keyR) @@ -42,7 +44,7 @@ def test_local_nested_to_gcp_transfer(gcp_creds): task_definition = { "type": "transfer", "source": { - "directory": "src/tmp/nested", + "directory": f"{current_dir}", "fileRegex": ".*\\.txt", "protocol": {"name": "local"}, }, From 5ef8dcd08d1f5c34f52e8f40e40957a939e7534f Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:56:09 +0000 Subject: [PATCH 19/21] bump version v24.32.1 -> v24.36.0 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0a5a4eb..cac57c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "otf-addons-gcp" -version = "v24.32.1" +version = "v24.36.0" authors = [{ name = "Aldo Troiano", email = "a.troiano@reply.com" }] license = { text = "GPLv3" } classifiers = [ @@ -51,7 +51,7 @@ dev = [ profile = 'black' [tool.bumpver] -current_version = "v24.32.1" +current_version = "v24.36.0" version_pattern = "vYY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true From 6312cdf1239bea1b7b99054f5d54544bc40377d8 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:58:23 +0000 Subject: [PATCH 20/21] update CHANGELOG --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8ef1de..912efef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v24.36.0 + +- Handle token refresh +- Check fileRegex +- Handle fileWatch + ## v24.32.1 - Minor fix to accept file_pattern arg in list_files method. From 8841d15b6807db3bf6f2e429f968619d3f6a0941 Mon Sep 17 00:00:00 2001 From: simal00 <142039176+simal00@users.noreply.github.com> Date: Fri, 6 Sep 2024 16:23:00 +0000 Subject: [PATCH 21/21] remove unnecessary comment --- README.md | 2 +- src/opentaskpy/addons/gcp/remotehandlers/creds.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/README.md b/README.md index f4739af..f8b2ffd 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,6 @@ JSON configs for transfers can be defined as follows: "pattern" : "(? credentials: logger = opentaskpy.otflogging.init_logging(__name__, None, None) try: # Get an Access Token - # auth_creds = None # Initialising authCreds - logger.info("Retrieving credentials") auth_creds = service_account.Credentials.from_service_account_info( credentials_["credentials"],