Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAT: Improve error message when data mismatches schema #4753

Merged
merged 4 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.12
Improve error message when data mismatches schema: https://github.com/airbytehq/airbyte/pull/4753

## 0.1.11
Fix error in the naming of method `test_match_expected` for class `TestSpec`.

Expand All @@ -21,4 +24,4 @@ Add: `test_spec` additionally checks if Dockerfile has `ENV AIRBYTE_ENTRYPOINT`
Add test whether PKs present and not None if `source_defined_primary_key` defined: https://github.com/airbytehq/airbyte/pull/4140

## 0.1.5
Add configurable timeout for the acceptance tests: https://github.com/airbytehq/airbyte/pull/4296
Add configurable timeout for the acceptance tests: https://github.com/airbytehq/airbyte/pull/4296
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,16 @@ def test_read(
output = docker_runner.call_read(connector_config, configured_catalog)
records = [message.record for message in output if message.type == Type.RECORD]
counter = Counter(record.stream for record in records)

if inputs.validate_schema:
streams_with_errors = set()
for record, errors in verify_records_schema(records, configured_catalog):
if record.stream not in streams_with_errors:
logging.error(f"The {record.stream} stream has the following schema errors: {errors}")
streams_with_errors.add(record.stream)

if streams_with_errors:
pytest.fail(f"Please check your json_schema in selected streams {streams_with_errors}.")
bar = "-" * 80
streams_errors = verify_records_schema(records, configured_catalog)
for stream_name, errors in streams_errors.items():
errors = map(str, errors.values())
str_errors = f"\n{bar}\n".join(errors)
logging.error(f"The {stream_name} stream has the following schema errors:\n{str_errors}")

if streams_errors:
pytest.fail(f"Please check your json_schema in selected streams {streams_errors.keys()}.")

all_streams = set(stream.stream.name for stream in configured_catalog.streams)
streams_with_records = set(counter.keys())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,33 @@
#

import logging
from typing import Iterator, List, Tuple
from collections import defaultdict
from typing import List, Mapping

from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog
from jsonschema import Draft4Validator, ValidationError


def verify_records_schema(
records: List[AirbyteRecordMessage], catalog: ConfiguredAirbyteCatalog
) -> Iterator[Tuple[AirbyteRecordMessage, List[ValidationError]]]:
) -> Mapping[str, Mapping[str, ValidationError]]:
"""Check records against their schemas from the catalog, yield error messages.
Only first record with error will be yielded for each stream.
"""
validators = {}
for stream in catalog.streams:
validators[stream.stream.name] = Draft4Validator(stream.stream.json_schema)

stream_errors = defaultdict(dict)

for record in records:
validator = validators.get(record.stream)
if not validator:
logging.error(f"Record from the {record.stream} stream that is not in the catalog.")
continue

errors = list(validator.iter_errors(record.data))
if errors:
yield record, sorted(errors, key=str)
for error in errors:
stream_errors[record.stream][str(error.schema_path)] = error

return stream_errors
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,10 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):

records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0) for record in records]

records_with_errors, record_errors = zip(*verify_records_schema(records, configured_catalog))
errors = [[error.message for error in errors] for errors in record_errors]
streams_with_errors = verify_records_schema(records, configured_catalog)
errors = [error.message for error in streams_with_errors["my_stream"].values()]

assert len(records_with_errors) == 3, "only 3 out of 4 records have errors"
assert records_with_errors[0] == records[0], "1st record should have errors"
assert records_with_errors[1] == records[1], "2nd record should have errors"
assert records_with_errors[2] == records[3], "4th record should have errors"
assert errors[0] == ["'text' is not of type 'number'", "123 is not of type 'null', 'string'"]
assert errors[1] == ["None is not of type 'number'", "None is not of type 'string'"]
assert errors[2] == ["'text' is not of type 'number'"]
assert "my_stream" in streams_with_errors
assert len(streams_with_errors) == 1, "only one stream"
assert len(streams_with_errors["my_stream"]) == 3, "only first error for each field"
assert errors == ["123 is not of type 'null', 'string'", "'text' is not of type 'number'", "None is not of type 'string'"]