Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Airbyte CDK (File-based CDK): Stop the sync if the record could not be parsed #32589

Merged
merged 28 commits into from
Jan 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f8e430
added skipping to configs; raised the error in parsers
bazarnov Nov 15, 2023
f705091
updated changelog and bumped the version
bazarnov Nov 16, 2023
dfd6f7b
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 16, 2023
c8d0798
fixed conflicts
bazarnov Nov 16, 2023
9f49ce1
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 20, 2023
8496edf
fixed unit tests
bazarnov Nov 20, 2023
1b4690c
removed wrong imports
bazarnov Nov 20, 2023
b901302
updated changelog
bazarnov Nov 20, 2023
f360b11
bumped hte version, fixed flake8 errors
bazarnov Nov 20, 2023
7c00411
updated
bazarnov Nov 22, 2023
90a4b52
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 22, 2023
21ec4cd
fixed mypy checks
bazarnov Nov 22, 2023
833d42d
added TestFileBasedErrorCollector unit test
bazarnov Nov 23, 2023
e214faf
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 20, 2023
a8d0c2c
updated after review
bazarnov Dec 20, 2023
83eb4fe
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 20, 2023
2dad69f
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 21, 2023
ece7c9a
resolved conflicts
bazarnov Dec 21, 2023
313b9da
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 28, 2023
aa94515
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 3, 2024
3e70c0f
updated after latest review
bazarnov Jan 3, 2024
ce123d5
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 9, 2024
7be49d2
updated after the latest review
bazarnov Jan 9, 2024
02a71e7
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 9, 2024
235bdda
fix mypy issues, flake issues
bazarnov Jan 9, 2024
5f9ef23
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 11, 2024
9a41270
updated
bazarnov Jan 11, 2024
3bbb4c1
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
added skipping to configs; raised the error in parsers
  • Loading branch information
bazarnov committed Nov 15, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 1f8e430f5d54da2ea4733546785c1fe8a4b75687
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Optional

from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field

@@ -21,3 +23,10 @@ class Config(OneOfOptionConfig):
description="Whether to convert double fields to strings. This is recommended if you have decimal numbers with a high degree of precision because there can be a loss precision when handling floating point numbers.",
default=False,
)

skip_unprocessable_file_types: Optional[bool] = Field(
default=True,
title="Skip Unprocessable File Types",
description="If true, skip files that cannot be parsed because of their file type and log a warning. If false, fail the sync. Corrupted files with valid file types will still result in a failed sync.",
always_show=True,
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
)
Original file line number Diff line number Diff line change
@@ -147,6 +147,12 @@ class Config(OneOfOptionConfig):
description="How to infer the types of the columns. If none, inference default to strings.",
airbyte_hidden=True,
)
skip_unprocessable_file_types: Optional[bool] = Field(
default=True,
title="Skip Unprocessable File Types",
description="If true, skip files that cannot be parsed because of their file type and log a warning. If false, fail the sync. Corrupted files with valid file types will still result in a failed sync.",
always_show=True,
)

@validator("delimiter")
def validate_delimiter(cls, v: str) -> str:
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Optional

from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field

@@ -21,3 +23,10 @@ class Config(OneOfOptionConfig):
description="Whether to convert decimal fields to floats. There is a loss of precision when converting decimals to floats, so this is not recommended.",
default=False,
)

skip_unprocessable_file_types: Optional[bool] = Field(
default=True,
title="Skip Unprocessable File Types",
description="If true, skip files that cannot be parsed because of their file type and log a warning. If false, fail the sync. Corrupted files with valid file types will still result in a failed sync.",
always_show=True,
)
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
import fastavro
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
@@ -138,15 +139,23 @@ def parse_records(
if not isinstance(avro_format, AvroFormat):
raise ValueError(f"Expected ParquetFormat, got {avro_format}")

with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
avro_reader = fastavro.reader(fp)
schema = avro_reader.writer_schema
schema_field_name_to_type = {field["name"]: field["type"] for field in schema["fields"]}
for record in avro_reader:
yield {
record_field: self._to_output_value(avro_format, schema_field_name_to_type[record_field], record[record_field])
for record_field, record_value in schema_field_name_to_type.items()
}
line_no = 0
try:
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
avro_reader = fastavro.reader(fp)
schema = avro_reader.writer_schema
schema_field_name_to_type = {field["name"]: field["type"] for field in schema["fields"]}
for record in avro_reader:
line_no += 1
yield {
record_field: self._to_output_value(avro_format, schema_field_name_to_type[record_field], record[record_field])
for record_field, record_value in schema_field_name_to_type.items()
}
except Exception:
if avro_format.skip_unprocessable_file_types:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
logger.warn(f"AvroParser: File {file.uri} cannot be parsed and will be skipped.")
else:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no)

@property
def file_read_mode(self) -> FileReadMode:
Original file line number Diff line number Diff line change
@@ -172,17 +172,28 @@ def parse_records(
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
config_format = _extract_format(config)
if discovered_schema:
property_types = {col: prop["type"] for col, prop in discovered_schema["properties"].items()} # type: ignore # discovered_schema["properties"] is known to be a mapping
deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
else:
deduped_property_types = {}
cast_fn = CsvParser._get_cast_function(deduped_property_types, config_format, logger, config.schemaless)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for row in data_generator:
yield CsvParser._to_nullable(cast_fn(row), deduped_property_types, config_format.null_values, config_format.strings_can_be_null)
data_generator.close()
line_no = 0
try:
config_format = _extract_format(config)
if discovered_schema:
property_types = {col: prop["type"] for col, prop in discovered_schema["properties"].items()} # type: ignore # discovered_schema["properties"] is known to be a mapping
deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
else:
deduped_property_types = {}
cast_fn = CsvParser._get_cast_function(deduped_property_types, config_format, logger, config.schemaless)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for row in data_generator:
line_no += 1
yield CsvParser._to_nullable(
cast_fn(row), deduped_property_types, config_format.null_values, config_format.strings_can_be_null
)
except RecordParseError:
if config_format.skip_unprocessable_file_types:
logger.warn(f"CsvParser: File {file.uri} cannot be parsed and will be skipped.")
else:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no)
finally:
data_generator.close()

@property
def file_read_mode(self) -> FileReadMode:
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ def _parse_jsonl_entries(
break

if had_json_parsing_error and not yielded_at_least_once:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD)
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line)

@staticmethod
def _instantiate_accumulator(line: Union[bytes, str]) -> Union[bytes, str]:
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@
import pyarrow as pa
import pyarrow.parquet as pq
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, ParquetFormat
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
@@ -58,19 +58,28 @@ def parse_records(
if not isinstance(parquet_format, ParquetFormat):
logger.info(f"Expected ParquetFormat, got {parquet_format}")
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
reader = pq.ParquetFile(fp)
partition_columns = {x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri)}
for row_group in range(reader.num_row_groups):
batch = reader.read_row_group(row_group)
for row in range(batch.num_rows):
yield {
**{
column: ParquetParser._to_output_value(batch.column(column)[row], parquet_format)
for column in batch.column_names
},
**partition_columns,
}

line_no = 0
try:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
reader = pq.ParquetFile(fp)
partition_columns = {x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri)}
for row_group in range(reader.num_row_groups):
batch = reader.read_row_group(row_group)
for row in range(batch.num_rows):
line_no += 1
yield {
**{
column: ParquetParser._to_output_value(batch.column(column)[row], parquet_format)
for column in batch.column_names
},
**partition_columns,
}
except Exception as parse_error:
if parquet_format.skip_unprocessable_file_types:
logger.warn(f"ParquetParser: File `{file.uri}` cannot be parsed and will be skipped. Error Details: {parse_error}.")
else:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=f"{row_group=}, {line_no=}")

@staticmethod
def _extract_partitions(filepath: str) -> List[str]:
Original file line number Diff line number Diff line change
@@ -109,16 +109,16 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
)
break

except RecordParseError:
except RecordParseError as parse_error:
# Increment line_no because the exception was raised before we could increment it
line_no += 1
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
stack_trace=traceback.format_exc(),
),
# we need to explicitly stop the sync, if we have RecordParseError raised from `parse_records()` method.
# reference issue: https://github.com/airbytehq/airbyte/issues/31605
raise AirbyteTracedException(
internal_message=FileBasedSourceError.ERROR_PARSING_RECORD.value,
message=f"Error while parsing the record: stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
exception=parse_error,
failure_type=FailureType.system_error,
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
)

except Exception: