Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Airbyte CDK (File-based CDK): Stop the sync if the record could not be parsed #32589

Merged
merged 28 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f8e430
added skipping to configs; raised the error in parsers
bazarnov Nov 15, 2023
f705091
updated changelog and bumped the version
bazarnov Nov 16, 2023
dfd6f7b
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 16, 2023
c8d0798
fixed conflicts
bazarnov Nov 16, 2023
9f49ce1
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 20, 2023
8496edf
fixed unit tests
bazarnov Nov 20, 2023
1b4690c
removed wrong imports
bazarnov Nov 20, 2023
b901302
updated changelog
bazarnov Nov 20, 2023
f360b11
bumped hte version, fixed flake8 errors
bazarnov Nov 20, 2023
7c00411
updated
bazarnov Nov 22, 2023
90a4b52
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Nov 22, 2023
21ec4cd
fixed mypy checks
bazarnov Nov 22, 2023
833d42d
added TestFileBasedErrorCollector unit test
bazarnov Nov 23, 2023
e214faf
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 20, 2023
a8d0c2c
updated after review
bazarnov Dec 20, 2023
83eb4fe
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 20, 2023
2dad69f
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 21, 2023
ece7c9a
resolved conflicts
bazarnov Dec 21, 2023
313b9da
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Dec 28, 2023
aa94515
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 3, 2024
3e70c0f
updated after latest review
bazarnov Jan 3, 2024
ce123d5
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 9, 2024
7be49d2
updated after the latest review
bazarnov Jan 9, 2024
02a71e7
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 9, 2024
235bdda
fix mypy issues, flake issues
bazarnov Jan 9, 2024
5f9ef23
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 11, 2024
9a41270
updated
bazarnov Jan 11, 2024
3bbb4c1
Merge remote-tracking branch 'origin/master' into baz/airbyte-cdk/rai…
bazarnov Jan 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.58.5
current_version = 0.58.6
commit = False

[bumpversion:file:setup.py]
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.58.6
File CDK: Added logic to emit logged `RecordParseError` errors and raise the single `AirbyteTracebackException` in the end of the sync, instead of silent skipping the parsing errors. PR: https://github.com/airbytehq/airbyte/pull/32589

## 0.58.5
Handle private network exception as config error

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.58.5
LABEL io.airbyte.version=0.58.6
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field

Expand Down
27 changes: 26 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#

from enum import Enum
from typing import Union
from typing import Any, List, Union

from airbyte_cdk.models import AirbyteMessage, FailureType
from airbyte_cdk.utils import AirbyteTracedException


Expand Down Expand Up @@ -40,6 +41,30 @@ class FileBasedSourceError(Enum):
UNDEFINED_VALIDATION_POLICY = "The validation policy defined in the config does not exist for the source."


class FileBasedErrorsCollector:
"""
The placeholder for all errors collected.
"""

errors: List[AirbyteMessage] = []

def yield_and_raise_collected(self) -> Any:
if self.errors:
# emit collected logged messages
yield from self.errors
# clean the collector
self.errors.clear()
# raising the single exception
raise AirbyteTracedException(
internal_message="Please check the logged errors for more information.",
message="Some errors occured while reading from the source.",
failure_type=FailureType.config_error,
)

def collect(self, logged_error: AirbyteMessage) -> None:
self.errors.append(logged_error)


class BaseFileBasedSourceError(Exception):
def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa
if isinstance(error, FileBasedSourceError):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, ValidationPolicy
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy, DefaultDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedErrorsCollector, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types import default_parsers
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
Expand Down Expand Up @@ -49,6 +49,7 @@ def __init__(
self.stream_schemas = {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {}
self.cursor_cls = cursor_cls
self.logger = logging.getLogger(f"airbyte.{self.name}")
self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector()

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
Expand Down Expand Up @@ -106,6 +107,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
parsers=self.parsers,
validation_policy=self._validate_and_get_validation_policy(stream_config),
cursor=self.cursor_cls(stream_config),
errors_collector=self.errors_collector,
)
)
return streams
Expand All @@ -121,6 +123,8 @@ def read(
state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None,
) -> Iterator[AirbyteMessage]:
yield from super().read(logger, config, catalog, state)
# emit all the errors collected
yield from self.errors_collector.yield_and_raise_collected()
# count streams using a certain parser
parsed_config = self._get_parsed_config(config)
for parser, count in Counter(stream.format.filetype for stream in parsed_config.streams).items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import fastavro
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
Expand Down Expand Up @@ -144,15 +145,20 @@ def parse_records(
if not isinstance(avro_format, AvroFormat):
raise ValueError(f"Expected ParquetFormat, got {avro_format}")

with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
avro_reader = fastavro.reader(fp)
schema = avro_reader.writer_schema
schema_field_name_to_type = {field["name"]: field["type"] for field in schema["fields"]}
for record in avro_reader:
yield {
record_field: self._to_output_value(avro_format, schema_field_name_to_type[record_field], record[record_field])
for record_field, record_value in schema_field_name_to_type.items()
}
line_no = 0
try:
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
avro_reader = fastavro.reader(fp)
schema = avro_reader.writer_schema
schema_field_name_to_type = {field["name"]: field["type"] for field in schema["fields"]}
for record in avro_reader:
line_no += 1
yield {
record_field: self._to_output_value(avro_format, schema_field_name_to_type[record_field], record[record_field])
for record_field, record_value in schema_field_name_to_type.items()
}
except Exception as exc:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no) from exc

@property
def file_read_mode(self) -> FileReadMode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,25 @@ def parse_records(
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
config_format = _extract_format(config)
if discovered_schema:
property_types = {col: prop["type"] for col, prop in discovered_schema["properties"].items()} # type: ignore # discovered_schema["properties"] is known to be a mapping
deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
else:
deduped_property_types = {}
cast_fn = CsvParser._get_cast_function(deduped_property_types, config_format, logger, config.schemaless)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for row in data_generator:
yield CsvParser._to_nullable(cast_fn(row), deduped_property_types, config_format.null_values, config_format.strings_can_be_null)
data_generator.close()
line_no = 0
try:
config_format = _extract_format(config)
if discovered_schema:
property_types = {col: prop["type"] for col, prop in discovered_schema["properties"].items()} # type: ignore # discovered_schema["properties"] is known to be a mapping
deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
else:
deduped_property_types = {}
cast_fn = CsvParser._get_cast_function(deduped_property_types, config_format, logger, config.schemaless)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for row in data_generator:
line_no += 1
yield CsvParser._to_nullable(
cast_fn(row), deduped_property_types, config_format.null_values, config_format.strings_can_be_null
)
except RecordParseError as parse_err:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no) from parse_err
finally:
data_generator.close()

@property
def file_read_mode(self) -> FileReadMode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def _parse_jsonl_entries(
break

if had_json_parsing_error and not yielded_at_least_once:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD)
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line)

@staticmethod
def _instantiate_accumulator(line: Union[bytes, str]) -> Union[bytes, str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pyarrow as pa
import pyarrow.parquet as pq
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, ParquetFormat
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
Expand Down Expand Up @@ -64,19 +64,27 @@ def parse_records(
if not isinstance(parquet_format, ParquetFormat):
logger.info(f"Expected ParquetFormat, got {parquet_format}")
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
reader = pq.ParquetFile(fp)
partition_columns = {x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri)}
for row_group in range(reader.num_row_groups):
batch = reader.read_row_group(row_group)
for row in range(batch.num_rows):
yield {
**{
column: ParquetParser._to_output_value(batch.column(column)[row], parquet_format)
for column in batch.column_names
},
**partition_columns,
}

line_no = 0
try:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
reader = pq.ParquetFile(fp)
partition_columns = {x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri)}
for row_group in range(reader.num_row_groups):
batch = reader.read_row_group(row_group)
for row in range(batch.num_rows):
line_no += 1
yield {
**{
column: ParquetParser._to_output_value(batch.column(column)[row], parquet_format)
for column in batch.column_names
},
**partition_columns,
}
except Exception as exc:
raise RecordParseError(
FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=f"{row_group=}, {line_no=}"
) from exc

@staticmethod
def _extract_partitions(filepath: str) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, PrimaryKeyType
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError, UndefinedParserError
from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector, FileBasedSourceError, RecordParseError, UndefinedParserError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
Expand Down Expand Up @@ -44,6 +44,7 @@ def __init__(
discovery_policy: AbstractDiscoveryPolicy,
parsers: Dict[Type[Any], FileTypeParser],
validation_policy: AbstractSchemaValidationPolicy,
errors_collector: FileBasedErrorsCollector,
):
super().__init__()
self.config = config
Expand All @@ -53,6 +54,7 @@ def __init__(
self._discovery_policy = discovery_policy
self._availability_strategy = availability_strategy
self._parsers = parsers
self.errors_collector = errors_collector

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,14 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
except RecordParseError:
# Increment line_no because the exception was raised before we could increment it
line_no += 1
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
stack_trace=traceback.format_exc(),
self.errors_collector.collect(
AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
stack_trace=traceback.format_exc(),
),
),
)

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.58.5",
version="0.58.6",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Loading
Loading