diff --git a/CHANGELOG.md b/CHANGELOG.md index d8ef1de..912efef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v24.36.0 + +- Handle token refresh +- Check fileRegex +- Handle fileWatch + ## v24.32.1 - Minor fix to accept file_pattern arg in list_files method. diff --git a/README.md b/README.md index 925299f..f8b2ffd 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,6 @@ JSON configs for transfers can be defined as follows: "pattern" : "(? {new_version}" commit = true diff --git a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py index 8fcbfaf..01b7e3c 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/bucket.py +++ b/src/opentaskpy/addons/gcp/remotehandlers/bucket.py @@ -5,7 +5,6 @@ import opentaskpy.otflogging import requests -from opentaskpy.exceptions import RemoteTransferError from opentaskpy.remotehandlers.remotehandler import RemoteTransferHandler from .creds import get_access_token @@ -33,6 +32,10 @@ def __init__(self, spec: dict): # Generating Access Token for Transfer self.credentials = get_access_token(self.spec["protocol"]) + def validate_or_refresh_creds(self) -> None: + """Ensure the credentials are valid, refresh if necessary.""" + self.credentials = get_access_token(self.spec["protocol"]) + def supports_direct_transfer(self) -> bool: """Return False, as all files should go via the worker.""" return False @@ -51,6 +54,7 @@ def handle_post_copy_action(self, files: list[str]) -> int: or self.spec["postCopyAction"]["action"] == "rename" ): try: + self.validate_or_refresh_creds() # refresh creds # Append a directory if one is defined for file in files: @@ -138,6 +142,7 @@ def push_files_from_worker( int: 0 if successful, 1 if not. """ try: + self.validate_or_refresh_creds() # refresh creds if file_list: files = list(file_list.keys()) else: @@ -212,6 +217,7 @@ def pull_files_to_worker( result = 0 self.logger.info("Downloading file from GCP.") try: + self.validate_or_refresh_creds() # refresh creds for file in files: self.logger.info(file) file = file.replace( @@ -278,17 +284,14 @@ def list_files( """ self.logger.info("Listing Files in Bucket.") try: + self.validate_or_refresh_creds() # refresh creds directory = directory or self.spec.get("directory", "") base_url = ( f"https://storage.googleapis.com/storage/v1/b/{self.spec['bucket']}/o" ) headers = {"Authorization": f"Bearer {self.credentials}"} - params = ( - {"prefix": directory, "maxResults": MAX_OBJECTS_PER_QUERY} - if directory - else {} - ) + params = {"prefix": directory, "maxResults": MAX_OBJECTS_PER_QUERY} items = [] while True: @@ -307,7 +310,7 @@ def list_files( break else: self.logger.error(f"List files returned {response.status_code}") - raise RemoteTransferError(response) + self.logger.error(f"Remote files not found: {response}") filenames = [ item["name"] diff --git a/src/opentaskpy/addons/gcp/remotehandlers/creds.py b/src/opentaskpy/addons/gcp/remotehandlers/creds.py index 35d58f0..2de3375 100644 --- a/src/opentaskpy/addons/gcp/remotehandlers/creds.py +++ b/src/opentaskpy/addons/gcp/remotehandlers/creds.py @@ -1,5 +1,7 @@ """GCP helper functions.""" +from datetime import datetime, timedelta + import opentaskpy.otflogging from google.auth.transport.requests import Request from google.oauth2 import credentials, service_account @@ -15,14 +17,19 @@ def get_access_token(credentials_: dict) -> credentials: logger = opentaskpy.otflogging.init_logging(__name__, None, None) try: # Get an Access Token - auth_creds = None # Initialising authCreds - logger.info("Retrieving credentials") auth_creds = service_account.Credentials.from_service_account_info( credentials_["credentials"], scopes=["https://www.googleapis.com/auth/cloud-platform"], ) - auth_creds.refresh(Request()) # Refreshing access token + # Check if the token needs to be refreshed + if ( + auth_creds.expiry is None + or auth_creds.expired + or auth_creds.expiry > datetime.now() - timedelta(minutes=5) + ): + logger.info("Refreshing access token") + auth_creds.refresh(Request()) # Refreshing access token if not auth_creds.token: # Handle exception logger.error("Error Retrieving credentials.") diff --git a/tests/test_download.py b/tests/test_download.py index a18f9e1..b05f032 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -11,17 +11,13 @@ # Set the log level to maximum os.environ["OTF_LOG_LEVEL"] = "DEBUG" +current_dir = os.path.dirname(os.path.realpath(__file__)) +load_dotenv(dotenv_path=f"{current_dir}/../.env") bucket_source_root_definition = { "bucket": "bucket-test-gcpupload", - "directory": "postcopy", - "postCopyAction": { - "action": "rename", - "sub": "LOCAL", - "pattern": "LOCALh", - "destination": "postcopy1", - }, - "fileExpression": "**.txt", + "directory": "localnested", + "fileRegex": ".*\\.txt", "protocol": { "name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer", "credentials": {}, @@ -30,7 +26,7 @@ bucket_source_nested_definition = { "bucket": "bucket-test-gcpupload", "directory": "localnested", - "fileExpression": "**.txt", + "fileRegex": ".*\\.txt", "protocol": { "name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer", "credentials": {}, @@ -39,14 +35,14 @@ bucket_source_nested_regex_definition = { "bucket": "bucket-test-gcpupload", "directory": "localnested", - "fileExpression": "root1dir.txt", + "fileRegex": ".*\\.txt", "protocol": { "name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer", "credentials": {}, }, } bucket_local_definition = { - "directory": "src/tmp/downloaded", + "directory": current_dir, "protocol": {"name": "local"}, } @@ -61,7 +57,7 @@ def gcp_creds(): current_dir = os.path.dirname(os.path.realpath(__file__)) load_dotenv(dotenv_path=f"{current_dir}/../.env") - with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f: + with open(f"{current_dir}/creds.json", "r") as f: keyR = f.read() return json.loads(keyR) @@ -87,6 +83,11 @@ def test_gcp_nested_to_local_transfer(gcp_creds): "destination": [deepcopy(bucket_local_definition)], } task_definition["source"]["protocol"]["credentials"] = gcp_creds + task_definition["source"]["fileWatch"] = { + "timeout": 20, + "directory": "localnested", + "fileRegex": ".*\\.txt", + } transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) @@ -100,6 +101,9 @@ def test_gcp_nested_regex_to_local_transfer(gcp_creds): "destination": [deepcopy(bucket_local_definition)], } task_definition["source"]["protocol"]["credentials"] = gcp_creds + task_definition["source"]["fileWatch"] = { + "timeout": 20, + } transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) @@ -126,9 +130,8 @@ def test_gcp_file_watch(gcp_creds): "destination": [deepcopy(bucket_local_definition)], } task_definition["source"]["fileWatch"] = { - "timeout": 300, - "directory": "localnested", - "fileRegex": ".*\\.txt", + "timeout": 20, + "fileRegex": ".*\\.tx$", } task_definition["source"]["protocol"]["credentials"] = gcp_creds transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition) diff --git a/tests/test_local_to_root.py b/tests/test_local_to_root.py index f180ab4..0dde6e3 100644 --- a/tests/test_local_to_root.py +++ b/tests/test_local_to_root.py @@ -11,6 +11,8 @@ # Set the log level to maximum os.environ["OTF_LOG_LEVEL"] = "DEBUG" +current_dir = os.path.dirname(os.path.realpath(__file__)) +load_dotenv(dotenv_path=f"{current_dir}/../.env") bucket_destination_definition = { @@ -33,7 +35,7 @@ def gcp_creds(): current_dir = os.path.dirname(os.path.realpath(__file__)) load_dotenv(dotenv_path=f"{current_dir}/../.env") - with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f: + with open(f"{current_dir}/creds.json", "r") as f: keyR = f.read() return json.loads(keyR) @@ -44,7 +46,7 @@ def test_local_to_gcp_transfer(gcp_creds): task_definition = { "type": "transfer", "source": { - "directory": "src/tmp", + "directory": f"{current_dir}", "fileRegex": ".*\\.txt", "protocol": {"name": "local"}, }, diff --git a/tests/test_upload_nested.py b/tests/test_upload_nested.py index 5bf3fb1..d702627 100644 --- a/tests/test_upload_nested.py +++ b/tests/test_upload_nested.py @@ -11,6 +11,8 @@ # Set the log level to maximum os.environ["OTF_LOG_LEVEL"] = "DEBUG" +current_dir = os.path.dirname(os.path.realpath(__file__)) +load_dotenv(dotenv_path=f"{current_dir}/../.env") bucket_nested_destination_definition = { "bucket": "bucket-test-gcpupload", @@ -32,7 +34,7 @@ def gcp_creds(): current_dir = os.path.dirname(os.path.realpath(__file__)) load_dotenv(dotenv_path=f"{current_dir}/../.env") - with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f: + with open(f"{current_dir}/creds.json", "r") as f: keyR = f.read() return json.loads(keyR) @@ -42,7 +44,7 @@ def test_local_nested_to_gcp_transfer(gcp_creds): task_definition = { "type": "transfer", "source": { - "directory": "src/tmp/nested", + "directory": f"{current_dir}", "fileRegex": ".*\\.txt", "protocol": {"name": "local"}, },