Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Apr 13, 2023
1 parent 4c343fe commit c594365
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 28 deletions.
13 changes: 2 additions & 11 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class AbstractSource(Source, ABC):
in this class to create an Airbyte Specification compliant Source.
"""

SLICE_LOG_PREFIX = "slice:"

@abstractmethod
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
Expand Down Expand Up @@ -237,11 +236,7 @@ def _read_incremental(
has_slices = False
for _slice in slices:
has_slices = True
if self.log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
)
yield from stream_instance.start_of_slice_messages(logger, _slice)
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=_slice,
Expand Down Expand Up @@ -298,11 +293,7 @@ def _read_full_refresh(
)
total_records_counter = 0
for _slice in slices:
if self.log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
)
yield from stream_instance.start_of_slice_messages(logger, _slice)
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

Expand Down Expand Up @@ -129,3 +129,9 @@ def stream_slices(
"""
# this is not passing the cursor field because it is known at init time
return self.retriever.stream_slices(sync_mode=sync_mode, stream_state=stream_state)


@dataclass
class DeclarativeStreamTestReadDecorator(DeclarativeStream):
def log_slice_message(self, logger: logging.Logger):
return True
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ def read(
self._configure_logger_level(logger)
yield from super().read(logger, config, catalog, state)

def log_slice_message(self, logger: logging.Logger):
# Commented so the unit tests fail
return self._constructor._emit_connector_builder_messages or super(self).log_slice_message(logger)

def _configure_logger_level(self, logger: logging.Logger):
"""
Set the log level to logging.DEBUG if debug mode is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream, DeclarativeStreamTestReadDecorator
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor
Expand Down Expand Up @@ -458,16 +458,29 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi
if model.transformations:
for transformation_model in model.transformations:
transformations.append(self._create_component_from_model(model=transformation_model, config=config))
return DeclarativeStream(
name=model.name,
primary_key=primary_key,
retriever=retriever,
schema_loader=schema_loader,
stream_cursor_field=cursor_field or "",
transformations=transformations,
config=config,
parameters=model.parameters,
)
if self._emit_connector_builder_messages:
return DeclarativeStreamTestReadDecorator(
name=model.name,
primary_key=primary_key,
retriever=retriever,
schema_loader=schema_loader,
stream_cursor_field=cursor_field or "",
transformations=transformations,
config=config,
parameters=model.parameters,
)
else:
return DeclarativeStream(
name=model.name,
primary_key=primary_key,
retriever=retriever,
schema_loader=schema_loader,
stream_cursor_field=cursor_field or "",
transformations=transformations,
config=config,
parameters=model.parameters,
)


def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -> Optional[StreamSlicer]:
incremental_sync = (
Expand Down
21 changes: 20 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@


import inspect
import json
import logging
import typing
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, AirbyteTraceMessage, Level, SyncMode
from airbyte_cdk.models import Type as MessageType

# list of all possible HTTP methods which can be used for sending of request bodies
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
Expand Down Expand Up @@ -75,6 +77,8 @@ class Stream(ABC):
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
"""

SLICE_LOG_PREFIX = "slice:"

# Use self.logger in subclasses to log any messages
@property
def logger(self):
Expand Down Expand Up @@ -247,6 +251,21 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
"""
return {}

def start_of_slice_messages(self, logger: logging.Logger, stream_slice: Mapping[str, Any]) -> typing.Iterator[AirbyteMessage]:
if self.log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(stream_slice, default=str)}"),
)

def log_slice_message(self, logger: logging.Logger):
"""
:param logger:
:return:
"""
return logger.isEnabledFor(logging.DEBUG)

@staticmethod
def _wrapped_primary_key(keys: Optional[Union[str, List[str], List[List[str]]]]) -> Optional[List[List[str]]]:
"""
Expand Down

0 comments on commit c594365

Please sign in to comment.