Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

adds retain_s3_files and s3_file_naming_scheme #77

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.idea/*

# Python
.mypy_cache/
__pycache__/
*.py[cod]
*$py.class
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ Full list of options in `config.json`:
| s3_endpoint_url | String | No | The complete URL to use for the constructed client. This is allowing to use non-native s3 account. |
| s3_region_name | String | No | Default region when creating new connections |
| s3_acl | String | No | S3 ACL name to set on the uploaded files |
| s3_file_naming_scheme | String | No | (Default: `pipelinewise_{stream}_{timecode}.{ext}`) A parameterized string which specifies how each file should be named in S3. Variables allowed are: `{stream}`, `{timecode}`, and `{ext}` |
| retain_s3_files | Boolean | No | (Default: False) Specify 'True' to keep files in S3 after importing is complete. Default behavior is False, which removes all files from S3 after loading. |
| stage | String | Yes | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name |
| file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. |
| batch_size_rows | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. |
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ singer-python==5.1.1
snowflake-connector-python==2.0.3
boto3==1.9.33
inflection==0.3.1
joblib==0.13.2
joblib==0.16.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
'botocore==1.13.8',
'urllib3==1.24.3',
'inflection==0.3.1',
'joblib==0.13.2',
'joblib==0.16.0',
'python-dateutil==2.8.1'
],
extras_require={
Expand Down
18 changes: 11 additions & 7 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ def flush_streams(
row_count=row_count,
db_sync=stream_to_sync[stream],
no_compression=config.get('no_compression'),
retain_s3_files=config.get('retain_s3_files'),
delete_rows=config.get('hard_delete'),
temp_dir=config.get('temp_dir')
) for stream in streams_to_flush)
Expand All @@ -390,11 +391,11 @@ def flush_streams(
return flushed_state


def load_stream_batch(stream, records_to_load, row_count, db_sync, no_compression=False, delete_rows=False,
temp_dir=None):
def load_stream_batch(stream, records_to_load, row_count, db_sync, no_compression=False,
retain_s3_files=False, delete_rows=False, temp_dir=None):
# Load into snowflake
if row_count[stream] > 0:
flush_records(stream, records_to_load, row_count[stream], db_sync, temp_dir, no_compression)
flush_records(stream, records_to_load, row_count[stream], db_sync, temp_dir, no_compression, retain_s3_files)

# Delete soft-deleted, flagged rows - where _sdc_deleted at is not null
if delete_rows:
Expand All @@ -410,26 +411,29 @@ def write_record_to_file(outfile, records_to_load, record_to_csv_line_transforme
outfile.write(bytes(csv_line + '\n', 'UTF-8'))


def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None, no_compression=False):
def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None,
no_compression=False, retain_s3_files=False):
if temp_dir:
os.makedirs(temp_dir, exist_ok=True)
csv_fd, csv_file = mkstemp(suffix='.csv', prefix='records_', dir=temp_dir)
record_to_csv_line_transformer = db_sync.record_to_csv_line

# Using gzip or plain file object
if no_compression:
csv_fd, csv_file = mkstemp(suffix='.csv', prefix='records_', dir=temp_dir)
with open(csv_fd, 'wb') as outfile:
write_record_to_file(outfile, records_to_load, record_to_csv_line_transformer)
else:
csv_fd, csv_file = mkstemp(suffix='.csv.gz', prefix='records_', dir=temp_dir)
with gzip.open(csv_file, 'wb') as outfile:
write_record_to_file(outfile, records_to_load, record_to_csv_line_transformer)

size_bytes = os.path.getsize(csv_file)
s3_key = db_sync.put_to_stage(csv_file, stream, row_count, temp_dir=temp_dir)
db_sync.load_csv(s3_key, row_count, size_bytes)

os.close(csv_fd)
os.remove(csv_file)
db_sync.delete_from_stage(s3_key)
if not retain_s3_files:
db_sync.delete_from_stage(s3_key)


def main():
Expand Down
19 changes: 17 additions & 2 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ def record_primary_key_string(self, record):
raise exc
return ','.join(key_props)

def get_csv_header_line(self):
return ','.join([name for name in self.flatten_schema])

def record_to_csv_line(self, record):
flatten = flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level)

Expand All @@ -398,7 +401,18 @@ def put_to_stage(self, file, stream, count, temp_dir=None):
bucket = self.connection_config['s3_bucket']
s3_acl = self.connection_config.get('s3_acl')
s3_key_prefix = self.connection_config.get('s3_key_prefix', '')
s3_key = "{}pipelinewise_{}_{}.csv".format(s3_key_prefix, stream, datetime.datetime.now().strftime("%Y%m%d-%H%M%S-%f"))
s3_file_naming_scheme = self.connection_config.get(
's3_file_naming_scheme', "pipelinewise_{stream}_{timecode}.{ext}"
)
s3_file_name = s3_file_naming_scheme
for k, v in {
"{stream}": stream,
"{timecode}": datetime.datetime.now().strftime("%Y%m%d-%H%M%S-%f"),
"{ext}": ".".join(file.replace("\\", "/").split("/")[-1].split(".")[1:])
}.items():
if k in s3_file_name:
s3_file_name = s3_file_name.replace(k, v)
s3_key = "{}{}".format(s3_key_prefix, s3_file_name)

self.logger.info("Target S3 bucket: {}, local file: {}, S3 key: {}".format(bucket, file, s3_key))

Expand Down Expand Up @@ -467,7 +481,8 @@ def load_csv(self, s3_key, count, size_bytes):
USING (
SELECT {}
FROM '@{}/{}'
(FILE_FORMAT => '{}')) s
(FILE_FORMAT => '{}')
) s
ON {}
WHEN MATCHED THEN
UPDATE SET {}
Expand Down