From 0b3df1340ec61fb7c0e2199a6f25a6b1c3ff100b Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 18 Jul 2024 09:54:11 -0700 Subject: [PATCH 1/3] feat: improve progress prints --- airbyte/progress.py | 66 ++++++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/airbyte/progress.py b/airbyte/progress.py index 70a568da..f30ff7b2 100644 --- a/airbyte/progress.py +++ b/airbyte/progress.py @@ -92,6 +92,10 @@ def _get_elapsed_time_str(seconds: float) -> str: Minutes are always included after 1 minute elapsed. Hours are always included after 1 hour elapsed. """ + if seconds <= 2: # noqa: PLR2004 # Magic numbers OK here. + # Less than 1 minute elapsed + return f"{seconds:.2f} seconds" + if seconds <= 60: # noqa: PLR2004 # Magic numbers OK here. # Less than 1 minute elapsed return f"{seconds:.0f} seconds" @@ -245,6 +249,7 @@ def reset(self, num_streams_expected: int) -> None: # Reads self.read_start_time = time.time() + self.first_record_received_time: float | None = None self.read_end_time = None self.total_records_read = 0 @@ -263,12 +268,22 @@ def reset(self, num_streams_expected: int) -> None: self._start_rich_view() @property - def elapsed_seconds(self) -> int: + def elapsed_seconds(self) -> float: + """Return the number of seconds elapsed since the read operation started.""" + if self.finalize_end_time: + return self.finalize_end_time - self.read_start_time + + return time.time() - self.read_start_time + + @property + def elapsed_seconds_since_first_record(self) -> int: """Return the number of seconds elapsed since the read operation started.""" if self.finalize_end_time: - return int(self.finalize_end_time - self.read_start_time) + return int( + self.finalize_end_time - (self.first_record_received_time or self.read_start_time) + ) - return int(time.time() - self.read_start_time) + return int(time.time() - (self.first_record_received_time or self.read_start_time)) @property def elapsed_time_string(self) -> str: @@ -297,13 +312,13 @@ def elapsed_read_time_string(self) -> str: return _get_elapsed_time_str(self.elapsed_read_seconds) @property - def elapsed_finalization_seconds(self) -> int: + def elapsed_finalization_seconds(self) -> float: """Return the number of seconds elapsed since the read operation started.""" if self.finalize_start_time is None: return 0 if self.finalize_end_time is None: - return int(time.time() - self.finalize_start_time) - return int(self.finalize_end_time - self.finalize_start_time) + return time.time() - self.finalize_start_time + return self.finalize_end_time - self.finalize_start_time @property def elapsed_finalization_time_str(self) -> str: @@ -312,6 +327,9 @@ def elapsed_finalization_time_str(self) -> str: def log_records_read(self, new_total_count: int) -> None: """Load a number of records read.""" + if self.first_record_received_time is None: + self.first_record_received_time = time.time() + self.total_records_read = new_total_count # This is some math to make updates adaptive to the scale of records read. @@ -399,38 +417,42 @@ def _get_status_message(self) -> str: # Format start time as a friendly string in local timezone: start_time_str = _to_time_str(self.read_start_time) records_per_second: float = 0.0 - if self.elapsed_read_seconds > 0: + if self.elapsed_seconds_since_first_record > 0: records_per_second = round( - float(self.total_records_read) / self.elapsed_read_seconds, + float(self.total_records_read) / self.elapsed_seconds_since_first_record, ndigits=1, ) status_message = ( - f"## Read Progress\n\n" - f"Started reading at {start_time_str}.\n\n" - f"Read **{self.total_records_read:,}** records " + f"### Read Progress\n\n" + f"**Started reading from source at `{start_time_str}`:**\n\n" + f"- Read **{self.total_records_read:,}** records " f"over **{self.elapsed_read_time_string}** " f"({records_per_second:,} records / second).\n\n" ) if self.total_records_written > 0: status_message += ( - f"Wrote **{self.total_records_written:,}** records " - f"over {self.total_batches_written:,} batches.\n\n" + f"- Cached **{self.total_records_written:,}** records " + f"into {self.total_batches_written:,} local cache file(s).\n\n" ) if self.read_end_time is not None: read_end_time_str = _to_time_str(self.read_end_time) - status_message += f"Finished reading at {read_end_time_str}.\n\n" + status_message += f"- Finished reading from source at `{read_end_time_str}`.\n\n" if self.finalize_start_time is not None: finalize_start_time_str = _to_time_str(self.finalize_start_time) - status_message += f"Started finalizing streams at {finalize_start_time_str}.\n\n" + status_message += f"**Started cache processing at `{finalize_start_time_str}`:**\n\n" status_message += ( - f"Finalized **{self.total_batches_finalized}** batches " - f"over {self.elapsed_finalization_time_str}.\n\n" + f"- Processed **{self.total_batches_finalized}** cache " + f"file(s) over **{self.elapsed_finalization_time_str}**.\n\n" ) + if self.finalize_end_time is not None: + completion_time_str = _to_time_str(self.finalize_end_time) + status_message += f"- Finished cache processing at `{completion_time_str}`.\n\n" + if self.finalized_stream_names: status_message += ( - f"Completed {len(self.finalized_stream_names)} " + f"**Completed processing {len(self.finalized_stream_names)} " + (f"out of {self.num_streams_expected} " if self.num_streams_expected else "") - + "streams:\n\n" + + "streams:**\n\n" ) for stream_name in self.finalized_stream_names: status_message += f" - {stream_name}\n" @@ -438,11 +460,7 @@ def _get_status_message(self) -> str: status_message += "\n\n" if self.finalize_end_time is not None: - completion_time_str = _to_time_str(self.finalize_end_time) - status_message += ( - f"Completed writing at {completion_time_str}. " - f"Total time elapsed: {self.elapsed_time_string}\n\n" - ) + status_message += f"**Total time elapsed: {self.elapsed_time_string}**\n\n" status_message += "\n------------------------------------------------\n" return status_message From 83df743cb0597b910b08319f819b18cfd08fb2fe Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 18 Jul 2024 09:58:37 -0700 Subject: [PATCH 2/3] fix: missing declaration in constructor --- airbyte/progress.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte/progress.py b/airbyte/progress.py index f30ff7b2..35999a07 100644 --- a/airbyte/progress.py +++ b/airbyte/progress.py @@ -132,6 +132,7 @@ def __init__( # Reads self.read_start_time = time.time() self.read_end_time: float | None = None + self.first_record_received_time: float | None = None self.total_records_read = 0 # Writes From 44524e30a62b4f00b9ab025cafbcae6ceb66a090 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 18 Jul 2024 10:48:35 -0700 Subject: [PATCH 3/3] fix tests --- airbyte/progress.py | 22 +++++++------- tests/unit_tests/test_progress.py | 50 ++++++++++++++++++------------- 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/airbyte/progress.py b/airbyte/progress.py index 35999a07..fcf0f777 100644 --- a/airbyte/progress.py +++ b/airbyte/progress.py @@ -250,7 +250,7 @@ def reset(self, num_streams_expected: int) -> None: # Reads self.read_start_time = time.time() - self.first_record_received_time: float | None = None + self.first_record_received_time = None self.read_end_time = None self.total_records_read = 0 @@ -277,14 +277,14 @@ def elapsed_seconds(self) -> float: return time.time() - self.read_start_time @property - def elapsed_seconds_since_first_record(self) -> int: + def elapsed_read_time(self) -> float: """Return the number of seconds elapsed since the read operation started.""" - if self.finalize_end_time: - return int( - self.finalize_end_time - (self.first_record_received_time or self.read_start_time) + if self.finalize_start_time: + return self.finalize_start_time - ( + self.first_record_received_time or self.read_start_time ) - return int(time.time() - (self.first_record_received_time or self.read_start_time)) + return time.time() - (self.first_record_received_time or self.read_start_time) @property def elapsed_time_string(self) -> str: @@ -418,17 +418,15 @@ def _get_status_message(self) -> str: # Format start time as a friendly string in local timezone: start_time_str = _to_time_str(self.read_start_time) records_per_second: float = 0.0 - if self.elapsed_seconds_since_first_record > 0: - records_per_second = round( - float(self.total_records_read) / self.elapsed_seconds_since_first_record, - ndigits=1, - ) + if self.elapsed_read_time > 0: + records_per_second = self.total_records_read / self.elapsed_read_time + status_message = ( f"### Read Progress\n\n" f"**Started reading from source at `{start_time_str}`:**\n\n" f"- Read **{self.total_records_read:,}** records " f"over **{self.elapsed_read_time_string}** " - f"({records_per_second:,} records / second).\n\n" + f"({records_per_second:,.1f} records / second).\n\n" ) if self.total_records_written > 0: status_message += ( diff --git a/tests/unit_tests/test_progress.py b/tests/unit_tests/test_progress.py index 245073d9..7c3bb634 100644 --- a/tests/unit_tests/test_progress.py +++ b/tests/unit_tests/test_progress.py @@ -105,7 +105,9 @@ def _assert_lines(expected_lines, actual_lines: list[str] | str): if isinstance(actual_lines, list): actual_lines = "\n".join(actual_lines) for line in expected_lines: - assert line in actual_lines, f"Missing line: {line}" + assert ( + line in actual_lines + ), f"Missing line:\n{line}\n\nIn lines:\n\n{actual_lines}" def test_get_status_message_after_finalizing_records(): @@ -113,16 +115,19 @@ def test_get_status_message_after_finalizing_records(): with freeze_time("2022-01-01 00:00:00"): progress = ReadProgress() expected_lines = [ - "Started reading at 00:00:00.", - "Read **0** records over **0 seconds** (0.0 records / second).", + "Started reading from source at `00:00:00`", + "Read **0** records over **0.00 seconds** (0.0 records / second).", ] _assert_lines(expected_lines, progress._get_status_message()) + # We need to read one record to start the "time since first record" timer + progress.log_records_read(1) + # Test after reading some records with freeze_time("2022-01-01 00:01:00"): progress.log_records_read(100) expected_lines = [ - "Started reading at 00:00:00.", + "Started reading from source at `00:00:00`", "Read **100** records over **60 seconds** (1.7 records / second).", ] _assert_lines(expected_lines, progress._get_status_message()) @@ -132,11 +137,14 @@ def test_get_status_message_after_finalizing_records(): progress = ReadProgress() progress.reset(1) expected_lines = [ - "Started reading at 00:00:00.", - "Read **0** records over **0 seconds** (0.0 records / second).", + "Started reading from source at `00:00:00`", + "Read **0** records over **0.00 seconds** (0.0 records / second).", ] _assert_lines(expected_lines, progress._get_status_message()) + # We need to read one record to start the "time since first record" timer + progress.log_records_read(1) + # Test after writing some records and starting to finalize with freeze_time("2022-01-02 00:01:00"): progress.log_records_read(100) @@ -144,11 +152,11 @@ def test_get_status_message_after_finalizing_records(): progress.log_batches_finalizing("stream1", 1) expected_lines = [ "## Read Progress", - "Started reading at 00:00:00.", + "Started reading from source at `00:00:00`", "Read **100** records over **60 seconds** (1.7 records / second).", - "Wrote **50** records over 1 batches.", - "Finished reading at 00:01:00.", - "Started finalizing streams at 00:01:00.", + "Cached **50** records into 1 local cache file(s).", + "Finished reading from source at `00:01:00`", + "Started cache processing at `00:01:00`", ] _assert_lines(expected_lines, progress._get_status_message()) @@ -157,12 +165,12 @@ def test_get_status_message_after_finalizing_records(): progress.log_batches_finalized("stream1", 1) expected_lines = [ "## Read Progress", - "Started reading at 00:00:00.", + "Started reading from source at `00:00:00`", "Read **100** records over **60 seconds** (1.7 records / second).", - "Wrote **50** records over 1 batches.", - "Finished reading at 00:01:00.", - "Started finalizing streams at 00:01:00.", - "Finalized **1** batches over 60 seconds.", + "Cached **50** records into 1 local cache file(s).", + "Finished reading from source at `00:01:00`", + "Started cache processing at `00:01:00`", + "Processed **1** cache file(s) over **60 seconds**", ] _assert_lines(expected_lines, progress._get_status_message()) @@ -172,13 +180,13 @@ def test_get_status_message_after_finalizing_records(): message = progress._get_status_message() expected_lines = [ "## Read Progress", - "Started reading at 00:00:00.", + "Started reading from source at `00:00:00`", "Read **100** records over **60 seconds** (1.7 records / second).", - "Wrote **50** records over 1 batches.", - "Finished reading at 00:01:00.", - "Started finalizing streams at 00:01:00.", - "Finalized **1** batches over 60 seconds.", - "Completed 1 out of 1 streams:", + "Cached **50** records into 1 local cache file(s).", + "Finished reading from source at `00:01:00`", + "Started cache processing at `00:01:00`", + "Processed **1** cache file(s) over **60 seconds", + "Completed processing 1 out of 1 streams", "- stream1", "Total time elapsed: 2min 0s", ]