diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index e5e4aca812bf..6e5dfadf89b5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -22,7 +22,8 @@ from airbyte_cdk.sources.source import Source from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.http import HttpStream -from airbyte_cdk.sources.utils.record_helper import data_to_airbyte_record +from airbyte_cdk.sources.utils.record_helper import \ + stream_data_to_airbyte_message from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, \ split_config from airbyte_cdk.utils.event_timing import create_timer @@ -242,8 +243,8 @@ def _read_incremental( ) record_counter = 0 for message_counter, record_data_or_message in enumerate(records, start=1): - message = data_to_airbyte_record(stream_name, record_data_or_message, stream_instance.transformer, - stream_instance.get_json_schema()) + message = stream_data_to_airbyte_message(stream_name, record_data_or_message, stream_instance.transformer, + stream_instance.get_json_schema()) yield message if message.type == MessageType.RECORD: record = message.record @@ -288,8 +289,8 @@ def _read_full_refresh( cursor_field=configured_stream.cursor_field, ) for record_data_or_message in record_data_or_messages: - message = data_to_airbyte_record(stream_instance.name, record_data_or_message, stream_instance.transformer, - stream_instance.get_json_schema()) + message = stream_data_to_airbyte_message(stream_instance.name, record_data_or_message, stream_instance.transformer, + stream_instance.get_json_schema()) yield message if message.type == MessageType.RECORD: total_records_counter += 1 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index dfa466cacd35..e304d3455928 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -18,7 +18,12 @@ from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer -RecordDataOrLogOrTraceMessage = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage] +# A stream's read method can return one of the following types: +# Mapping[str, Any]: The content of an AirbyteRecordMessage +# AirbyteRecordMessage: An AirbyteRecordMessage +# AirbyteLogMessage: A log message +# AirbyteTraceMessage: A trace message +StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage] def package_name_from_class(cls: object) -> str: @@ -100,7 +105,7 @@ def read_records( cursor_field: List[str] = None, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, - ) -> Iterable[RecordDataOrLogOrTraceMessage]: + ) -> Iterable[StreamData]: """ This method should be overridden by subclasses to read records based on the inputs """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py index 78faa713d538..734cb5754062 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py @@ -8,13 +8,13 @@ from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, \ AirbyteTraceMessage, AirbyteLogMessage from airbyte_cdk.models import Type as MessageType -from airbyte_cdk.sources.streams.core import RecordDataOrLogOrTraceMessage +from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer -def data_to_airbyte_record( +def stream_data_to_airbyte_message( stream_name: str, - data_or_message: RecordDataOrLogOrTraceMessage, + data_or_message: StreamData, transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform), schema: Mapping[str, Any] = None, ) -> AirbyteMessage: diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index 9d0f8bb41197..e886c0d2d663 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -34,7 +34,8 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import IncrementalMixin, Stream -from airbyte_cdk.sources.utils.record_helper import data_to_airbyte_record +from airbyte_cdk.sources.utils.record_helper import \ + stream_data_to_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -888,35 +889,35 @@ def test_emit_non_records(self, mocker, per_stream_enabled): expected = _fix_emitted_at([ # stream 1 slice 1 - data_to_airbyte_record("s1", stream_output[0]), - data_to_airbyte_record("s1", stream_output[1]), - data_to_airbyte_record("s1", stream_output[2]), + stream_data_to_airbyte_message("s1", stream_output[0]), + stream_data_to_airbyte_message("s1", stream_output[1]), + stream_data_to_airbyte_message("s1", stream_output[2]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), - data_to_airbyte_record("s1", stream_output[3]), + stream_data_to_airbyte_message("s1", stream_output[3]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), # stream 1 slice 2 - data_to_airbyte_record("s1", stream_output[0]), - data_to_airbyte_record("s1", stream_output[1]), - data_to_airbyte_record("s1", stream_output[2]), + stream_data_to_airbyte_message("s1", stream_output[0]), + stream_data_to_airbyte_message("s1", stream_output[1]), + stream_data_to_airbyte_message("s1", stream_output[2]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), - data_to_airbyte_record("s1", stream_output[3]), + stream_data_to_airbyte_message("s1", stream_output[3]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), # stream 2 slice 1 - data_to_airbyte_record("s2", stream_output[0]), - data_to_airbyte_record("s2", stream_output[1]), - data_to_airbyte_record("s2", stream_output[2]), + stream_data_to_airbyte_message("s2", stream_output[0]), + stream_data_to_airbyte_message("s2", stream_output[1]), + stream_data_to_airbyte_message("s2", stream_output[2]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state( {"s1": state, "s2": state}), - data_to_airbyte_record("s2", stream_output[3]), + stream_data_to_airbyte_message("s2", stream_output[3]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state( {"s1": state, "s2": state}), # stream 2 slice 2 - data_to_airbyte_record("s2", stream_output[0]), - data_to_airbyte_record("s2", stream_output[1]), - data_to_airbyte_record("s2", stream_output[2]), + stream_data_to_airbyte_message("s2", stream_output[0]), + stream_data_to_airbyte_message("s2", stream_output[1]), + stream_data_to_airbyte_message("s2", stream_output[2]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state( {"s1": state, "s2": state}), - data_to_airbyte_record("s2", stream_output[3]), + stream_data_to_airbyte_message("s2", stream_output[3]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state( {"s1": state, "s2": state}), ]) diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py b/airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py index 2d9f2290df3e..94991e49c447 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py @@ -10,7 +10,8 @@ AirbyteLogMessage, Level, AirbyteTraceMessage, TraceType, AirbyteStateMessage, \ AirbyteStateType from airbyte_cdk.models import Type as MessageType -from airbyte_cdk.sources.utils.record_helper import data_to_airbyte_record +from airbyte_cdk.sources.utils.record_helper import \ + stream_data_to_airbyte_message NOW = 1234567 STREAM_NAME = "my_stream" @@ -36,7 +37,7 @@ def test_data_or_record_to_airbyte_record(test_name, data, expected_message): transformer = MagicMock() schema = {} - message = data_to_airbyte_record(STREAM_NAME, data, transformer, schema) + message = stream_data_to_airbyte_message(STREAM_NAME, data, transformer, schema) message.record.emitted_at = NOW if isinstance(data, dict): @@ -64,7 +65,7 @@ def test_data_or_record_to_airbyte_record(test_name, data, expected_message): def test_log_or_trace_to_message(test_name, data, expected_message): transformer = MagicMock() schema = {} - message = data_to_airbyte_record(STREAM_NAME, data, transformer, schema) + message = stream_data_to_airbyte_message(STREAM_NAME, data, transformer, schema) assert not transformer.transform.called assert expected_message == message @@ -82,4 +83,4 @@ def test_state_message_to_message(test_name, data): transformer = MagicMock() schema = {} with pytest.raises(ValueError): - data_to_airbyte_record(STREAM_NAME, data, transformer, schema) + stream_data_to_airbyte_message(STREAM_NAME, data, transformer, schema)