From f5f7b0372e327b5ed99eb76ed39c6df8ab00d5b7 Mon Sep 17 00:00:00 2001 From: Suraj <153008493+Suraj-Vishwakarma70@users.noreply.github.com> Date: Tue, 9 Jul 2024 21:08:01 +0530 Subject: [PATCH] Fix: Instead of failing, raise a warning if stream selection is performed before config is present (#285) --- airbyte/sources/base.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index e860974d..404214d6 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -81,6 +81,7 @@ def __init__( self._discovered_catalog: AirbyteCatalog | None = None self._spec: ConnectorSpecification | None = None self._selected_stream_names: list[str] = [] + self._to_be_selected_streams: list[str] | str = [] if config is not None: self.set_config(config, validate=validate) if streams is not None: @@ -100,12 +101,29 @@ def set_streams(self, streams: list[str]) -> None: ) self.select_streams(streams) + def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: + """Logs a warning message indicating stream selection which are not selected yet""" + if streams == "*": + print( + "Warning: Config is not set yet. All streams will be selected after config is set." + ) + else: + print( + "Warning: Config is not set yet. " + f"Streams to be selected after config is set: {streams}" + ) + def select_all_streams(self) -> None: """Select all streams. This is a more streamlined equivalent to: > source.select_streams(source.get_available_streams()). """ + if self._config_dict is None: + self._to_be_selected_streams = "*" + self._log_warning_preselected_stream(self._to_be_selected_streams) + return + self._selected_stream_names = self.get_available_streams() def select_streams(self, streams: str | list[str]) -> None: @@ -116,6 +134,11 @@ def select_streams(self, streams: str | list[str]) -> None: Currently, if this is not set, all streams will be read. """ + if self._config_dict is None: + self._to_be_selected_streams = streams + self._log_warning_preselected_stream(streams) + return + if streams == "*": self.select_all_streams() return @@ -159,6 +182,10 @@ def set_config( self._config_dict = config + if self._to_be_selected_streams: + self.select_streams(self._to_be_selected_streams) + self._to_be_selected_streams = [] + def get_config(self) -> dict[str, Any]: """Get the config for the connector.""" return self._config