-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream returns AirbyteMessages #18572
Changes from all commits
5f88aa6
0de8395
e5acdcf
8a63af7
d0dcb52
adf07fa
c68e01d
258990d
b0f80a7
68528b2
b31f9bd
893beab
c3964c9
c69774a
152f5df
8ea5e85
1684823
b08a9c4
4fd9ed0
06b1c70
3e4b135
4702f4d
ae7b7fb
3d86bce
9dc4486
d3852f3
179d689
65c416c
b13fe7a
0fff056
01dcd2a
5d46bb0
600c195
0b48fea
613e876
ea2089d
e2d11b3
d901e1c
0600e59
82e46c5
50bb3ed
acfcec8
3fa6a00
048508d
3e85dca
6d9d0aa
464b247
4acd199
0c36dcb
46a807e
bad305b
d78628f
7fd5cc9
0cbd28e
a1a139e
b1d62cd
1da2ed1
5dac7c2
594ad06
2d91e9a
cff30a1
2f91b80
3fc697c
62d95eb
003b860
27150ba
26c3289
b36842a
fe3b01a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,15 +4,12 @@ | |
|
||
import logging | ||
from abc import ABC, abstractmethod | ||
from datetime import datetime | ||
from functools import lru_cache | ||
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union | ||
|
||
from airbyte_cdk.models import ( | ||
AirbyteCatalog, | ||
AirbyteConnectionStatus, | ||
AirbyteMessage, | ||
AirbyteRecordMessage, | ||
AirbyteStateMessage, | ||
ConfiguredAirbyteCatalog, | ||
ConfiguredAirbyteStream, | ||
|
@@ -24,8 +21,8 @@ | |
from airbyte_cdk.sources.source import Source | ||
from airbyte_cdk.sources.streams import Stream | ||
from airbyte_cdk.sources.streams.http.http import HttpStream | ||
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message | ||
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config | ||
from airbyte_cdk.sources.utils.transform import TypeTransformer | ||
from airbyte_cdk.utils.event_timing import create_timer | ||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||
|
||
|
@@ -241,20 +238,27 @@ def _read_incremental( | |
stream_state=stream_state, | ||
cursor_field=configured_stream.cursor_field or None, | ||
) | ||
for record_counter, record_data in enumerate(records, start=1): | ||
yield self._as_airbyte_record(stream_name, record_data) | ||
stream_state = stream_instance.get_updated_state(stream_state, record_data) | ||
checkpoint_interval = stream_instance.state_checkpoint_interval | ||
if checkpoint_interval and record_counter % checkpoint_interval == 0: | ||
yield self._checkpoint_state(stream_instance, stream_state, state_manager) | ||
|
||
total_records_counter += 1 | ||
# This functionality should ideally live outside of this method | ||
# but since state is managed inside this method, we keep track | ||
# of it here. | ||
if self._limit_reached(internal_config, total_records_counter): | ||
# Break from slice loop to save state and exit from _read_incremental function. | ||
break | ||
record_counter = 0 | ||
for message_counter, record_data_or_message in enumerate(records, start=1): | ||
message = stream_data_to_airbyte_message( | ||
stream_name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema() | ||
) | ||
yield message | ||
if message.type == MessageType.RECORD: | ||
record = message.record | ||
stream_state = stream_instance.get_updated_state(stream_state, record.data) | ||
checkpoint_interval = stream_instance.state_checkpoint_interval | ||
record_counter += 1 | ||
if checkpoint_interval and record_counter % checkpoint_interval == 0: | ||
yield self._checkpoint_state(stream_instance, stream_state, state_manager) | ||
|
||
total_records_counter += 1 | ||
# This functionality should ideally live outside of this method | ||
# but since state is managed inside this method, we keep track | ||
# of it here. | ||
if self._limit_reached(internal_config, total_records_counter): | ||
# Break from slice loop to save state and exit from _read_incremental function. | ||
break | ||
|
||
yield self._checkpoint_state(stream_instance, stream_state, state_manager) | ||
if self._limit_reached(internal_config, total_records_counter): | ||
|
@@ -277,50 +281,32 @@ def _read_full_refresh( | |
total_records_counter = 0 | ||
for _slice in slices: | ||
logger.debug("Processing stream slice", extra={"slice": _slice}) | ||
records = stream_instance.read_records( | ||
record_data_or_messages = stream_instance.read_records( | ||
stream_slice=_slice, | ||
sync_mode=SyncMode.full_refresh, | ||
cursor_field=configured_stream.cursor_field, | ||
) | ||
for record in records: | ||
yield self._as_airbyte_record(configured_stream.stream.name, record) | ||
total_records_counter += 1 | ||
if self._limit_reached(internal_config, total_records_counter): | ||
return | ||
for record_data_or_message in record_data_or_messages: | ||
message = stream_data_to_airbyte_message( | ||
stream_instance.name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema() | ||
) | ||
yield message | ||
if message.type == MessageType.RECORD: | ||
total_records_counter += 1 | ||
if self._limit_reached(internal_config, total_records_counter): | ||
return | ||
|
||
def _checkpoint_state(self, stream: Stream, stream_state, state_manager: ConnectorStateManager): | ||
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state | ||
# property is not implemented by the stream instance and as a fallback, use the stream_state retrieved from the stream | ||
# instance's deprecated get_updated_state() method. | ||
try: | ||
state_manager.update_state_for_stream(stream.name, stream.namespace, stream.state) | ||
|
||
except AttributeError: | ||
state_manager.update_state_for_stream(stream.name, stream.namespace, stream_state) | ||
return state_manager.create_state_message(stream.name, stream.namespace, send_per_stream_state=self.per_stream_state_enabled) | ||
|
||
@lru_cache(maxsize=None) | ||
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, Mapping[str, Any]]: | ||
""" | ||
Lookup stream's transform object and jsonschema based on stream name. | ||
This function would be called a lot so using caching to save on costly | ||
get_json_schema operation. | ||
:param stream_name name of stream from catalog. | ||
:return tuple with stream transformer object and discover json schema. | ||
""" | ||
stream_instance = self._stream_to_instance_map[stream_name] | ||
return stream_instance.transformer, stream_instance.get_json_schema() | ||
|
||
def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extracted to a function so it can be reused by the SimpleRetriever |
||
now_millis = int(datetime.now().timestamp() * 1000) | ||
transformer, schema = self._get_stream_transformer_and_schema(stream_name) | ||
# Transform object fields according to config. Most likely you will | ||
# need it to normalize values against json schema. By default no action | ||
# taken unless configured. See | ||
# docs/connector-development/cdk-python/schemas.md for details. | ||
transformer.transform(data, schema) # type: ignore | ||
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis) | ||
return AirbyteMessage(type=MessageType.RECORD, record=message) | ||
|
||
@staticmethod | ||
def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: Stream): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,14 +6,24 @@ | |
import inspect | ||
import logging | ||
from abc import ABC, abstractmethod | ||
from functools import lru_cache | ||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union | ||
|
||
import airbyte_cdk.sources.utils.casing as casing | ||
from airbyte_cdk.models import AirbyteStream, SyncMode | ||
from airbyte_cdk.models import AirbyteLogMessage, AirbyteRecordMessage, AirbyteStream, AirbyteTraceMessage, SyncMode | ||
|
||
# list of all possible HTTP methods which can be used for sending of request bodies | ||
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader | ||
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer | ||
from deprecated.classic import deprecated | ||
|
||
# A stream's read method can return one of the following types: | ||
# Mapping[str, Any]: The content of an AirbyteRecordMessage | ||
# AirbyteRecordMessage: An AirbyteRecordMessage | ||
# AirbyteLogMessage: A log message | ||
# AirbyteTraceMessage: A trace message | ||
StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not attached to this typedef, but the full union definition is quite cumbersome |
||
|
||
|
||
def package_name_from_class(cls: object) -> str: | ||
"""Find the package name given a class name""" | ||
|
@@ -94,11 +104,12 @@ def read_records( | |
cursor_field: List[str] = None, | ||
stream_slice: Mapping[str, Any] = None, | ||
stream_state: Mapping[str, Any] = None, | ||
) -> Iterable[Mapping[str, Any]]: | ||
) -> Iterable[StreamData]: | ||
""" | ||
This method should be overridden by subclasses to read records based on the inputs | ||
""" | ||
|
||
@lru_cache(maxsize=None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
def get_json_schema(self) -> Mapping[str, Any]: | ||
""" | ||
:return: A dict of the JSON schema representing this stream. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
# | ||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
# Initialize Utils Package | ||
|
||
__all__ = ["record_helper"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import datetime | ||
from typing import Any, Mapping | ||
|
||
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, AirbyteTraceMessage | ||
from airbyte_cdk.models import Type as MessageType | ||
from airbyte_cdk.sources.streams.core import StreamData | ||
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer | ||
|
||
|
||
def stream_data_to_airbyte_message( | ||
stream_name: str, | ||
data_or_message: StreamData, | ||
transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform), | ||
schema: Mapping[str, Any] = None, | ||
) -> AirbyteMessage: | ||
if schema is None: | ||
schema = {} | ||
|
||
if isinstance(data_or_message, dict): | ||
data = data_or_message | ||
now_millis = int(datetime.datetime.now().timestamp() * 1000) | ||
# Transform object fields according to config. Most likely you will | ||
# need it to normalize values against json schema. By default no action | ||
# taken unless configured. See | ||
# docs/connector-development/cdk-python/schemas.md for details. | ||
transformer.transform(data, schema) # type: ignore | ||
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis) | ||
return AirbyteMessage(type=MessageType.RECORD, record=message) | ||
elif isinstance(data_or_message, AirbyteRecordMessage): | ||
return AirbyteMessage(type=MessageType.RECORD, record=data_or_message) | ||
elif isinstance(data_or_message, AirbyteTraceMessage): | ||
return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message) | ||
elif isinstance(data_or_message, AirbyteLogMessage): | ||
return AirbyteMessage(type=MessageType.LOG, log=data_or_message) | ||
else: | ||
raise ValueError(f"Unexpected type for data_or_message: {type(data_or_message)}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
|
||
from typing import Any, Iterable, List, Mapping | ||
from unittest import mock | ||
|
||
import pytest | ||
from airbyte_cdk.models import AirbyteStream, SyncMode | ||
|
@@ -173,3 +174,11 @@ def test_wrapped_primary_key_various_argument(test_input, expected): | |
wrapped = Stream._wrapped_primary_key(test_input) | ||
|
||
assert wrapped == expected | ||
|
||
|
||
@mock.patch("airbyte_cdk.sources.utils.schema_helpers.ResourceSchemaLoader.get_schema") | ||
def test_get_json_schema_is_cached(mocked_method): | ||
stream = StreamStubFullRefresh() | ||
for i in range(5): | ||
stream.get_json_schema() | ||
assert mocked_method.call_count == 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. manually confirmed this test fails if @lru_cache is deleted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same logic moved inside the
if
block because we only want to update the state and record counter on records