Skip to content

Commit

Permalink
Fix ListObjectsV2 pagination & add new PCA option (#39)
Browse files Browse the repository at this point in the history
* Fixes #38

* Add new PCA feature

* Fix logic for file naming

* bump version v24.16.0 -> v24.19.0
  • Loading branch information
adammcdonagh authored May 13, 2024
1 parent 0c0c044 commit e12f886
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v24.19.0

- Removed a stray `break` in S3 file listing that was preventing fetching more than the first 1000 records
- Added new feature to allow moving a file from the source S3 bucket into another destination as a post copy action by referencing the full `s3://` path.

## v24.16.0

- Added a wider window for refreshing token. If it's within 60 seconds of expiry when checking we will refresh it, so handle instances where there's a delay renewing.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ As part of the upload, the `bucket-owner-full-control` ACL flag is applied to al

- Plain file watch
- File watch/transfer with file size and age constraints
- `move` & `delete` post copy actions
- `move`, `rename` & `delete` post copy actions
- Touching empty files after transfer. e.g. `.fin` files used as completion flags
- Touching empty files as an execution

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "otf-addons-aws"
version = "v24.16.0"
version = "v24.19.0"
authors = [{ name = "Adam McDonagh", email = "adam@elitemonkey.net" }]
license = { text = "GPLv3" }
classifiers = [
Expand Down Expand Up @@ -53,7 +53,7 @@ dev = [
profile = 'black'

[tool.bumpver]
current_version = "v24.16.0"
current_version = "v24.19.0"
version_pattern = "vYY.WW.PATCH[-TAG]"
commit_message = "bump version {old_version} -> {new_version}"
commit = true
Expand Down
31 changes: 24 additions & 7 deletions src/opentaskpy/addons/aws/remotehandlers/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,39 +136,55 @@ def handle_post_copy_action(self, files: list[str]) -> int:
or self.spec["postCopyAction"]["action"] == "rename"
):
for file in files:
source_bucket = self.spec["bucket"]
dest_bucket = self.spec["bucket"]
new_file = (
f"{self.spec['postCopyAction']['destination']}{file.split('/')[-1]}"
)
if self.spec["postCopyAction"]["action"] == "rename":

# Use the pattern and sub values to rename the file correctly
new_file = re.sub(
self.spec["postCopyAction"]["pattern"],
self.spec["postCopyAction"]["sub"],
new_file,
)

self.logger.info(f'"Moving" file from {file} to {new_file}')
else:
# Check if the destination starts with s3://, if so, then we are also moving bucket
if self.spec["postCopyAction"]["destination"].startswith("s3://"):
dest_bucket = self.spec["postCopyAction"]["destination"].split(
"/"
)[2]
new_file = (
self.spec["postCopyAction"]["destination"].split("/", 3)[3]
+ file.split("/")[-1]
)

self.logger.info(
f'"Moving" file from s3://{source_bucket}/{file} to s3://{dest_bucket}/{new_file}'
)

self.s3_client.copy_object(
Bucket=self.spec["bucket"],
Bucket=dest_bucket,
CopySource={
"Bucket": self.spec["bucket"],
"Bucket": source_bucket,
"Key": file,
},
Key=new_file,
)

# Check that the copy worked
try:
self.s3_client.head_object(Bucket=self.spec["bucket"], Key=new_file)
self.s3_client.head_object(Bucket=dest_bucket, Key=new_file)
except Exception as e:
# Print the exception message
self.logger.error(e)
self.logger.error(f"Failed to copy file: {file}")
return 1

response = self.s3_client.delete_objects(
Bucket=self.spec["bucket"],
Bucket=source_bucket,
Delete={
"Objects": [{"Key": file}],
"Quiet": True,
Expand All @@ -182,7 +198,7 @@ def handle_post_copy_action(self, files: list[str]) -> int:
# Check that the delete worked
try:
response = self.s3_client.head_object(
Bucket=self.spec["bucket"], Key=file
Bucket=source_bucket, Key=file
)
self.logger.error(response)
self.logger.error(f"Failed to delete file: {file}")
Expand Down Expand Up @@ -272,7 +288,8 @@ def list_files(
kwargs["ContinuationToken"] = response["NextContinuationToken"]
except KeyError:
break
break
else:
break
except Exception as e: # pylint: disable=broad-exception-caught
self.logger.error(f"Error listing files: {self.spec['bucket']}")
self.logger.exception(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,49 @@
"required": ["sub", "pattern"]
}
}
},
{
"if": {
"properties": {
"action": {
"const": "move"
},
"destination": {
"pattern": "^s3://"
}
}
},
"then": {
"allOf": [
{
"properties": {
"destination": {
"pattern": "^s3://.+/$"
}
}
}
]
}
},
{
"if": {
"properties": {
"action": {
"const": "move"
}
}
},
"then": {
"allOf": [
{
"properties": {
"destination": {
"pattern": "/$"
}
}
}
]
}
}
],
"additionalProperties": false
Expand Down
114 changes: 114 additions & 0 deletions tests/test_remotehandler_s3_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@
},
}

s3_file_watch_pagination_task_definition = {
"type": "transfer",
"source": {
"bucket": BUCKET_NAME,
"directory": "src",
"fileRegex": ".*-xxxx\\.txt",
"protocol": {
"name": "opentaskpy.addons.aws.remotehandlers.s3.S3Transfer",
},
"fileWatch": {
"timeout": 10,
},
},
}


s3_age_conditions_task_definition = {
"type": "transfer",
Expand Down Expand Up @@ -259,6 +274,31 @@
],
}

s3_to_s3_pca_move_new_bucket_task_definition = {
"type": "transfer",
"source": {
"bucket": BUCKET_NAME,
"directory": "src",
"fileRegex": "pca-move\\.txt",
"postCopyAction": {
"action": "move",
"destination": f"s3://{BUCKET_NAME_2}/PCA/",
},
"protocol": {
"name": "opentaskpy.addons.aws.remotehandlers.s3.S3Transfer",
},
},
"destination": [
{
"bucket": BUCKET_NAME_2,
"directory": "dest",
"protocol": {
"name": "opentaskpy.addons.aws.remotehandlers.s3.S3Transfer",
},
},
],
}

s3_to_s3_assume_role_task_definition = {
"type": "transfer",
"source": {
Expand Down Expand Up @@ -900,6 +940,55 @@ def test_s3_to_s3_copy_pca_move(setup_bucket, tmp_path, s3_client):
assert objects["Contents"][0]["Key"] == "src/archive/pca-move.txt"


def test_s3_to_s3_copy_pca_move_new_bucket(setup_bucket, tmp_path, s3_client):
transfer_obj = transfer.Transfer(
None,
"s3-to-s3-pca-move-new-bucket",
s3_to_s3_pca_move_new_bucket_task_definition,
)

# Write a test file locally

fs.create_files([{f"{tmp_path}/pca-move.txt": {"content": "test1234"}}])
create_s3_file(s3_client, f"{tmp_path}/pca-move.txt", "src/pca-move.txt")

assert transfer_obj.run()

# Check that the file is in the destination bucket (as well as the moved file)
objects = s3_client.list_objects(Bucket=BUCKET_NAME_2)
assert len(objects["Contents"]) == 2
assert any(obj["Key"] == "dest/pca-move.txt" for obj in objects["Contents"])

# Check that the file has been moved to the new location in the new bucket
objects = s3_client.list_objects(Bucket=BUCKET_NAME_2)
assert len(objects["Contents"]) == 2
# Check there's a key named "PCA/pca-move.txt"
assert any(obj["Key"] == "PCA/pca-move.txt" for obj in objects["Contents"])

# Try again but change the post copy move destination to the root of the bucket instead
s3_to_s3_pca_move_new_bucket_task_definition_copy = deepcopy(
s3_to_s3_pca_move_new_bucket_task_definition
)
s3_to_s3_pca_move_new_bucket_task_definition_copy["source"]["postCopyAction"][
"destination"
] = f"s3://{BUCKET_NAME_2}/"

fs.create_files([{f"{tmp_path}/pca-move.txt": {"content": "test1234"}}])
create_s3_file(s3_client, f"{tmp_path}/pca-move.txt", "src/pca-move.txt")

transfer_obj = transfer.Transfer(
None,
"s3-to-s3-pca-move-new-bucket-2",
s3_to_s3_pca_move_new_bucket_task_definition_copy,
)

assert transfer_obj.run()

# Check the file exists in the root of the bucket
objects = s3_client.list_objects(Bucket=BUCKET_NAME_2)
assert any(obj["Key"] == "pca-move.txt" for obj in objects["Contents"])


def test_s3_to_s3_copy_pca_rename(setup_bucket, tmp_path, s3_client):
transfer_obj = transfer.Transfer(
None, "s3-to-s3-pca-rename", s3_to_s3_pca_rename_task_definition
Expand Down Expand Up @@ -991,6 +1080,31 @@ def test_s3_file_watch_custom_creds(
assert transfer_obj.run()


def test_s3_file_watch_pagination(s3_client, setup_bucket, tmp_path):
transfer_obj = transfer.Transfer(
None, "s3-file-watch-pagination", s3_file_watch_pagination_task_definition
)

# Create a file to watch for with the current date
datestamp = datetime.datetime.now().strftime("%Y%m%d")

# Write 1010 files locally
for i in range(1010):
fs.create_files([{f"{tmp_path}/{datestamp}-{i}.txt": {"content": "test1234"}}])
create_s3_file(
s3_client, f"{tmp_path}/{datestamp}-{i}.txt", f"src/{datestamp}-{i}.txt"
)

# Now write another
fs.create_files([{f"{tmp_path}/{datestamp}-xxxx.txt": {"content": "test1234"}}])
create_s3_file(
s3_client, f"{tmp_path}/{datestamp}-xxxx.txt", f"src/{datestamp}-xxxx.txt"
)

# File should be found
assert transfer_obj.run()


def create_s3_file(s3_client, local_file, object_key):
s3_client.put_object(
Bucket=BUCKET_NAME,
Expand Down
27 changes: 27 additions & 0 deletions tests/test_s3_source_schema_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,33 @@ def test_s3_source_file_watch(valid_transfer):
assert validate_transfer_json(json_data)


def test_s3_post_copy_action(valid_transfer):

json_data = {
"type": "transfer",
"source": valid_transfer,
}

json_data["source"]["postCopyAction"] = {
"action": "move",
"destination": "s3://test-bucket/dest",
}

assert not validate_transfer_json(json_data)

json_data["source"]["postCopyAction"]["destination"] = "s3://test-bucket/dest/"
assert validate_transfer_json(json_data)

json_data["source"]["postCopyAction"]["destination"] = "s3://"
assert not validate_transfer_json(json_data)

json_data["source"]["postCopyAction"]["destination"] = "archive/"
assert validate_transfer_json(json_data)

json_data["source"]["postCopyAction"]["destination"] = "/"
assert validate_transfer_json(json_data)


def test_s3_destination(valid_transfer, valid_destination):
json_data = {
"type": "transfer",
Expand Down

0 comments on commit e12f886

Please sign in to comment.