Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connector builder: Emit message at start of slice #25180

Merged
merged 16 commits into from
Apr 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource:
manifest = config["__injected_declarative_manifest"]
return ManifestDeclarativeSource(
source_config=manifest, component_factory=ModelToComponentFactory(
emit_connector_builder_messages=True,
emit_connector_builder_messages=True,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the flag from the ModelToComponentFactory to the Source

source_config=manifest,
component_factory=ModelToComponentFactory(
limit_pages_fetched_per_slice=limits.max_pages_per_slice,
limit_slices_fetched=limits.max_slices)
)
Expand Down
12 changes: 10 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def _read_incremental(
has_slices = False
for _slice in slices:
has_slices = True
if logger.isEnabledFor(logging.DEBUG):
if self.log_slice_message(logger):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to log_slice_message method so it can be overwritten

yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
Expand Down Expand Up @@ -277,6 +277,14 @@ def _read_incremental(
checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager)
yield checkpoint

def log_slice_message(self, logger: logging.Logger):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call this should_log_slice_message() since the method says whether we should emit the log message, it doesn't actually log it. And type hint for return

"""

:param logger:
:return:
"""
return logger.isEnabledFor(logging.DEBUG)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency, we still look at the log level


def _read_full_refresh(
self,
logger: logging.Logger,
Expand All @@ -290,7 +298,7 @@ def _read_full_refresh(
)
total_records_counter = 0
for _slice in slices:
if logger.isEnabledFor(logging.DEBUG):
if self.log_slice_message(logger):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the new method

yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ class ManifestDeclarativeSource(DeclarativeSource):

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "type", "version"}

def __init__(self, source_config: ConnectionDefinition, debug: bool = False, component_factory: ModelToComponentFactory = None):
def __init__(
self,
source_config: ConnectionDefinition,
debug: bool = False,
emit_connector_builder_messages: bool = False,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add emit_connector_builder_messages to the constructor since it's not in the ModelToComponentFactory anymore

component_factory: ModelToComponentFactory = None,
brianjlai marked this conversation as resolved.
Show resolved Hide resolved
):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
Expand All @@ -53,6 +59,7 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False, com
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters("", resolved_source_config, {})
self._source_config = propagated_source_config
self._debug = debug
self._emit_connector_builder_messages = emit_connector_builder_messages
self._constructor = component_factory if component_factory else ModelToComponentFactory()

self._validate_source()
Expand All @@ -66,7 +73,9 @@ def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "type" not in check:
check["type"] = "CheckStream"
check_stream = self._constructor.create_component(CheckStreamModel, check, dict())
check_stream = self._constructor.create_component(
CheckStreamModel, check, dict(), emit_connector_builder_messages=self._emit_connector_builder_messages
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass the field as argument since the ModelToComponentFactory doesn't know about it

)
if isinstance(check_stream, ConnectionChecker):
return check_stream
else:
Expand All @@ -76,7 +85,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})

source_streams = [
self._constructor.create_component(DeclarativeStreamModel, stream_config, config)
self._constructor.create_component(
DeclarativeStreamModel, stream_config, config, emit_connector_builder_messages=self._emit_connector_builder_messages
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass the field as argument since the ModelToComponentFactory doesn't know about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass the field as argument since the ModelToComponentFactory doesn't know about it

)
for stream_config in self._stream_configs(self._source_config)
]

Expand Down Expand Up @@ -118,6 +129,9 @@ def read(
self._configure_logger_level(logger)
yield from super().read(logger, config, catalog, state)

def log_slice_message(self, logger: logging.Logger):
return self._emit_connector_builder_messages or super(self).log_slice_message(logger)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log the slice message if _emit_connector_builder_messages or if the debug flag is turned on for consistency


def _configure_logger_level(self, logger: logging.Logger):
"""
Set the log level to logging.DEBUG if debug mode is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def __init__(
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
self._limit_slices_fetched = limit_slices_fetched
self._emit_connector_builder_messages = emit_connector_builder_messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels more difficult to have to propagate emit_connector_builder_messages through every constructor in the this factory.

Should we instead just leave this code as is and as part of manifest_declarative_source.py:__init__() we continue to pass emit_connector_builder_messages as a constructor parameter as we are already doing. In that file it would look something like this:

self._source_config = propagated_source_config
self._debug = debug
self._emit_connector_builder_messages = emit_connector_builder_messages
self._constructor = component_factory if component_factory else ModelToComponentFactory(emit_connector_builder_messages=True)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def _init_mappings(self):
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = {
Expand Down Expand Up @@ -176,7 +175,15 @@ def _init_mappings(self):
# Needed for the case where we need to perform a second parse on the fields of a custom component
self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR}

def create_component(self, model_type: Type[BaseModel], component_definition: ComponentDefinition, config: Config, **kwargs) -> type:
def create_component(
self,
model_type: Type[BaseModel],
component_definition: ComponentDefinition,
config: Config,
*,
emit_connector_builder_messages: bool = False,
**kwargs,
) -> type:
"""
Takes a given Pydantic model type and Mapping representing a component definition and creates a declarative component and
subcomponents which will be used at runtime. This is done by first parsing the mapping into a Pydantic model and then creating
Expand All @@ -185,6 +192,7 @@ def create_component(self, model_type: Type[BaseModel], component_definition: Co
:param model_type: The type of declarative component that is being initialized
:param component_definition: The mapping that represents a declarative component
:param config: The connector config that is provided by the customer
:param emit_connector_builder_messages: Flag specifying whether the source must emit log messages at the start of the slice
:return: The declarative component to be used at runtime
"""

Expand All @@ -197,7 +205,9 @@ def create_component(self, model_type: Type[BaseModel], component_definition: Co
if not isinstance(declarative_component_model, model_type):
raise ValueError(f"Expected {model_type.__name__} component, but received {declarative_component_model.__class__.__name__}")

return self._create_component_from_model(model=declarative_component_model, config=config, **kwargs)
return self._create_component_from_model(
model=declarative_component_model, config=config, emit_connector_builder_messages=emit_connector_builder_messages, **kwargs
)

def _create_component_from_model(self, model: BaseModel, config: Config, **kwargs) -> Any:
if model.__class__ not in self.PYDANTIC_MODEL_TO_CONSTRUCTOR:
Expand Down Expand Up @@ -296,7 +306,7 @@ def create_custom_component(self, model, config: Config, **kwargs) -> type:
model_value["type"] = derived_type

if self._is_component(model_value):
model_args[model_field] = self._create_nested_component(model, model_field, model_value, config)
model_args[model_field] = self._create_nested_component(model, model_field, model_value, config, **kwargs)
elif isinstance(model_value, list):
vals = []
for v in model_value:
Expand All @@ -305,7 +315,7 @@ def create_custom_component(self, model, config: Config, **kwargs) -> type:
if derived_type:
v["type"] = derived_type
if self._is_component(v):
vals.append(self._create_nested_component(model, model_field, v, config))
vals.append(self._create_nested_component(model, model_field, v, config, **kwargs))
else:
vals.append(v)
model_args[model_field] = vals
Expand Down Expand Up @@ -351,7 +361,7 @@ def _extract_missing_parameters(error: TypeError) -> List[str]:
else:
return []

def _create_nested_component(self, model, model_field: str, model_value: Any, config: Config) -> Any:
def _create_nested_component(self, model, model_field: str, model_value: Any, config: Config, **kwargs) -> Any:
type_name = model_value.get("type", None)
if not type_name:
# If no type is specified, we can assume this is a dictionary object which can be returned instead of a subcomponent
Expand All @@ -371,7 +381,7 @@ def _create_nested_component(self, model, model_field: str, model_value: Any, co
constructor_kwargs = inspect.getfullargspec(model_constructor).kwonlyargs
model_parameters = model_value.get("$parameters", {})
matching_parameters = {kwarg: model_parameters[kwarg] for kwarg in constructor_kwargs if kwarg in model_parameters}
return self._create_component_from_model(model=parsed_model, config=config, **matching_parameters)
return self._create_component_from_model(model=parsed_model, config=config, **matching_parameters, **kwargs)
except TypeError as error:
missing_parameters = self._extract_missing_parameters(error)
if missing_parameters:
Expand Down Expand Up @@ -432,7 +442,9 @@ def create_datetime_based_cursor(self, model: DatetimeBasedCursorModel, config:
parameters=model.parameters,
)

def create_declarative_stream(self, model: DeclarativeStreamModel, config: Config, **kwargs) -> DeclarativeStream:
def create_declarative_stream(
self, model: DeclarativeStreamModel, config: Config, emit_connector_builder_messages: bool, **kwargs
) -> DeclarativeStream:
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
Expand All @@ -441,7 +453,13 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi

primary_key = model.primary_key.__root__ if model.primary_key else None
retriever = self._create_component_from_model(
model=model.retriever, config=config, name=model.name, primary_key=primary_key, stream_slicer=combined_slicers
model=model.retriever,
config=config,
name=model.name,
primary_key=primary_key,
stream_slicer=combined_slicers,
emit_connector_builder_messages=emit_connector_builder_messages,
**kwargs,
)

cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
Expand Down Expand Up @@ -522,7 +540,7 @@ def create_default_error_handler(self, model: DefaultErrorHandlerModel, config:
parameters=model.parameters,
)

def create_default_paginator(self, model: DefaultPaginatorModel, config: Config, *, url_base: str) -> DefaultPaginator:
def create_default_paginator(self, model: DefaultPaginatorModel, config: Config, *, url_base: str, **kwargs) -> DefaultPaginator:
decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={})
page_size_option = (
self._create_component_from_model(model=model.page_size_option, config=config) if model.page_size_option else None
Expand Down Expand Up @@ -553,10 +571,10 @@ def create_dpath_extractor(self, model: DpathExtractorModel, config: Config, **k
def create_exponential_backoff_strategy(model: ExponentialBackoffStrategyModel, config: Config) -> ExponentialBackoffStrategy:
return ExponentialBackoffStrategy(factor=model.factor, parameters=model.parameters, config=config)

def create_http_requester(self, model: HttpRequesterModel, config: Config, *, name: str) -> HttpRequester:
def create_http_requester(self, model: HttpRequesterModel, config: Config, *, name: str, **kwargs) -> HttpRequester:
authenticator = self._create_component_from_model(model=model.authenticator, config=config) if model.authenticator else None
error_handler = (
self._create_component_from_model(model=model.error_handler, config=config)
self._create_component_from_model(model=model.error_handler, config=config, **kwargs)
if model.error_handler
else DefaultErrorHandler(backoff_strategies=[], response_filters=[], config=config, parameters=model.parameters)
)
Expand Down Expand Up @@ -695,7 +713,7 @@ def create_page_increment(model: PageIncrementModel, config: Config, **kwargs) -
return PageIncrement(page_size=model.page_size, start_from_page=model.start_from_page, parameters=model.parameters)

def create_parent_stream_config(self, model: ParentStreamConfigModel, config: Config, **kwargs) -> ParentStreamConfig:
declarative_stream = self._create_component_from_model(model.stream, config=config)
declarative_stream = self._create_component_from_model(model.stream, config=config, **kwargs)
request_option = self._create_component_from_model(model.request_option, config=config) if model.request_option else None
return ParentStreamConfig(
parent_key=model.parent_key,
Expand Down Expand Up @@ -750,6 +768,7 @@ def create_simple_retriever(
config: Config,
*,
name: str,
emit_connector_builder_messages: bool,
primary_key: Optional[Union[str, List[str], List[List[str]]]],
stream_slicer: Optional[StreamSlicer],
) -> SimpleRetriever:
Expand All @@ -762,7 +781,7 @@ def create_simple_retriever(
else NoPagination(parameters={})
)

if self._limit_slices_fetched or self._emit_connector_builder_messages:
if self._limit_slices_fetched or emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
paginator=paginator,
Expand Down Expand Up @@ -799,7 +818,7 @@ def create_substream_partition_router(self, model: SubstreamPartitionRouterModel
if model.parent_stream_configs:
parent_stream_configs.extend(
[
self._create_component_from_model(model=parent_stream_config, config=config)
self._create_component_from_model(model=parent_stream_config, config=config, **kwargs)
for parent_stream_config in model.parent_stream_configs
]
)
Expand Down
Loading