Skip to content

Commit

Permalink
connector builder: Emit message at start of slice (airbytehq#25180)
Browse files Browse the repository at this point in the history
* Move condition for yielding the slice message to an overwritable method

* Automated Commit - Formatting Changes

* yield the slice log messages

* same for incremental

* refactor

* Revert "refactor"

This reverts commit c594365.

* move flag from factory to source

* set the flag

* remove debug print

* halfmock

* clean up

* Add a test for a single page

* Add another test

* Pass the flag

* rename

---------

Co-authored-by: girarda <girarda@users.noreply.github.com>
  • Loading branch information
2 people authored and btkcodedev committed Apr 26, 2023
1 parent 3360cd4 commit 1fdf3b8
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource:
manifest = config["__injected_declarative_manifest"]
return ManifestDeclarativeSource(
source_config=manifest, component_factory=ModelToComponentFactory(
emit_connector_builder_messages=True,
source_config=manifest,
component_factory=ModelToComponentFactory(
emit_connector_builder_messages=True,
limit_pages_fetched_per_slice=limits.max_pages_per_slice,
limit_slices_fetched=limits.max_slices)
Expand Down
12 changes: 10 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def _read_incremental(
has_slices = False
for _slice in slices:
has_slices = True
if logger.isEnabledFor(logging.DEBUG):
if self.should_log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
Expand Down Expand Up @@ -277,6 +277,14 @@ def _read_incremental(
checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager)
yield checkpoint

def should_log_slice_message(self, logger: logging.Logger):
"""
:param logger:
:return:
"""
return logger.isEnabledFor(logging.DEBUG)

def _read_full_refresh(
self,
logger: logging.Logger,
Expand All @@ -290,7 +298,7 @@ def _read_full_refresh(
)
total_records_counter = 0
for _slice in slices:
if logger.isEnabledFor(logging.DEBUG):
if self.should_log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ class ManifestDeclarativeSource(DeclarativeSource):

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "type", "version"}

def __init__(self, source_config: ConnectionDefinition, debug: bool = False, component_factory: ModelToComponentFactory = None):
def __init__(
self,
source_config: ConnectionDefinition,
debug: bool = False,
emit_connector_builder_messages: bool = False,
component_factory: ModelToComponentFactory = None,
):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
Expand All @@ -53,7 +59,8 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False, com
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters("", resolved_source_config, {})
self._source_config = propagated_source_config
self._debug = debug
self._constructor = component_factory if component_factory else ModelToComponentFactory()
self._emit_connector_builder_messages = emit_connector_builder_messages
self._constructor = component_factory if component_factory else ModelToComponentFactory(emit_connector_builder_messages)

self._validate_source()

Expand All @@ -66,7 +73,9 @@ def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "type" not in check:
check["type"] = "CheckStream"
check_stream = self._constructor.create_component(CheckStreamModel, check, dict())
check_stream = self._constructor.create_component(
CheckStreamModel, check, dict(), emit_connector_builder_messages=self._emit_connector_builder_messages
)
if isinstance(check_stream, ConnectionChecker):
return check_stream
else:
Expand All @@ -76,7 +85,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})

source_streams = [
self._constructor.create_component(DeclarativeStreamModel, stream_config, config)
self._constructor.create_component(
DeclarativeStreamModel, stream_config, config, emit_connector_builder_messages=self._emit_connector_builder_messages
)
for stream_config in self._stream_configs(self._source_config)
]

Expand Down Expand Up @@ -118,6 +129,9 @@ def read(
self._configure_logger_level(logger)
yield from super().read(logger, config, catalog, state)

def should_log_slice_message(self, logger: logging.Logger):
return self._emit_connector_builder_messages or super(self).should_log_slice_message(logger)

def _configure_logger_level(self, logger: logging.Logger):
"""
Set the log level to logging.DEBUG if debug mode is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from unittest.mock import patch

import pytest
import requests
from airbyte_cdk import connector_builder
from airbyte_cdk.connector_builder.connector_builder_handler import (
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE,
Expand All @@ -22,10 +23,22 @@
)
from airbyte_cdk.connector_builder.main import handle_connector_builder_request, handle_request, read_stream
from airbyte_cdk.connector_builder.models import LogMessage, StreamRead, StreamReadSlicesInner, StreamReadSlicesInnerPagesInner
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import (
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
Level,
SyncMode,
)
from airbyte_cdk.models import Type
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.retrievers import SimpleRetrieverTestReadDecorator
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from unit_tests.connector_builder.utils import create_configured_catalog
Expand All @@ -34,23 +47,29 @@
_stream_primary_key = "id"
_stream_url_base = "https://api.sendgrid.com"
_stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base}
_page_size = 2

MANIFEST = {
"version": "0.30.3",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size": _page_size,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path", "type": "RequestPath"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size},
},
"partition_router": {
"type": "ListPartitionRouter",
"values": ["0", "1", "2", "3", "4", "5", "6", "7"],
"cursor_field": "item_id"
},
""
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": "10"},
"request_parameters": {"a_param": "10"},
},
"record_selector": {"extractor": {"field_path": ["result"]}},
},
Expand All @@ -59,7 +78,6 @@
{
"type": "DeclarativeStream",
"$parameters": _stream_options,
"schema_loader": {"$ref": "#/definitions/schema_loader"},
"retriever": "#/definitions/retriever",
},
],
Expand Down Expand Up @@ -169,39 +187,35 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
"type": "DeclarativeSource",
"version": "0.30.3",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size": _page_size,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path", "type": "RequestPath"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size},
},
"partition_router": {
"type": "ListPartitionRouter",
"values": ["0", "1", "2", "3", "4", "5", "6", "7"],
"cursor_field": "item_id",
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": "10"},
"request_parameters": {"a_param": "10"},
},
"record_selector": {"extractor": {"field_path": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"schema_loader": {
"type": "JsonFileSchemaLoader",
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"retriever": {
"type": "SimpleRetriever",
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size": _page_size,
"page_size_option": {
"type": "RequestOption",
"inject_into": "request_parameter",
Expand All @@ -226,7 +240,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
"page_size": 10,
"page_size": _page_size,
},
"name": _stream_name,
"primary_key": _stream_primary_key,
Expand All @@ -244,7 +258,16 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"request_parameters": {"page_size": "10"},
"request_parameters": {"a_param": "10"},
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"partition_router": {
"type": "ListPartitionRouter",
"values": ["0", "1", "2", "3", "4", "5", "6", "7"],
"cursor_field": "item_id",
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
Expand Down Expand Up @@ -512,3 +535,101 @@ def test_create_source():
assert isinstance(source, ManifestDeclarativeSource)
assert source._constructor._limit_pages_fetched_per_slice == limits.max_pages_per_slice
assert source._constructor._limit_slices_fetched == limits.max_slices


def request_log_message(request: dict) -> AirbyteMessage:
return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"request:{json.dumps(request)}"))


def response_log_message(response: dict) -> AirbyteMessage:
return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"response:{json.dumps(response)}"))


def _create_request():
url = "https://example.com/api"
headers = {'Content-Type': 'application/json'}
return requests.Request('POST', url, headers=headers, json={"key": "value"}).prepare()


def _create_response(body):
response = requests.Response()
response.status_code = 200
response._content = bytes(json.dumps(body), "utf-8")
response.headers["Content-Type"] = "application/json"
return response


def _create_page(response_body):
return _create_request(), _create_response(response_body)


@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}})) * 10)
def test_read_source(mock_http_stream):
"""
This test sort of acts as an integration test for the connector builder.
Each slice has two pages
The first page has two records
The second page one record
The response._metadata.next field in the first page tells the paginator to fetch the next page.
"""
max_records = 100
max_pages_per_slice = 2
max_slices = 3
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)

catalog = ConfiguredAirbyteCatalog(streams=[
ConfiguredAirbyteStream(stream=AirbyteStream(name=_stream_name, json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append)
])

config = {"__injected_declarative_manifest": MANIFEST}

source = create_source(config, limits)

output_data = read_stream(source, config, catalog, limits).record.data
slices = output_data["slices"]

assert len(slices) == max_slices
for s in slices:
pages = s["pages"]
assert len(pages) == max_pages_per_slice

first_page, second_page = pages[0], pages[1]
assert len(first_page["records"]) == _page_size
assert len(second_page["records"]) == 1

streams = source.streams(config)
for s in streams:
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)


@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}})))
def test_read_source_single_page_single_slice(mock_http_stream):
max_records = 100
max_pages_per_slice = 1
max_slices = 1
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)

catalog = ConfiguredAirbyteCatalog(streams=[
ConfiguredAirbyteStream(stream=AirbyteStream(name=_stream_name, json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append)
])

config = {"__injected_declarative_manifest": MANIFEST}

source = create_source(config, limits)

output_data = read_stream(source, config, catalog, limits).record.data
slices = output_data["slices"]

assert len(slices) == max_slices
for s in slices:
pages = s["pages"]
assert len(pages) == max_pages_per_slice

first_page = pages[0]
assert len(first_page["records"]) == _page_size

streams = source.streams(config)
for s in streams:
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)

0 comments on commit 1fdf3b8

Please sign in to comment.