Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Token refresh, fileWatch and fileRegex corrections. #9

Merged
merged 24 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v24.36.0

- Handle token refresh
- Check fileRegex
- Handle fileWatch

## v24.32.1

- Minor fix to accept file_pattern arg in list_files method.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ JSON configs for transfers can be defined as follows:
"pattern" : "(?<![^ ])(?=[^ ])(?!ab)", ## Regex for prefixing
"sub" : "Archived_"
},
"fileRegex": "**.txt" ## Global Expression syntax (only download .txt)
"fileRegex": ".*//.txt$" ## accepts re module matching
}
```
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "otf-addons-gcp"
version = "v24.32.1"
version = "v24.36.0"
authors = [{ name = "Aldo Troiano", email = "a.troiano@reply.com" }]
license = { text = "GPLv3" }
classifiers = [
Expand Down Expand Up @@ -51,7 +51,7 @@ dev = [
profile = 'black'

[tool.bumpver]
current_version = "v24.32.1"
current_version = "v24.36.0"
version_pattern = "vYY.WW.PATCH[-TAG]"
commit_message = "bump version {old_version} -> {new_version}"
commit = true
Expand Down
17 changes: 10 additions & 7 deletions src/opentaskpy/addons/gcp/remotehandlers/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import opentaskpy.otflogging
import requests
from opentaskpy.exceptions import RemoteTransferError
from opentaskpy.remotehandlers.remotehandler import RemoteTransferHandler

from .creds import get_access_token
Expand Down Expand Up @@ -33,6 +32,10 @@ def __init__(self, spec: dict):
# Generating Access Token for Transfer
self.credentials = get_access_token(self.spec["protocol"])

def validate_or_refresh_creds(self) -> None:
"""Ensure the credentials are valid, refresh if necessary."""
self.credentials = get_access_token(self.spec["protocol"])

def supports_direct_transfer(self) -> bool:
"""Return False, as all files should go via the worker."""
return False
Expand All @@ -51,6 +54,7 @@ def handle_post_copy_action(self, files: list[str]) -> int:
or self.spec["postCopyAction"]["action"] == "rename"
):
try:
self.validate_or_refresh_creds() # refresh creds
# Append a directory if one is defined

for file in files:
Expand Down Expand Up @@ -138,6 +142,7 @@ def push_files_from_worker(
int: 0 if successful, 1 if not.
"""
try:
self.validate_or_refresh_creds() # refresh creds
if file_list:
files = list(file_list.keys())
else:
Expand Down Expand Up @@ -212,6 +217,7 @@ def pull_files_to_worker(
result = 0
self.logger.info("Downloading file from GCP.")
try:
self.validate_or_refresh_creds() # refresh creds
for file in files:
self.logger.info(file)
file = file.replace(
Expand Down Expand Up @@ -278,17 +284,14 @@ def list_files(
"""
self.logger.info("Listing Files in Bucket.")
try:
self.validate_or_refresh_creds() # refresh creds
directory = directory or self.spec.get("directory", "")

base_url = (
f"https://storage.googleapis.com/storage/v1/b/{self.spec['bucket']}/o"
)
headers = {"Authorization": f"Bearer {self.credentials}"}
params = (
{"prefix": directory, "maxResults": MAX_OBJECTS_PER_QUERY}
if directory
else {}
)
params = {"prefix": directory, "maxResults": MAX_OBJECTS_PER_QUERY}
items = []

while True:
Expand All @@ -307,7 +310,7 @@ def list_files(
break
else:
self.logger.error(f"List files returned {response.status_code}")
raise RemoteTransferError(response)
self.logger.error(f"Remote files not found: {response}")

filenames = [
item["name"]
Expand Down
13 changes: 10 additions & 3 deletions src/opentaskpy/addons/gcp/remotehandlers/creds.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""GCP helper functions."""

from datetime import datetime, timedelta

import opentaskpy.otflogging
from google.auth.transport.requests import Request
from google.oauth2 import credentials, service_account
Expand All @@ -15,14 +17,19 @@ def get_access_token(credentials_: dict) -> credentials:
logger = opentaskpy.otflogging.init_logging(__name__, None, None)
try:
# Get an Access Token
auth_creds = None # Initialising authCreds

logger.info("Retrieving credentials")
auth_creds = service_account.Credentials.from_service_account_info(
credentials_["credentials"],
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
auth_creds.refresh(Request()) # Refreshing access token
# Check if the token needs to be refreshed
if (
auth_creds.expiry is None
or auth_creds.expired
or auth_creds.expiry > datetime.now() - timedelta(minutes=5)
):
logger.info("Refreshing access token")
auth_creds.refresh(Request()) # Refreshing access token

if not auth_creds.token: # Handle exception
logger.error("Error Retrieving credentials.")
Expand Down
33 changes: 18 additions & 15 deletions tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@

# Set the log level to maximum
os.environ["OTF_LOG_LEVEL"] = "DEBUG"
current_dir = os.path.dirname(os.path.realpath(__file__))
load_dotenv(dotenv_path=f"{current_dir}/../.env")

bucket_source_root_definition = {
"bucket": "bucket-test-gcpupload",
"directory": "postcopy",
"postCopyAction": {
"action": "rename",
"sub": "LOCAL",
"pattern": "LOCALh",
"destination": "postcopy1",
},
"fileExpression": "**.txt",
"directory": "localnested",
"fileRegex": ".*\\.txt",
"protocol": {
"name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer",
"credentials": {},
Expand All @@ -30,7 +26,7 @@
bucket_source_nested_definition = {
"bucket": "bucket-test-gcpupload",
"directory": "localnested",
"fileExpression": "**.txt",
"fileRegex": ".*\\.txt",
"protocol": {
"name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer",
"credentials": {},
Expand All @@ -39,14 +35,14 @@
bucket_source_nested_regex_definition = {
"bucket": "bucket-test-gcpupload",
"directory": "localnested",
"fileExpression": "root1dir.txt",
"fileRegex": ".*\\.txt",
"protocol": {
"name": "opentaskpy.addons.gcp.remotehandlers.bucket.BucketTransfer",
"credentials": {},
},
}
bucket_local_definition = {
"directory": "src/tmp/downloaded",
"directory": current_dir,
"protocol": {"name": "local"},
}

Expand All @@ -61,7 +57,7 @@ def gcp_creds():
current_dir = os.path.dirname(os.path.realpath(__file__))
load_dotenv(dotenv_path=f"{current_dir}/../.env")

with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f:
with open(f"{current_dir}/creds.json", "r") as f:
keyR = f.read()

return json.loads(keyR)
Expand All @@ -87,6 +83,11 @@ def test_gcp_nested_to_local_transfer(gcp_creds):
"destination": [deepcopy(bucket_local_definition)],
}
task_definition["source"]["protocol"]["credentials"] = gcp_creds
task_definition["source"]["fileWatch"] = {
"timeout": 20,
"directory": "localnested",
"fileRegex": ".*\\.txt",
}

transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition)

Expand All @@ -100,6 +101,9 @@ def test_gcp_nested_regex_to_local_transfer(gcp_creds):
"destination": [deepcopy(bucket_local_definition)],
}
task_definition["source"]["protocol"]["credentials"] = gcp_creds
task_definition["source"]["fileWatch"] = {
"timeout": 20,
}

transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition)

Expand All @@ -126,9 +130,8 @@ def test_gcp_file_watch(gcp_creds):
"destination": [deepcopy(bucket_local_definition)],
}
task_definition["source"]["fileWatch"] = {
"timeout": 300,
"directory": "localnested",
"fileRegex": ".*\\.txt",
"timeout": 20,
"fileRegex": ".*\\.tx$",
}
task_definition["source"]["protocol"]["credentials"] = gcp_creds
transfer_obj = transfer.Transfer(None, "gcp-to-local", task_definition)
Expand Down
6 changes: 4 additions & 2 deletions tests/test_local_to_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

# Set the log level to maximum
os.environ["OTF_LOG_LEVEL"] = "DEBUG"
current_dir = os.path.dirname(os.path.realpath(__file__))
load_dotenv(dotenv_path=f"{current_dir}/../.env")


bucket_destination_definition = {
Expand All @@ -33,7 +35,7 @@ def gcp_creds():
current_dir = os.path.dirname(os.path.realpath(__file__))
load_dotenv(dotenv_path=f"{current_dir}/../.env")

with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f:
with open(f"{current_dir}/creds.json", "r") as f:
keyR = f.read()

return json.loads(keyR)
Expand All @@ -44,7 +46,7 @@ def test_local_to_gcp_transfer(gcp_creds):
task_definition = {
"type": "transfer",
"source": {
"directory": "src/tmp",
"directory": f"{current_dir}",
"fileRegex": ".*\\.txt",
"protocol": {"name": "local"},
},
Expand Down
6 changes: 4 additions & 2 deletions tests/test_upload_nested.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

# Set the log level to maximum
os.environ["OTF_LOG_LEVEL"] = "DEBUG"
current_dir = os.path.dirname(os.path.realpath(__file__))
load_dotenv(dotenv_path=f"{current_dir}/../.env")

bucket_nested_destination_definition = {
"bucket": "bucket-test-gcpupload",
Expand All @@ -32,7 +34,7 @@ def gcp_creds():
current_dir = os.path.dirname(os.path.realpath(__file__))
load_dotenv(dotenv_path=f"{current_dir}/../.env")

with open(f"{current_dir}/testFiles/testprojectglue-2fb4b71447c4.json", "r") as f:
with open(f"{current_dir}/creds.json", "r") as f:
keyR = f.read()

return json.loads(keyR)
Expand All @@ -42,7 +44,7 @@ def test_local_nested_to_gcp_transfer(gcp_creds):
task_definition = {
"type": "transfer",
"source": {
"directory": "src/tmp/nested",
"directory": f"{current_dir}",
"fileRegex": ".*\\.txt",
"protocol": {"name": "local"},
},
Expand Down
Loading