Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

WIP archive_load_files #174

Closed
wants to merge 11 commits into from
Closed

WIP archive_load_files #174

wants to merge 11 commits into from

Conversation

llehtinen
Copy link
Contributor

Problem

https://transferwise.atlassian.net/browse/AP-1011

Proposed changes

https://docs.google.com/document/d/11UTlmWVJS9aGickmyxpXOUhrv4RTPnm8zip_IqeR2J0/edit?ts=609e4106

Types of changes

What types of changes does your code introduce to PipelineWise?
Put an x in the boxes that apply

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation Update (if none of the other choices apply)

Checklist

  • Description above provides context of the change
  • I have added tests that prove my fix is effective or that my feature works
  • Unit tests for changes (not needed for documentation changes)
  • CI checks pass with my changes
  • Bumping version in setup.py is an individual PR and not mixed with feature or bugfix PRs
  • Commit message/PR title starts with [AP-NNNN] (if applicable. AP-NNNN = JIRA ID)
  • Branch name starts with AP-NNN (if applicable. AP-NNN = JIRA ID)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions

@llehtinen llehtinen added the Draft PR or issue still in draft mode label May 27, 2021
@@ -24,3 +24,9 @@ def delete_object(self, stream: str, key: str) -> None:
"""
Delete object
"""

@abstractmethod
def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SnowflakeUploadClient doesn't implement this.

The copy_object acceptable only for s3_upload_client but I think an implementation is still required in the SnowflakeUploadClient as well. Maybe we can raise NotImplementedError with some human readable error message.

if config.get('archive_load_files.enabled'):
# keep track of min and max
archive_load_files_primary_column =\
config.get('archive_load_files.primary_column') or primary_key_string
Copy link
Contributor

@koszti koszti May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be done only once when processing the SCHEMA message and not in every RECORD message.

Primary key column(s) is available in the schema message as o['key_properties'] and the incremental key(s) as o['bookmark_properties'].

's3_key': 'tbd', # TODO: Turn archive_load_files.naming_convention into actual key
'database': 'tbd', # TODO: Where to get this
'schema': 'tbd', # TODO: Where to get this
'table': stream,
Copy link
Contributor

@koszti koszti May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these values, especially the s3_key should be defined at an another place, ideally only when loading the batch to s3.

The stream is a concatenated string of db-schema and db-table for example: public-table_one and that's already sent to flush_streams as part of records_to_load list.

The database is not acceptable in general in target-snowflake. One ppw stream can read only from one database and DB-schema-table hierarchy is not acceptable to every data source type, hence target-snowflake is not using the concept of database. (pg vs mariadb for example)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koszti One issue with s3_key is that it requires access to the config. From here, currently, config is not passed further than flush_streams. Would you prefer to have the entire config propagated further from there, or add the archive_load_files config as init param to the S3UploadClient, or something else?

@@ -170,6 +171,31 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
else:
records_to_load[stream][primary_key_string] = o['record']

if config.get('archive_load_files.enabled'):
Copy link
Contributor

@koszti koszti May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to check if we're using the S3UploadClient and not the SnowflakeUploadClient. Archiving load files is acceptable only if we use the S3UploadClient.

The SnowflakeUploadClient is using snowflake managed table stages and we can't archive anything onto it.

There is a check for that at around here.

# Determine archive_load_files_primary_column
# 1) Use incremental replication key if defined
# 2) Otherwise use primary key
if False and 'bookmark_properties' in o and len(o['bookmark_properties']) > 0:
Copy link
Contributor Author

@llehtinen llehtinen Jun 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBD: What's the proper way to identify incremental replication key? Unit test data had lsn here.

Copy link
Contributor

@koszti koszti Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check messages-simple-table.json, maybe this is a better example. The incremental key is defined in details only in the STATE message, but taps are not sending them necessarily always after the SCHEMA and before the RECORD messages. The order is not defined by the singer-spec

Taps are copying replication key into the bookmark_properties. If it's logical replication then tap-postgres using the lsn keyword. The best what we can do is to get o['bookmark_properties'] and if the result is one of the column in the schema then we use it as archive_load_files_primary_column. If not exists then we use the PK: o['key_properties'][0].

This all could be complicated and better to introduce a new function in stream_utils.py. We can implement and unit test the new function separately and here it will look like this:

archive_load_files_primary_column = stream_utils.get_archive_load_files_primary_columns(o)

The new function will use three propertes of the input SCHEMA message (here it's called o):

o['key_properties']: List of primary keys in the stream. What we should return if it's a composite key? And how we should track it?
o['bookmark_properties']: List of incremental keys - Only one value is supported, so getting the first value is OK
o['schema']['properties']: List of columns in the schema

@@ -113,6 +113,9 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS)
batch_wait_limit_seconds = config.get('batch_wait_limit_seconds', None)
flush_timestamp = datetime.utcnow()
archive_load_files_enabled =\
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of better way to check the config - if archive_load_files.enabled but s3 bucket isn't defined, raise exception

Copy link
Contributor

@koszti koszti Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config validation is at https://github.com/transferwise/pipelinewise-target-snowflake/blob/master/target_snowflake/db_sync.py#L65 , where we can do something like this:

    # Check if archive load files option is using external stages
    archive_load_files = config.get('archive_load_files', {})
    if archive_load_files.get('enabled') and not config.get('s3_bucket', None):
        errors.append('Archive load files option can be used only with external s3 stages. Please define s3_bucket')

Once the validation is done we can get the values here by:

archive_load_files_enabled = config.get('archive_load_files', {}).get('enabled', None)

Please note that separating config variables with dots (db_conn.s3_bucket and archive_load_files.enabled) are not working. We need to access them as nested dictionaries.

Also the db_conn dictionary keys are accessible directly in the config dict and we don't need the prefix.

archive_load_files_primary_column = o['key_properties'][0] # Behavior when key multi-col?

archive_load_files_data[stream] = {
'tap': config.get('id'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'tap': config.get('id'),
'tap': config.get('tap_id'),

target-snowflake can't access the original YAML file. The config is a generated JSON of the items under db_conn and some other propertes listed here

# Keep track of min and max of the designated column
values = archive_load_files_data[stream]
archive_primary_column_name = values['column']
archive_primary_column_value = o['record'][archive_primary_column_name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we should return if archive_primary_column_name is a composite primary key? How we should track it?

self.config['id'] = 'test-tap-id'
self.config['archive_load_files.enabled'] = True
self.config['db_conn.s3_bucket'] = 'dummy_bucket'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.config['db_conn.s3_bucket'] = 'dummy_bucket'
self.config['s3_bucket'] = 'dummy_bucket'

Separating by dots is not valid and here we don't use the original YAML file. The config is a generated JSON of the items under db_conn and some other propertes listed here

self.config['id'] = 'test-tap-id'
self.config['archive_load_files.enabled'] = True
self.config['db_conn.s3_bucket'] = 'dummy_bucket'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

self.config['id'] = 'test-tap-id'
self.config['archive_load_files.enabled'] = True
self.config['db_conn.s3_bucket'] = 'dummy_bucket'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@@ -32,7 +32,7 @@ jobs:
command: |
. venv/bin/activate
export LOGGING_CONF_FILE=$(pwd)/sample_logging.conf
pytest tests/integration/ -vv --cov target_snowflake --cov-fail-under=86
pytest tests/integration/ -k test_archive_load_files -vv
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be reverted, this was used for faster turnaround when testing

@koszti
Copy link
Contributor

koszti commented Jun 23, 2021

this PR is obsolete and covered by #178

@koszti koszti closed this Jun 23, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Draft PR or issue still in draft mode
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants