From ec25875ebc2b3921446669dc448dabd6627f99c0 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Sun, 30 Jul 2023 20:57:11 -0400 Subject: [PATCH 1/2] File-based CDK: allow null values for all inferred columns --- .../sources/file_based/schema_helpers.py | 4 +- .../stream/default_file_based_stream.py | 25 +++++- .../file_based/file_types/test_avro_parser.py | 12 +-- .../file_based/scenarios/avro_scenarios.py | 84 +++++++++---------- .../file_based/scenarios/csv_scenarios.py | 58 ++++++------- .../scenarios/incremental_scenarios.py | 80 +++++++++--------- .../file_based/scenarios/jsonl_scenarios.py | 76 ++++++++--------- .../file_based/scenarios/parquet_scenarios.py | 66 +++++++-------- .../scenarios/user_input_schema_scenarios.py | 6 +- .../sources/file_based/test_scenarios.py | 2 +- 10 files changed, 218 insertions(+), 195 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_helpers.py index af5043a2a20b..597c919e39e6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_helpers.py @@ -176,7 +176,9 @@ def conforms_to_schema(record: Mapping[str, Any], schema: Mapping[str, Any]) -> value = record.get(column) if value is not None: - if expected_type == "object": + if isinstance(expected_type, list): + return any(is_equal_or_narrower_type(value, e) for e in expected_type) + elif expected_type == "object": return isinstance(value, dict) elif expected_type == "array": if not isinstance(value, list): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index d09b6eb914ec..f923b2739162 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -186,7 +186,25 @@ def list_files(self) -> List[RemoteFile]: def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: loop = asyncio.get_event_loop() - return loop.run_until_complete(self._infer_schema(files)) + schema = loop.run_until_complete(self._infer_schema(files)) + return self._fill_nulls(schema) + + @staticmethod + def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]: + if isinstance(schema, dict): + for k, v in schema.items(): + if k == "type": + if isinstance(v, list): + if "null" not in v: + schema[k] = ["null"] + v + elif v != "null": + schema[k] = ["null", v] + else: + DefaultFileBasedStream._fill_nulls(v) + elif isinstance(schema, list): + for item in schema: + DefaultFileBasedStream._fill_nulls(item) + return schema async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: """ @@ -208,7 +226,10 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: # number of concurrent tasks drops below the number allowed. done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) for task in done: - base_schema = merge_schemas(base_schema, task.result()) + try: + base_schema = merge_schemas(base_schema, task.result()) + except Exception as exc: + self.logger.error(f"An error occurred inferring the schema. \n {traceback.format_exc()}", exc_info=exc) return base_schema diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_avro_parser.py b/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_avro_parser.py index 597b12fe18b3..40684985abec 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_avro_parser.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_avro_parser.py @@ -139,17 +139,17 @@ id="test_decimal_missing_precision"), pytest.param(_default_avro_format, {"type": "bytes", "logicalType": "decimal", "precision": 9}, None, ValueError, id="test_decimal_missing_scale"), - pytest.param(_default_avro_format, {"type": "bytes", "logicalType": "uuid"}, {"type": "string"}, None, id="test_uuid"), - pytest.param(_default_avro_format, {"type": "int", "logicalType": "date"}, {"type": "string", "format": "date"}, None, + pytest.param(_default_avro_format, {"type": "bytes", "logicalType": "uuid"}, {"type": ["null", "string"]}, None, id="test_uuid"), + pytest.param(_default_avro_format, {"type": "int", "logicalType": "date"}, {"type": ["null", "string"], "format": "date"}, None, id="test_date"), - pytest.param(_default_avro_format, {"type": "int", "logicalType": "time-millis"}, {"type": "integer"}, None, id="test_time_millis"), - pytest.param(_default_avro_format, {"type": "long", "logicalType": "time-micros"}, {"type": "integer"}, None, + pytest.param(_default_avro_format, {"type": "int", "logicalType": "time-millis"}, {"type": ["null", "integer"]}, None, id="test_time_millis"), + pytest.param(_default_avro_format, {"type": "long", "logicalType": "time-micros"}, {"type": ["null", "integer"]}, None, id="test_time_micros"), pytest.param( _default_avro_format, - {"type": "long", "logicalType": "timestamp-millis"}, {"type": "string", "format": "date-time"}, None, id="test_timestamp_millis" + {"type": "long", "logicalType": "timestamp-millis"}, {"type": ["null", "string"], "format": "date-time"}, None, id="test_timestamp_millis" ), - pytest.param(_default_avro_format, {"type": "long", "logicalType": "timestamp-micros"}, {"type": "string"}, None, + pytest.param(_default_avro_format, {"type": "long", "logicalType": "timestamp-micros"}, {"type": ["null", "string"]}, None, id="test_timestamp_micros"), pytest.param( _default_avro_format, diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/avro_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/avro_scenarios.py index 8a1f2db4786c..b13672649f3e 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/avro_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/avro_scenarios.py @@ -243,8 +243,8 @@ "json_schema": { "type": "object", "properties": { - "col1": {"type": "string"}, - "col2": {"type": "integer"}, + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "integer"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -327,19 +327,19 @@ "json_schema": { "type": "object", "properties": { - "col_double": {"type": "string"}, - "col_string": {"type": "string"}, + "col_double": {"type": ["null", "string"]}, + "col_string": {"type": ["null", "string"]}, "col_album": { "properties": { - "album": {"type": "string"}, + "album": {"type": ["null", "string"]}, }, - "type": "object", + "type": ["null", "object"], }, "col_song": { "properties": { - "title": {"type": "string"}, + "title": {"type": ["null", "string"]}, }, - "type": "object", + "type": ["null", "object"], }, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, @@ -422,28 +422,28 @@ "json_schema": { "type": "object", "properties": { - "col_array": {"items": {"type": "string"}, "type": "array"}, - "col_bool": {"type": "boolean"}, - "col_bytes": {"type": "string"}, - "col_double": {"type": "string"}, - "col_enum": {"enum": ["POP_ROCK", "INDIE_ROCK", "ALTERNATIVE_ROCK"], "type": "string"}, - "col_fixed": {"pattern": "^[0-9A-Fa-f]{8}$", "type": "string"}, - "col_float": {"type": "number"}, - "col_int": {"type": "integer"}, - "col_long": {"type": "integer"}, - "col_map": {"additionalProperties": {"type": "string"}, "type": "object"}, + "col_array": {"items": {"type": ["null", "string"]}, "type": ["null", "array"]}, + "col_bool": {"type": ["null", "boolean"]}, + "col_bytes": {"type": ["null", "string"]}, + "col_double": {"type": ["null", "string"]}, + "col_enum": {"enum": ["POP_ROCK", "INDIE_ROCK", "ALTERNATIVE_ROCK"], "type": ["null", "string"]}, + "col_fixed": {"pattern": "^[0-9A-Fa-f]{8}$", "type": ["null", "string"]}, + "col_float": {"type": ["null", "number"]}, + "col_int": {"type": ["null", "integer"]}, + "col_long": {"type": ["null", "integer"]}, + "col_map": {"additionalProperties": {"type": ["null", "string"]}, "type": ["null", "object"]}, "col_record": { - "properties": {"artist": {"type": "string"}, "song": {"type": "string"}, "year": {"type": "integer"}}, - "type": "object", + "properties": {"artist": {"type": ["null", "string"]}, "song": {"type": ["null", "string"]}, "year": {"type": ["null", "integer"]}}, + "type": ["null", "object"], }, - "col_string": {"type": "string"}, - "col_decimal": {"pattern": "^-?\\d{(1, 5)}(?:\\.\\d(1, 5))?$", "type": "string"}, - "col_uuid": {"type": "string"}, - "col_date": {"format": "date", "type": "string"}, - "col_time_millis": {"type": "integer"}, - "col_time_micros": {"type": "integer"}, - "col_timestamp_millis": {"format": "date-time", "type": "string"}, - "col_timestamp_micros": {"type": "string"}, + "col_string": {"type": ["null", "string"]}, + "col_decimal": {"pattern": "^-?\\d{(1, 5)}(?:\\.\\d(1, 5))?$", "type": ["null", "string"]}, + "col_uuid": {"type": ["null", "string"]}, + "col_date": {"format": "date", "type": ["null", "string"]}, + "col_time_millis": {"type": ["null", "integer"]}, + "col_time_micros": {"type": ["null", "integer"]}, + "col_timestamp_millis": {"format": "date-time", "type": ["null", "string"]}, + "col_timestamp_micros": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -587,10 +587,10 @@ "json_schema": { "type": "object", "properties": { - "col_title": {"type": "string"}, - "col_album": {"type": "string", "enum": ["SUMMERS_GONE", "IN_RETURN", "A_MOMENT_APART", "THE_LAST_GOODBYE"]}, - "col_year": {"type": "integer"}, - "col_vocals": {"type": "boolean"}, + "col_title": {"type": ["null", "string"]}, + "col_album": {"type": ["null", "string"], "enum": ["SUMMERS_GONE", "IN_RETURN", "A_MOMENT_APART", "THE_LAST_GOODBYE"]}, + "col_year": {"type": ["null", "integer"]}, + "col_vocals": {"type": ["null", "boolean"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -604,12 +604,12 @@ "json_schema": { "type": "object", "properties": { - "col_name": {"type": "string"}, + "col_name": {"type": ["null", "string"]}, "col_location": { - "properties": {"country": {"type": "string"}, "state": {"type": "string"}, "city": {"type": "string"}}, - "type": "object", + "properties": {"country": {"type": ["null", "string"]}, "state": {"type": ["null", "string"]}, "city": {"type": ["null", "string"]}}, + "type": ["null", "object"], }, - "col_attendance": {"type": "integer"}, + "col_attendance": {"type": ["null", "integer"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -698,19 +698,19 @@ "json_schema": { "type": "object", "properties": { - "col_double": {"type": "number"}, - "col_string": {"type": "string"}, + "col_double": {"type": ["null", "number"]}, + "col_string": {"type": ["null", "string"]}, "col_album": { "properties": { - "album": {"type": "string"}, + "album": {"type": ["null", "string"]}, }, - "type": "object", + "type": ["null", "object"], }, "col_song": { "properties": { - "title": {"type": "string"}, + "title": {"type": ["null", "string"]}, }, - "type": "object", + "type": ["null", "object"], }, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py index 1897592830ae..06cca171b865 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -236,8 +236,8 @@ "json_schema": { "type": "object", "properties": { - "col1": {"type": "string"}, - "col2": {"type": "string"}, + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -317,9 +317,9 @@ "json_schema": { "type": "object", "properties": { - "col1": {"type": "string"}, - "col2": {"type": "string"}, - "col3": {"type": "string"}, + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "string"]}, + "col3": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -419,8 +419,8 @@ "json_schema": { "type": "object", "properties": { - "col1": {"type": "string"}, - "col2": {"type": "string"}, + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -514,8 +514,8 @@ "json_schema": { "type": "object", "properties": { - "col1": {"type": "string"}, - "col2": {"type": "string"}, + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -583,8 +583,8 @@ "json_schema": { "type": "object", "properties": { - "col1": {"type": "string"}, - "col2": {"type": "string"}, + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -670,9 +670,9 @@ "json_schema": { "type": "object", "properties": { - "col1": {"type": "string"}, - "col2": {"type": "string"}, - "col3": {"type": "string"}, + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "string"]}, + "col3": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -686,7 +686,7 @@ "json_schema": { "type": "object", "properties": { - "col3": {"type": "string"}, + "col3": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -787,13 +787,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, @@ -895,13 +895,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, @@ -1021,13 +1021,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, @@ -1043,7 +1043,7 @@ "type": "object", "properties": { "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, @@ -1139,8 +1139,8 @@ "json_schema": { "type": "object", "properties": { - "col1": {"type": "string"}, - "col2": {"type": "string"}, + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -1339,7 +1339,7 @@ "json_schema": { "type": "object", "properties": { - "col3": {"type": "string"}, + "col3": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, @@ -1513,7 +1513,7 @@ "json_schema": { "type": "object", "properties": { - "col3": {"type": "string"}, + "col3": {"type": ["null", "string"]}, "_ab_source_file_last_modified": {"type": "string"}, "_ab_source_file_url": {"type": "string"}, }, diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/incremental_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/incremental_scenarios.py index 87e71c0064e6..c25a5fe48823 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/incremental_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/incremental_scenarios.py @@ -71,9 +71,9 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -154,9 +154,9 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -239,9 +239,9 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -297,9 +297,9 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -376,13 +376,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -460,9 +460,9 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -551,13 +551,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -656,13 +656,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -767,13 +767,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -887,13 +887,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -1001,13 +1001,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -1145,13 +1145,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -1262,13 +1262,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -1378,13 +1378,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -1500,13 +1500,13 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "col2": { - "type": "string", + "type": ["null", "string"], }, "col3": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/jsonl_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/jsonl_scenarios.py index 0ceaf1c7d06c..81b427ed1a7e 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/jsonl_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/jsonl_scenarios.py @@ -42,16 +42,16 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"], }, "col2": { - "type": "string" + "type": ["null", "string"], }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, }, }, @@ -116,19 +116,19 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"], }, "col2": { - "type": "string" + "type": ["null", "string"], }, "col3": { - "type": "string" + "type": ["null", "string"], }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, } }, @@ -197,15 +197,15 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"], }, "col2": { - "type": "string" + "type": ["null", "string"], }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, } }, @@ -275,15 +275,15 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"], }, "col2": { - "type": "string" + "type": ["null", "string"], }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, } }, @@ -346,13 +346,13 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"], }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, } }, @@ -429,19 +429,19 @@ "type": "object", "properties": { "col1": { - "type": "integer" + "type": ["null", "integer"] }, "col2": { - "type": "string" + "type": ["null", "string"], }, "col3": { - "type": "number" + "type": ["null", "number"] }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, }, }, @@ -455,13 +455,13 @@ "type": "object", "properties": { "col3": { - "type": "number" + "type": ["null", "number"] }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, }, }, @@ -539,10 +539,10 @@ "type": "object" }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, } }, @@ -620,10 +620,10 @@ "type": "object" }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, }, }, @@ -637,13 +637,13 @@ "type": "object", "properties": { "col3": { - "type": "number" + "type": ["null", "number"] }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, }, }, @@ -709,13 +709,13 @@ "type": "integer" }, "col2": { - "type": "string" + "type": "string", }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, }, }, diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/parquet_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/parquet_scenarios.py index cab40377673f..ce0c2c23fbcd 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/parquet_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/parquet_scenarios.py @@ -185,10 +185,10 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"] }, "col2": { - "type": "string" + "type": ["null", "string"] }, "_ab_source_file_last_modified": { "type": "string" @@ -233,13 +233,13 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"] }, "col2": { - "type": "string" + "type": ["null", "string"] }, "col3": { - "type": "string" + "type": ["null", "string"] }, "_ab_source_file_last_modified": { "type": "string" @@ -296,80 +296,80 @@ "type": "object", "properties": { "col_bool": { - "type": "boolean" + "type": ["null", "boolean"], }, "col_int8": { - "type": "integer" + "type": ["null", "integer"], }, "col_int16": { - "type": "integer" + "type": ["null", "integer"], }, "col_int32": { - "type": "integer" + "type": ["null", "integer"], }, "col_uint8": { - "type": "integer" + "type": ["null", "integer"], }, "col_uint16": { - "type": "integer" + "type": ["null", "integer"], }, "col_uint32": { - "type": "integer" + "type": ["null", "integer"], }, "col_uint64": { - "type": "integer" + "type": ["null", "integer"], }, "col_float32": { - "type": "number" + "type": ["null", "number"], }, "col_float64": { - "type": "number" + "type": ["null", "number"], }, "col_string": { - "type": "string" + "type": ["null", "string"], }, "col_date32": { - "type": "string", + "type": ["null", "string"], "format": "date" }, "col_date64": { - "type": "string", + "type": ["null", "string"], "format": "date" }, "col_timestamp_without_tz": { - "type": "string", + "type": ["null", "string"], "format": "date-time" }, "col_timestamp_with_tz": { - "type": "string", + "type": ["null", "string"], "format": "date-time" }, "col_time32s": { - "type": "string", + "type": ["null", "string"], }, "col_time32ms": { - "type": "string", + "type": ["null", "string"], }, "col_time64us": { - "type": "string", + "type": ["null", "string"], }, "col_struct": { - "type": "object", + "type": ["null", "object"], }, "col_list": { - "type": "array", + "type": ["null", "array"], }, "col_duration": { - "type": "integer", + "type": ["null", "integer"], }, "col_binary": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { - "type": "string" + "type": "string", }, "_ab_source_file_url": { - "type": "string" + "type": "string", }, }, }, @@ -443,7 +443,7 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"] }, "_ab_source_file_last_modified": { "type": "string" @@ -500,7 +500,7 @@ "type": "object", "properties": { "col1": { - "type": "string" + "type": ["null", "string"] }, "_ab_source_file_last_modified": { "type": "string" @@ -557,7 +557,7 @@ "type": "object", "properties": { "col1": { - "type": "number" + "type": ["null", "number"] }, "_ab_source_file_last_modified": { "type": "string" @@ -611,7 +611,7 @@ "type": "object", "properties": { "col1": { - "type": "number" + "type": ["null", "number"] }, "_ab_source_file_last_modified": { "type": "string" diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/user_input_schema_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/user_input_schema_scenarios.py index 98e0c4217247..9a5f84921551 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/user_input_schema_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/user_input_schema_scenarios.py @@ -319,7 +319,7 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -515,7 +515,7 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" @@ -654,7 +654,7 @@ "type": "object", "properties": { "col1": { - "type": "string", + "type": ["null", "string"], }, "_ab_source_file_last_modified": { "type": "string" diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py index 6cd51b51d51c..6c93a8a0053b 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py @@ -241,7 +241,7 @@ def _verify_read_output(output: Dict[str, Any], scenario: TestScenario) -> None: if "record" in actual: for key, value in actual["record"]["data"].items(): if isinstance(value, float): - assert math.isclose(value, expected["data"][key], abs_tol=1e-04) + assert math.isclose(value, float(expected["data"][key]), abs_tol=1e-04) else: assert value == expected["data"][key] assert actual["record"]["stream"] == expected["stream"] From 8baa8702f939514eae4e0e4647d71533cd41dda1 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Mon, 31 Jul 2023 15:01:09 -0400 Subject: [PATCH 2/2] Add unit tests --- .../stream/test_default_file_based_stream.py | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py new file mode 100644 index 000000000000..4889b85f1e8a --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -0,0 +1,48 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Mapping + +import pytest +from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream + + +@pytest.mark.parametrize( + "input_schema, expected_output", + [ + pytest.param({}, {}, id="empty-schema"), + pytest.param( + {"type": "string"}, + {"type": ["null", "string"]}, + id="simple-schema", + ), + pytest.param( + {"type": ["string"]}, + {"type": ["null", "string"]}, + id="simple-schema-list-type", + ), + pytest.param( + {"type": ["null", "string"]}, + {"type": ["null", "string"]}, + id="simple-schema-already-has-null", + ), + pytest.param( + {"properties": {"type": "string"}}, + {"properties": {"type": ["null", "string"]}}, + id="nested-schema", + ), + pytest.param( + {"items": {"type": "string"}}, + {"items": {"type": ["null", "string"]}}, + id="array-schema", + ), + pytest.param( + {"type": "object", "properties": {"prop": {"type": "string"}}}, + {"type": ["null", "object"], "properties": {"prop": {"type": ["null", "string"]}}}, + id="deeply-nested-schema", + ), + ], +) +def test_fill_nulls(input_schema: Mapping[str, Any], expected_output: Mapping[str, Any]) -> None: + assert DefaultFileBasedStream._fill_nulls(input_schema) == expected_output