diff --git a/tests/test_database_types.py b/tests/test_database_types.py index 9729b00d..fdf8784d 100644 --- a/tests/test_database_types.py +++ b/tests/test_database_types.py @@ -526,16 +526,17 @@ def get_test_db_pairs(): for source_db, source_type_categories, target_db, target_type_categories in get_test_db_pairs(): for type_category, source_types in source_type_categories.items(): # int, datetime, .. for source_type in source_types: - for target_type in target_type_categories[type_category]: - type_pairs.append( - ( - source_db, - target_db, - source_type, - target_type, - type_category, + if type_category in target_type_categories: # only cross-compatible types + for target_type in target_type_categories[type_category]: + type_pairs.append( + ( + source_db, + target_db, + source_type, + target_type, + type_category, + ) ) - ) def sanitize(name): @@ -603,8 +604,15 @@ def _insert_to_table(conn, table_path, values, coltype): elif isinstance(conn, db.BigQuery) and coltype == "datetime": values = [(i, Code(f"cast(timestamp '{sample}' as datetime)")) for i, sample in values] - if isinstance(conn, db.Redshift) and coltype == "json": - values = [(i, Code(f"JSON_PARSE('{sample}')")) for i, sample in values] + elif isinstance(conn, db.Redshift) and coltype in ("json", "jsonb"): + values = [(i, Code(f"JSON_PARSE({sample})")) for i, sample in values] + elif isinstance(conn, db.PostgreSQL) and coltype in ("json", "jsonb"): + values = [(i, Code( + "'{}'".format( + (json.dumps(sample) if isinstance(sample, (dict, list)) else sample) + .replace('\'', '\'\'') + ) + )) for i, sample in values] insert_rows_in_batches(conn, tbl, values, columns=["id", "col"]) conn.query(commit) @@ -627,9 +635,10 @@ def _create_table_with_indexes(conn, table_path, type_): else: conn.query(tbl.create()) - if conn.dialect.SUPPORTS_INDEXES: - (index_id,) = table_path + (index_id,) = table_path + if conn.dialect.SUPPORTS_INDEXES and type_ not in ('json', 'jsonb', 'array', 'struct'): conn.query(f"CREATE INDEX xa_{index_id} ON {table_name} ({quote('id')}, {quote('col')})") + if conn.dialect.SUPPORTS_INDEXES: conn.query(f"CREATE INDEX xb_{index_id} ON {table_name} ({quote('id')})") conn.query(commit) @@ -724,9 +733,11 @@ def test_types(self, source_db, target_db, source_type, target_type, type_catego checksum_duration = time.monotonic() - start expected = [] self.assertEqual(expected, diff) - self.assertEqual( - 0, differ.stats.get("rows_downloaded", 0) - ) # This may fail if the hash is different, but downloaded values are equal + + # For fuzzily diffed types, some rows can be downloaded for local comparison. This happens + # when hashes are diferent but the essential payload is not; e.g. due to json serialization. + if not {source_type, target_type} & {'json', 'jsonb', 'array', 'struct'}: + self.assertEqual(0, differ.stats.get("rows_downloaded", 0)) # This section downloads all rows to ensure that Python agrees with the # database, in terms of comparison.