Skip to content

Commit

Permalink
Fix: Instead of failing, raise a warning if stream selection is perfo…
Browse files Browse the repository at this point in the history
…rmed before config is present (#285)
  • Loading branch information
Suraj-Vishwakarma70 authored Jul 9, 2024
1 parent 2682ed2 commit f5f7b03
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
self._discovered_catalog: AirbyteCatalog | None = None
self._spec: ConnectorSpecification | None = None
self._selected_stream_names: list[str] = []
self._to_be_selected_streams: list[str] | str = []
if config is not None:
self.set_config(config, validate=validate)
if streams is not None:
Expand All @@ -100,12 +101,29 @@ def set_streams(self, streams: list[str]) -> None:
)
self.select_streams(streams)

def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
"""Logs a warning message indicating stream selection which are not selected yet"""
if streams == "*":
print(
"Warning: Config is not set yet. All streams will be selected after config is set."
)
else:
print(
"Warning: Config is not set yet. "
f"Streams to be selected after config is set: {streams}"
)

def select_all_streams(self) -> None:
"""Select all streams.
This is a more streamlined equivalent to:
> source.select_streams(source.get_available_streams()).
"""
if self._config_dict is None:
self._to_be_selected_streams = "*"
self._log_warning_preselected_stream(self._to_be_selected_streams)
return

self._selected_stream_names = self.get_available_streams()

def select_streams(self, streams: str | list[str]) -> None:
Expand All @@ -116,6 +134,11 @@ def select_streams(self, streams: str | list[str]) -> None:
Currently, if this is not set, all streams will be read.
"""
if self._config_dict is None:
self._to_be_selected_streams = streams
self._log_warning_preselected_stream(streams)
return

if streams == "*":
self.select_all_streams()
return
Expand Down Expand Up @@ -159,6 +182,10 @@ def set_config(

self._config_dict = config

if self._to_be_selected_streams:
self.select_streams(self._to_be_selected_streams)
self._to_be_selected_streams = []

def get_config(self) -> dict[str, Any]:
"""Get the config for the connector."""
return self._config
Expand Down

0 comments on commit f5f7b03

Please sign in to comment.