diff --git a/.gitignore b/.gitignore index 36d6c9f3..783134e1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .idea/* # Python +.mypy_cache/ __pycache__/ *.py[cod] *$py.class diff --git a/README.md b/README.md index f3186966..31edf95d 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,8 @@ Full list of options in `config.json`: | s3_endpoint_url | String | No | The complete URL to use for the constructed client. This is allowing to use non-native s3 account. | | s3_region_name | String | No | Default region when creating new connections | | s3_acl | String | No | S3 ACL name to set on the uploaded files | +| s3_file_naming_scheme | String | No | (Default: `pipelinewise_{stream}_{timecode}.{ext}`) A parameterized string which specifies how each file should be named in S3. Variables allowed are: `{stream}`, `{timecode}`, and `{ext}` | +| retain_s3_files | Boolean | No | (Default: False) Specify 'True' to keep files in S3 after importing is complete. Default behavior is False, which removes all files from S3 after loading. | | stage | String | Yes | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name | | file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. | | batch_size_rows | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. | diff --git a/requirements.txt b/requirements.txt index caa51451..d54010a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,4 @@ singer-python==5.1.1 snowflake-connector-python==2.0.3 boto3==1.9.33 inflection==0.3.1 -joblib==0.13.2 +joblib==0.16.0 diff --git a/setup.py b/setup.py index 9cf73e55..fd13d093 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ 'botocore==1.13.8', 'urllib3==1.24.3', 'inflection==0.3.1', - 'joblib==0.13.2', + 'joblib==0.16.0', 'python-dateutil==2.8.1' ], extras_require={ diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 430af88e..ebaba378 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -364,6 +364,7 @@ def flush_streams( row_count=row_count, db_sync=stream_to_sync[stream], no_compression=config.get('no_compression'), + retain_s3_files=config.get('retain_s3_files'), delete_rows=config.get('hard_delete'), temp_dir=config.get('temp_dir') ) for stream in streams_to_flush) @@ -390,11 +391,11 @@ def flush_streams( return flushed_state -def load_stream_batch(stream, records_to_load, row_count, db_sync, no_compression=False, delete_rows=False, - temp_dir=None): +def load_stream_batch(stream, records_to_load, row_count, db_sync, no_compression=False, + retain_s3_files=False, delete_rows=False, temp_dir=None): # Load into snowflake if row_count[stream] > 0: - flush_records(stream, records_to_load, row_count[stream], db_sync, temp_dir, no_compression) + flush_records(stream, records_to_load, row_count[stream], db_sync, temp_dir, no_compression, retain_s3_files) # Delete soft-deleted, flagged rows - where _sdc_deleted at is not null if delete_rows: @@ -410,17 +411,18 @@ def write_record_to_file(outfile, records_to_load, record_to_csv_line_transforme outfile.write(bytes(csv_line + '\n', 'UTF-8')) -def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None, no_compression=False): +def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None, + no_compression=False, retain_s3_files=False): if temp_dir: os.makedirs(temp_dir, exist_ok=True) - csv_fd, csv_file = mkstemp(suffix='.csv', prefix='records_', dir=temp_dir) record_to_csv_line_transformer = db_sync.record_to_csv_line - # Using gzip or plain file object if no_compression: + csv_fd, csv_file = mkstemp(suffix='.csv', prefix='records_', dir=temp_dir) with open(csv_fd, 'wb') as outfile: write_record_to_file(outfile, records_to_load, record_to_csv_line_transformer) else: + csv_fd, csv_file = mkstemp(suffix='.csv.gz', prefix='records_', dir=temp_dir) with gzip.open(csv_file, 'wb') as outfile: write_record_to_file(outfile, records_to_load, record_to_csv_line_transformer) @@ -428,8 +430,10 @@ def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None, no s3_key = db_sync.put_to_stage(csv_file, stream, row_count, temp_dir=temp_dir) db_sync.load_csv(s3_key, row_count, size_bytes) + os.close(csv_fd) os.remove(csv_file) - db_sync.delete_from_stage(s3_key) + if not retain_s3_files: + db_sync.delete_from_stage(s3_key) def main(): diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index b844b610..9fa300d7 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -381,6 +381,9 @@ def record_primary_key_string(self, record): raise exc return ','.join(key_props) + def get_csv_header_line(self): + return ','.join([name for name in self.flatten_schema]) + def record_to_csv_line(self, record): flatten = flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level) @@ -398,7 +401,18 @@ def put_to_stage(self, file, stream, count, temp_dir=None): bucket = self.connection_config['s3_bucket'] s3_acl = self.connection_config.get('s3_acl') s3_key_prefix = self.connection_config.get('s3_key_prefix', '') - s3_key = "{}pipelinewise_{}_{}.csv".format(s3_key_prefix, stream, datetime.datetime.now().strftime("%Y%m%d-%H%M%S-%f")) + s3_file_naming_scheme = self.connection_config.get( + 's3_file_naming_scheme', "pipelinewise_{stream}_{timecode}.{ext}" + ) + s3_file_name = s3_file_naming_scheme + for k, v in { + "{stream}": stream, + "{timecode}": datetime.datetime.now().strftime("%Y%m%d-%H%M%S-%f"), + "{ext}": ".".join(file.replace("\\", "/").split("/")[-1].split(".")[1:]) + }.items(): + if k in s3_file_name: + s3_file_name = s3_file_name.replace(k, v) + s3_key = "{}{}".format(s3_key_prefix, s3_file_name) self.logger.info("Target S3 bucket: {}, local file: {}, S3 key: {}".format(bucket, file, s3_key)) @@ -467,7 +481,8 @@ def load_csv(self, s3_key, count, size_bytes): USING ( SELECT {} FROM '@{}/{}' - (FILE_FORMAT => '{}')) s + (FILE_FORMAT => '{}') + ) s ON {} WHEN MATCHED THEN UPDATE SET {}