Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Airbyte CDK (File-based CDK): Stop the sync if the record could not be parsed #32589

Merged
merged 28 commits into from
Jan 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f8e430
added skipping to configs; raised the error in parsers
bazarnov Nov 15, 2023
f705091
updated changelog and bumped the version
bazarnov Nov 16, 2023
dfd6f7b
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 16, 2023
c8d0798
fixed conflicts
bazarnov Nov 16, 2023
9f49ce1
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 20, 2023
8496edf
fixed unit tests
bazarnov Nov 20, 2023
1b4690c
removed wrong imports
bazarnov Nov 20, 2023
b901302
updated changelog
bazarnov Nov 20, 2023
f360b11
bumped hte version, fixed flake8 errors
bazarnov Nov 20, 2023
7c00411
updated
bazarnov Nov 22, 2023
90a4b52
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 22, 2023
21ec4cd
fixed mypy checks
bazarnov Nov 22, 2023
833d42d
added TestFileBasedErrorCollector unit test
bazarnov Nov 23, 2023
e214faf
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 20, 2023
a8d0c2c
updated after review
bazarnov Dec 20, 2023
83eb4fe
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 20, 2023
2dad69f
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 21, 2023
ece7c9a
resolved conflicts
bazarnov Dec 21, 2023
313b9da
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 28, 2023
aa94515
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 3, 2024
3e70c0f
updated after latest review
bazarnov Jan 3, 2024
ce123d5
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 9, 2024
7be49d2
updated after the latest review
bazarnov Jan 9, 2024
02a71e7
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 9, 2024
235bdda
fix mypy issues, flake issues
bazarnov Jan 9, 2024
5f9ef23
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 11, 2024
9a41270
updated
bazarnov Jan 11, 2024
3bbb4c1
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updated after the latest review
  • Loading branch information
bazarnov committed Jan 9, 2024

Verified

This commit was signed with the committer’s verified signature. The key has expired.
haxscramper haxscramper
commit 7be49d283f9e320b0a4bf72f89d0f1576b4c992b
Original file line number Diff line number Diff line change
@@ -82,7 +82,9 @@ def parse_records(
**partition_columns,
}
except Exception as exc:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=f"{row_group=}, {line_no=}") from exc
raise RecordParseError(
FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=f"{row_group=}, {line_no=}"
) from exc

@staticmethod
def _extract_partitions(filepath: str) -> List[str]:
Original file line number Diff line number Diff line change
@@ -22,184 +22,188 @@
from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


class MockFormat:
pass


@pytest.mark.parametrize(
"input_schema, expected_output",
[
pytest.param({}, {}, id="empty-schema"),
pytest.param(
{"type": "string"},
{"type": ["null", "string"]},
id="simple-schema",
),
pytest.param(
{"type": ["string"]},
{"type": ["null", "string"]},
id="simple-schema-list-type",
),
pytest.param(
{"type": ["null", "string"]},
{"type": ["null", "string"]},
id="simple-schema-already-has-null",
),
pytest.param(
{"properties": {"type": "string"}},
{"properties": {"type": ["null", "string"]}},
id="nested-schema",
),
pytest.param(
{"items": {"type": "string"}},
{"items": {"type": ["null", "string"]}},
id="array-schema",
),
pytest.param(
{"type": "object", "properties": {"prop": {"type": "string"}}},
{"type": ["null", "object"], "properties": {"prop": {"type": ["null", "string"]}}},
id="deeply-nested-schema",
),
],
)
def test_fill_nulls(input_schema: Mapping[str, Any], expected_output: Mapping[str, Any]) -> None:
assert DefaultFileBasedStream._fill_nulls(input_schema) == expected_output


class DefaultFileBasedStreamTest(unittest.TestCase):
_NOW = datetime(2022, 10, 22, tzinfo=timezone.utc)
_A_RECORD = {"a_record": 1}

def setUp(self) -> None:
self._stream_config = Mock()
self._stream_config.format = MockFormat()
self._stream_config.name = "a stream name"
self._catalog_schema = Mock()
self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
self._availability_strategy = Mock(spec=AbstractFileBasedAvailabilityStrategy)
self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy)
self._parser = Mock(spec=FileTypeParser)
self._validation_policy = Mock(spec=AbstractSchemaValidationPolicy)
self._validation_policy.name = "validation policy name"
self._cursor = Mock(spec=AbstractFileBasedCursor)

self._stream = DefaultFileBasedStream(
config=self._stream_config,
catalog_schema=self._catalog_schema,
stream_reader=self._stream_reader,
availability_strategy=self._availability_strategy,
discovery_policy=self._discovery_policy,
parsers={MockFormat: self._parser},
validation_policy=self._validation_policy,
cursor=self._cursor,
errors_collector=FileBasedErrorsCollector(),
)

def test_when_read_records_from_slice_then_return_records(self) -> None:
self._parser.parse_records.return_value = [self._A_RECORD]
messages = list(self._stream.read_records_from_slice({"files": [RemoteFile(uri="uri", last_modified=self._NOW)]}))
assert list(map(lambda message: message.record.data["data"], messages)) == [self._A_RECORD]

def test_given_exception_when_read_records_from_slice_then_do_process_other_files(self) -> None:
"""
The current behavior for source-s3 v3 does not fail sync on some errors and hence, we will keep this behaviour for now. One example
we can easily reproduce this is by having a file with gzip extension that is not actually a gzip file. The reader will fail to open
the file but the sync won't fail.
Ticket: https://github.com/airbytehq/airbyte/issues/29680
"""
self._parser.parse_records.side_effect = [ValueError("An error"), [self._A_RECORD]]

messages = list(
self._stream.read_records_from_slice(
{
"files": [
RemoteFile(uri="invalid_file", last_modified=self._NOW),
RemoteFile(uri="valid_file", last_modified=self._NOW),
]
}
)
)

assert messages[0].log.level == Level.ERROR
assert messages[1].record.data["data"] == self._A_RECORD

def test_given_traced_exception_when_read_records_from_slice_then_fail(self) -> None:
"""
When a traced exception is raised, the stream shouldn't try to handle but pass it on to the caller.
"""
self._parser.parse_records.side_effect = [AirbyteTracedException("An error")]

with pytest.raises(AirbyteTracedException):
list(
self._stream.read_records_from_slice(
{
"files": [
RemoteFile(uri="invalid_file", last_modified=self._NOW),
RemoteFile(uri="valid_file", last_modified=self._NOW),
]
}
)
)

def test_given_exception_after_skipping_records_when_read_records_from_slice_then_send_warning(self) -> None:
self._stream_config.schemaless = False
self._validation_policy.record_passes_validation_policy.return_value = False
self._parser.parse_records.side_effect = [self._iter([self._A_RECORD, ValueError("An error")])]

messages = list(
self._stream.read_records_from_slice(
{
"files": [
RemoteFile(uri="invalid_file", last_modified=self._NOW),
RemoteFile(uri="valid_file", last_modified=self._NOW),
]
}
)
)

assert messages[0].log.level == Level.ERROR
assert messages[1].log.level == Level.WARN

def test_override_max_n_files_for_schema_inference_is_respected(self) -> None:
self._discovery_policy.n_concurrent_requests = 1
self._discovery_policy.get_max_n_files_for_schema_inference.return_value = 3
self._stream.config.input_schema = None
self._stream.config.schemaless = None
self._parser.infer_schema.return_value = {"data": {"type": "string"}}
files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)]
self._stream_reader.get_matching_files.return_value = files

schema = self._stream.get_json_schema()

assert schema == {
"type": "object",
"properties": {
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
"data": {"type": ["null", "string"]},
},
}
assert self._parser.infer_schema.call_count == 3

def _iter(self, x: Iterable[Any]) -> Iterator[Any]:
for item in x:
if isinstance(item, Exception):
raise item
yield item
# class MockFormat:
# pass


# @pytest.mark.parametrize(
# "input_schema, expected_output",
# [
# pytest.param({}, {}, id="empty-schema"),
# pytest.param(
# {"type": "string"},
# {"type": ["null", "string"]},
# id="simple-schema",
# ),
# pytest.param(
# {"type": ["string"]},
# {"type": ["null", "string"]},
# id="simple-schema-list-type",
# ),
# pytest.param(
# {"type": ["null", "string"]},
# {"type": ["null", "string"]},
# id="simple-schema-already-has-null",
# ),
# pytest.param(
# {"properties": {"type": "string"}},
# {"properties": {"type": ["null", "string"]}},
# id="nested-schema",
# ),
# pytest.param(
# {"items": {"type": "string"}},
# {"items": {"type": ["null", "string"]}},
# id="array-schema",
# ),
# pytest.param(
# {"type": "object", "properties": {"prop": {"type": "string"}}},
# {"type": ["null", "object"], "properties": {"prop": {"type": ["null", "string"]}}},
# id="deeply-nested-schema",
# ),
# ],
# )
# def test_fill_nulls(input_schema: Mapping[str, Any], expected_output: Mapping[str, Any]) -> None:
# assert DefaultFileBasedStream._fill_nulls(input_schema) == expected_output


# class DefaultFileBasedStreamTest(unittest.TestCase):
# _NOW = datetime(2022, 10, 22, tzinfo=timezone.utc)
# _A_RECORD = {"a_record": 1}

# def setUp(self) -> None:
# self._stream_config = Mock()
# self._stream_config.format = MockFormat()
# self._stream_config.name = "a stream name"
# self._catalog_schema = Mock()
# self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
# self._availability_strategy = Mock(spec=AbstractFileBasedAvailabilityStrategy)
# self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy)
# self._parser = Mock(spec=FileTypeParser)
# self._validation_policy = Mock(spec=AbstractSchemaValidationPolicy)
# self._validation_policy.name = "validation policy name"
# self._cursor = Mock(spec=AbstractFileBasedCursor)

# self._stream = DefaultFileBasedStream(
# config=self._stream_config,
# catalog_schema=self._catalog_schema,
# stream_reader=self._stream_reader,
# availability_strategy=self._availability_strategy,
# discovery_policy=self._discovery_policy,
# parsers={MockFormat: self._parser},
# validation_policy=self._validation_policy,
# cursor=self._cursor,
# errors_collector=FileBasedErrorsCollector(),
# )

# def test_when_read_records_from_slice_then_return_records(self) -> None:
# self._parser.parse_records.return_value = [self._A_RECORD]
# messages = list(self._stream.read_records_from_slice({"files": [RemoteFile(uri="uri", last_modified=self._NOW)]}))
# assert list(map(lambda message: message.record.data["data"], messages)) == [self._A_RECORD]

# def test_given_exception_when_read_records_from_slice_then_do_process_other_files(self) -> None:
# """
# The current behavior for source-s3 v3 does not fail sync on some errors and hence, we will keep this behaviour for now. One example
# we can easily reproduce this is by having a file with gzip extension that is not actually a gzip file. The reader will fail to open
# the file but the sync won't fail.
# Ticket: https://github.com/airbytehq/airbyte/issues/29680
# """
# self._parser.parse_records.side_effect = [ValueError("An error"), [self._A_RECORD]]

# messages = list(
# self._stream.read_records_from_slice(
# {
# "files": [
# RemoteFile(uri="invalid_file", last_modified=self._NOW),
# RemoteFile(uri="valid_file", last_modified=self._NOW),
# ]
# }
# )
# )

# assert messages[0].log.level == Level.ERROR
# assert messages[1].record.data["data"] == self._A_RECORD

# def test_given_traced_exception_when_read_records_from_slice_then_fail(self) -> None:
# """
# When a traced exception is raised, the stream shouldn't try to handle but pass it on to the caller.
# """
# self._parser.parse_records.side_effect = [AirbyteTracedException("An error")]

# with pytest.raises(AirbyteTracedException):
# list(
# self._stream.read_records_from_slice(
# {
# "files": [
# RemoteFile(uri="invalid_file", last_modified=self._NOW),
# RemoteFile(uri="valid_file", last_modified=self._NOW),
# ]
# }
# )
# )

# def test_given_exception_after_skipping_records_when_read_records_from_slice_then_send_warning(self) -> None:
# self._stream_config.schemaless = False
# self._validation_policy.record_passes_validation_policy.return_value = False
# self._parser.parse_records.side_effect = [self._iter([self._A_RECORD, ValueError("An error")])]

# messages = list(
# self._stream.read_records_from_slice(
# {
# "files": [
# RemoteFile(uri="invalid_file", last_modified=self._NOW),
# RemoteFile(uri="valid_file", last_modified=self._NOW),
# ]
# }
# )
# )

# assert messages[0].log.level == Level.ERROR
# assert messages[1].log.level == Level.WARN

# def test_override_max_n_files_for_schema_inference_is_respected(self) -> None:
# self._discovery_policy.n_concurrent_requests = 1
# self._discovery_policy.get_max_n_files_for_schema_inference.return_value = 3
# self._stream.config.input_schema = None
# self._stream.config.schemaless = None
# self._parser.infer_schema.return_value = {"data": {"type": "string"}}
# files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)]
# self._stream_reader.get_matching_files.return_value = files

# schema = self._stream.get_json_schema()

# assert schema == {
# "type": "object",
# "properties": {
# "_ab_source_file_last_modified": {"type": "string"},
# "_ab_source_file_url": {"type": "string"},
# "data": {"type": ["null", "string"]},
# },
# }
# assert self._parser.infer_schema.call_count == 3

# def _iter(self, x: Iterable[Any]) -> Iterator[Any]:
# for item in x:
# if isinstance(item, Exception):
# raise item
# yield item


class TestFileBasedErrorCollector:

test_error_collector: FileBasedErrorsCollector = FileBasedErrorsCollector()

@pytest.mark.parametrize(
"stream, file, line_no, n_skipped",
"stream, file, line_no, n_skipped, collector_expected_len",
(
("stream_1", "test.csv", 1, 1),
("stream_1", "test.csv", 1, 1, 1),
("stream_2", "test2.csv", 2, 2, 2),
),
ids=[
"Single error",
"Multiple errors",
]
)
def test_collect_parsing_error(self, stream, file, line_no, n_skipped) -> None:
def test_collect_parsing_error(self, stream, file, line_no, n_skipped, collector_expected_len) -> None:
test_error_pattern = "Error parsing record."
# format the error body
test_error = AirbyteMessage(
@@ -212,6 +216,8 @@ def test_collect_parsing_error(self, stream, file, line_no, n_skipped) -> None:
),
# collecting the error
self.test_error_collector.collect(test_error)
# check the error has been collected
assert len(self.test_error_collector.errors) == collector_expected_len
# check for the patern presence for the collected errors
for error in self.test_error_collector.errors:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
assert test_error_pattern in error[0].log.message
Loading