Skip to content

Commit

Permalink
#17047 Airbyte CDK: Improve error for returning non-iterable from con…
Browse files Browse the repository at this point in the history
…nectors parse_response (#17626)

* Improve airbyte cdk invalid message data type error message

* Test cdk invalid message data type custom error is raised

* Fix test to pass stream as a string

* Add valid record message data input type test

* Add object type and value to AirbyteRecordMessage validator message

Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
  • Loading branch information
Gitznik and girarda authored Oct 6, 2022
1 parent bd5fb35 commit d9ad272
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
12 changes: 11 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from pydantic import AnyUrl, BaseModel, Extra, Field
from pydantic import AnyUrl, BaseModel, Extra, Field, validator


class Type(Enum):
Expand All @@ -35,6 +35,16 @@ class Config:
description="when the data was emitted from the source. epoch in millisecond.",
)

@validator("data", pre=True)
def data_is_dict(cls: AirbyteRecordMessage, value: Dict[str, Any]):
if isinstance(value, dict):
return value
raise ValueError("Data object is not a dictionary. "
"This can happen when the parse_response method directly returns the response.json, "
"instead of yielding a it/elements of it."
f"Object instead is {type(value)} with value: {value}"
)


class AirbyteStateType(Enum):
GLOBAL = "GLOBAL"
Expand Down
12 changes: 12 additions & 0 deletions airbyte-cdk/python/unit_tests/sources/test_abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,3 +800,15 @@ def test_checkpoint_state_from_stream_instance():
assert actual_message == _as_state(
{"teams": {"updated_at": "2022-09-11"}, "managers": {"updated": "expected_here"}}, "managers", {"updated": "expected_here"}
)


def test_airbyte_record_message_custom_data_validation_error():
invalid_data = "Not a dict"
with pytest.raises(ValueError):
AirbyteRecordMessage(stream="stream", data=invalid_data, emitted_at=GLOBAL_EMITTED_AT)


def test_airbyte_record_message_valid_data():
valid_data = {"foo": "bar"}
AirbyteRecordMessage(stream="stream", data=valid_data, emitted_at=GLOBAL_EMITTED_AT)

0 comments on commit d9ad272

Please sign in to comment.