From ea5e86ce188eae7148dd0b1a0a2d2ebe761044a6 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 11:18:15 -0700 Subject: [PATCH 01/15] Move condition for yielding the slice message to an overwritable method --- .../python/airbyte_cdk/sources/abstract_source.py | 10 +++++++++- .../sources/declarative/manifest_declarative_source.py | 5 +++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 4beec964909f..355756a2eff5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -277,6 +277,14 @@ def _read_incremental( checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager) yield checkpoint + def log_slice_message(self, logger: logging.Logger): + """ + + :param logger: + :return: + """ + return logger.isEnabledFor(logging.DEBUG) + def _read_full_refresh( self, logger: logging.Logger, @@ -290,7 +298,7 @@ def _read_full_refresh( ) total_records_counter = 0 for _slice in slices: - if logger.isEnabledFor(logging.DEBUG): + if self.log_slice_message(logger): yield AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 9ddb5c8819b9..5c7f6b19e1b5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -118,6 +118,11 @@ def read( self._configure_logger_level(logger) yield from super().read(logger, config, catalog, state) + def log_slice_message(self, logger: logging.Logger): + # Commented so the unit tests fail + #return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) + return super(self).log_slice_message(logger) + def _configure_logger_level(self, logger: logging.Logger): """ Set the log level to logging.DEBUG if debug mode is enabled From 5e8c5b6e1f7efe2a992173a761c1c945c15e9a45 Mon Sep 17 00:00:00 2001 From: girarda Date: Thu, 13 Apr 2023 18:21:36 +0000 Subject: [PATCH 02/15] Automated Commit - Formatting Changes --- .../sources/declarative/manifest_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 5c7f6b19e1b5..5ba261c31ad5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -120,7 +120,7 @@ def read( def log_slice_message(self, logger: logging.Logger): # Commented so the unit tests fail - #return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) + # return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) return super(self).log_slice_message(logger) def _configure_logger_level(self, logger: logging.Logger): From ba9081a32cb699369f18c50cdc5e4656e92e936c Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 13:11:08 -0700 Subject: [PATCH 03/15] yield the slice log messages --- .../manifest_declarative_source.py | 3 +- .../test_connector_builder_handler.py | 88 ++++++++++++++----- 2 files changed, 68 insertions(+), 23 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 5c7f6b19e1b5..690678a87e36 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -120,8 +120,7 @@ def read( def log_slice_message(self, logger: logging.Logger): # Commented so the unit tests fail - #return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) - return super(self).log_slice_message(logger) + return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) def _configure_logger_level(self, logger: logging.Logger): """ diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index 0ebadda79485..1f9389e91ad0 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -5,6 +5,7 @@ import copy import dataclasses import json +import logging from unittest import mock from unittest.mock import patch @@ -22,7 +23,7 @@ ) from airbyte_cdk.connector_builder.main import handle_connector_builder_request, handle_request, read_stream from airbyte_cdk.connector_builder.models import LogMessage, StreamRead, StreamReadSlicesInner, StreamReadSlicesInnerPagesInner -from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type, AirbyteLogMessage, Level from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource @@ -34,23 +35,29 @@ _stream_primary_key = "id" _stream_url_base = "https://api.sendgrid.com" _stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base} +_page_size = 2 MANIFEST = { "version": "0.30.3", "definitions": { - "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, "retriever": { "paginator": { "type": "DefaultPaginator", - "page_size": 10, + "page_size": _page_size, "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size}, }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1", "2", "3", "4", "5", "6", "7"], + "cursor_field": "item_id" + }, + "" "requester": { "path": "/v3/marketing/lists", "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, - "request_parameters": {"page_size": "10"}, + "request_parameters": {"a_param": "10"}, }, "record_selector": {"extractor": {"field_path": ["result"]}}, }, @@ -59,7 +66,6 @@ { "type": "DeclarativeStream", "$parameters": _stream_options, - "schema_loader": {"$ref": "#/definitions/schema_loader"}, "retriever": "#/definitions/retriever", }, ], @@ -169,19 +175,23 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "type": "DeclarativeSource", "version": "0.30.3", "definitions": { - "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, "retriever": { "paginator": { "type": "DefaultPaginator", - "page_size": 10, + "page_size": _page_size, "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size}, + }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1", "2", "3", "4", "5", "6", "7"], + "cursor_field": "item_id", }, "requester": { "path": "/v3/marketing/lists", "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, - "request_parameters": {"page_size": "10"}, + "request_parameters": {"a_param": "10"}, }, "record_selector": {"extractor": {"field_path": ["result"]}}, }, @@ -189,19 +199,11 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "streams": [ { "type": "DeclarativeStream", - "schema_loader": { - "type": "JsonFileSchemaLoader", - "name": "{{ options.stream_name }}", - "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", - "primary_key": _stream_primary_key, - "url_base": _stream_url_base, - "$parameters": _stream_options, - }, "retriever": { "type": "SimpleRetriever", "paginator": { "type": "DefaultPaginator", - "page_size": 10, + "page_size": _page_size, "page_size_option": { "type": "RequestOption", "inject_into": "request_parameter", @@ -226,7 +228,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "primary_key": _stream_primary_key, "url_base": _stream_url_base, "$parameters": _stream_options, - "page_size": 10, + "page_size": _page_size, }, "name": _stream_name, "primary_key": _stream_primary_key, @@ -244,7 +246,16 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "url_base": _stream_url_base, "$parameters": _stream_options, }, - "request_parameters": {"page_size": "10"}, + "request_parameters": {"a_param": "10"}, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1", "2", "3", "4", "5", "6", "7"], + "cursor_field": "item_id", "name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base, @@ -512,3 +523,38 @@ def test_create_source(): assert isinstance(source, ManifestDeclarativeSource) assert source._constructor._limit_pages_fetched_per_slice == limits.max_pages_per_slice assert source._constructor._limit_slices_fetched == limits.max_slices + +def request_log_message(request: dict) -> AirbyteMessage: + return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"request:{json.dumps(request)}")) +def response_log_message(response: dict) -> AirbyteMessage: + return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"response:{json.dumps(response)}")) + +@patch.object(HttpStream, "_read_pages", return_value=[ + request_log_message({}), + response_log_message({}), + {"id": 0}, {"id": 1}, request_log_message({}), response_log_message({}), {"id": 2}]) +def test_read_source(mock_http_stream): + max_records = 100 + max_pages_per_slice = 2 + max_slices = 3 + limits = TestReadLimits(max_records, max_pages_per_slice, max_slices) + + catalog = ConfiguredAirbyteCatalog(streams=[ + ConfiguredAirbyteStream(stream=AirbyteStream(name=_stream_name, json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append) + ]) + + config = {"__injected_declarative_manifest": MANIFEST} + + source = create_source(config, limits) + + output_data = read_stream(source, config, catalog, limits).record.data + slices = output_data["slices"] + + assert len(slices) == max_slices + for s in slices: + pages = s["pages"] + assert len(pages) == max_pages_per_slice + + first_page, second_page = pages[0], pages[1] + assert len(first_page["records"]) == _page_size + assert len(second_page["records"]) == 1 From 4c343feb36ee7f86963fece64ebbdd28a09f9c98 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 13:17:31 -0700 Subject: [PATCH 04/15] same for incremental --- airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 355756a2eff5..a49ae883f94d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -237,7 +237,7 @@ def _read_incremental( has_slices = False for _slice in slices: has_slices = True - if logger.isEnabledFor(logging.DEBUG): + if self.log_slice_message(logger): yield AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), From c594365bd82c059df9d3d41656d58d426278aa78 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 13:43:01 -0700 Subject: [PATCH 05/15] refactor --- .../airbyte_cdk/sources/abstract_source.py | 13 ++----- .../sources/declarative/declarative_stream.py | 8 ++++- .../manifest_declarative_source.py | 4 --- .../parsers/model_to_component_factory.py | 35 +++++++++++++------ .../airbyte_cdk/sources/streams/core.py | 21 ++++++++++- 5 files changed, 53 insertions(+), 28 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index a49ae883f94d..0ce4742decd7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -37,7 +37,6 @@ class AbstractSource(Source, ABC): in this class to create an Airbyte Specification compliant Source. """ - SLICE_LOG_PREFIX = "slice:" @abstractmethod def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: @@ -237,11 +236,7 @@ def _read_incremental( has_slices = False for _slice in slices: has_slices = True - if self.log_slice_message(logger): - yield AirbyteMessage( - type=MessageType.LOG, - log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), - ) + yield from stream_instance.start_of_slice_messages(logger, _slice) records = stream_instance.read_records( sync_mode=SyncMode.incremental, stream_slice=_slice, @@ -298,11 +293,7 @@ def _read_full_refresh( ) total_records_counter = 0 for _slice in slices: - if self.log_slice_message(logger): - yield AirbyteMessage( - type=MessageType.LOG, - log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), - ) + yield from stream_instance.start_of_slice_messages(logger, _slice) record_data_or_messages = stream_instance.read_records( stream_slice=_slice, sync_mode=SyncMode.full_refresh, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py index c3ccdd3affe5..0a448b81474f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import logging from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union @@ -129,3 +129,9 @@ def stream_slices( """ # this is not passing the cursor field because it is known at init time return self.retriever.stream_slices(sync_mode=sync_mode, stream_state=stream_state) + + +@dataclass +class DeclarativeStreamTestReadDecorator(DeclarativeStream): + def log_slice_message(self, logger: logging.Logger): + return True diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 690678a87e36..9ddb5c8819b9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -118,10 +118,6 @@ def read( self._configure_logger_level(logger) yield from super().read(logger, config, catalog, state) - def log_slice_message(self, logger: logging.Logger): - # Commented so the unit tests fail - return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) - def _configure_logger_level(self, logger: logging.Logger): """ Set the log level to logging.DEBUG if debug mode is enabled diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c86a848b129c..b67e5b64a5bb 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -19,7 +19,7 @@ ) from airbyte_cdk.sources.declarative.checks import CheckStream from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream, DeclarativeStreamTestReadDecorator from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor @@ -458,16 +458,29 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi if model.transformations: for transformation_model in model.transformations: transformations.append(self._create_component_from_model(model=transformation_model, config=config)) - return DeclarativeStream( - name=model.name, - primary_key=primary_key, - retriever=retriever, - schema_loader=schema_loader, - stream_cursor_field=cursor_field or "", - transformations=transformations, - config=config, - parameters=model.parameters, - ) + if self._emit_connector_builder_messages: + return DeclarativeStreamTestReadDecorator( + name=model.name, + primary_key=primary_key, + retriever=retriever, + schema_loader=schema_loader, + stream_cursor_field=cursor_field or "", + transformations=transformations, + config=config, + parameters=model.parameters, + ) + else: + return DeclarativeStream( + name=model.name, + primary_key=primary_key, + retriever=retriever, + schema_loader=schema_loader, + stream_cursor_field=cursor_field or "", + transformations=transformations, + config=config, + parameters=model.parameters, + ) + def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -> Optional[StreamSlicer]: incremental_sync = ( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 17c0076c3cf5..a01933c9f0c9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -4,6 +4,7 @@ import inspect +import json import logging import typing from abc import ABC, abstractmethod @@ -11,7 +12,8 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union import airbyte_cdk.sources.utils.casing as casing -from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, AirbyteTraceMessage, Level, SyncMode +from airbyte_cdk.models import Type as MessageType # list of all possible HTTP methods which can be used for sending of request bodies from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader @@ -75,6 +77,8 @@ class Stream(ABC): Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol. """ + SLICE_LOG_PREFIX = "slice:" + # Use self.logger in subclasses to log any messages @property def logger(self): @@ -247,6 +251,21 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late """ return {} + def start_of_slice_messages(self, logger: logging.Logger, stream_slice: Mapping[str, Any]) -> typing.Iterator[AirbyteMessage]: + if self.log_slice_message(logger): + yield AirbyteMessage( + type=MessageType.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(stream_slice, default=str)}"), + ) + + def log_slice_message(self, logger: logging.Logger): + """ + + :param logger: + :return: + """ + return logger.isEnabledFor(logging.DEBUG) + @staticmethod def _wrapped_primary_key(keys: Optional[Union[str, List[str], List[List[str]]]]) -> Optional[List[List[str]]]: """ From 3a301555a41af1ffed812424106fd654643bdc7b Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 14:57:23 -0700 Subject: [PATCH 06/15] Revert "refactor" This reverts commit c594365bd82c059df9d3d41656d58d426278aa78. --- .../airbyte_cdk/sources/abstract_source.py | 13 +++++-- .../sources/declarative/declarative_stream.py | 8 +---- .../manifest_declarative_source.py | 4 +++ .../parsers/model_to_component_factory.py | 35 ++++++------------- .../airbyte_cdk/sources/streams/core.py | 21 +---------- 5 files changed, 28 insertions(+), 53 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 0ce4742decd7..a49ae883f94d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -37,6 +37,7 @@ class AbstractSource(Source, ABC): in this class to create an Airbyte Specification compliant Source. """ + SLICE_LOG_PREFIX = "slice:" @abstractmethod def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: @@ -236,7 +237,11 @@ def _read_incremental( has_slices = False for _slice in slices: has_slices = True - yield from stream_instance.start_of_slice_messages(logger, _slice) + if self.log_slice_message(logger): + yield AirbyteMessage( + type=MessageType.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), + ) records = stream_instance.read_records( sync_mode=SyncMode.incremental, stream_slice=_slice, @@ -293,7 +298,11 @@ def _read_full_refresh( ) total_records_counter = 0 for _slice in slices: - yield from stream_instance.start_of_slice_messages(logger, _slice) + if self.log_slice_message(logger): + yield AirbyteMessage( + type=MessageType.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), + ) record_data_or_messages = stream_instance.read_records( stream_slice=_slice, sync_mode=SyncMode.full_refresh, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py index 0a448b81474f..c3ccdd3affe5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import logging + from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union @@ -129,9 +129,3 @@ def stream_slices( """ # this is not passing the cursor field because it is known at init time return self.retriever.stream_slices(sync_mode=sync_mode, stream_state=stream_state) - - -@dataclass -class DeclarativeStreamTestReadDecorator(DeclarativeStream): - def log_slice_message(self, logger: logging.Logger): - return True diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 9ddb5c8819b9..690678a87e36 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -118,6 +118,10 @@ def read( self._configure_logger_level(logger) yield from super().read(logger, config, catalog, state) + def log_slice_message(self, logger: logging.Logger): + # Commented so the unit tests fail + return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) + def _configure_logger_level(self, logger: logging.Logger): """ Set the log level to logging.DEBUG if debug mode is enabled diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b67e5b64a5bb..c86a848b129c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -19,7 +19,7 @@ ) from airbyte_cdk.sources.declarative.checks import CheckStream from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream, DeclarativeStreamTestReadDecorator +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor @@ -458,29 +458,16 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi if model.transformations: for transformation_model in model.transformations: transformations.append(self._create_component_from_model(model=transformation_model, config=config)) - if self._emit_connector_builder_messages: - return DeclarativeStreamTestReadDecorator( - name=model.name, - primary_key=primary_key, - retriever=retriever, - schema_loader=schema_loader, - stream_cursor_field=cursor_field or "", - transformations=transformations, - config=config, - parameters=model.parameters, - ) - else: - return DeclarativeStream( - name=model.name, - primary_key=primary_key, - retriever=retriever, - schema_loader=schema_loader, - stream_cursor_field=cursor_field or "", - transformations=transformations, - config=config, - parameters=model.parameters, - ) - + return DeclarativeStream( + name=model.name, + primary_key=primary_key, + retriever=retriever, + schema_loader=schema_loader, + stream_cursor_field=cursor_field or "", + transformations=transformations, + config=config, + parameters=model.parameters, + ) def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -> Optional[StreamSlicer]: incremental_sync = ( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index a01933c9f0c9..17c0076c3cf5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -4,7 +4,6 @@ import inspect -import json import logging import typing from abc import ABC, abstractmethod @@ -12,8 +11,7 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union import airbyte_cdk.sources.utils.casing as casing -from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, AirbyteTraceMessage, Level, SyncMode -from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode # list of all possible HTTP methods which can be used for sending of request bodies from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader @@ -77,8 +75,6 @@ class Stream(ABC): Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol. """ - SLICE_LOG_PREFIX = "slice:" - # Use self.logger in subclasses to log any messages @property def logger(self): @@ -251,21 +247,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late """ return {} - def start_of_slice_messages(self, logger: logging.Logger, stream_slice: Mapping[str, Any]) -> typing.Iterator[AirbyteMessage]: - if self.log_slice_message(logger): - yield AirbyteMessage( - type=MessageType.LOG, - log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(stream_slice, default=str)}"), - ) - - def log_slice_message(self, logger: logging.Logger): - """ - - :param logger: - :return: - """ - return logger.isEnabledFor(logging.DEBUG) - @staticmethod def _wrapped_primary_key(keys: Optional[Union[str, List[str], List[List[str]]]]) -> Optional[List[List[str]]]: """ From 7f2bf027a275a46730d607b70cbf4c1daff7f9ad Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 15:37:07 -0700 Subject: [PATCH 07/15] move flag from factory to source --- .../connector_builder_handler.py | 5 +- .../manifest_declarative_source.py | 16 ++++-- .../parsers/model_to_component_factory.py | 52 +++++++++++++------ .../test_connector_builder_handler.py | 18 ++++++- .../test_model_to_component_factory.py | 3 +- 5 files changed, 70 insertions(+), 24 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py index 76909c8ca6d4..1572e70367d1 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -45,8 +45,9 @@ def get_limits(config: Mapping[str, Any]) -> TestReadLimits: def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource: manifest = config["__injected_declarative_manifest"] return ManifestDeclarativeSource( - source_config=manifest, component_factory=ModelToComponentFactory( - emit_connector_builder_messages=True, + emit_connector_builder_messages=True, + source_config=manifest, + component_factory=ModelToComponentFactory( limit_pages_fetched_per_slice=limits.max_pages_per_slice, limit_slices_fetched=limits.max_slices) ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 690678a87e36..c64aef4c7998 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -36,7 +36,13 @@ class ManifestDeclarativeSource(DeclarativeSource): VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "type", "version"} - def __init__(self, source_config: ConnectionDefinition, debug: bool = False, component_factory: ModelToComponentFactory = None): + def __init__( + self, + source_config: ConnectionDefinition, + debug: bool = False, + emit_connector_builder_messages: bool = False, + component_factory: ModelToComponentFactory = None, + ): """ :param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector :param debug(bool): True if debug mode is enabled @@ -53,6 +59,7 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False, com propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters("", resolved_source_config, {}) self._source_config = propagated_source_config self._debug = debug + self._emit_connector_builder_messages = emit_connector_builder_messages self._constructor = component_factory if component_factory else ModelToComponentFactory() self._validate_source() @@ -66,7 +73,9 @@ def connection_checker(self) -> ConnectionChecker: check = self._source_config["check"] if "type" not in check: check["type"] = "CheckStream" - check_stream = self._constructor.create_component(CheckStreamModel, check, dict()) + check_stream = self._constructor.create_component( + CheckStreamModel, check, dict(), emit_connector_builder_messages=self._emit_connector_builder_messages + ) if isinstance(check_stream, ConnectionChecker): return check_stream else: @@ -119,8 +128,7 @@ def read( yield from super().read(logger, config, catalog, state) def log_slice_message(self, logger: logging.Logger): - # Commented so the unit tests fail - return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger) + return self._emit_connector_builder_messages or super(self).log_slice_message(logger) def _configure_logger_level(self, logger: logging.Logger): """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c86a848b129c..589e497df233 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -117,7 +117,6 @@ def __init__( self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice self._limit_slices_fetched = limit_slices_fetched - self._emit_connector_builder_messages = emit_connector_builder_messages def _init_mappings(self): self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = { @@ -176,7 +175,15 @@ def _init_mappings(self): # Needed for the case where we need to perform a second parse on the fields of a custom component self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR} - def create_component(self, model_type: Type[BaseModel], component_definition: ComponentDefinition, config: Config, **kwargs) -> type: + def create_component( + self, + model_type: Type[BaseModel], + component_definition: ComponentDefinition, + config: Config, + *, + emit_connector_builder_messages: bool = False, + **kwargs, + ) -> type: """ Takes a given Pydantic model type and Mapping representing a component definition and creates a declarative component and subcomponents which will be used at runtime. This is done by first parsing the mapping into a Pydantic model and then creating @@ -185,6 +192,7 @@ def create_component(self, model_type: Type[BaseModel], component_definition: Co :param model_type: The type of declarative component that is being initialized :param component_definition: The mapping that represents a declarative component :param config: The connector config that is provided by the customer + :param emit_connector_builder_messages: :return: The declarative component to be used at runtime """ @@ -197,12 +205,17 @@ def create_component(self, model_type: Type[BaseModel], component_definition: Co if not isinstance(declarative_component_model, model_type): raise ValueError(f"Expected {model_type.__name__} component, but received {declarative_component_model.__class__.__name__}") - return self._create_component_from_model(model=declarative_component_model, config=config, **kwargs) + return self._create_component_from_model( + model=declarative_component_model, + config=config, + **{**kwargs, **{"emit_connector_builder_messages": emit_connector_builder_messages}}, + ) def _create_component_from_model(self, model: BaseModel, config: Config, **kwargs) -> Any: if model.__class__ not in self.PYDANTIC_MODEL_TO_CONSTRUCTOR: raise ValueError(f"{model.__class__} with attributes {model} is not a valid component type") component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__) + print(f"model: {model} KWARGS: {kwargs}") return component_constructor(model=model, config=config, **kwargs) @staticmethod @@ -296,7 +309,7 @@ def create_custom_component(self, model, config: Config, **kwargs) -> type: model_value["type"] = derived_type if self._is_component(model_value): - model_args[model_field] = self._create_nested_component(model, model_field, model_value, config) + model_args[model_field] = self._create_nested_component(model, model_field, model_value, config, **kwargs) elif isinstance(model_value, list): vals = [] for v in model_value: @@ -305,7 +318,7 @@ def create_custom_component(self, model, config: Config, **kwargs) -> type: if derived_type: v["type"] = derived_type if self._is_component(v): - vals.append(self._create_nested_component(model, model_field, v, config)) + vals.append(self._create_nested_component(model, model_field, v, config, **kwargs)) else: vals.append(v) model_args[model_field] = vals @@ -351,7 +364,7 @@ def _extract_missing_parameters(error: TypeError) -> List[str]: else: return [] - def _create_nested_component(self, model, model_field: str, model_value: Any, config: Config) -> Any: + def _create_nested_component(self, model, model_field: str, model_value: Any, config: Config, **kwargs) -> Any: type_name = model_value.get("type", None) if not type_name: # If no type is specified, we can assume this is a dictionary object which can be returned instead of a subcomponent @@ -371,7 +384,7 @@ def _create_nested_component(self, model, model_field: str, model_value: Any, co constructor_kwargs = inspect.getfullargspec(model_constructor).kwonlyargs model_parameters = model_value.get("$parameters", {}) matching_parameters = {kwarg: model_parameters[kwarg] for kwarg in constructor_kwargs if kwarg in model_parameters} - return self._create_component_from_model(model=parsed_model, config=config, **matching_parameters) + return self._create_component_from_model(model=parsed_model, config=config, **matching_parameters, **kwargs) except TypeError as error: missing_parameters = self._extract_missing_parameters(error) if missing_parameters: @@ -432,7 +445,9 @@ def create_datetime_based_cursor(self, model: DatetimeBasedCursorModel, config: parameters=model.parameters, ) - def create_declarative_stream(self, model: DeclarativeStreamModel, config: Config, **kwargs) -> DeclarativeStream: + def create_declarative_stream( + self, model: DeclarativeStreamModel, config: Config, emit_connector_builder_messages: bool, **kwargs + ) -> DeclarativeStream: # When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field # components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the # Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in @@ -441,7 +456,13 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi primary_key = model.primary_key.__root__ if model.primary_key else None retriever = self._create_component_from_model( - model=model.retriever, config=config, name=model.name, primary_key=primary_key, stream_slicer=combined_slicers + model=model.retriever, + config=config, + name=model.name, + primary_key=primary_key, + stream_slicer=combined_slicers, + emit_connector_builder_messages=emit_connector_builder_messages, + **kwargs, ) cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None @@ -522,7 +543,7 @@ def create_default_error_handler(self, model: DefaultErrorHandlerModel, config: parameters=model.parameters, ) - def create_default_paginator(self, model: DefaultPaginatorModel, config: Config, *, url_base: str) -> DefaultPaginator: + def create_default_paginator(self, model: DefaultPaginatorModel, config: Config, *, url_base: str, **kwargs) -> DefaultPaginator: decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={}) page_size_option = ( self._create_component_from_model(model=model.page_size_option, config=config) if model.page_size_option else None @@ -553,10 +574,10 @@ def create_dpath_extractor(self, model: DpathExtractorModel, config: Config, **k def create_exponential_backoff_strategy(model: ExponentialBackoffStrategyModel, config: Config) -> ExponentialBackoffStrategy: return ExponentialBackoffStrategy(factor=model.factor, parameters=model.parameters, config=config) - def create_http_requester(self, model: HttpRequesterModel, config: Config, *, name: str) -> HttpRequester: + def create_http_requester(self, model: HttpRequesterModel, config: Config, *, name: str, **kwargs) -> HttpRequester: authenticator = self._create_component_from_model(model=model.authenticator, config=config) if model.authenticator else None error_handler = ( - self._create_component_from_model(model=model.error_handler, config=config) + self._create_component_from_model(model=model.error_handler, config=config, **kwargs) if model.error_handler else DefaultErrorHandler(backoff_strategies=[], response_filters=[], config=config, parameters=model.parameters) ) @@ -695,7 +716,7 @@ def create_page_increment(model: PageIncrementModel, config: Config, **kwargs) - return PageIncrement(page_size=model.page_size, start_from_page=model.start_from_page, parameters=model.parameters) def create_parent_stream_config(self, model: ParentStreamConfigModel, config: Config, **kwargs) -> ParentStreamConfig: - declarative_stream = self._create_component_from_model(model.stream, config=config) + declarative_stream = self._create_component_from_model(model.stream, config=config, **kwargs) request_option = self._create_component_from_model(model.request_option, config=config) if model.request_option else None return ParentStreamConfig( parent_key=model.parent_key, @@ -750,6 +771,7 @@ def create_simple_retriever( config: Config, *, name: str, + emit_connector_builder_messages: bool, primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], ) -> SimpleRetriever: @@ -762,7 +784,7 @@ def create_simple_retriever( else NoPagination(parameters={}) ) - if self._limit_slices_fetched or self._emit_connector_builder_messages: + if self._limit_slices_fetched or emit_connector_builder_messages: return SimpleRetrieverTestReadDecorator( name=name, paginator=paginator, @@ -799,7 +821,7 @@ def create_substream_partition_router(self, model: SubstreamPartitionRouterModel if model.parent_stream_configs: parent_stream_configs.extend( [ - self._create_component_from_model(model=parent_stream_config, config=config) + self._create_component_from_model(model=parent_stream_config, config=config, **kwargs) for parent_stream_config in model.parent_stream_configs ] ) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index 1f9389e91ad0..9a9514e7f4ec 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -5,7 +5,6 @@ import copy import dataclasses import json -import logging from unittest import mock from unittest.mock import patch @@ -23,7 +22,18 @@ ) from airbyte_cdk.connector_builder.main import handle_connector_builder_request, handle_request, read_stream from airbyte_cdk.connector_builder.models import LogMessage, StreamRead, StreamReadSlicesInner, StreamReadSlicesInnerPagesInner -from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type, AirbyteLogMessage, Level +from airbyte_cdk.models import ( + AirbyteLogMessage, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Level, + SyncMode, +) +from airbyte_cdk.models import Type from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource @@ -524,11 +534,15 @@ def test_create_source(): assert source._constructor._limit_pages_fetched_per_slice == limits.max_pages_per_slice assert source._constructor._limit_slices_fetched == limits.max_slices + def request_log_message(request: dict) -> AirbyteMessage: return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"request:{json.dumps(request)}")) + + def response_log_message(response: dict) -> AirbyteMessage: return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"response:{json.dumps(response)}")) + @patch.object(HttpStream, "_read_pages", return_value=[ request_log_message({}), response_log_message({}), diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 615d484c724c..52f1318ec8d8 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -1312,7 +1312,7 @@ def test_simple_retriever_emit_log_messages(): "requester": {"type": "HttpRequester", "name": "list", "url_base": "orange.com", "path": "/v1/api"}, } - connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + connector_builder_factory = ModelToComponentFactory() retriever = connector_builder_factory.create_component( model_type=SimpleRetrieverModel, component_definition=simple_retriever_model, @@ -1320,6 +1320,7 @@ def test_simple_retriever_emit_log_messages(): name="Test", primary_key="id", stream_slicer=None, + emit_connector_builder_messages=True ) assert isinstance(retriever, SimpleRetrieverTestReadDecorator) From 970390f6d9a92d600cef16594f796355d535e992 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 15:53:09 -0700 Subject: [PATCH 08/15] set the flag --- .../sources/declarative/manifest_declarative_source.py | 4 +++- .../connector_builder/test_connector_builder_handler.py | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index c64aef4c7998..acbbf24fbca0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -85,7 +85,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}) source_streams = [ - self._constructor.create_component(DeclarativeStreamModel, stream_config, config) + self._constructor.create_component( + DeclarativeStreamModel, stream_config, config, emit_connector_builder_messages=self._emit_connector_builder_messages + ) for stream_config in self._stream_configs(self._source_config) ] diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index 9a9514e7f4ec..ff0a9f9365a3 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -37,6 +37,7 @@ from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.retrievers import SimpleRetrieverTestReadDecorator from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http import HttpStream from unit_tests.connector_builder.utils import create_configured_catalog @@ -572,3 +573,7 @@ def test_read_source(mock_http_stream): first_page, second_page = pages[0], pages[1] assert len(first_page["records"]) == _page_size assert len(second_page["records"]) == 1 + + streams = source.streams(config) + for s in streams: + assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator) From 9faeb6f913fbc7b939d451bb82c1e45ec316dd23 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 15:55:48 -0700 Subject: [PATCH 09/15] remove debug print --- .../sources/declarative/parsers/model_to_component_factory.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 589e497df233..0176df4af351 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -215,7 +215,6 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg if model.__class__ not in self.PYDANTIC_MODEL_TO_CONSTRUCTOR: raise ValueError(f"{model.__class__} with attributes {model} is not a valid component type") component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__) - print(f"model: {model} KWARGS: {kwargs}") return component_constructor(model=model, config=config, **kwargs) @staticmethod From bc3cf4d426abb418e5710d8def7f71c8fd6f692b Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 17:22:21 -0700 Subject: [PATCH 10/15] halfmock --- .../test_connector_builder_handler.py | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index ff0a9f9365a3..f85faa21f1e8 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -9,6 +9,8 @@ from unittest.mock import patch import pytest +import requests + from airbyte_cdk import connector_builder from airbyte_cdk.connector_builder.connector_builder_handler import ( DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, @@ -543,12 +545,31 @@ def request_log_message(request: dict) -> AirbyteMessage: def response_log_message(response: dict) -> AirbyteMessage: return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"response:{json.dumps(response)}")) - -@patch.object(HttpStream, "_read_pages", return_value=[ - request_log_message({}), - response_log_message({}), - {"id": 0}, {"id": 1}, request_log_message({}), response_log_message({}), {"id": 2}]) +def create_request(): + url = "https://example.com/api" + headers = {'Content-Type': 'application/json'} + return requests.Request('POST', url, headers=headers, json={"key": "value"}).prepare() + + +def create_response(body): + response = requests.Response() + response.status_code = 200 + response._content = bytes(json.dumps(body), "utf-8") + response.headers["Content-Type"] = "application/json" + return response + +@patch.object(HttpStream, "_fetch_next_page", return_value=( + create_request(), + create_response({ + "result": [{"id": 1}, {"id": 2}], + "_metadata": {} + }))) def test_read_source(mock_http_stream): + """ + This test sort of acts as an integration test for the connector builder with the caveat that + this does not test that the request log and response log messages are emitted. + Instead, this tests that the stream's retriever is a SimpleTestRestDecorator, which is known to emit the log messages + """ max_records = 100 max_pages_per_slice = 2 max_slices = 3 From d838985db75969ce7a506c159c0ebb8c9a043fae Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 18:14:27 -0700 Subject: [PATCH 11/15] clean up --- .../parsers/model_to_component_factory.py | 6 ++-- .../test_connector_builder_handler.py | 28 +++++++++++-------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 0176df4af351..306bd832220d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -192,7 +192,7 @@ def create_component( :param model_type: The type of declarative component that is being initialized :param component_definition: The mapping that represents a declarative component :param config: The connector config that is provided by the customer - :param emit_connector_builder_messages: + :param emit_connector_builder_messages: Flag specifying whether the source must emit log messages at the start of the slice :return: The declarative component to be used at runtime """ @@ -206,9 +206,7 @@ def create_component( raise ValueError(f"Expected {model_type.__name__} component, but received {declarative_component_model.__class__.__name__}") return self._create_component_from_model( - model=declarative_component_model, - config=config, - **{**kwargs, **{"emit_connector_builder_messages": emit_connector_builder_messages}}, + model=declarative_component_model, config=config, emit_connector_builder_messages=emit_connector_builder_messages, **kwargs ) def _create_component_from_model(self, model: BaseModel, config: Config, **kwargs) -> Any: diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index f85faa21f1e8..cfde80c2c36b 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -10,7 +10,6 @@ import pytest import requests - from airbyte_cdk import connector_builder from airbyte_cdk.connector_builder.connector_builder_handler import ( DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, @@ -545,30 +544,35 @@ def request_log_message(request: dict) -> AirbyteMessage: def response_log_message(response: dict) -> AirbyteMessage: return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"response:{json.dumps(response)}")) -def create_request(): + +def _create_request(): url = "https://example.com/api" headers = {'Content-Type': 'application/json'} return requests.Request('POST', url, headers=headers, json={"key": "value"}).prepare() -def create_response(body): +def _create_response(body): response = requests.Response() response.status_code = 200 response._content = bytes(json.dumps(body), "utf-8") response.headers["Content-Type"] = "application/json" return response -@patch.object(HttpStream, "_fetch_next_page", return_value=( - create_request(), - create_response({ - "result": [{"id": 1}, {"id": 2}], - "_metadata": {} - }))) + +def _create_page(response_body): + return _create_request(), _create_response(response_body) + + +@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {}})) * 5) def test_read_source(mock_http_stream): """ - This test sort of acts as an integration test for the connector builder with the caveat that - this does not test that the request log and response log messages are emitted. - Instead, this tests that the stream's retriever is a SimpleTestRestDecorator, which is known to emit the log messages + This test sort of acts as an integration test for the connector builder. + + Each slice has two pages + The first page has two records + The second page one record + + The response._metadata.next field in the first page tells the paginator to fetch the next page. """ max_records = 100 max_pages_per_slice = 2 From 8c70f81628dfebcd60e6f8a26e0cb0dff4a8a8e3 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 18:25:46 -0700 Subject: [PATCH 12/15] Add a test for a single page --- .../test_connector_builder_handler.py | 42 ++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index cfde80c2c36b..c98d5161c809 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -563,7 +563,7 @@ def _create_page(response_body): return _create_request(), _create_response(response_body) -@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {}})) * 5) +@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}})) * 5) def test_read_source(mock_http_stream): """ This test sort of acts as an integration test for the connector builder. @@ -602,3 +602,43 @@ def test_read_source(mock_http_stream): streams = source.streams(config) for s in streams: assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator) + + +@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), )* 5) +def test_read_source_single_page(mock_http_stream): + """ + This test sort of acts as an integration test for the connector builder. + + Each slice has two pages + The first page has two records + The second page one record + + The response._metadata.next field in the first page tells the paginator to fetch the next page. + """ + max_records = 100 + max_pages_per_slice = 1 + max_slices = 3 + limits = TestReadLimits(max_records, max_pages_per_slice, max_slices) + + catalog = ConfiguredAirbyteCatalog(streams=[ + ConfiguredAirbyteStream(stream=AirbyteStream(name=_stream_name, json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append) + ]) + + config = {"__injected_declarative_manifest": MANIFEST} + + source = create_source(config, limits) + + output_data = read_stream(source, config, catalog, limits).record.data + slices = output_data["slices"] + + assert len(slices) == max_slices + for s in slices: + pages = s["pages"] + assert len(pages) == max_pages_per_slice + + first_page = pages[0] + assert len(first_page["records"]) == _page_size + + streams = source.streams(config) + for s in streams: + assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator) From 8c8a59498e38ce00876e215abf72adfa38560dc3 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 13 Apr 2023 18:27:22 -0700 Subject: [PATCH 13/15] Add another test --- .../test_connector_builder_handler.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index c98d5161c809..2ac37f24c6ca 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -563,7 +563,7 @@ def _create_page(response_body): return _create_request(), _create_response(response_body) -@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}})) * 5) +@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}})) * 10) def test_read_source(mock_http_stream): """ This test sort of acts as an integration test for the connector builder. @@ -604,20 +604,11 @@ def test_read_source(mock_http_stream): assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator) -@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), )* 5) -def test_read_source_single_page(mock_http_stream): - """ - This test sort of acts as an integration test for the connector builder. - - Each slice has two pages - The first page has two records - The second page one record - - The response._metadata.next field in the first page tells the paginator to fetch the next page. - """ +@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}}))) +def test_read_source_single_page_single_slice(mock_http_stream): max_records = 100 max_pages_per_slice = 1 - max_slices = 3 + max_slices = 1 limits = TestReadLimits(max_records, max_pages_per_slice, max_slices) catalog = ConfiguredAirbyteCatalog(streams=[ From a053945d5ee23e014db1e8e45116c28eeab6ad9c Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Fri, 14 Apr 2023 08:49:29 -0700 Subject: [PATCH 14/15] Pass the flag --- .../connector_builder_handler.py | 1 + .../manifest_declarative_source.py | 2 +- .../parsers/model_to_component_factory.py | 49 ++++++------------- .../test_model_to_component_factory.py | 3 +- 4 files changed, 18 insertions(+), 37 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py index 1572e70367d1..5637b0955e34 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -48,6 +48,7 @@ def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> Manifest emit_connector_builder_messages=True, source_config=manifest, component_factory=ModelToComponentFactory( + emit_connector_builder_messages=True, limit_pages_fetched_per_slice=limits.max_pages_per_slice, limit_slices_fetched=limits.max_slices) ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index acbbf24fbca0..3bd58166bbc3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -60,7 +60,7 @@ def __init__( self._source_config = propagated_source_config self._debug = debug self._emit_connector_builder_messages = emit_connector_builder_messages - self._constructor = component_factory if component_factory else ModelToComponentFactory() + self._constructor = component_factory if component_factory else ModelToComponentFactory(emit_connector_builder_messages) self._validate_source() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 306bd832220d..c86a848b129c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -117,6 +117,7 @@ def __init__( self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice self._limit_slices_fetched = limit_slices_fetched + self._emit_connector_builder_messages = emit_connector_builder_messages def _init_mappings(self): self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = { @@ -175,15 +176,7 @@ def _init_mappings(self): # Needed for the case where we need to perform a second parse on the fields of a custom component self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR} - def create_component( - self, - model_type: Type[BaseModel], - component_definition: ComponentDefinition, - config: Config, - *, - emit_connector_builder_messages: bool = False, - **kwargs, - ) -> type: + def create_component(self, model_type: Type[BaseModel], component_definition: ComponentDefinition, config: Config, **kwargs) -> type: """ Takes a given Pydantic model type and Mapping representing a component definition and creates a declarative component and subcomponents which will be used at runtime. This is done by first parsing the mapping into a Pydantic model and then creating @@ -192,7 +185,6 @@ def create_component( :param model_type: The type of declarative component that is being initialized :param component_definition: The mapping that represents a declarative component :param config: The connector config that is provided by the customer - :param emit_connector_builder_messages: Flag specifying whether the source must emit log messages at the start of the slice :return: The declarative component to be used at runtime """ @@ -205,9 +197,7 @@ def create_component( if not isinstance(declarative_component_model, model_type): raise ValueError(f"Expected {model_type.__name__} component, but received {declarative_component_model.__class__.__name__}") - return self._create_component_from_model( - model=declarative_component_model, config=config, emit_connector_builder_messages=emit_connector_builder_messages, **kwargs - ) + return self._create_component_from_model(model=declarative_component_model, config=config, **kwargs) def _create_component_from_model(self, model: BaseModel, config: Config, **kwargs) -> Any: if model.__class__ not in self.PYDANTIC_MODEL_TO_CONSTRUCTOR: @@ -306,7 +296,7 @@ def create_custom_component(self, model, config: Config, **kwargs) -> type: model_value["type"] = derived_type if self._is_component(model_value): - model_args[model_field] = self._create_nested_component(model, model_field, model_value, config, **kwargs) + model_args[model_field] = self._create_nested_component(model, model_field, model_value, config) elif isinstance(model_value, list): vals = [] for v in model_value: @@ -315,7 +305,7 @@ def create_custom_component(self, model, config: Config, **kwargs) -> type: if derived_type: v["type"] = derived_type if self._is_component(v): - vals.append(self._create_nested_component(model, model_field, v, config, **kwargs)) + vals.append(self._create_nested_component(model, model_field, v, config)) else: vals.append(v) model_args[model_field] = vals @@ -361,7 +351,7 @@ def _extract_missing_parameters(error: TypeError) -> List[str]: else: return [] - def _create_nested_component(self, model, model_field: str, model_value: Any, config: Config, **kwargs) -> Any: + def _create_nested_component(self, model, model_field: str, model_value: Any, config: Config) -> Any: type_name = model_value.get("type", None) if not type_name: # If no type is specified, we can assume this is a dictionary object which can be returned instead of a subcomponent @@ -381,7 +371,7 @@ def _create_nested_component(self, model, model_field: str, model_value: Any, co constructor_kwargs = inspect.getfullargspec(model_constructor).kwonlyargs model_parameters = model_value.get("$parameters", {}) matching_parameters = {kwarg: model_parameters[kwarg] for kwarg in constructor_kwargs if kwarg in model_parameters} - return self._create_component_from_model(model=parsed_model, config=config, **matching_parameters, **kwargs) + return self._create_component_from_model(model=parsed_model, config=config, **matching_parameters) except TypeError as error: missing_parameters = self._extract_missing_parameters(error) if missing_parameters: @@ -442,9 +432,7 @@ def create_datetime_based_cursor(self, model: DatetimeBasedCursorModel, config: parameters=model.parameters, ) - def create_declarative_stream( - self, model: DeclarativeStreamModel, config: Config, emit_connector_builder_messages: bool, **kwargs - ) -> DeclarativeStream: + def create_declarative_stream(self, model: DeclarativeStreamModel, config: Config, **kwargs) -> DeclarativeStream: # When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field # components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the # Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in @@ -453,13 +441,7 @@ def create_declarative_stream( primary_key = model.primary_key.__root__ if model.primary_key else None retriever = self._create_component_from_model( - model=model.retriever, - config=config, - name=model.name, - primary_key=primary_key, - stream_slicer=combined_slicers, - emit_connector_builder_messages=emit_connector_builder_messages, - **kwargs, + model=model.retriever, config=config, name=model.name, primary_key=primary_key, stream_slicer=combined_slicers ) cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None @@ -540,7 +522,7 @@ def create_default_error_handler(self, model: DefaultErrorHandlerModel, config: parameters=model.parameters, ) - def create_default_paginator(self, model: DefaultPaginatorModel, config: Config, *, url_base: str, **kwargs) -> DefaultPaginator: + def create_default_paginator(self, model: DefaultPaginatorModel, config: Config, *, url_base: str) -> DefaultPaginator: decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={}) page_size_option = ( self._create_component_from_model(model=model.page_size_option, config=config) if model.page_size_option else None @@ -571,10 +553,10 @@ def create_dpath_extractor(self, model: DpathExtractorModel, config: Config, **k def create_exponential_backoff_strategy(model: ExponentialBackoffStrategyModel, config: Config) -> ExponentialBackoffStrategy: return ExponentialBackoffStrategy(factor=model.factor, parameters=model.parameters, config=config) - def create_http_requester(self, model: HttpRequesterModel, config: Config, *, name: str, **kwargs) -> HttpRequester: + def create_http_requester(self, model: HttpRequesterModel, config: Config, *, name: str) -> HttpRequester: authenticator = self._create_component_from_model(model=model.authenticator, config=config) if model.authenticator else None error_handler = ( - self._create_component_from_model(model=model.error_handler, config=config, **kwargs) + self._create_component_from_model(model=model.error_handler, config=config) if model.error_handler else DefaultErrorHandler(backoff_strategies=[], response_filters=[], config=config, parameters=model.parameters) ) @@ -713,7 +695,7 @@ def create_page_increment(model: PageIncrementModel, config: Config, **kwargs) - return PageIncrement(page_size=model.page_size, start_from_page=model.start_from_page, parameters=model.parameters) def create_parent_stream_config(self, model: ParentStreamConfigModel, config: Config, **kwargs) -> ParentStreamConfig: - declarative_stream = self._create_component_from_model(model.stream, config=config, **kwargs) + declarative_stream = self._create_component_from_model(model.stream, config=config) request_option = self._create_component_from_model(model.request_option, config=config) if model.request_option else None return ParentStreamConfig( parent_key=model.parent_key, @@ -768,7 +750,6 @@ def create_simple_retriever( config: Config, *, name: str, - emit_connector_builder_messages: bool, primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], ) -> SimpleRetriever: @@ -781,7 +762,7 @@ def create_simple_retriever( else NoPagination(parameters={}) ) - if self._limit_slices_fetched or emit_connector_builder_messages: + if self._limit_slices_fetched or self._emit_connector_builder_messages: return SimpleRetrieverTestReadDecorator( name=name, paginator=paginator, @@ -818,7 +799,7 @@ def create_substream_partition_router(self, model: SubstreamPartitionRouterModel if model.parent_stream_configs: parent_stream_configs.extend( [ - self._create_component_from_model(model=parent_stream_config, config=config, **kwargs) + self._create_component_from_model(model=parent_stream_config, config=config) for parent_stream_config in model.parent_stream_configs ] ) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 52f1318ec8d8..615d484c724c 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -1312,7 +1312,7 @@ def test_simple_retriever_emit_log_messages(): "requester": {"type": "HttpRequester", "name": "list", "url_base": "orange.com", "path": "/v1/api"}, } - connector_builder_factory = ModelToComponentFactory() + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) retriever = connector_builder_factory.create_component( model_type=SimpleRetrieverModel, component_definition=simple_retriever_model, @@ -1320,7 +1320,6 @@ def test_simple_retriever_emit_log_messages(): name="Test", primary_key="id", stream_slicer=None, - emit_connector_builder_messages=True ) assert isinstance(retriever, SimpleRetrieverTestReadDecorator) From 75cac58f69e9ab61a31deeba6c901dbe69a759f0 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Fri, 14 Apr 2023 10:09:28 -0700 Subject: [PATCH 15/15] rename --- airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py | 6 +++--- .../sources/declarative/manifest_declarative_source.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index a49ae883f94d..8cfcb0b5beaa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -237,7 +237,7 @@ def _read_incremental( has_slices = False for _slice in slices: has_slices = True - if self.log_slice_message(logger): + if self.should_log_slice_message(logger): yield AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), @@ -277,7 +277,7 @@ def _read_incremental( checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager) yield checkpoint - def log_slice_message(self, logger: logging.Logger): + def should_log_slice_message(self, logger: logging.Logger): """ :param logger: @@ -298,7 +298,7 @@ def _read_full_refresh( ) total_records_counter = 0 for _slice in slices: - if self.log_slice_message(logger): + if self.should_log_slice_message(logger): yield AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 3bd58166bbc3..82fbb1dfb36d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -129,8 +129,8 @@ def read( self._configure_logger_level(logger) yield from super().read(logger, config, catalog, state) - def log_slice_message(self, logger: logging.Logger): - return self._emit_connector_builder_messages or super(self).log_slice_message(logger) + def should_log_slice_message(self, logger: logging.Logger): + return self._emit_connector_builder_messages or super(self).should_log_slice_message(logger) def _configure_logger_level(self, logger: logging.Logger): """