From 3552f0716548d268b54220b14d21fe394b30ba2d Mon Sep 17 00:00:00 2001 From: Lauri Lehtinen Date: Thu, 27 May 2021 11:14:56 +0300 Subject: [PATCH 1/3] Add archive_load_files feature --- target_snowflake/__init__.py | 105 ++++++++++++++++-- target_snowflake/db_sync.py | 10 ++ target_snowflake/exceptions.py | 4 + target_snowflake/stream_utils.py | 14 +++ .../upload_clients/base_upload_client.py | 6 + .../upload_clients/s3_upload_client.py | 11 ++ .../upload_clients/snowflake_upload_client.py | 4 + tests/integration/test_target_snowflake.py | 73 ++++++++++++ .../unit/resources/messages-simple-table.json | 8 ++ tests/unit/test_db_sync.py | 5 + tests/unit/test_stream_utils.py | 30 +++++ tests/unit/test_target_snowflake.py | 59 +++++++++- 12 files changed, 317 insertions(+), 12 deletions(-) create mode 100644 tests/unit/resources/messages-simple-table.json diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index c59681a0..e05488fc 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -74,7 +74,7 @@ def get_snowflake_statics(config): if not ('disable_table_cache' in config and config['disable_table_cache']): LOGGER.info('Getting catalog objects from table cache...') - db = DbSync(config) # pylint: disable=invalid-name + db = DbSync(config) # pylint: disable=invalid-name table_cache = db.get_table_columns( table_schemas=stream_utils.get_schema_names_from_config(config)) @@ -83,8 +83,9 @@ def get_snowflake_statics(config): return table_cache, file_format_type + # pylint: disable=too-many-locals,too-many-branches,too-many-statements,invalid-name -def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatTypes=None) -> None: +def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatTypes = None) -> None: """Main loop to read and consume singer messages from stdin Params: @@ -113,6 +114,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS) batch_wait_limit_seconds = config.get('batch_wait_limit_seconds', None) flush_timestamp = datetime.utcnow() + archive_load_files_enabled = config.get('archive_load_files', {}).get('enabled', None) + archive_load_files_data = {} # Loop over lines from stdin for line in lines: @@ -170,6 +173,21 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT else: records_to_load[stream][primary_key_string] = o['record'] + if archive_load_files_enabled and stream in archive_load_files_data: + # Keep track of min and max of the designated column + stream_archive_load_files_values = archive_load_files_data[stream] + if 'column' in stream_archive_load_files_values: + archive_primary_column_name = stream_archive_load_files_values['column'] + archive_primary_column_value = o['record'][archive_primary_column_name] + min_value = stream_archive_load_files_values['min'] + max_value = stream_archive_load_files_values['max'] + + if min_value is None or min_value > archive_primary_column_value: + stream_archive_load_files_values['min'] = archive_primary_column_value + + if max_value is None or max_value < archive_primary_column_value: + stream_archive_load_files_values['max'] = archive_primary_column_value + flush = False if row_count[stream] >= batch_size_rows: flush = True @@ -196,6 +214,7 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT config, state, flushed_state, + archive_load_files_data, filter_streams=filter_streams) flush_timestamp = datetime.utcnow() @@ -226,7 +245,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT stream_to_sync, config, state, - flushed_state) + flushed_state, + archive_load_files_data) # emit latest encountered state emit_state(flushed_state) @@ -257,6 +277,27 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT else: stream_to_sync[stream] = DbSync(config, o, table_cache, file_format_type) + if archive_load_files_enabled: + archive_load_files_data[stream] = { + 'tap': config.get('tap_id'), + } + + # In case of incremental replication, track min/max of the replication key. + # Incremental replication is assumed if o['bookmark_properties'][0] is one of the columns. + archive_load_files_primary_column = stream_utils.get_archive_load_files_primary_column(o) + if archive_load_files_primary_column: + LOGGER.info("Using %s as archive_load_files_primary_column", archive_load_files_primary_column) + archive_load_files_data[stream].update( + column=archive_load_files_primary_column, + min=None, + max=None + ) + else: + LOGGER.warning( + "archive_load_files is enabled, but no archive_load_files_primary_column was found. " + "Min/max values will not be added to metadata for stream %s.", stream + ) + stream_to_sync[stream].create_schema_if_not_exists() stream_to_sync[stream].sync_table() @@ -281,7 +322,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT # then flush all buckets. if sum(row_count.values()) > 0: # flush all streams one last time, delete records if needed, reset counts and then emit current state - flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state) + flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state, + archive_load_files_data) # emit latest state emit_state(copy.deepcopy(flushed_state)) @@ -295,6 +337,7 @@ def flush_streams( config, state, flushed_state, + archive_load_files_data, filter_streams=None): """ Flushes all buckets and resets records count to 0 as well as empties records to load list @@ -305,6 +348,7 @@ def flush_streams( :param state: dictionary containing the original state from tap :param flushed_state: dictionary containing updated states only when streams got flushed :param filter_streams: Keys of streams to flush from the streams dict. Default is every stream + :param archive_load_files_data: dictionary of dictionaries containing archive load files data :return: State dict with flushed positions """ parallelism = config.get("parallelism", DEFAULT_PARALLELISM) @@ -337,7 +381,8 @@ def flush_streams( db_sync=stream_to_sync[stream], no_compression=config.get('no_compression'), delete_rows=config.get('hard_delete'), - temp_dir=config.get('temp_dir') + temp_dir=config.get('temp_dir'), + archive_load_files=copy.copy(archive_load_files_data.get(stream, None)) ) for stream in streams_to_flush) # reset flushed stream records to empty to avoid flushing same records @@ -358,16 +403,20 @@ def flush_streams( else: flushed_state = copy.deepcopy(state) + if stream in archive_load_files_data: + archive_load_files_data[stream]['min'] = None + archive_load_files_data[stream]['max'] = None + # Return with state message with flushed positions return flushed_state def load_stream_batch(stream, records, row_count, db_sync, no_compression=False, delete_rows=False, - temp_dir=None): + temp_dir=None, archive_load_files=None): """Load one batch of the stream into target table""" # Load into snowflake if row_count[stream] > 0: - flush_records(stream, records, db_sync, temp_dir, no_compression) + flush_records(stream, records, db_sync, temp_dir, no_compression, archive_load_files) # Delete soft-deleted, flagged rows - where _sdc_deleted at is not null if delete_rows: @@ -381,7 +430,8 @@ def flush_records(stream: str, records: List[Dict], db_sync: DbSync, temp_dir: str = None, - no_compression: bool = False) -> None: + no_compression: bool = False, + archive_load_files: Dict = None) -> None: """ Takes a list of record messages and loads it into the snowflake target table @@ -391,8 +441,9 @@ def flush_records(stream: str, column value row_count: db_sync: A DbSync object - temp_dir: Directory where intermediate temporary files will be created. (Default: OS specificy temp directory) + temp_dir: Directory where intermediate temporary files will be created. (Default: OS specific temp directory) no_compression: Disable to use compressed files. (Default: False) + archive_load_files: Data needed for archive load files. (Default: None) Returns: None @@ -403,7 +454,7 @@ def flush_records(stream: str, compression=not no_compression, dest_dir=temp_dir, data_flattening_max_level= - db_sync.data_flattening_max_level) + db_sync.data_flattening_max_level) # Get file stats row_count = len(records) @@ -413,8 +464,40 @@ def flush_records(stream: str, s3_key = db_sync.put_to_stage(filepath, stream, row_count, temp_dir=temp_dir) db_sync.load_file(s3_key, row_count, size_bytes) - # Delete file from local disk and from s3 + # Delete file from local disk os.remove(filepath) + + if archive_load_files: + stream_name_parts = stream_utils.stream_name_to_dict(stream) + if 'schema_name' not in stream_name_parts or 'table_name' not in stream_name_parts: + raise Exception("Failed to extract schema and table names from stream '{}'".format(stream)) + + archive_schema = stream_name_parts['schema_name'] + archive_table = stream_name_parts['table_name'] + archive_tap = archive_load_files['tap'] + + archive_metadata = { + 'tap': archive_tap, + 'schema': archive_schema, + 'table': archive_table, + 'archived-by': 'pipelinewise_target_snowflake' + } + + if 'column' in archive_load_files: + archive_metadata.update({ + 'archive-load-files-primary-column': archive_load_files['column'], + 'archive-load-files-primary-column-min': str(archive_load_files['min']), + 'archive-load-files-primary-column-max': str(archive_load_files['max']) + }) + + # Use same file name as in import + archive_file = s3_key.split('/')[-1] + archive_folder = "archive/{}/{}".format(archive_tap, archive_table) + archive_key = "{}/{}".format(archive_folder, archive_file) + + db_sync.copy_to_archive(s3_key, archive_key, archive_metadata) + + # Delete file from S3 db_sync.delete_from_stage(stream, s3_key) diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 4689785d..bb0f2285 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -63,6 +63,11 @@ def validate_config(config): if not config_default_target_schema and not config_schema_mapping: errors.append("Neither 'default_target_schema' (string) nor 'schema_mapping' (object) keys set in config.") + # Check if archive load files option is using external stages + archive_load_files = config.get('archive_load_files', {}) + if archive_load_files.get('enabled') and not config.get('s3_bucket', None): + errors.append('Archive load files option can be used only with external s3 stages. Please define s3_bucket.') + return errors @@ -397,6 +402,11 @@ def delete_from_stage(self, stream, s3_key): self.logger.info('Deleting %s from stage', format(s3_key)) self.upload_client.delete_object(stream, s3_key) + def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata): + """Copy file from snowflake stage to archive""" + self.logger.info('Copying %s to archive location %s', s3_source_key, s3_archive_key) + self.upload_client.copy_object(s3_source_key, s3_archive_key, s3_archive_metadata) + def get_stage_name(self, stream): """Generate snowflake stage name""" stage = self.connection_config.get('stage', None) diff --git a/target_snowflake/exceptions.py b/target_snowflake/exceptions.py index d8058b8b..91f080a4 100644 --- a/target_snowflake/exceptions.py +++ b/target_snowflake/exceptions.py @@ -23,3 +23,7 @@ class FileFormatNotFoundException(Exception): class InvalidFileFormatException(Exception): """Exception to raise when name file format is not compatible""" + + +class UnexpectedMessageTypeException(Exception): + """Exception to raise when provided message doesn't match the expected type""" diff --git a/target_snowflake/stream_utils.py b/target_snowflake/stream_utils.py index 552ebb4c..6a68ceb0 100644 --- a/target_snowflake/stream_utils.py +++ b/target_snowflake/stream_utils.py @@ -8,6 +8,7 @@ from singer import get_logger from target_snowflake.exceptions import UnexpectedValueTypeException +from target_snowflake.exceptions import UnexpectedMessageTypeException LOGGER = get_logger('target_snowflake') @@ -114,3 +115,16 @@ def stream_name_to_dict(stream_name, separator='-'): 'schema_name': schema_name, 'table_name': table_name } + + +def get_archive_load_files_primary_column(singer_msg: Dict): + """Derive archive load files primary column from a Singer message dictionary""" + if singer_msg['type'] != "SCHEMA": + raise UnexpectedMessageTypeException("Expecting type SCHEMA, got {}".format(singer_msg['type'])) + + if 'bookmark_properties' in singer_msg and len(singer_msg['bookmark_properties']) > 0: + col = singer_msg['bookmark_properties'][0] + if col in singer_msg['schema']['properties']: + return col + + return None diff --git a/target_snowflake/upload_clients/base_upload_client.py b/target_snowflake/upload_clients/base_upload_client.py index 11717450..ea7851ce 100644 --- a/target_snowflake/upload_clients/base_upload_client.py +++ b/target_snowflake/upload_clients/base_upload_client.py @@ -24,3 +24,9 @@ def delete_object(self, stream: str, key: str) -> None: """ Delete object """ + + @abstractmethod + def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None: + """ + Copy object + """ diff --git a/target_snowflake/upload_clients/s3_upload_client.py b/target_snowflake/upload_clients/s3_upload_client.py index 8802107c..31cc4062 100644 --- a/target_snowflake/upload_clients/s3_upload_client.py +++ b/target_snowflake/upload_clients/s3_upload_client.py @@ -97,3 +97,14 @@ def delete_object(self, stream: str, key: str) -> None: self.logger.info('Deleting %s from external snowflake stage on S3', key) bucket = self.connection_config['s3_bucket'] self.s3_client.delete_object(Bucket=bucket, Key=key) + + def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None: + """Copy object to another location on S3""" + self.logger.info('Copying %s to %s', source_key, target_key) + bucket = self.connection_config['s3_bucket'] + + copy_source = "{}/{}".format(bucket, source_key) + + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object + self.s3_client.copy_object(CopySource=copy_source, Bucket=bucket, Key=target_key, Metadata=target_metadata, + MetadataDirective="REPLACE") diff --git a/target_snowflake/upload_clients/snowflake_upload_client.py b/target_snowflake/upload_clients/snowflake_upload_client.py index 5366ae82..d2b55dcb 100644 --- a/target_snowflake/upload_clients/snowflake_upload_client.py +++ b/target_snowflake/upload_clients/snowflake_upload_client.py @@ -38,3 +38,7 @@ def delete_object(self, stream: str, key: str) -> None: with self.dblink.open_connection() as connection: connection.cursor().execute(f"REMOVE '@{stage}/{key}'") + + def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None: + raise NotImplementedError( + "Copying objects is not supported with a Snowflake upload client.") diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index e229a243..7003546a 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1,9 +1,12 @@ import datetime +import gzip import json +import tempfile import unittest import mock import os import botocore +import boto3 import itertools import target_snowflake @@ -41,6 +44,20 @@ def setUp(self): if self.config['default_target_schema']: snowflake.query("DROP SCHEMA IF EXISTS {}".format(self.config['default_target_schema'])) + # Set up S3 client + aws_access_key_id = self.config.get('aws_access_key_id') + aws_secret_access_key = self.config.get('aws_secret_access_key') + aws_session_token = self.config.get('aws_session_token') + aws_session = boto3.session.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_session_token=aws_session_token + ) + + self.s3_client = aws_session.client('s3', + region_name=self.config.get('s3_region_name'), + endpoint_url=self.config.get('s3_endpoint_url')) + def persist_lines(self, lines): """Loads singer messages into snowflake without table caching option""" target_snowflake.persist_lines(self.config, lines) @@ -1159,3 +1176,59 @@ def test_parquet(self): # Check if data loaded correctly and metadata columns exist self.assert_three_streams_are_into_snowflake() + + def test_archive_load_files(self): + """Test if load file is copied to archive folder""" + self.config['archive_load_files'] = {'enabled': True} + self.config['tap_id'] = 'test_tap_id' + self.config['client_side_encryption_master_key'] = '' + + s3_bucket = self.config['s3_bucket'] + + # Delete any dangling files from archive + files_in_s3_archive = self.s3_client.list_objects( + Bucket=s3_bucket, Prefix="archive/test_tap_id/").get('Contents', []) + for file_in_archive in files_in_s3_archive: + key = file_in_archive["Key"] + self.s3_client.delete_object(Bucket=s3_bucket, Key=key) + + tap_lines = test_utils.get_test_tap_lines('messages-simple-table.json') + self.persist_lines_with_cache(tap_lines) + + # Verify expected file metadata in S3 + files_in_s3_archive = self.s3_client.list_objects(Bucket=s3_bucket, Prefix="archive/test_tap_id/").get( + 'Contents') + self.assertIsNotNone(files_in_s3_archive) + self.assertEqual(1, len(files_in_s3_archive)) + + archived_file_key = files_in_s3_archive[0]['Key'] + archive_metadata = self.s3_client.head_object(Bucket=s3_bucket, Key=archived_file_key)['Metadata'] + self.assertEqual({ + 'tap': 'test_tap_id', + 'schema': 'tap_mysql_test', + 'table': 'test_simple_table', + 'archived-by': 'pipelinewise_target_snowflake', + 'archive-load-files-primary-column': 'id', + 'archive-load-files-primary-column-min': '1', + 'archive-load-files-primary-column-max': '5' + }, archive_metadata) + + # Verify expected file contents + tmpfile = tempfile.NamedTemporaryFile() + with open(tmpfile.name, 'wb') as f: + self.s3_client.download_fileobj(s3_bucket, archived_file_key, f) + + lines = [] + with gzip.open(tmpfile, "rt") as gzipfile: + for line in gzipfile.readlines(): + lines.append(line) + + self.assertEqual(''.join(lines), '''1,"xyz1","not-formatted-time-1" +2,"xyz2","not-formatted-time-2" +3,"xyz3","not-formatted-time-3" +4,"xyz4","not-formatted-time-4" +5,"xyz5","not-formatted-time-5" +''') + + # Clean up + self.s3_client.delete_object(Bucket=s3_bucket, Key=archived_file_key) diff --git a/tests/unit/resources/messages-simple-table.json b/tests/unit/resources/messages-simple-table.json new file mode 100644 index 00000000..5b270ded --- /dev/null +++ b/tests/unit/resources/messages-simple-table.json @@ -0,0 +1,8 @@ +{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"id": {"maximum": 9223372036854775807, "minimum": -9223372036854775808, "type": ["integer"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"], "bookmark_properties": ["id"]} +{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 52799009, "version": 1, "last_replication_method": "INCREMENTAL"}}, "currently_syncing": "tap_mysql_test-test_simple_table"}} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_simple_table", "version": 1} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 1, "results": "xyz1", "time_created": "not-formatted-time-1"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 2, "results": "xyz2", "time_created": "not-formatted-time-2"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 3, "results": "xyz3", "time_created": "not-formatted-time-3"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 4, "results": "xyz4", "time_created": "not-formatted-time-4"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 5, "results": "xyz5", "time_created": "not-formatted-time-5"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index 15c112ac..09d2fa3c 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -86,6 +86,11 @@ def test_config_validation(self): config_with_external_stage['stage'] = 'dummy-value' self.assertGreater(len(validator(config_with_external_stage)), 0) + # Configuration with archive_load_files but no s3_bucket + config_with_archive_load_files = minimal_config.copy() + config_with_archive_load_files['archive_load_files'] = {'enabled': True} + self.assertGreater(len(validator(config_with_external_stage)), 0) + def test_column_type_mapping(self): """Test JSON type to Snowflake column type mappings""" mapper = db_sync.column_type diff --git a/tests/unit/test_stream_utils.py b/tests/unit/test_stream_utils.py index ba0a3d81..13fdfb33 100644 --- a/tests/unit/test_stream_utils.py +++ b/tests/unit/test_stream_utils.py @@ -4,6 +4,7 @@ import target_snowflake.stream_utils as stream_utils from target_snowflake.exceptions import UnexpectedValueTypeException +from target_snowflake.exceptions import UnexpectedMessageTypeException class TestSchemaUtils(unittest.TestCase): @@ -207,3 +208,32 @@ def test_stream_name_to_dict(self): # Snowflake table format (Custom '.' separator) self.assertEqual(stream_utils.stream_name_to_dict('my_catalog.my_schema.my_table', separator='.'), {"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"}) + + def test_get_archive_load_files_primary_column(self): + """Test selecting archive load files primary column from schema message""" + + # Bookmark properties contains column which is also in schema properties + self.assertEqual(stream_utils.get_archive_load_files_primary_column( + { + "type": "SCHEMA", + "schema": {"properties": {"id": {}, "some_col": {}}}, + "key_properties": ["id"], + "bookmark_properties": ["some_col"] + }), "some_col") + + # Bookmark properties contains column which is not in schema properties + self.assertEqual(stream_utils.get_archive_load_files_primary_column( + { + "type": "SCHEMA", + "schema": {"properties": {"id": {}, "some_col": {}}}, + "key_properties": ["id"], + "bookmark_properties": ["lsn"] + }), None) + + with self.assertRaises(UnexpectedMessageTypeException): + stream_utils.get_archive_load_files_primary_column( + { + "type": "RECORD", + "stream": "some-stream", + "record": {} + }) diff --git a/tests/unit/test_target_snowflake.py b/tests/unit/test_target_snowflake.py index 882d28bc..e7ffe8c9 100644 --- a/tests/unit/test_target_snowflake.py +++ b/tests/unit/test_target_snowflake.py @@ -85,4 +85,61 @@ def test_persist_40_records_with_batch_wait_limit(self, dbSync_mock, flush_strea target_snowflake.persist_lines(self.config, lines) # Expecting flush after every records + 1 at the end - assert flush_streams_mock.call_count == 41 \ No newline at end of file + assert flush_streams_mock.call_count == 41 + + @patch('target_snowflake.DbSync') + @patch('target_snowflake.os.remove') + def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync_mock): + self.config['tap_id'] = 'test_tap_id' + self.config['archive_load_files'] = {'enabled': True} + self.config['s3_bucket'] = 'dummy_bucket' + + with open(f'{os.path.dirname(__file__)}/resources/messages-simple-table.json', 'r') as f: + lines = f.readlines() + + instance = dbSync_mock.return_value + instance.create_schema_if_not_exists.return_value = None + instance.sync_table.return_value = None + instance.put_to_stage.return_value = 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz' + + target_snowflake.persist_lines(self.config, lines) + + copy_to_archive_args = instance.copy_to_archive.call_args[0] + assert copy_to_archive_args[0] == 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz' + assert copy_to_archive_args[1] == 'archive/test_tap_id/test_simple_table/some-name_date_batch_hash.csg.gz' + assert copy_to_archive_args[2] == { + 'tap': 'test_tap_id', + 'schema': 'tap_mysql_test', + 'table': 'test_simple_table', + 'archived-by': 'pipelinewise_target_snowflake', + 'archive-load-files-primary-column': 'id', + 'archive-load-files-primary-column-min': '1', + 'archive-load-files-primary-column-max': '5' + } + + @patch('target_snowflake.DbSync') + @patch('target_snowflake.os.remove') + def test_archive_load_files_log_based_replication(self, os_remove_mock, dbSync_mock): + self.config['tap_id'] = 'test_tap_id' + self.config['archive_load_files'] = {'enabled': True} + self.config['s3_bucket'] = 'dummy_bucket' + + with open(f'{os.path.dirname(__file__)}/resources/logical-streams.json', 'r') as f: + lines = f.readlines() + + instance = dbSync_mock.return_value + instance.create_schema_if_not_exists.return_value = None + instance.sync_table.return_value = None + instance.put_to_stage.return_value = 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz' + + target_snowflake.persist_lines(self.config, lines) + + copy_to_archive_args = instance.copy_to_archive.call_args[0] + assert copy_to_archive_args[0] == 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz' + assert copy_to_archive_args[1] == 'archive/test_tap_id/logical1_table2/some-name_date_batch_hash.csg.gz' + assert copy_to_archive_args[2] == { + 'tap': 'test_tap_id', + 'schema': 'logical1', + 'table': 'logical1_table2', + 'archived-by': 'pipelinewise_target_snowflake' + } From c8da2d3551f805ecd9530ce09955e919861d2baa Mon Sep 17 00:00:00 2001 From: Lauri Lehtinen Date: Fri, 11 Jun 2021 16:51:41 +0300 Subject: [PATCH 2/3] Update required unit test coverage to 60 --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 911df022..0d30bf33 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -25,7 +25,7 @@ jobs: command: | . venv/bin/activate export LOGGING_CONF_FILE=$(pwd)/sample_logging.conf - pytest tests/unit -vv --cov target_snowflake --cov-fail-under=55 + pytest tests/unit -vv --cov target_snowflake --cov-fail-under=60 - run: name: 'Integration Tests' From b77b392236366482c3bb1e4c4b6a83b7a7932286 Mon Sep 17 00:00:00 2001 From: Lauri Lehtinen Date: Mon, 14 Jun 2021 16:40:14 +0300 Subject: [PATCH 3/3] Simplify config and naming, add archive_load_files to README --- README.md | 1 + target_snowflake/__init__.py | 34 +++++++++++----------- target_snowflake/db_sync.py | 4 +-- target_snowflake/stream_utils.py | 4 +-- tests/integration/test_target_snowflake.py | 8 ++--- tests/unit/test_db_sync.py | 2 +- tests/unit/test_stream_utils.py | 10 +++---- tests/unit/test_target_snowflake.py | 10 +++---- 8 files changed, 37 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 7b665a6e..19e0dfbb 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,7 @@ Full list of options in `config.json`: | temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. | | no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. | | query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. | +| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `{{s3_bucket}}` under the key `/archive/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`. ### To run tests: diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index e05488fc..b77814f5 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -114,7 +114,7 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS) batch_wait_limit_seconds = config.get('batch_wait_limit_seconds', None) flush_timestamp = datetime.utcnow() - archive_load_files_enabled = config.get('archive_load_files', {}).get('enabled', None) + archive_load_files = config.get('archive_load_files', False) archive_load_files_data = {} # Loop over lines from stdin @@ -173,20 +173,20 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT else: records_to_load[stream][primary_key_string] = o['record'] - if archive_load_files_enabled and stream in archive_load_files_data: + if archive_load_files and stream in archive_load_files_data: # Keep track of min and max of the designated column stream_archive_load_files_values = archive_load_files_data[stream] if 'column' in stream_archive_load_files_values: - archive_primary_column_name = stream_archive_load_files_values['column'] - archive_primary_column_value = o['record'][archive_primary_column_name] + incremental_key_column_name = stream_archive_load_files_values['column'] + incremental_key_value = o['record'][incremental_key_column_name] min_value = stream_archive_load_files_values['min'] max_value = stream_archive_load_files_values['max'] - if min_value is None or min_value > archive_primary_column_value: - stream_archive_load_files_values['min'] = archive_primary_column_value + if min_value is None or min_value > incremental_key_value: + stream_archive_load_files_values['min'] = incremental_key_value - if max_value is None or max_value < archive_primary_column_value: - stream_archive_load_files_values['max'] = archive_primary_column_value + if max_value is None or max_value < incremental_key_value: + stream_archive_load_files_values['max'] = incremental_key_value flush = False if row_count[stream] >= batch_size_rows: @@ -277,24 +277,24 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT else: stream_to_sync[stream] = DbSync(config, o, table_cache, file_format_type) - if archive_load_files_enabled: + if archive_load_files: archive_load_files_data[stream] = { 'tap': config.get('tap_id'), } # In case of incremental replication, track min/max of the replication key. # Incremental replication is assumed if o['bookmark_properties'][0] is one of the columns. - archive_load_files_primary_column = stream_utils.get_archive_load_files_primary_column(o) - if archive_load_files_primary_column: - LOGGER.info("Using %s as archive_load_files_primary_column", archive_load_files_primary_column) + incremental_key_column_name = stream_utils.get_incremental_key(o) + if incremental_key_column_name: + LOGGER.info("Using %s as incremental_key_column_name", incremental_key_column_name) archive_load_files_data[stream].update( - column=archive_load_files_primary_column, + column=incremental_key_column_name, min=None, max=None ) else: LOGGER.warning( - "archive_load_files is enabled, but no archive_load_files_primary_column was found. " + "archive_load_files is enabled, but no incremental_key_column_name was found. " "Min/max values will not be added to metadata for stream %s.", stream ) @@ -485,9 +485,9 @@ def flush_records(stream: str, if 'column' in archive_load_files: archive_metadata.update({ - 'archive-load-files-primary-column': archive_load_files['column'], - 'archive-load-files-primary-column-min': str(archive_load_files['min']), - 'archive-load-files-primary-column-max': str(archive_load_files['max']) + 'incremental-key': archive_load_files['column'], + 'incremental-key-min': str(archive_load_files['min']), + 'incremental-key-max': str(archive_load_files['max']) }) # Use same file name as in import diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index bb0f2285..ea2e1c12 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -64,8 +64,8 @@ def validate_config(config): errors.append("Neither 'default_target_schema' (string) nor 'schema_mapping' (object) keys set in config.") # Check if archive load files option is using external stages - archive_load_files = config.get('archive_load_files', {}) - if archive_load_files.get('enabled') and not config.get('s3_bucket', None): + archive_load_files = config.get('archive_load_files', False) + if archive_load_files and not config.get('s3_bucket', None): errors.append('Archive load files option can be used only with external s3 stages. Please define s3_bucket.') return errors diff --git a/target_snowflake/stream_utils.py b/target_snowflake/stream_utils.py index 6a68ceb0..c100cfdd 100644 --- a/target_snowflake/stream_utils.py +++ b/target_snowflake/stream_utils.py @@ -117,8 +117,8 @@ def stream_name_to_dict(stream_name, separator='-'): } -def get_archive_load_files_primary_column(singer_msg: Dict): - """Derive archive load files primary column from a Singer message dictionary""" +def get_incremental_key(singer_msg: Dict): + """Derive incremental key from a Singer message dictionary""" if singer_msg['type'] != "SCHEMA": raise UnexpectedMessageTypeException("Expecting type SCHEMA, got {}".format(singer_msg['type'])) diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 7003546a..959b8bf0 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1179,7 +1179,7 @@ def test_parquet(self): def test_archive_load_files(self): """Test if load file is copied to archive folder""" - self.config['archive_load_files'] = {'enabled': True} + self.config['archive_load_files'] = True self.config['tap_id'] = 'test_tap_id' self.config['client_side_encryption_master_key'] = '' @@ -1208,9 +1208,9 @@ def test_archive_load_files(self): 'schema': 'tap_mysql_test', 'table': 'test_simple_table', 'archived-by': 'pipelinewise_target_snowflake', - 'archive-load-files-primary-column': 'id', - 'archive-load-files-primary-column-min': '1', - 'archive-load-files-primary-column-max': '5' + 'incremental-key': 'id', + 'incremental-key-min': '1', + 'incremental-key-max': '5' }, archive_metadata) # Verify expected file contents diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index 09d2fa3c..7ba7c296 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -88,7 +88,7 @@ def test_config_validation(self): # Configuration with archive_load_files but no s3_bucket config_with_archive_load_files = minimal_config.copy() - config_with_archive_load_files['archive_load_files'] = {'enabled': True} + config_with_archive_load_files['archive_load_files'] = True self.assertGreater(len(validator(config_with_external_stage)), 0) def test_column_type_mapping(self): diff --git a/tests/unit/test_stream_utils.py b/tests/unit/test_stream_utils.py index 13fdfb33..1bccd29a 100644 --- a/tests/unit/test_stream_utils.py +++ b/tests/unit/test_stream_utils.py @@ -209,11 +209,11 @@ def test_stream_name_to_dict(self): self.assertEqual(stream_utils.stream_name_to_dict('my_catalog.my_schema.my_table', separator='.'), {"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"}) - def test_get_archive_load_files_primary_column(self): - """Test selecting archive load files primary column from schema message""" + def test_get_incremental_key(self): + """Test selecting incremental key column from schema message""" # Bookmark properties contains column which is also in schema properties - self.assertEqual(stream_utils.get_archive_load_files_primary_column( + self.assertEqual(stream_utils.get_incremental_key( { "type": "SCHEMA", "schema": {"properties": {"id": {}, "some_col": {}}}, @@ -222,7 +222,7 @@ def test_get_archive_load_files_primary_column(self): }), "some_col") # Bookmark properties contains column which is not in schema properties - self.assertEqual(stream_utils.get_archive_load_files_primary_column( + self.assertEqual(stream_utils.get_incremental_key( { "type": "SCHEMA", "schema": {"properties": {"id": {}, "some_col": {}}}, @@ -231,7 +231,7 @@ def test_get_archive_load_files_primary_column(self): }), None) with self.assertRaises(UnexpectedMessageTypeException): - stream_utils.get_archive_load_files_primary_column( + stream_utils.get_incremental_key( { "type": "RECORD", "stream": "some-stream", diff --git a/tests/unit/test_target_snowflake.py b/tests/unit/test_target_snowflake.py index e7ffe8c9..d629540e 100644 --- a/tests/unit/test_target_snowflake.py +++ b/tests/unit/test_target_snowflake.py @@ -91,7 +91,7 @@ def test_persist_40_records_with_batch_wait_limit(self, dbSync_mock, flush_strea @patch('target_snowflake.os.remove') def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync_mock): self.config['tap_id'] = 'test_tap_id' - self.config['archive_load_files'] = {'enabled': True} + self.config['archive_load_files'] = True self.config['s3_bucket'] = 'dummy_bucket' with open(f'{os.path.dirname(__file__)}/resources/messages-simple-table.json', 'r') as f: @@ -112,16 +112,16 @@ def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync 'schema': 'tap_mysql_test', 'table': 'test_simple_table', 'archived-by': 'pipelinewise_target_snowflake', - 'archive-load-files-primary-column': 'id', - 'archive-load-files-primary-column-min': '1', - 'archive-load-files-primary-column-max': '5' + 'incremental-key': 'id', + 'incremental-key-min': '1', + 'incremental-key-max': '5' } @patch('target_snowflake.DbSync') @patch('target_snowflake.os.remove') def test_archive_load_files_log_based_replication(self, os_remove_mock, dbSync_mock): self.config['tap_id'] = 'test_tap_id' - self.config['archive_load_files'] = {'enabled': True} + self.config['archive_load_files'] = True self.config['s3_bucket'] = 'dummy_bucket' with open(f'{os.path.dirname(__file__)}/resources/logical-streams.json', 'r') as f: