Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

AP1011 - Custom archive bucket/folder #180

Merged
merged 2 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ Full list of options in `config.json`:
| temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. |
| no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. |
| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. |
| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `{{s3_bucket}}` under the key `/archive/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`.
| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket` under the key `/{archive_load_files_s3_prefix}/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`.
| archive_load_files_s3_prefix | String | | (Default: "archive") When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix.
| archive_load_files_s3_bucket | String | | (Default: Value of `s3_bucket`) When `archive_load_files` is enabled, the archived files will be placed in this bucket.

### To run tests:

Expand Down
5 changes: 2 additions & 3 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,8 @@ def flush_records(stream: str,
})

# Use same file name as in import
archive_file = s3_key.split('/')[-1]
archive_folder = "archive/{}/{}".format(archive_tap, archive_table)
archive_key = "{}/{}".format(archive_folder, archive_file)
archive_file = os.path.basename(s3_key)
archive_key = "{}/{}/{}".format(archive_tap, archive_table, archive_file)

db_sync.copy_to_archive(s3_key, archive_key, archive_metadata)

Expand Down
32 changes: 29 additions & 3 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,35 @@ def delete_from_stage(self, stream, s3_key):
self.upload_client.delete_object(stream, s3_key)

def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata):
"""Copy file from snowflake stage to archive"""
self.logger.info('Copying %s to archive location %s', s3_source_key, s3_archive_key)
self.upload_client.copy_object(s3_source_key, s3_archive_key, s3_archive_metadata)
"""
Copy file from snowflake stage to archive.

s3_source_key: The s3 key to copy, assumed to exist in the bucket configured as 's3_bucket'

s3_archive_key: The key to use in archive destination. This will be prefixed with the config value
'archive_load_files_s3_prefix'. If none is specified, 'archive' will be used as the prefix.

As destination bucket, the config value 'archive_load_files_s3_bucket' will be used. If none is
specified, the bucket configured as 's3_bucket' will be used.

s3_archive_metadata: This dict will be used as the S3 metadata in the file in archive destination. Metadata in
the source file will be replaced.

"""
source_bucket = self.connection_config.get('s3_bucket')

# Get archive s3_bucket from config, or use same bucket if not specified
archive_bucket = self.connection_config.get('archive_load_files_s3_bucket', source_bucket)

# Determine prefix to use in archive s3 bucket
default_archive_prefix = 'archive'
archive_prefix = self.connection_config.get('archive_load_files_s3_prefix', default_archive_prefix)
prefixed_archive_key = '{}/{}'.format(archive_prefix, s3_archive_key)

copy_source = '{}/{}'.format(source_bucket, s3_source_key)

self.logger.info('Copying %s to archive location %s', copy_source, prefixed_archive_key)
self.upload_client.copy_object(copy_source, archive_bucket, prefixed_archive_key, s3_archive_metadata)
Copy link
Contributor

@koszti koszti Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the authentication work if we copy between two buckets? Can we use the same s3 upload_client for both buckets that initialised at

The aws_profile , aws_access_key_id, aws_secret_access_key , aws_session_token , s3_region_name, and s3_endpoint_url optional parameters should be valid for both buckets. Is that fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Boto3 docs:

All copy requests must be authenticated. Additionally, you must have read access to the source object and write access to the destination bucket. For more information, see REST Authentication . Both the Region that you want to copy the object from and the Region that you want to copy the object to must be enabled for your account.

So, if we want to use the copy_object method (rather than uploading object to two different buckets and using two sets of credentials), the credentials used for Snowflake imports must be valid for writing to the archive bucket as well. I tested this in staging and it works. Not sure how different region or endpoint url would play out.


def get_stage_name(self, stream):
"""Generate snowflake stage name"""
Expand Down
2 changes: 1 addition & 1 deletion target_snowflake/upload_clients/base_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def delete_object(self, stream: str, key: str) -> None:
"""

@abstractmethod
def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None:
"""
Copy object
"""
12 changes: 4 additions & 8 deletions target_snowflake/upload_clients/s3_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,9 @@ def delete_object(self, stream: str, key: str) -> None:
bucket = self.connection_config['s3_bucket']
self.s3_client.delete_object(Bucket=bucket, Key=key)

def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None:
"""Copy object to another location on S3"""
self.logger.info('Copying %s to %s', source_key, target_key)
bucket = self.connection_config['s3_bucket']

copy_source = "{}/{}".format(bucket, source_key)

self.logger.info('Copying %s to %s/%s', copy_source, target_bucket, target_key)
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object
self.s3_client.copy_object(CopySource=copy_source, Bucket=bucket, Key=target_key, Metadata=target_metadata,
MetadataDirective="REPLACE")
self.s3_client.copy_object(CopySource=copy_source, Bucket=target_bucket, Key=target_key,
Metadata=target_metadata, MetadataDirective="REPLACE")
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ def delete_object(self, stream: str, key: str) -> None:
with self.dblink.open_connection() as connection:
connection.cursor().execute(f"REMOVE '@{stage}/{key}'")

def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None:
raise NotImplementedError(
"Copying objects is not supported with a Snowflake upload client.")
8 changes: 3 additions & 5 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,14 +1180,15 @@ def test_parquet(self):
def test_archive_load_files(self):
"""Test if load file is copied to archive folder"""
self.config['archive_load_files'] = True
self.config['archive_load_files_s3_prefix'] = 'archive_folder'
self.config['tap_id'] = 'test_tap_id'
self.config['client_side_encryption_master_key'] = ''

s3_bucket = self.config['s3_bucket']

# Delete any dangling files from archive
files_in_s3_archive = self.s3_client.list_objects(
Bucket=s3_bucket, Prefix="archive/test_tap_id/").get('Contents', [])
Bucket=s3_bucket, Prefix="archive_folder/test_tap_id/").get('Contents', [])
for file_in_archive in files_in_s3_archive:
key = file_in_archive["Key"]
self.s3_client.delete_object(Bucket=s3_bucket, Key=key)
Expand All @@ -1196,7 +1197,7 @@ def test_archive_load_files(self):
self.persist_lines_with_cache(tap_lines)

# Verify expected file metadata in S3
files_in_s3_archive = self.s3_client.list_objects(Bucket=s3_bucket, Prefix="archive/test_tap_id/").get(
files_in_s3_archive = self.s3_client.list_objects(Bucket=s3_bucket, Prefix="archive_folder/test_tap_id/").get(
'Contents')
self.assertIsNotNone(files_in_s3_archive)
self.assertEqual(1, len(files_in_s3_archive))
Expand Down Expand Up @@ -1229,6 +1230,3 @@ def test_archive_load_files(self):
4,"xyz4","not-formatted-time-4"
5,"xyz5","not-formatted-time-5"
''')

# Clean up
self.s3_client.delete_object(Bucket=s3_bucket, Key=archived_file_key)
37 changes: 37 additions & 0 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,43 @@ def test_parallelism(self, query_patch):
self.assertEqual(db_sync.DbSync({**minimal_config,
**table_stage_with_parallel}).connection_config['parallelism'], 1)

@patch('target_snowflake.upload_clients.s3_upload_client.S3UploadClient.copy_object')
@patch('target_snowflake.db_sync.DbSync.query')
def test_copy_to_archive(self, query_patch, copy_object_patch):
query_patch.return_value = [{'type': 'CSV'}]
minimal_config = {
'account': "dummy-value",
'dbname': "dummy-value",
'user': "dummy-value",
'password': "dummy-value",
'warehouse': "dummy-value",
'default_target_schema': "dummy-value",
'file_format': "dummy-value",
's3_bucket': 'dummy-bucket',
'stage': 'dummy_schema.dummy_stage'
}

# Assert default values (same bucket, 'archive' as the archive prefix)
s3_config = {}
dbsync = db_sync.DbSync({**minimal_config, **s3_config})
dbsync.copy_to_archive('source/file', 'tap/schema/file', {'meta': "data"})

self.assertEqual(copy_object_patch.call_args[0][0], 'dummy-bucket/source/file')
self.assertEqual(copy_object_patch.call_args[0][1], 'dummy-bucket')
self.assertEqual(copy_object_patch.call_args[0][2], 'archive/tap/schema/file')

# Assert custom archive bucket and prefix
s3_config = {
'archive_load_files_s3_bucket': "custom-bucket",
'archive_load_files_s3_prefix': "custom-prefix"
}
dbsync = db_sync.DbSync({**minimal_config, **s3_config})
dbsync.copy_to_archive('source/file', 'tap/schema/file', {'meta': "data"})

self.assertEqual(copy_object_patch.call_args[0][0], 'dummy-bucket/source/file')
self.assertEqual(copy_object_patch.call_args[0][1], 'custom-bucket')
self.assertEqual(copy_object_patch.call_args[0][2], 'custom-prefix/tap/schema/file')

def test_safe_column_name(self):
self.assertEqual(db_sync.safe_column_name("columnname"), '"COLUMNNAME"')
self.assertEqual(db_sync.safe_column_name("columnName"), '"COLUMNNAME"')
Expand Down
13 changes: 6 additions & 7 deletions tests/unit/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync
instance = dbSync_mock.return_value
instance.create_schema_if_not_exists.return_value = None
instance.sync_table.return_value = None
instance.put_to_stage.return_value = 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz'
instance.put_to_stage.return_value = 'some-s3-folder/some-name_date_batch_hash.csg.gz'

target_snowflake.persist_lines(self.config, lines)

copy_to_archive_args = instance.copy_to_archive.call_args[0]
assert copy_to_archive_args[0] == 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[1] == 'archive/test_tap_id/test_simple_table/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[0] == 'some-s3-folder/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[1] == 'test_tap_id/test_simple_table/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[2] == {
'tap': 'test_tap_id',
'schema': 'tap_mysql_test',
Expand All @@ -122,21 +122,20 @@ def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync
def test_archive_load_files_log_based_replication(self, os_remove_mock, dbSync_mock):
self.config['tap_id'] = 'test_tap_id'
self.config['archive_load_files'] = True
self.config['s3_bucket'] = 'dummy_bucket'

with open(f'{os.path.dirname(__file__)}/resources/logical-streams.json', 'r') as f:
lines = f.readlines()

instance = dbSync_mock.return_value
instance.create_schema_if_not_exists.return_value = None
instance.sync_table.return_value = None
instance.put_to_stage.return_value = 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz'
instance.put_to_stage.return_value = 'some-s3-folder/some-name_date_batch_hash.csg.gz'

target_snowflake.persist_lines(self.config, lines)

copy_to_archive_args = instance.copy_to_archive.call_args[0]
assert copy_to_archive_args[0] == 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[1] == 'archive/test_tap_id/logical1_table2/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[0] == 'some-s3-folder/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[1] == 'test_tap_id/logical1_table2/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[2] == {
'tap': 'test_tap_id',
'schema': 'logical1',
Expand Down