From aea02e1b41ae4774e8ab9b72939a98fb59922861 Mon Sep 17 00:00:00 2001 From: V Singh <143428836+vspanxcode@users.noreply.github.com> Date: Mon, 8 Jul 2024 06:22:09 +0530 Subject: [PATCH] Feat: Enable logging when new stream starts (#288) --- airbyte/sources/base.py | 7 +++++++ tests/unit_tests/test_lowcode_connectors.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index c1cd4b3a..c2944a8b 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -75,6 +75,7 @@ def __init__( self.executor = executor self.name = name self._processed_records = 0 + self._stream_names_observed: set[str] = set() self._config_dict: dict[str, Any] | None = None self._last_log_messages: list[str] = [] self._discovered_catalog: AirbyteCatalog | None = None @@ -582,6 +583,9 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) if message.type is Type.RECORD: self._processed_records += 1 + if message.record.stream not in self._stream_names_observed: + self._stream_names_observed.add(message.record.stream) + self._log_stream_read_start(message.record.stream) if message.type == Type.LOG: self._add_to_logs(message.log.message) if message.type == Type.TRACE and message.trace.type == TraceType.ERROR: @@ -620,6 +624,9 @@ def _log_sync_start( event_type=EventType.SYNC, ) + def _log_stream_read_start(self, stream: str) -> None: + print(f"Read started on stream: {stream} at {pendulum.now().format('HH:mm:ss')}...") + def _log_sync_success( self, *, diff --git a/tests/unit_tests/test_lowcode_connectors.py b/tests/unit_tests/test_lowcode_connectors.py index 0147b721..502825b8 100644 --- a/tests/unit_tests/test_lowcode_connectors.py +++ b/tests/unit_tests/test_lowcode_connectors.py @@ -28,4 +28,4 @@ def test_nocode_execution(connector_name: str, config: dict) -> None: source.read() for name, records in source.read().streams.items(): assert name - assert len(records) > 0 + assert len(records) > 0, f"No records were returned from the '{name}' stream."