From 65002d4145d649ff8cc94d3cf8a597d8242885fc Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 6 Feb 2024 11:40:33 +0100 Subject: [PATCH] airbyte-lib: Fix processed records counter (#34857) --- airbyte-lib/airbyte_lib/progress.py | 2 +- airbyte-lib/airbyte_lib/source.py | 12 ++++++------ .../fixtures/source-test/source_test/run.py | 1 + 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/airbyte-lib/airbyte_lib/progress.py b/airbyte-lib/airbyte_lib/progress.py index 14e1044801a0..0a2d76e6f993 100644 --- a/airbyte-lib/airbyte_lib/progress.py +++ b/airbyte-lib/airbyte_lib/progress.py @@ -200,7 +200,7 @@ def log_records_read(self, new_total_count: int) -> None: # We want to update the display more often when the count is low, and less # often when the count is high. updated_period = min( - MAX_UPDATE_FREQUENCY, 10 ** math.floor(math.log10(self.total_records_read) / 4) + MAX_UPDATE_FREQUENCY, 10 ** math.floor(math.log10(max(self.total_records_read, 1)) / 4) ) if self.total_records_read % updated_period != 0: return diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index ab76922c13eb..b033f27ff586 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -13,7 +13,6 @@ from airbyte_protocol.models import ( AirbyteCatalog, AirbyteMessage, - AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, ConnectorSpecification, @@ -363,7 +362,7 @@ def _read( cache_info: CacheTelemetryInfo, *, force_full_refresh: bool, - ) -> Iterable[AirbyteRecordMessage]: + ) -> Iterable[AirbyteMessage]: """ Call read on the connector. @@ -372,7 +371,7 @@ def _read( * Generate a configured catalog that syncs all streams in full_refresh mode * Write the configured catalog and the config to a temporary file * execute the connector with read --config --catalog - * Listen to the messages and return the AirbyteRecordMessages that come along. + * Listen to the messages and return the AirbyteMessage that come along. """ # Ensure discovered and configured catalog properties are cached before we start reading _ = self.discovered_catalog @@ -462,14 +461,15 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: def _tally_records( self, - messages: Iterable[AirbyteRecordMessage], - ) -> Generator[AirbyteRecordMessage, Any, None]: + messages: Iterable[AirbyteMessage], + ) -> Generator[AirbyteMessage, Any, None]: """This method simply tallies the number of records processed and yields the messages.""" self._processed_records = 0 # Reset the counter before we start progress.reset(len(self._selected_stream_names or [])) for message in messages: - self._processed_records += 1 + if message.type is Type.RECORD: + self._processed_records += 1 yield message progress.log_records_read(self._processed_records) diff --git a/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/run.py b/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/run.py index d17502ba4c42..213f94fd23a7 100644 --- a/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/run.py +++ b/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/run.py @@ -124,6 +124,7 @@ def run(): args = parse_args() catalog = get_json_file(args["--catalog"]) config = get_json_file(args["--config"]) + print(json.dumps({"type": "LOG", "log": {"level": "INFO", "message": "Starting sync"}})) for stream in catalog["streams"]: if stream["stream"]["name"] == "stream1": print(json.dumps(sample_record1_stream1))