diff --git a/README.md b/README.md index 19e0dfbb..b72d40fe 100644 --- a/README.md +++ b/README.md @@ -176,7 +176,9 @@ Full list of options in `config.json`: | temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. | | no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. | | query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. | -| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `{{s3_bucket}}` under the key `/archive/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`. +| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket` under the key `/{archive_load_files_s3_prefix}/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`. +| archive_load_files_s3_prefix | String | | (Default: "archive") When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix. +| archive_load_files_s3_bucket | String | | (Default: Value of `s3_bucket`) When `archive_load_files` is enabled, the archived files will be placed in this bucket. ### To run tests: diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index b77814f5..1ecf7e21 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -491,9 +491,8 @@ def flush_records(stream: str, }) # Use same file name as in import - archive_file = s3_key.split('/')[-1] - archive_folder = "archive/{}/{}".format(archive_tap, archive_table) - archive_key = "{}/{}".format(archive_folder, archive_file) + archive_file = os.path.basename(s3_key) + archive_key = "{}/{}/{}".format(archive_tap, archive_table, archive_file) db_sync.copy_to_archive(s3_key, archive_key, archive_metadata) diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index ea2e1c12..7f116b8a 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -403,9 +403,35 @@ def delete_from_stage(self, stream, s3_key): self.upload_client.delete_object(stream, s3_key) def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata): - """Copy file from snowflake stage to archive""" - self.logger.info('Copying %s to archive location %s', s3_source_key, s3_archive_key) - self.upload_client.copy_object(s3_source_key, s3_archive_key, s3_archive_metadata) + """ + Copy file from snowflake stage to archive. + + s3_source_key: The s3 key to copy, assumed to exist in the bucket configured as 's3_bucket' + + s3_archive_key: The key to use in archive destination. This will be prefixed with the config value + 'archive_load_files_s3_prefix'. If none is specified, 'archive' will be used as the prefix. + + As destination bucket, the config value 'archive_load_files_s3_bucket' will be used. If none is + specified, the bucket configured as 's3_bucket' will be used. + + s3_archive_metadata: This dict will be used as the S3 metadata in the file in archive destination. Metadata in + the source file will be replaced. + + """ + source_bucket = self.connection_config.get('s3_bucket') + + # Get archive s3_bucket from config, or use same bucket if not specified + archive_bucket = self.connection_config.get('archive_load_files_s3_bucket', source_bucket) + + # Determine prefix to use in archive s3 bucket + default_archive_prefix = 'archive' + archive_prefix = self.connection_config.get('archive_load_files_s3_prefix', default_archive_prefix) + prefixed_archive_key = '{}/{}'.format(archive_prefix, s3_archive_key) + + copy_source = '{}/{}'.format(source_bucket, s3_source_key) + + self.logger.info('Copying %s to archive location %s', copy_source, prefixed_archive_key) + self.upload_client.copy_object(copy_source, archive_bucket, prefixed_archive_key, s3_archive_metadata) def get_stage_name(self, stream): """Generate snowflake stage name""" diff --git a/target_snowflake/upload_clients/base_upload_client.py b/target_snowflake/upload_clients/base_upload_client.py index ea7851ce..74dc0d4b 100644 --- a/target_snowflake/upload_clients/base_upload_client.py +++ b/target_snowflake/upload_clients/base_upload_client.py @@ -26,7 +26,7 @@ def delete_object(self, stream: str, key: str) -> None: """ @abstractmethod - def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None: + def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None: """ Copy object """ diff --git a/target_snowflake/upload_clients/s3_upload_client.py b/target_snowflake/upload_clients/s3_upload_client.py index 31cc4062..44fbfb5a 100644 --- a/target_snowflake/upload_clients/s3_upload_client.py +++ b/target_snowflake/upload_clients/s3_upload_client.py @@ -98,13 +98,9 @@ def delete_object(self, stream: str, key: str) -> None: bucket = self.connection_config['s3_bucket'] self.s3_client.delete_object(Bucket=bucket, Key=key) - def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None: + def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None: """Copy object to another location on S3""" - self.logger.info('Copying %s to %s', source_key, target_key) - bucket = self.connection_config['s3_bucket'] - - copy_source = "{}/{}".format(bucket, source_key) - + self.logger.info('Copying %s to %s/%s', copy_source, target_bucket, target_key) # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object - self.s3_client.copy_object(CopySource=copy_source, Bucket=bucket, Key=target_key, Metadata=target_metadata, - MetadataDirective="REPLACE") + self.s3_client.copy_object(CopySource=copy_source, Bucket=target_bucket, Key=target_key, + Metadata=target_metadata, MetadataDirective="REPLACE") diff --git a/target_snowflake/upload_clients/snowflake_upload_client.py b/target_snowflake/upload_clients/snowflake_upload_client.py index d2b55dcb..67d4a9b8 100644 --- a/target_snowflake/upload_clients/snowflake_upload_client.py +++ b/target_snowflake/upload_clients/snowflake_upload_client.py @@ -39,6 +39,6 @@ def delete_object(self, stream: str, key: str) -> None: with self.dblink.open_connection() as connection: connection.cursor().execute(f"REMOVE '@{stage}/{key}'") - def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None: + def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None: raise NotImplementedError( "Copying objects is not supported with a Snowflake upload client.") diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 959b8bf0..d9f7584f 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1180,6 +1180,7 @@ def test_parquet(self): def test_archive_load_files(self): """Test if load file is copied to archive folder""" self.config['archive_load_files'] = True + self.config['archive_load_files_s3_prefix'] = 'archive_folder' self.config['tap_id'] = 'test_tap_id' self.config['client_side_encryption_master_key'] = '' @@ -1187,7 +1188,7 @@ def test_archive_load_files(self): # Delete any dangling files from archive files_in_s3_archive = self.s3_client.list_objects( - Bucket=s3_bucket, Prefix="archive/test_tap_id/").get('Contents', []) + Bucket=s3_bucket, Prefix="archive_folder/test_tap_id/").get('Contents', []) for file_in_archive in files_in_s3_archive: key = file_in_archive["Key"] self.s3_client.delete_object(Bucket=s3_bucket, Key=key) @@ -1196,7 +1197,7 @@ def test_archive_load_files(self): self.persist_lines_with_cache(tap_lines) # Verify expected file metadata in S3 - files_in_s3_archive = self.s3_client.list_objects(Bucket=s3_bucket, Prefix="archive/test_tap_id/").get( + files_in_s3_archive = self.s3_client.list_objects(Bucket=s3_bucket, Prefix="archive_folder/test_tap_id/").get( 'Contents') self.assertIsNotNone(files_in_s3_archive) self.assertEqual(1, len(files_in_s3_archive)) @@ -1229,6 +1230,3 @@ def test_archive_load_files(self): 4,"xyz4","not-formatted-time-4" 5,"xyz5","not-formatted-time-5" ''') - - # Clean up - self.s3_client.delete_object(Bucket=s3_bucket, Key=archived_file_key) diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index 7ba7c296..c1833ab4 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -224,6 +224,43 @@ def test_parallelism(self, query_patch): self.assertEqual(db_sync.DbSync({**minimal_config, **table_stage_with_parallel}).connection_config['parallelism'], 1) + @patch('target_snowflake.upload_clients.s3_upload_client.S3UploadClient.copy_object') + @patch('target_snowflake.db_sync.DbSync.query') + def test_copy_to_archive(self, query_patch, copy_object_patch): + query_patch.return_value = [{'type': 'CSV'}] + minimal_config = { + 'account': "dummy-value", + 'dbname': "dummy-value", + 'user': "dummy-value", + 'password': "dummy-value", + 'warehouse': "dummy-value", + 'default_target_schema': "dummy-value", + 'file_format': "dummy-value", + 's3_bucket': 'dummy-bucket', + 'stage': 'dummy_schema.dummy_stage' + } + + # Assert default values (same bucket, 'archive' as the archive prefix) + s3_config = {} + dbsync = db_sync.DbSync({**minimal_config, **s3_config}) + dbsync.copy_to_archive('source/file', 'tap/schema/file', {'meta': "data"}) + + self.assertEqual(copy_object_patch.call_args[0][0], 'dummy-bucket/source/file') + self.assertEqual(copy_object_patch.call_args[0][1], 'dummy-bucket') + self.assertEqual(copy_object_patch.call_args[0][2], 'archive/tap/schema/file') + + # Assert custom archive bucket and prefix + s3_config = { + 'archive_load_files_s3_bucket': "custom-bucket", + 'archive_load_files_s3_prefix': "custom-prefix" + } + dbsync = db_sync.DbSync({**minimal_config, **s3_config}) + dbsync.copy_to_archive('source/file', 'tap/schema/file', {'meta': "data"}) + + self.assertEqual(copy_object_patch.call_args[0][0], 'dummy-bucket/source/file') + self.assertEqual(copy_object_patch.call_args[0][1], 'custom-bucket') + self.assertEqual(copy_object_patch.call_args[0][2], 'custom-prefix/tap/schema/file') + def test_safe_column_name(self): self.assertEqual(db_sync.safe_column_name("columnname"), '"COLUMNNAME"') self.assertEqual(db_sync.safe_column_name("columnName"), '"COLUMNNAME"') diff --git a/tests/unit/test_target_snowflake.py b/tests/unit/test_target_snowflake.py index d629540e..24504699 100644 --- a/tests/unit/test_target_snowflake.py +++ b/tests/unit/test_target_snowflake.py @@ -100,13 +100,13 @@ def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync instance = dbSync_mock.return_value instance.create_schema_if_not_exists.return_value = None instance.sync_table.return_value = None - instance.put_to_stage.return_value = 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz' + instance.put_to_stage.return_value = 'some-s3-folder/some-name_date_batch_hash.csg.gz' target_snowflake.persist_lines(self.config, lines) copy_to_archive_args = instance.copy_to_archive.call_args[0] - assert copy_to_archive_args[0] == 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz' - assert copy_to_archive_args[1] == 'archive/test_tap_id/test_simple_table/some-name_date_batch_hash.csg.gz' + assert copy_to_archive_args[0] == 'some-s3-folder/some-name_date_batch_hash.csg.gz' + assert copy_to_archive_args[1] == 'test_tap_id/test_simple_table/some-name_date_batch_hash.csg.gz' assert copy_to_archive_args[2] == { 'tap': 'test_tap_id', 'schema': 'tap_mysql_test', @@ -122,7 +122,6 @@ def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync def test_archive_load_files_log_based_replication(self, os_remove_mock, dbSync_mock): self.config['tap_id'] = 'test_tap_id' self.config['archive_load_files'] = True - self.config['s3_bucket'] = 'dummy_bucket' with open(f'{os.path.dirname(__file__)}/resources/logical-streams.json', 'r') as f: lines = f.readlines() @@ -130,13 +129,13 @@ def test_archive_load_files_log_based_replication(self, os_remove_mock, dbSync_m instance = dbSync_mock.return_value instance.create_schema_if_not_exists.return_value = None instance.sync_table.return_value = None - instance.put_to_stage.return_value = 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz' + instance.put_to_stage.return_value = 'some-s3-folder/some-name_date_batch_hash.csg.gz' target_snowflake.persist_lines(self.config, lines) copy_to_archive_args = instance.copy_to_archive.call_args[0] - assert copy_to_archive_args[0] == 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz' - assert copy_to_archive_args[1] == 'archive/test_tap_id/logical1_table2/some-name_date_batch_hash.csg.gz' + assert copy_to_archive_args[0] == 'some-s3-folder/some-name_date_batch_hash.csg.gz' + assert copy_to_archive_args[1] == 'test_tap_id/logical1_table2/some-name_date_batch_hash.csg.gz' assert copy_to_archive_args[2] == { 'tap': 'test_tap_id', 'schema': 'logical1',