Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Nov 5, 2022
1 parent d78628f commit 7fd5cc9
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 31 deletions.
11 changes: 6 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.sources.utils.record_helper import data_to_airbyte_record
from airbyte_cdk.sources.utils.record_helper import \
stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, \
split_config
from airbyte_cdk.utils.event_timing import create_timer
Expand Down Expand Up @@ -242,8 +243,8 @@ def _read_incremental(
)
record_counter = 0
for message_counter, record_data_or_message in enumerate(records, start=1):
message = data_to_airbyte_record(stream_name, record_data_or_message, stream_instance.transformer,
stream_instance.get_json_schema())
message = stream_data_to_airbyte_message(stream_name, record_data_or_message, stream_instance.transformer,
stream_instance.get_json_schema())
yield message
if message.type == MessageType.RECORD:
record = message.record
Expand Down Expand Up @@ -288,8 +289,8 @@ def _read_full_refresh(
cursor_field=configured_stream.cursor_field,
)
for record_data_or_message in record_data_or_messages:
message = data_to_airbyte_record(stream_instance.name, record_data_or_message, stream_instance.transformer,
stream_instance.get_json_schema())
message = stream_data_to_airbyte_message(stream_instance.name, record_data_or_message, stream_instance.transformer,
stream_instance.get_json_schema())
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
Expand Down
9 changes: 7 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

RecordDataOrLogOrTraceMessage = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage]
# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
# AirbyteRecordMessage: An AirbyteRecordMessage
# AirbyteLogMessage: A log message
# AirbyteTraceMessage: A trace message
StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage]


def package_name_from_class(cls: object) -> str:
Expand Down Expand Up @@ -100,7 +105,7 @@ def read_records(
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[RecordDataOrLogOrTraceMessage]:
) -> Iterable[StreamData]:
"""
This method should be overridden by subclasses to read records based on the inputs
"""
Expand Down
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, \
AirbyteTraceMessage, AirbyteLogMessage
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams.core import RecordDataOrLogOrTraceMessage
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer


def data_to_airbyte_record(
def stream_data_to_airbyte_message(
stream_name: str,
data_or_message: RecordDataOrLogOrTraceMessage,
data_or_message: StreamData,
transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform),
schema: Mapping[str, Any] = None,
) -> AirbyteMessage:
Expand Down
35 changes: 18 additions & 17 deletions airbyte-cdk/python/unit_tests/sources/test_abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.utils.record_helper import data_to_airbyte_record
from airbyte_cdk.sources.utils.record_helper import \
stream_data_to_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -888,35 +889,35 @@ def test_emit_non_records(self, mocker, per_stream_enabled):

expected = _fix_emitted_at([
# stream 1 slice 1
data_to_airbyte_record("s1", stream_output[0]),
data_to_airbyte_record("s1", stream_output[1]),
data_to_airbyte_record("s1", stream_output[2]),
stream_data_to_airbyte_message("s1", stream_output[0]),
stream_data_to_airbyte_message("s1", stream_output[1]),
stream_data_to_airbyte_message("s1", stream_output[2]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
data_to_airbyte_record("s1", stream_output[3]),
stream_data_to_airbyte_message("s1", stream_output[3]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
# stream 1 slice 2
data_to_airbyte_record("s1", stream_output[0]),
data_to_airbyte_record("s1", stream_output[1]),
data_to_airbyte_record("s1", stream_output[2]),
stream_data_to_airbyte_message("s1", stream_output[0]),
stream_data_to_airbyte_message("s1", stream_output[1]),
stream_data_to_airbyte_message("s1", stream_output[2]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
data_to_airbyte_record("s1", stream_output[3]),
stream_data_to_airbyte_message("s1", stream_output[3]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
# stream 2 slice 1
data_to_airbyte_record("s2", stream_output[0]),
data_to_airbyte_record("s2", stream_output[1]),
data_to_airbyte_record("s2", stream_output[2]),
stream_data_to_airbyte_message("s2", stream_output[0]),
stream_data_to_airbyte_message("s2", stream_output[1]),
stream_data_to_airbyte_message("s2", stream_output[2]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state(
{"s1": state, "s2": state}),
data_to_airbyte_record("s2", stream_output[3]),
stream_data_to_airbyte_message("s2", stream_output[3]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state(
{"s1": state, "s2": state}),
# stream 2 slice 2
data_to_airbyte_record("s2", stream_output[0]),
data_to_airbyte_record("s2", stream_output[1]),
data_to_airbyte_record("s2", stream_output[2]),
stream_data_to_airbyte_message("s2", stream_output[0]),
stream_data_to_airbyte_message("s2", stream_output[1]),
stream_data_to_airbyte_message("s2", stream_output[2]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state(
{"s1": state, "s2": state}),
data_to_airbyte_record("s2", stream_output[3]),
stream_data_to_airbyte_message("s2", stream_output[3]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state(
{"s1": state, "s2": state}),
])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
AirbyteLogMessage, Level, AirbyteTraceMessage, TraceType, AirbyteStateMessage, \
AirbyteStateType
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.utils.record_helper import data_to_airbyte_record
from airbyte_cdk.sources.utils.record_helper import \
stream_data_to_airbyte_message

NOW = 1234567
STREAM_NAME = "my_stream"
Expand All @@ -36,7 +37,7 @@
def test_data_or_record_to_airbyte_record(test_name, data, expected_message):
transformer = MagicMock()
schema = {}
message = data_to_airbyte_record(STREAM_NAME, data, transformer, schema)
message = stream_data_to_airbyte_message(STREAM_NAME, data, transformer, schema)
message.record.emitted_at = NOW

if isinstance(data, dict):
Expand Down Expand Up @@ -64,7 +65,7 @@ def test_data_or_record_to_airbyte_record(test_name, data, expected_message):
def test_log_or_trace_to_message(test_name, data, expected_message):
transformer = MagicMock()
schema = {}
message = data_to_airbyte_record(STREAM_NAME, data, transformer, schema)
message = stream_data_to_airbyte_message(STREAM_NAME, data, transformer, schema)

assert not transformer.transform.called
assert expected_message == message
Expand All @@ -82,4 +83,4 @@ def test_state_message_to_message(test_name, data):
transformer = MagicMock()
schema = {}
with pytest.raises(ValueError):
data_to_airbyte_record(STREAM_NAME, data, transformer, schema)
stream_data_to_airbyte_message(STREAM_NAME, data, transformer, schema)

0 comments on commit 7fd5cc9

Please sign in to comment.