From df8c432c541349f005a2c9c0705aabf53e73a082 Mon Sep 17 00:00:00 2001 From: Thomas Briggs Date: Thu, 10 Feb 2022 16:24:25 -0500 Subject: [PATCH 1/2] Distinguish between nulls and empty strings --- target_postgres/db_sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/target_postgres/db_sync.py b/target_postgres/db_sync.py index 3b8443b..62d87dd 100644 --- a/target_postgres/db_sync.py +++ b/target_postgres/db_sync.py @@ -349,7 +349,7 @@ def record_to_csv_line(self, record): return ','.join( [ json.dumps(flatten[name], ensure_ascii=False) - if name in flatten and (flatten[name] == 0 or flatten[name]) else '' + if name in flatten and (flatten[name] == 0 or flatten[name] or (column_type(self.flatten_schema[name]) == 'character varying' and flatten[name] == '')) else '###NULL###' for name in self.flatten_schema ] ) @@ -367,7 +367,7 @@ def load_csv(self, file, count, size_bytes): temp_table = self.table_name(stream_schema_message['stream'], is_temporary=True) cur.execute(self.create_table_query(table_name=temp_table, is_temporary=True)) - copy_sql = "COPY {} ({}) FROM STDIN WITH (FORMAT CSV, ESCAPE '\\')".format( + copy_sql = "COPY {} ({}) FROM STDIN WITH (FORMAT CSV, ESCAPE '\\', NULL '###NULL###')".format( temp_table, ', '.join(self.column_names()) ) From 1cc2531bae24633156bf55e5a64cb9ed35e1ee3c Mon Sep 17 00:00:00 2001 From: Thomas Briggs Date: Mon, 14 Feb 2022 15:39:41 -0500 Subject: [PATCH 2/2] Don't load the logging config every time we check a column type --- target_postgres/db_sync.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/target_postgres/db_sync.py b/target_postgres/db_sync.py index 62d87dd..90a0b6d 100644 --- a/target_postgres/db_sync.py +++ b/target_postgres/db_sync.py @@ -37,7 +37,7 @@ def validate_config(config): # pylint: disable=fixme -def column_type(schema_property): +def column_type(schema_property, logger=None): property_type = schema_property['type'] property_format = schema_property['format'] if 'format' in schema_property else None col_type = 'character varying' @@ -69,7 +69,8 @@ def column_type(schema_property): elif 'boolean' in property_type: col_type = 'boolean' - get_logger('target_postgres').debug("schema_property: %s -> col_type: %s", schema_property, col_type) + if logger: + logger.debug("schema_property: %s -> col_type: %s", schema_property, col_type) return col_type @@ -78,8 +79,8 @@ def safe_column_name(name): return '"{}"'.format(name).lower() -def column_clause(name, schema_property): - return '{} {}'.format(safe_column_name(name), column_type(schema_property)) +def column_clause(name, schema_property, logger): + return '{} {}'.format(safe_column_name(name), column_type(schema_property, logger)) def flatten_key(k, parent_key, sep): @@ -436,7 +437,8 @@ def create_table_query(self, table_name=None, is_temporary=False): columns = [ column_clause( name, - schema + schema, + self.logger ) for (name, schema) in self.flatten_schema.items() ] @@ -537,7 +539,8 @@ def update_columns(self): columns_to_add = [ column_clause( name, - properties_schema + properties_schema, + self.logger ) for (name, properties_schema) in self.flatten_schema.items() if name.lower() not in columns_dict @@ -549,11 +552,12 @@ def update_columns(self): columns_to_replace = [ (safe_column_name(name), column_clause( name, - properties_schema + properties_schema, + self.logger )) for (name, properties_schema) in self.flatten_schema.items() if name.lower() in columns_dict and - columns_dict[name.lower()]['data_type'].lower() != column_type(properties_schema).lower() + columns_dict[name.lower()]['data_type'].lower() != column_type(properties_schema, self.logger).lower() ] for (column_name, column) in columns_to_replace: