Skip to content

Commit

Permalink
airbyte-lib: Fix processed records counter (#34857)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Reuter authored Feb 6, 2024
1 parent 5b91b5b commit 65002d4
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
2 changes: 1 addition & 1 deletion airbyte-lib/airbyte_lib/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def log_records_read(self, new_total_count: int) -> None:
# We want to update the display more often when the count is low, and less
# often when the count is high.
updated_period = min(
MAX_UPDATE_FREQUENCY, 10 ** math.floor(math.log10(self.total_records_read) / 4)
MAX_UPDATE_FREQUENCY, 10 ** math.floor(math.log10(max(self.total_records_read, 1)) / 4)
)
if self.total_records_read % updated_period != 0:
return
Expand Down
12 changes: 6 additions & 6 deletions airbyte-lib/airbyte_lib/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from airbyte_protocol.models import (
AirbyteCatalog,
AirbyteMessage,
AirbyteRecordMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
Expand Down Expand Up @@ -363,7 +362,7 @@ def _read(
cache_info: CacheTelemetryInfo,
*,
force_full_refresh: bool,
) -> Iterable[AirbyteRecordMessage]:
) -> Iterable[AirbyteMessage]:
"""
Call read on the connector.
Expand All @@ -372,7 +371,7 @@ def _read(
* Generate a configured catalog that syncs all streams in full_refresh mode
* Write the configured catalog and the config to a temporary file
* execute the connector with read --config <config_file> --catalog <catalog_file>
* Listen to the messages and return the AirbyteRecordMessages that come along.
* Listen to the messages and return the AirbyteMessage that come along.
"""
# Ensure discovered and configured catalog properties are cached before we start reading
_ = self.discovered_catalog
Expand Down Expand Up @@ -462,14 +461,15 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]:

def _tally_records(
self,
messages: Iterable[AirbyteRecordMessage],
) -> Generator[AirbyteRecordMessage, Any, None]:
messages: Iterable[AirbyteMessage],
) -> Generator[AirbyteMessage, Any, None]:
"""This method simply tallies the number of records processed and yields the messages."""
self._processed_records = 0 # Reset the counter before we start
progress.reset(len(self._selected_stream_names or []))

for message in messages:
self._processed_records += 1
if message.type is Type.RECORD:
self._processed_records += 1
yield message
progress.log_records_read(self._processed_records)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def run():
args = parse_args()
catalog = get_json_file(args["--catalog"])
config = get_json_file(args["--config"])
print(json.dumps({"type": "LOG", "log": {"level": "INFO", "message": "Starting sync"}}))
for stream in catalog["streams"]:
if stream["stream"]["name"] == "stream1":
print(json.dumps(sample_record1_stream1))
Expand Down

0 comments on commit 65002d4

Please sign in to comment.