diff --git a/CHANGELOG.md b/CHANGELOG.md index 76e0b5e..c9401c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## v24.19.0 + +- Removed a stray `break` in S3 file listing that was preventing fetching more than the first 1000 records +- Added new feature to allow moving a file from the source S3 bucket into another destination as a post copy action by referencing the full `s3://` path. + ## v24.16.0 - Added a wider window for refreshing token. If it's within 60 seconds of expiry when checking we will refresh it, so handle instances where there's a delay renewing. diff --git a/README.md b/README.md index 2ec32ef..3232542 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ As part of the upload, the `bucket-owner-full-control` ACL flag is applied to al - Plain file watch - File watch/transfer with file size and age constraints -- `move` & `delete` post copy actions +- `move`, `rename` & `delete` post copy actions - Touching empty files after transfer. e.g. `.fin` files used as completion flags - Touching empty files as an execution diff --git a/pyproject.toml b/pyproject.toml index 988d215..6c8fee2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "otf-addons-aws" -version = "v24.16.0" +version = "v24.19.0" authors = [{ name = "Adam McDonagh", email = "adam@elitemonkey.net" }] license = { text = "GPLv3" } classifiers = [ @@ -53,7 +53,7 @@ dev = [ profile = 'black' [tool.bumpver] -current_version = "v24.16.0" +current_version = "v24.19.0" version_pattern = "vYY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true diff --git a/src/opentaskpy/addons/aws/remotehandlers/s3.py b/src/opentaskpy/addons/aws/remotehandlers/s3.py index 6375a41..8103385 100644 --- a/src/opentaskpy/addons/aws/remotehandlers/s3.py +++ b/src/opentaskpy/addons/aws/remotehandlers/s3.py @@ -136,10 +136,13 @@ def handle_post_copy_action(self, files: list[str]) -> int: or self.spec["postCopyAction"]["action"] == "rename" ): for file in files: + source_bucket = self.spec["bucket"] + dest_bucket = self.spec["bucket"] new_file = ( f"{self.spec['postCopyAction']['destination']}{file.split('/')[-1]}" ) if self.spec["postCopyAction"]["action"] == "rename": + # Use the pattern and sub values to rename the file correctly new_file = re.sub( self.spec["postCopyAction"]["pattern"], @@ -147,12 +150,25 @@ def handle_post_copy_action(self, files: list[str]) -> int: new_file, ) - self.logger.info(f'"Moving" file from {file} to {new_file}') + else: + # Check if the destination starts with s3://, if so, then we are also moving bucket + if self.spec["postCopyAction"]["destination"].startswith("s3://"): + dest_bucket = self.spec["postCopyAction"]["destination"].split( + "/" + )[2] + new_file = ( + self.spec["postCopyAction"]["destination"].split("/", 3)[3] + + file.split("/")[-1] + ) + + self.logger.info( + f'"Moving" file from s3://{source_bucket}/{file} to s3://{dest_bucket}/{new_file}' + ) self.s3_client.copy_object( - Bucket=self.spec["bucket"], + Bucket=dest_bucket, CopySource={ - "Bucket": self.spec["bucket"], + "Bucket": source_bucket, "Key": file, }, Key=new_file, @@ -160,7 +176,7 @@ def handle_post_copy_action(self, files: list[str]) -> int: # Check that the copy worked try: - self.s3_client.head_object(Bucket=self.spec["bucket"], Key=new_file) + self.s3_client.head_object(Bucket=dest_bucket, Key=new_file) except Exception as e: # Print the exception message self.logger.error(e) @@ -168,7 +184,7 @@ def handle_post_copy_action(self, files: list[str]) -> int: return 1 response = self.s3_client.delete_objects( - Bucket=self.spec["bucket"], + Bucket=source_bucket, Delete={ "Objects": [{"Key": file}], "Quiet": True, @@ -182,7 +198,7 @@ def handle_post_copy_action(self, files: list[str]) -> int: # Check that the delete worked try: response = self.s3_client.head_object( - Bucket=self.spec["bucket"], Key=file + Bucket=source_bucket, Key=file ) self.logger.error(response) self.logger.error(f"Failed to delete file: {file}") @@ -272,7 +288,8 @@ def list_files( kwargs["ContinuationToken"] = response["NextContinuationToken"] except KeyError: break - break + else: + break except Exception as e: # pylint: disable=broad-exception-caught self.logger.error(f"Error listing files: {self.spec['bucket']}") self.logger.exception(e) diff --git a/src/opentaskpy/addons/aws/remotehandlers/schemas/transfer/s3_source/postCopyAction.json b/src/opentaskpy/addons/aws/remotehandlers/schemas/transfer/s3_source/postCopyAction.json index b5ab498..d0432a0 100644 --- a/src/opentaskpy/addons/aws/remotehandlers/schemas/transfer/s3_source/postCopyAction.json +++ b/src/opentaskpy/addons/aws/remotehandlers/schemas/transfer/s3_source/postCopyAction.json @@ -63,6 +63,49 @@ "required": ["sub", "pattern"] } } + }, + { + "if": { + "properties": { + "action": { + "const": "move" + }, + "destination": { + "pattern": "^s3://" + } + } + }, + "then": { + "allOf": [ + { + "properties": { + "destination": { + "pattern": "^s3://.+/$" + } + } + } + ] + } + }, + { + "if": { + "properties": { + "action": { + "const": "move" + } + } + }, + "then": { + "allOf": [ + { + "properties": { + "destination": { + "pattern": "/$" + } + } + } + ] + } } ], "additionalProperties": false diff --git a/tests/test_remotehandler_s3_transfer.py b/tests/test_remotehandler_s3_transfer.py index 167660f..7b9694b 100644 --- a/tests/test_remotehandler_s3_transfer.py +++ b/tests/test_remotehandler_s3_transfer.py @@ -46,6 +46,21 @@ }, } +s3_file_watch_pagination_task_definition = { + "type": "transfer", + "source": { + "bucket": BUCKET_NAME, + "directory": "src", + "fileRegex": ".*-xxxx\\.txt", + "protocol": { + "name": "opentaskpy.addons.aws.remotehandlers.s3.S3Transfer", + }, + "fileWatch": { + "timeout": 10, + }, + }, +} + s3_age_conditions_task_definition = { "type": "transfer", @@ -259,6 +274,31 @@ ], } +s3_to_s3_pca_move_new_bucket_task_definition = { + "type": "transfer", + "source": { + "bucket": BUCKET_NAME, + "directory": "src", + "fileRegex": "pca-move\\.txt", + "postCopyAction": { + "action": "move", + "destination": f"s3://{BUCKET_NAME_2}/PCA/", + }, + "protocol": { + "name": "opentaskpy.addons.aws.remotehandlers.s3.S3Transfer", + }, + }, + "destination": [ + { + "bucket": BUCKET_NAME_2, + "directory": "dest", + "protocol": { + "name": "opentaskpy.addons.aws.remotehandlers.s3.S3Transfer", + }, + }, + ], +} + s3_to_s3_assume_role_task_definition = { "type": "transfer", "source": { @@ -900,6 +940,55 @@ def test_s3_to_s3_copy_pca_move(setup_bucket, tmp_path, s3_client): assert objects["Contents"][0]["Key"] == "src/archive/pca-move.txt" +def test_s3_to_s3_copy_pca_move_new_bucket(setup_bucket, tmp_path, s3_client): + transfer_obj = transfer.Transfer( + None, + "s3-to-s3-pca-move-new-bucket", + s3_to_s3_pca_move_new_bucket_task_definition, + ) + + # Write a test file locally + + fs.create_files([{f"{tmp_path}/pca-move.txt": {"content": "test1234"}}]) + create_s3_file(s3_client, f"{tmp_path}/pca-move.txt", "src/pca-move.txt") + + assert transfer_obj.run() + + # Check that the file is in the destination bucket (as well as the moved file) + objects = s3_client.list_objects(Bucket=BUCKET_NAME_2) + assert len(objects["Contents"]) == 2 + assert any(obj["Key"] == "dest/pca-move.txt" for obj in objects["Contents"]) + + # Check that the file has been moved to the new location in the new bucket + objects = s3_client.list_objects(Bucket=BUCKET_NAME_2) + assert len(objects["Contents"]) == 2 + # Check there's a key named "PCA/pca-move.txt" + assert any(obj["Key"] == "PCA/pca-move.txt" for obj in objects["Contents"]) + + # Try again but change the post copy move destination to the root of the bucket instead + s3_to_s3_pca_move_new_bucket_task_definition_copy = deepcopy( + s3_to_s3_pca_move_new_bucket_task_definition + ) + s3_to_s3_pca_move_new_bucket_task_definition_copy["source"]["postCopyAction"][ + "destination" + ] = f"s3://{BUCKET_NAME_2}/" + + fs.create_files([{f"{tmp_path}/pca-move.txt": {"content": "test1234"}}]) + create_s3_file(s3_client, f"{tmp_path}/pca-move.txt", "src/pca-move.txt") + + transfer_obj = transfer.Transfer( + None, + "s3-to-s3-pca-move-new-bucket-2", + s3_to_s3_pca_move_new_bucket_task_definition_copy, + ) + + assert transfer_obj.run() + + # Check the file exists in the root of the bucket + objects = s3_client.list_objects(Bucket=BUCKET_NAME_2) + assert any(obj["Key"] == "pca-move.txt" for obj in objects["Contents"]) + + def test_s3_to_s3_copy_pca_rename(setup_bucket, tmp_path, s3_client): transfer_obj = transfer.Transfer( None, "s3-to-s3-pca-rename", s3_to_s3_pca_rename_task_definition @@ -991,6 +1080,31 @@ def test_s3_file_watch_custom_creds( assert transfer_obj.run() +def test_s3_file_watch_pagination(s3_client, setup_bucket, tmp_path): + transfer_obj = transfer.Transfer( + None, "s3-file-watch-pagination", s3_file_watch_pagination_task_definition + ) + + # Create a file to watch for with the current date + datestamp = datetime.datetime.now().strftime("%Y%m%d") + + # Write 1010 files locally + for i in range(1010): + fs.create_files([{f"{tmp_path}/{datestamp}-{i}.txt": {"content": "test1234"}}]) + create_s3_file( + s3_client, f"{tmp_path}/{datestamp}-{i}.txt", f"src/{datestamp}-{i}.txt" + ) + + # Now write another + fs.create_files([{f"{tmp_path}/{datestamp}-xxxx.txt": {"content": "test1234"}}]) + create_s3_file( + s3_client, f"{tmp_path}/{datestamp}-xxxx.txt", f"src/{datestamp}-xxxx.txt" + ) + + # File should be found + assert transfer_obj.run() + + def create_s3_file(s3_client, local_file, object_key): s3_client.put_object( Bucket=BUCKET_NAME, diff --git a/tests/test_s3_source_schema_validate.py b/tests/test_s3_source_schema_validate.py index 1453af8..ae937ae 100644 --- a/tests/test_s3_source_schema_validate.py +++ b/tests/test_s3_source_schema_validate.py @@ -135,6 +135,33 @@ def test_s3_source_file_watch(valid_transfer): assert validate_transfer_json(json_data) +def test_s3_post_copy_action(valid_transfer): + + json_data = { + "type": "transfer", + "source": valid_transfer, + } + + json_data["source"]["postCopyAction"] = { + "action": "move", + "destination": "s3://test-bucket/dest", + } + + assert not validate_transfer_json(json_data) + + json_data["source"]["postCopyAction"]["destination"] = "s3://test-bucket/dest/" + assert validate_transfer_json(json_data) + + json_data["source"]["postCopyAction"]["destination"] = "s3://" + assert not validate_transfer_json(json_data) + + json_data["source"]["postCopyAction"]["destination"] = "archive/" + assert validate_transfer_json(json_data) + + json_data["source"]["postCopyAction"]["destination"] = "/" + assert validate_transfer_json(json_data) + + def test_s3_destination(valid_transfer, valid_destination): json_data = { "type": "transfer",