Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

AP-1011 archive_load_files feature #178

Merged
merged 3 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
command: |
. venv/bin/activate
export LOGGING_CONF_FILE=$(pwd)/sample_logging.conf
pytest tests/unit -vv --cov target_snowflake --cov-fail-under=55
pytest tests/unit -vv --cov target_snowflake --cov-fail-under=60

- run:
name: 'Integration Tests'
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ Full list of options in `config.json`:
| temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. |
| no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. |
| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. |
| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `{{s3_bucket}}` under the key `/archive/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`.

### To run tests:

Expand Down
105 changes: 94 additions & 11 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def get_snowflake_statics(config):
if not ('disable_table_cache' in config and config['disable_table_cache']):
LOGGER.info('Getting catalog objects from table cache...')

db = DbSync(config) # pylint: disable=invalid-name
db = DbSync(config) # pylint: disable=invalid-name
table_cache = db.get_table_columns(
table_schemas=stream_utils.get_schema_names_from_config(config))

Expand All @@ -83,8 +83,9 @@ def get_snowflake_statics(config):

return table_cache, file_format_type


# pylint: disable=too-many-locals,too-many-branches,too-many-statements,invalid-name
def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatTypes=None) -> None:
def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatTypes = None) -> None:
"""Main loop to read and consume singer messages from stdin

Params:
Expand Down Expand Up @@ -113,6 +114,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS)
batch_wait_limit_seconds = config.get('batch_wait_limit_seconds', None)
flush_timestamp = datetime.utcnow()
archive_load_files = config.get('archive_load_files', False)
archive_load_files_data = {}

# Loop over lines from stdin
for line in lines:
Expand Down Expand Up @@ -170,6 +173,21 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
else:
records_to_load[stream][primary_key_string] = o['record']

if archive_load_files and stream in archive_load_files_data:
# Keep track of min and max of the designated column
stream_archive_load_files_values = archive_load_files_data[stream]
if 'column' in stream_archive_load_files_values:
incremental_key_column_name = stream_archive_load_files_values['column']
incremental_key_value = o['record'][incremental_key_column_name]
min_value = stream_archive_load_files_values['min']
max_value = stream_archive_load_files_values['max']

if min_value is None or min_value > incremental_key_value:
stream_archive_load_files_values['min'] = incremental_key_value

if max_value is None or max_value < incremental_key_value:
stream_archive_load_files_values['max'] = incremental_key_value

flush = False
if row_count[stream] >= batch_size_rows:
flush = True
Expand All @@ -196,6 +214,7 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
config,
state,
flushed_state,
archive_load_files_data,
filter_streams=filter_streams)

flush_timestamp = datetime.utcnow()
Expand Down Expand Up @@ -226,7 +245,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
stream_to_sync,
config,
state,
flushed_state)
flushed_state,
archive_load_files_data)

# emit latest encountered state
emit_state(flushed_state)
Expand Down Expand Up @@ -257,6 +277,27 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
else:
stream_to_sync[stream] = DbSync(config, o, table_cache, file_format_type)

if archive_load_files:
archive_load_files_data[stream] = {
'tap': config.get('tap_id'),
}

# In case of incremental replication, track min/max of the replication key.
# Incremental replication is assumed if o['bookmark_properties'][0] is one of the columns.
incremental_key_column_name = stream_utils.get_incremental_key(o)
if incremental_key_column_name:
LOGGER.info("Using %s as incremental_key_column_name", incremental_key_column_name)
archive_load_files_data[stream].update(
column=incremental_key_column_name,
min=None,
max=None
)
else:
LOGGER.warning(
"archive_load_files is enabled, but no incremental_key_column_name was found. "
"Min/max values will not be added to metadata for stream %s.", stream
)

stream_to_sync[stream].create_schema_if_not_exists()
stream_to_sync[stream].sync_table()

Expand All @@ -281,7 +322,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
# then flush all buckets.
if sum(row_count.values()) > 0:
# flush all streams one last time, delete records if needed, reset counts and then emit current state
flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state)
flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state,
archive_load_files_data)

# emit latest state
emit_state(copy.deepcopy(flushed_state))
Expand All @@ -295,6 +337,7 @@ def flush_streams(
config,
state,
flushed_state,
archive_load_files_data,
filter_streams=None):
"""
Flushes all buckets and resets records count to 0 as well as empties records to load list
Expand All @@ -305,6 +348,7 @@ def flush_streams(
:param state: dictionary containing the original state from tap
:param flushed_state: dictionary containing updated states only when streams got flushed
:param filter_streams: Keys of streams to flush from the streams dict. Default is every stream
:param archive_load_files_data: dictionary of dictionaries containing archive load files data
:return: State dict with flushed positions
"""
parallelism = config.get("parallelism", DEFAULT_PARALLELISM)
Expand Down Expand Up @@ -337,7 +381,8 @@ def flush_streams(
db_sync=stream_to_sync[stream],
no_compression=config.get('no_compression'),
delete_rows=config.get('hard_delete'),
temp_dir=config.get('temp_dir')
temp_dir=config.get('temp_dir'),
archive_load_files=copy.copy(archive_load_files_data.get(stream, None))
) for stream in streams_to_flush)

# reset flushed stream records to empty to avoid flushing same records
Expand All @@ -358,16 +403,20 @@ def flush_streams(
else:
flushed_state = copy.deepcopy(state)

if stream in archive_load_files_data:
archive_load_files_data[stream]['min'] = None
archive_load_files_data[stream]['max'] = None

# Return with state message with flushed positions
return flushed_state


def load_stream_batch(stream, records, row_count, db_sync, no_compression=False, delete_rows=False,
temp_dir=None):
temp_dir=None, archive_load_files=None):
"""Load one batch of the stream into target table"""
# Load into snowflake
if row_count[stream] > 0:
flush_records(stream, records, db_sync, temp_dir, no_compression)
flush_records(stream, records, db_sync, temp_dir, no_compression, archive_load_files)

# Delete soft-deleted, flagged rows - where _sdc_deleted at is not null
if delete_rows:
Expand All @@ -381,7 +430,8 @@ def flush_records(stream: str,
records: List[Dict],
db_sync: DbSync,
temp_dir: str = None,
no_compression: bool = False) -> None:
no_compression: bool = False,
archive_load_files: Dict = None) -> None:
"""
Takes a list of record messages and loads it into the snowflake target table

Expand All @@ -391,8 +441,9 @@ def flush_records(stream: str,
column value
row_count:
db_sync: A DbSync object
temp_dir: Directory where intermediate temporary files will be created. (Default: OS specificy temp directory)
temp_dir: Directory where intermediate temporary files will be created. (Default: OS specific temp directory)
no_compression: Disable to use compressed files. (Default: False)
archive_load_files: Data needed for archive load files. (Default: None)

Returns:
None
Expand All @@ -403,7 +454,7 @@ def flush_records(stream: str,
compression=not no_compression,
dest_dir=temp_dir,
data_flattening_max_level=
db_sync.data_flattening_max_level)
db_sync.data_flattening_max_level)

# Get file stats
row_count = len(records)
Expand All @@ -413,8 +464,40 @@ def flush_records(stream: str,
s3_key = db_sync.put_to_stage(filepath, stream, row_count, temp_dir=temp_dir)
db_sync.load_file(s3_key, row_count, size_bytes)

# Delete file from local disk and from s3
# Delete file from local disk
os.remove(filepath)

if archive_load_files:
stream_name_parts = stream_utils.stream_name_to_dict(stream)
if 'schema_name' not in stream_name_parts or 'table_name' not in stream_name_parts:
raise Exception("Failed to extract schema and table names from stream '{}'".format(stream))

archive_schema = stream_name_parts['schema_name']
archive_table = stream_name_parts['table_name']
archive_tap = archive_load_files['tap']

archive_metadata = {
'tap': archive_tap,
'schema': archive_schema,
'table': archive_table,
'archived-by': 'pipelinewise_target_snowflake'
}

if 'column' in archive_load_files:
archive_metadata.update({
'incremental-key': archive_load_files['column'],
'incremental-key-min': str(archive_load_files['min']),
'incremental-key-max': str(archive_load_files['max'])
})

# Use same file name as in import
archive_file = s3_key.split('/')[-1]
archive_folder = "archive/{}/{}".format(archive_tap, archive_table)
archive_key = "{}/{}".format(archive_folder, archive_file)

db_sync.copy_to_archive(s3_key, archive_key, archive_metadata)

# Delete file from S3
db_sync.delete_from_stage(stream, s3_key)


Expand Down
10 changes: 10 additions & 0 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def validate_config(config):
if not config_default_target_schema and not config_schema_mapping:
errors.append("Neither 'default_target_schema' (string) nor 'schema_mapping' (object) keys set in config.")

# Check if archive load files option is using external stages
archive_load_files = config.get('archive_load_files', False)
if archive_load_files and not config.get('s3_bucket', None):
errors.append('Archive load files option can be used only with external s3 stages. Please define s3_bucket.')

return errors


Expand Down Expand Up @@ -397,6 +402,11 @@ def delete_from_stage(self, stream, s3_key):
self.logger.info('Deleting %s from stage', format(s3_key))
self.upload_client.delete_object(stream, s3_key)

def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata):
"""Copy file from snowflake stage to archive"""
self.logger.info('Copying %s to archive location %s', s3_source_key, s3_archive_key)
self.upload_client.copy_object(s3_source_key, s3_archive_key, s3_archive_metadata)

def get_stage_name(self, stream):
"""Generate snowflake stage name"""
stage = self.connection_config.get('stage', None)
Expand Down
4 changes: 4 additions & 0 deletions target_snowflake/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ class FileFormatNotFoundException(Exception):

class InvalidFileFormatException(Exception):
"""Exception to raise when name file format is not compatible"""


class UnexpectedMessageTypeException(Exception):
"""Exception to raise when provided message doesn't match the expected type"""
14 changes: 14 additions & 0 deletions target_snowflake/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from singer import get_logger

from target_snowflake.exceptions import UnexpectedValueTypeException
from target_snowflake.exceptions import UnexpectedMessageTypeException

LOGGER = get_logger('target_snowflake')

Expand Down Expand Up @@ -114,3 +115,16 @@ def stream_name_to_dict(stream_name, separator='-'):
'schema_name': schema_name,
'table_name': table_name
}


def get_incremental_key(singer_msg: Dict):
"""Derive incremental key from a Singer message dictionary"""
if singer_msg['type'] != "SCHEMA":
raise UnexpectedMessageTypeException("Expecting type SCHEMA, got {}".format(singer_msg['type']))

if 'bookmark_properties' in singer_msg and len(singer_msg['bookmark_properties']) > 0:
col = singer_msg['bookmark_properties'][0]
if col in singer_msg['schema']['properties']:
return col

return None
6 changes: 6 additions & 0 deletions target_snowflake/upload_clients/base_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@ def delete_object(self, stream: str, key: str) -> None:
"""
Delete object
"""

@abstractmethod
def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
"""
Copy object
"""
11 changes: 11 additions & 0 deletions target_snowflake/upload_clients/s3_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,14 @@ def delete_object(self, stream: str, key: str) -> None:
self.logger.info('Deleting %s from external snowflake stage on S3', key)
bucket = self.connection_config['s3_bucket']
self.s3_client.delete_object(Bucket=bucket, Key=key)

def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
"""Copy object to another location on S3"""
self.logger.info('Copying %s to %s', source_key, target_key)
bucket = self.connection_config['s3_bucket']

copy_source = "{}/{}".format(bucket, source_key)

# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object
self.s3_client.copy_object(CopySource=copy_source, Bucket=bucket, Key=target_key, Metadata=target_metadata,
MetadataDirective="REPLACE")
4 changes: 4 additions & 0 deletions target_snowflake/upload_clients/snowflake_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ def delete_object(self, stream: str, key: str) -> None:

with self.dblink.open_connection() as connection:
connection.cursor().execute(f"REMOVE '@{stage}/{key}'")

def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
raise NotImplementedError(
"Copying objects is not supported with a Snowflake upload client.")
Loading