diff --git a/CHANGELOG.md b/CHANGELOG.md index f00df19..390be1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.3.1 + +- Merge in upstream changes, below: +- Increase the limit on the width of a field in the CSV files read by the tap [#47](https://github.com/singer-io/singer-encodings/pull/47) + ## 1.3.0 - Reintroduce ability to assume role for external AWS account diff --git a/setup.py b/setup.py index 40d94a6..3958f0b 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-s3-csv', - version='1.3.0', + version='1.3.1', description='Singer.io tap for extracting CSV files from S3', author='Stitch', url='https://singer.io', @@ -11,7 +11,7 @@ py_modules=['tap_s3_csv'], install_requires=[ 'backoff==1.8.0', - 'boto3==1.17.0', + 'boto3==1.9.57', 'singer-encodings==0.1.2', 'singer-python==5.12.1', 'voluptuous==0.10.5' diff --git a/tap_s3_csv/__init__.py b/tap_s3_csv/__init__.py index 9736e11..7030d29 100644 --- a/tap_s3_csv/__init__.py +++ b/tap_s3_csv/__init__.py @@ -11,7 +11,8 @@ LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = ["bucket"] -REQUIRED_CONFIG_KEYS_EXTERNAL_SOURCE = ["bucket", "account_id", "external_id", "role_name"] +REQUIRED_CONFIG_KEYS_EXTERNAL_SOURCE = [ + "bucket", "account_id", "external_id", "role_name"] def do_discover(config): @@ -34,7 +35,8 @@ def do_sync(config, catalog, state): for stream in catalog['streams']: stream_name = stream['tap_stream_id'] mdata = metadata.to_map(stream['metadata']) - table_spec = next(s for s in config['tables'] if s['table_name'] == stream_name) + table_spec = next( + s for s in config['tables'] if s['table_name'] == stream_name) if not stream_is_selected(mdata): LOGGER.info("%s: Skipping - not selected", stream_name) continue @@ -50,6 +52,7 @@ def do_sync(config, catalog, state): LOGGER.info('Done syncing.') + def validate_table_config(config): # Parse the incoming tables config as JSON tables_config = config['tables'] @@ -59,17 +62,19 @@ def validate_table_config(config): table_config.pop('search_prefix') if table_config.get('key_properties') == "" or table_config.get('key_properties') is None: table_config['key_properties'] = [] - elif table_config.get('key_properties'): - table_config['key_properties'] = [s.strip() for s in table_config['key_properties']] - + elif table_config.get('key_properties') and isinstance(table_config['key_properties'], str): + table_config['key_properties'] = [s.strip() + for s in table_config['key_properties'].split(',')] if table_config.get('date_overrides') == "" or table_config.get('date_overrides') is None: table_config['date_overrides'] = [] elif table_config.get('date_overrides') and isinstance(table_config['date_overrides'], str): - table_config['date_overrides'] = [s.strip() for s in table_config['date_overrides'].split(',')] + table_config['date_overrides'] = [s.strip() + for s in table_config['date_overrides'].split(',')] # Reassign the config tables to the validated object return CONFIG_CONTRACT(tables_config) + @singer.utils.handle_top_exception(LOGGER) def main(): args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) diff --git a/tap_s3_csv/conversion.py b/tap_s3_csv/conversion.py index 213612c..52031c7 100644 --- a/tap_s3_csv/conversion.py +++ b/tap_s3_csv/conversion.py @@ -2,7 +2,9 @@ LOGGER = singer.get_logger() -#pylint: disable=too-many-return-statements +# pylint: disable=too-many-return-statements + + def infer(key, datum, date_overrides, check_second_call=False): """ Returns the inferred data type @@ -50,7 +52,7 @@ def process_sample(sample, counts, lengths, table_spec): for key, value in sample.items(): if key not in counts: counts[key] = {} - + length = len(value) if key not in lengths or length > lengths[key]: lengths[key] = length @@ -95,6 +97,7 @@ def pick_datatype(counts): return to_return + def generate_schema(samples, table_spec, string_max_length: bool): counts, lengths = {}, {} for sample in samples: @@ -125,7 +128,8 @@ def generate_schema(samples, table_spec, string_max_length: bool): if string_max_length: schema[key]['anyOf'][1]['maxLength'] = lengths[key] else: - schema[key] = datatype_schema(datatype, lengths[key], string_max_length) + schema[key] = datatype_schema( + datatype, lengths[key], string_max_length) return schema @@ -139,7 +143,7 @@ def datatype_schema(datatype, length, string_max_length: bool): ] } if string_max_length: - schema['anyOf'][1]['maxLength'] = length + schema['anyOf'][1]['maxLength'] = length elif datatype == 'dict': schema = { 'anyOf': [ @@ -148,7 +152,7 @@ def datatype_schema(datatype, length, string_max_length: bool): ] } if string_max_length: - schema['anyOf'][1]['maxLength'] = length + schema['anyOf'][1]['maxLength'] = length else: types = ['null', datatype] if datatype != 'string': @@ -157,5 +161,5 @@ def datatype_schema(datatype, length, string_max_length: bool): 'type': types, } if string_max_length: - schema['maxLength'] = length + schema['maxLength'] = length return schema diff --git a/tap_s3_csv/s3.py b/tap_s3_csv/s3.py index a08a02a..2239e71 100644 --- a/tap_s3_csv/s3.py +++ b/tap_s3_csv/s3.py @@ -3,6 +3,7 @@ import io import json import gzip +import sys import backoff import boto3 import singer @@ -27,6 +28,7 @@ skipped_files_count = 0 + def retry_pattern(): return backoff.on_exception(backoff.expo, ClientError, @@ -36,7 +38,8 @@ def retry_pattern(): def log_backoff_attempt(details): - LOGGER.info("Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries")) + LOGGER.info( + "Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries")) class AssumeRoleProvider(): @@ -84,25 +87,29 @@ def get_sampled_schema_for_table(config, table_spec): s3_files_gen = get_input_files_for_table(config, table_spec) - samples = [sample for sample in sample_files(config, table_spec, s3_files_gen)] + samples = [sample for sample in sample_files( + config, table_spec, s3_files_gen)] if skipped_files_count: - LOGGER.warning("%s files got skipped during the last sampling.",skipped_files_count) + LOGGER.warning( + "%s files got skipped during the last sampling.", skipped_files_count) if not samples: - #Return empty properties for accept everything from data if no samples found + # Return empty properties for accept everything from data if no samples found return { 'type': 'object', 'properties': {} } - data_schema = conversion.generate_schema(samples, table_spec, config.get('string_max_length', False)) + data_schema = conversion.generate_schema( + samples, table_spec, config.get('string_max_length', False)) return { 'type': 'object', 'properties': data_schema } + def merge_dicts(first, second): to_return = first.copy() @@ -119,11 +126,25 @@ def merge_dicts(first, second): return to_return +def maximize_csv_field_width(): + + current_field_size_limit = csv_iterator.csv.field_size_limit() + field_size_limit = sys.maxsize + + if current_field_size_limit != field_size_limit: + csv_iterator.csv.field_size_limit(field_size_limit) + LOGGER.info("Changed the CSV field size limit from %s to %s", + current_field_size_limit, + field_size_limit) + + def get_records_for_csv(s3_path, sample_rate, iterator): current_row = 0 sampled_row_count = 0 + maximize_csv_field_width() + for row in iterator: # Skipping the empty line of CSV. @@ -191,11 +212,14 @@ def check_key_properties_and_date_overrides_for_jsonl_file(table_spec, jsonl_sam raise Exception('JSONL file "{}" is missing date_overrides key: {}' .format(s3_path, date_overrides - all_keys)) -#pylint: disable=global-statement +# pylint: disable=global-statement + + def sampling_gz_file(table_spec, s3_path, file_handle, sample_rate): global skipped_files_count if s3_path.endswith(".tar.gz"): - LOGGER.warning('Skipping "%s" file as .tar.gz extension is not supported',s3_path) + LOGGER.warning( + 'Skipping "%s" file as .tar.gz extension is not supported', s3_path) skipped_files_count = skipped_files_count + 1 return [] @@ -203,18 +227,21 @@ def sampling_gz_file(table_spec, s3_path, file_handle, sample_rate): gz_file_obj = gzip.GzipFile(fileobj=io.BytesIO(file_bytes)) try: - gz_file_name = utils.get_file_name_from_gzfile(fileobj=io.BytesIO(file_bytes)) + gz_file_name = utils.get_file_name_from_gzfile( + fileobj=io.BytesIO(file_bytes)) except AttributeError as err: # If a file is compressed using gzip command with --no-name attribute, # It will not return the file name and timestamp. Hence we will skip such files. # We also seen this issue occur when tar is used to compress the file - LOGGER.warning('Skipping "%s" file as we did not get the original file name',s3_path) + LOGGER.warning( + 'Skipping "%s" file as we did not get the original file name', s3_path) skipped_files_count = skipped_files_count + 1 return [] if gz_file_name: if gz_file_name.endswith(".gz"): - LOGGER.warning('Skipping "%s" file as it contains nested compression.',s3_path) + LOGGER.warning( + 'Skipping "%s" file as it contains nested compression.', s3_path) skipped_files_count = skipped_files_count + 1 return [] @@ -223,31 +250,35 @@ def sampling_gz_file(table_spec, s3_path, file_handle, sample_rate): raise Exception('"{}" file has some error(s)'.format(s3_path)) -#pylint: disable=global-statement +# pylint: disable=global-statement + + def sample_file(table_spec, s3_path, file_handle, sample_rate, extension): global skipped_files_count # Check whether file is without extension or not if not extension or s3_path.lower() == extension: - LOGGER.warning('"%s" without extension will not be sampled.',s3_path) + LOGGER.warning('"%s" without extension will not be sampled.', s3_path) skipped_files_count = skipped_files_count + 1 return [] if extension in ["csv", "txt"]: # If file object read from s3 bucket file else use extracted file object from zip or gz - file_handle = file_handle._raw_stream if hasattr(file_handle, "_raw_stream") else file_handle #pylint:disable=protected-access + file_handle = file_handle._raw_stream if hasattr( + file_handle, "_raw_stream") else file_handle # pylint:disable=protected-access iterator = csv_iterator.get_row_iterator(file_handle, table_spec) csv_records = [] if iterator: csv_records = get_records_for_csv(s3_path, sample_rate, iterator) else: - LOGGER.warning('Skipping "%s" file as it is empty',s3_path) + LOGGER.warning('Skipping "%s" file as it is empty', s3_path) skipped_files_count = skipped_files_count + 1 return csv_records if extension == "gz": return sampling_gz_file(table_spec, s3_path, file_handle, sample_rate) if extension == "jsonl": # If file object read from s3 bucket file else use extracted file object from zip or gz - file_handle = file_handle._raw_stream if hasattr(file_handle, "_raw_stream") else file_handle + file_handle = file_handle._raw_stream if hasattr( + file_handle, "_raw_stream") else file_handle records = get_records_for_jsonl( s3_path, sample_rate, file_handle) check_jsonl_sample_records, records = itertools.tee( @@ -261,14 +292,18 @@ def sample_file(table_spec, s3_path, file_handle, sample_rate, extension): return records if extension == "zip": - LOGGER.warning('Skipping "%s" file as it contains nested compression.',s3_path) + LOGGER.warning( + 'Skipping "%s" file as it contains nested compression.', s3_path) skipped_files_count = skipped_files_count + 1 return [] - LOGGER.warning('"%s" having the ".%s" extension will not be sampled.',s3_path,extension) + LOGGER.warning( + '"%s" having the ".%s" extension will not be sampled.', s3_path, extension) skipped_files_count = skipped_files_count + 1 return [] -#pylint: disable=global-statement +# pylint: disable=global-statement + + def get_files_to_sample(config, s3_files, max_files): """ Returns the list of files for sampling, it checks the s3_files whether any zip or gz file exists or not @@ -287,7 +322,7 @@ def get_files_to_sample(config, s3_files, max_files): global skipped_files_count sampled_files = [] - OTHER_FILES = ["csv","gz","jsonl","txt"] + OTHER_FILES = ["csv", "gz", "jsonl", "txt"] for s3_file in s3_files: file_key = s3_file.get('key') @@ -302,22 +337,28 @@ def get_files_to_sample(config, s3_files, max_files): # Check whether file is without extension or not if not extension or file_name.lower() == extension: - LOGGER.warning('"%s" without extension will not be sampled.',file_key) + LOGGER.warning( + '"%s" without extension will not be sampled.', file_key) skipped_files_count = skipped_files_count + 1 elif file_key.endswith(".tar.gz"): - LOGGER.warning('Skipping "%s" file as .tar.gz extension is not supported', file_key) + LOGGER.warning( + 'Skipping "%s" file as .tar.gz extension is not supported', file_key) skipped_files_count = skipped_files_count + 1 elif extension == "zip": - files = compression.infer(io.BytesIO(file_handle.read()), file_name) + files = compression.infer( + io.BytesIO(file_handle.read()), file_name) # Add only those extracted files which are supported by tap # Prepare dictionary contains the zip file name, type i.e. unzipped and file object of extracted file - sampled_files.extend([{ "type" : "unzipped", "s3_path" : file_key, "file_handle" : de_file } for de_file in files if de_file.name.split(".")[-1].lower() in OTHER_FILES and not de_file.name.endswith(".tar.gz") ]) + sampled_files.extend([{"type": "unzipped", "s3_path": file_key, "file_handle": de_file} for de_file in files if de_file.name.split( + ".")[-1].lower() in OTHER_FILES and not de_file.name.endswith(".tar.gz")]) elif extension in OTHER_FILES: # Prepare dictionary contains the s3 file path, extension of file and file object - sampled_files.append({ "s3_path" : file_key , "file_handle" : file_handle, "extension" : extension }) + sampled_files.append( + {"s3_path": file_key, "file_handle": file_handle, "extension": extension}) else: - LOGGER.warning('"%s" having the ".%s" extension will not be sampled.',file_key,extension) + LOGGER.warning( + '"%s" having the ".%s" extension will not be sampled.', file_key, extension) skipped_files_count = skipped_files_count + 1 return sampled_files @@ -331,8 +372,7 @@ def sample_files(config, table_spec, s3_files, for s3_file in itertools.islice(get_files_to_sample(config, s3_files, max_files), max_files): - - s3_path = s3_file.get("s3_path","") + s3_path = s3_file.get("s3_path", "") file_handle = s3_file.get("file_handle") file_type = s3_file.get("type") extension = s3_file.get("extension") @@ -349,14 +389,17 @@ def sample_files(config, table_spec, s3_files, sample_rate) try: yield from itertools.islice(sample_file(table_spec, s3_path, file_handle, sample_rate, extension), max_records) - except (UnicodeDecodeError,json.decoder.JSONDecodeError): + except (UnicodeDecodeError, json.decoder.JSONDecodeError): # UnicodeDecodeError will be raised if non csv file parsed to csv parser # JSONDecodeError will be reaised if non JSONL file parsed to JSON parser # Handled both error and skipping file with wrong extension. - LOGGER.warn("Skipping %s file as parsing failed. Verify an extension of the file.",s3_path) + LOGGER.warn( + "Skipping %s file as parsing failed. Verify an extension of the file.", s3_path) skipped_files_count = skipped_files_count + 1 -#pylint: disable=global-statement +# pylint: disable=global-statement + + def get_input_files_for_table(config, table_spec, modified_since=None): global skipped_files_count bucket = config['bucket'] @@ -429,7 +472,8 @@ def list_files_in_bucket(bucket, search_prefix=None, recursive_search=True): if not recursive_search: if search_prefix is not None and search_prefix != "" and not search_prefix.endswith('/'): search_prefix += '/' - args['Delimiter'] = '/' # This will limit results to the exact folder specified by the prefix, without going into subfolders + # This will limit results to the exact folder specified by the prefix, without going into subfolders + args['Delimiter'] = '/' if search_prefix is not None: args['Prefix'] = search_prefix @@ -445,7 +489,8 @@ def list_files_in_bucket(bucket, search_prefix=None, recursive_search=True): if s3_object_count > 0: LOGGER.info("Found %s files.", s3_object_count) else: - LOGGER.warning('Found no files for bucket "%s" that match prefix "%s"', bucket, search_prefix) + LOGGER.warning( + 'Found no files for bucket "%s" that match prefix "%s"', bucket, search_prefix) @retry_pattern() diff --git a/tap_s3_csv/sync.py b/tap_s3_csv/sync.py index cdfc4d5..3b795c1 100644 --- a/tap_s3_csv/sync.py +++ b/tap_s3_csv/sync.py @@ -19,10 +19,12 @@ LOGGER = singer.get_logger() + def sync_stream(config, state, table_spec, stream): table_name = table_spec['table_name'] bookmark = singer.get_bookmark(state, table_name, 'modified_since') - modified_since = singer_utils.strptime_with_tz(bookmark or '1990-01-01T00:00:00Z') + modified_since = singer_utils.strptime_with_tz( + bookmark or '1990-01-01T00:00:00Z') LOGGER.info('Syncing table "%s".', table_name) LOGGER.info('Getting files modified since %s.', modified_since) @@ -42,13 +44,16 @@ def sync_stream(config, state, table_spec, stream): records_streamed += sync_table_file( config, s3_file['key'], table_spec, stream) - state = singer.write_bookmark(state, table_name, 'modified_since', s3_file['last_modified'].isoformat()) + state = singer.write_bookmark( + state, table_name, 'modified_since', s3_file['last_modified'].isoformat()) singer.write_state(state) if s3.skipped_files_count: - LOGGER.warn("%s files got skipped during the last sync.",s3.skipped_files_count) + LOGGER.warn("%s files got skipped during the last sync.", + s3.skipped_files_count) - LOGGER.info('Wrote %s records for table "%s".', records_streamed, table_name) + LOGGER.info('Wrote %s records for table "%s".', + records_streamed, table_name) return records_streamed @@ -59,7 +64,7 @@ def sync_table_file(config, s3_path, table_spec, stream): # Check whether file is without extension or not if not extension or s3_path.lower() == extension: - LOGGER.warning('"%s" without extension will not be synced.',s3_path) + LOGGER.warning('"%s" without extension will not be synced.', s3_path) s3.skipped_files_count = s3.skipped_files_count + 1 return 0 try: @@ -67,25 +72,27 @@ def sync_table_file(config, s3_path, table_spec, stream): return sync_compressed_file(config, s3_path, table_spec, stream) if extension in ["csv", "gz", "jsonl", "txt"]: return handle_file(config, s3_path, table_spec, stream, extension) - LOGGER.warning('"%s" having the ".%s" extension will not be synced.',s3_path,extension) - except (UnicodeDecodeError,json.decoder.JSONDecodeError): + LOGGER.warning( + '"%s" having the ".%s" extension will not be synced.', s3_path, extension) + except (UnicodeDecodeError, json.decoder.JSONDecodeError): # UnicodeDecodeError will be raised if non csv file passed to csv parser # JSONDecodeError will be raised if non JSONL file passed to JSON parser # Handled both error and skipping file with wrong extension. - LOGGER.warning("Skipping %s file as parsing failed. Verify an extension of the file.",s3_path) + LOGGER.warning( + "Skipping %s file as parsing failed. Verify an extension of the file.", s3_path) s3.skipped_files_count = s3.skipped_files_count + 1 return 0 # pylint: disable=too-many-arguments -def handle_file(config, s3_path, table_spec, stream, extension, file_handler = None): +def handle_file(config, s3_path, table_spec, stream, extension, file_handler=None): """ Used to sync normal supported files """ # Check whether file is without extension or not if not extension or s3_path.lower() == extension: - LOGGER.warning('"%s" without extension will not be synced.',s3_path) + LOGGER.warning('"%s" without extension will not be synced.', s3_path) s3.skipped_files_count = s3.skipped_files_count + 1 return 0 if extension == "gz": @@ -94,14 +101,17 @@ def handle_file(config, s3_path, table_spec, stream, extension, file_handler = N if extension in ["csv", "txt"]: # If file is extracted from zip or gz use file object else get file object from s3 bucket - file_handle = file_handler if file_handler else s3.get_file_handle(config, s3_path)._raw_stream #pylint:disable=protected-access + file_handle = file_handler if file_handler else s3.get_file_handle( + config, s3_path)._raw_stream # pylint:disable=protected-access return sync_csv_file(config, file_handle, s3_path, table_spec, stream) if extension == "jsonl": # If file is extracted from zip or gz use file object else get file object from s3 bucket - file_handle = file_handler if file_handler else s3.get_file_handle(config, s3_path)._raw_stream - records = sync_jsonl_file(config, file_handle, s3_path, table_spec, stream) + file_handle = file_handler if file_handler else s3.get_file_handle( + config, s3_path)._raw_stream + records = sync_jsonl_file( + config, file_handle, s3_path, table_spec, stream) if records == 0: # Only space isn't the valid JSON but it is a valid CSV header hence skipping the jsonl file with only space. s3.skipped_files_count = s3.skipped_files_count + 1 @@ -109,42 +119,49 @@ def handle_file(config, s3_path, table_spec, stream, extension, file_handler = N return records if extension == "zip": - LOGGER.warning('Skipping "%s" file as it contains nested compression.',s3_path) + LOGGER.warning( + 'Skipping "%s" file as it contains nested compression.', s3_path) s3.skipped_files_count = s3.skipped_files_count + 1 return 0 - LOGGER.warning('"%s" having the ".%s" extension will not be synced.',s3_path,extension) + LOGGER.warning( + '"%s" having the ".%s" extension will not be synced.', s3_path, extension) s3.skipped_files_count = s3.skipped_files_count + 1 return 0 def sync_gz_file(config, s3_path, table_spec, stream, file_handler): if s3_path.endswith(".tar.gz"): - LOGGER.warning('Skipping "%s" file as .tar.gz extension is not supported',s3_path) + LOGGER.warning( + 'Skipping "%s" file as .tar.gz extension is not supported', s3_path) s3.skipped_files_count = s3.skipped_files_count + 1 return 0 # If file is extracted from zip use file object else get file object from s3 bucket - file_object = file_handler if file_handler else s3.get_file_handle(config, s3_path) + file_object = file_handler if file_handler else s3.get_file_handle( + config, s3_path) file_bytes = file_object.read() gz_file_obj = gzip.GzipFile(fileobj=io.BytesIO(file_bytes)) # pylint: disable=duplicate-code try: - gz_file_name = utils.get_file_name_from_gzfile(fileobj=io.BytesIO(file_bytes)) + gz_file_name = utils.get_file_name_from_gzfile( + fileobj=io.BytesIO(file_bytes)) except AttributeError as err: # If a file is compressed using gzip command with --no-name attribute, # It will not return the file name and timestamp. Hence we will skip such files. # We also seen this issue occur when tar is used to compress the file - LOGGER.warning('Skipping "%s" file as we did not get the original file name',s3_path) + LOGGER.warning( + 'Skipping "%s" file as we did not get the original file name', s3_path) s3.skipped_files_count = s3.skipped_files_count + 1 return 0 if gz_file_name: if gz_file_name.endswith(".gz"): - LOGGER.warning('Skipping "%s" file as it contains nested compression.',s3_path) + LOGGER.warning( + 'Skipping "%s" file as it contains nested compression.', s3_path) s3.skipped_files_count = s3.skipped_files_count + 1 return 0 @@ -160,7 +177,8 @@ def sync_compressed_file(config, s3_path, table_spec, stream): records_streamed = 0 s3_file_handle = s3.get_file_handle(config, s3_path) - decompressed_files = compression.infer(io.BytesIO(s3_file_handle.read()), s3_path) + decompressed_files = compression.infer( + io.BytesIO(s3_file_handle.read()), s3_path) for decompressed_file in decompressed_files: extension = decompressed_file.name.split(".")[-1].lower() @@ -169,7 +187,8 @@ def sync_compressed_file(config, s3_path, table_spec, stream): # Append the extracted file name with zip file. s3_file_path = s3_path + "/" + decompressed_file.name - records_streamed += handle_file(config, s3_file_path, table_spec, stream, extension, file_handler=decompressed_file) + records_streamed += handle_file(config, s3_file_path, table_spec, + stream, extension, file_handler=decompressed_file) return records_streamed @@ -196,17 +215,18 @@ def sync_csv_file(config, file_handle, s3_path, table_spec, stream): if iterator: for row in iterator: - #Skipping the empty line of CSV + # Skipping the empty line of CSV if len(row) == 0: continue with Transformer() as transformer: - to_write = transformer.transform(row, stream['schema'], metadata.to_map(stream['metadata'])) + to_write = transformer.transform( + row, stream['schema'], metadata.to_map(stream['metadata'])) singer.write_record(table_name, to_write) records_synced += 1 else: - LOGGER.warning('Skipping "%s" file as it is empty',s3_path) + LOGGER.warning('Skipping "%s" file as it is empty', s3_path) s3.skipped_files_count = s3.skipped_files_count + 1 return records_synced @@ -231,7 +251,8 @@ def sync_jsonl_file(config, iterator, s3_path, table_spec, stream): continue with Transformer() as transformer: - to_write = transformer.transform(row, stream['schema'], metadata.to_map(stream['metadata'])) + to_write = transformer.transform( + row, stream['schema'], metadata.to_map(stream['metadata'])) singer.write_record(table_name, to_write) records_synced += 1