Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Improve progress prints #302

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def _get_elapsed_time_str(seconds: float) -> str:
Minutes are always included after 1 minute elapsed.
Hours are always included after 1 hour elapsed.
"""
if seconds <= 2: # noqa: PLR2004 # Magic numbers OK here.
# Less than 1 minute elapsed
return f"{seconds:.2f} seconds"

if seconds <= 60: # noqa: PLR2004 # Magic numbers OK here.
# Less than 1 minute elapsed
return f"{seconds:.0f} seconds"
Expand Down Expand Up @@ -128,6 +132,7 @@ def __init__(
# Reads
self.read_start_time = time.time()
self.read_end_time: float | None = None
self.first_record_received_time: float | None = None
self.total_records_read = 0

# Writes
Expand Down Expand Up @@ -245,6 +250,7 @@ def reset(self, num_streams_expected: int) -> None:

# Reads
self.read_start_time = time.time()
self.first_record_received_time = None
self.read_end_time = None
self.total_records_read = 0

Expand All @@ -263,12 +269,22 @@ def reset(self, num_streams_expected: int) -> None:
self._start_rich_view()

@property
def elapsed_seconds(self) -> int:
def elapsed_seconds(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_end_time:
return int(self.finalize_end_time - self.read_start_time)
return self.finalize_end_time - self.read_start_time

return time.time() - self.read_start_time

@property
def elapsed_read_time(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_start_time:
return self.finalize_start_time - (
self.first_record_received_time or self.read_start_time
)

return int(time.time() - self.read_start_time)
return time.time() - (self.first_record_received_time or self.read_start_time)

@property
def elapsed_time_string(self) -> str:
Expand Down Expand Up @@ -297,13 +313,13 @@ def elapsed_read_time_string(self) -> str:
return _get_elapsed_time_str(self.elapsed_read_seconds)

@property
def elapsed_finalization_seconds(self) -> int:
def elapsed_finalization_seconds(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_start_time is None:
return 0
if self.finalize_end_time is None:
return int(time.time() - self.finalize_start_time)
return int(self.finalize_end_time - self.finalize_start_time)
return time.time() - self.finalize_start_time
return self.finalize_end_time - self.finalize_start_time

@property
def elapsed_finalization_time_str(self) -> str:
Expand All @@ -312,6 +328,9 @@ def elapsed_finalization_time_str(self) -> str:

def log_records_read(self, new_total_count: int) -> None:
"""Load a number of records read."""
if self.first_record_received_time is None:
self.first_record_received_time = time.time()

self.total_records_read = new_total_count

# This is some math to make updates adaptive to the scale of records read.
Expand Down Expand Up @@ -399,50 +418,48 @@ def _get_status_message(self) -> str:
# Format start time as a friendly string in local timezone:
start_time_str = _to_time_str(self.read_start_time)
records_per_second: float = 0.0
if self.elapsed_read_seconds > 0:
records_per_second = round(
float(self.total_records_read) / self.elapsed_read_seconds,
ndigits=1,
)
if self.elapsed_read_time > 0:
records_per_second = self.total_records_read / self.elapsed_read_time

status_message = (
f"## Read Progress\n\n"
f"Started reading at {start_time_str}.\n\n"
f"Read **{self.total_records_read:,}** records "
f"### Read Progress\n\n"
f"**Started reading from source at `{start_time_str}`:**\n\n"
f"- Read **{self.total_records_read:,}** records "
f"over **{self.elapsed_read_time_string}** "
f"({records_per_second:,} records / second).\n\n"
f"({records_per_second:,.1f} records / second).\n\n"
)
if self.total_records_written > 0:
status_message += (
f"Wrote **{self.total_records_written:,}** records "
f"over {self.total_batches_written:,} batches.\n\n"
f"- Cached **{self.total_records_written:,}** records "
f"into {self.total_batches_written:,} local cache file(s).\n\n"
)
if self.read_end_time is not None:
read_end_time_str = _to_time_str(self.read_end_time)
status_message += f"Finished reading at {read_end_time_str}.\n\n"
status_message += f"- Finished reading from source at `{read_end_time_str}`.\n\n"
if self.finalize_start_time is not None:
finalize_start_time_str = _to_time_str(self.finalize_start_time)
status_message += f"Started finalizing streams at {finalize_start_time_str}.\n\n"
status_message += f"**Started cache processing at `{finalize_start_time_str}`:**\n\n"
status_message += (
f"Finalized **{self.total_batches_finalized}** batches "
f"over {self.elapsed_finalization_time_str}.\n\n"
f"- Processed **{self.total_batches_finalized}** cache "
f"file(s) over **{self.elapsed_finalization_time_str}**.\n\n"
)
if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += f"- Finished cache processing at `{completion_time_str}`.\n\n"

if self.finalized_stream_names:
status_message += (
f"Completed {len(self.finalized_stream_names)} "
f"**Completed processing {len(self.finalized_stream_names)} "
+ (f"out of {self.num_streams_expected} " if self.num_streams_expected else "")
+ "streams:\n\n"
+ "streams:**\n\n"
)
for stream_name in self.finalized_stream_names:
status_message += f" - {stream_name}\n"

status_message += "\n\n"

if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += (
f"Completed writing at {completion_time_str}. "
f"Total time elapsed: {self.elapsed_time_string}\n\n"
)
status_message += f"**Total time elapsed: {self.elapsed_time_string}**\n\n"
status_message += "\n------------------------------------------------\n"

return status_message
Expand Down
50 changes: 29 additions & 21 deletions tests/unit_tests/test_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,29 @@ def _assert_lines(expected_lines, actual_lines: list[str] | str):
if isinstance(actual_lines, list):
actual_lines = "\n".join(actual_lines)
for line in expected_lines:
assert line in actual_lines, f"Missing line: {line}"
assert (
line in actual_lines
), f"Missing line:\n{line}\n\nIn lines:\n\n{actual_lines}"


def test_get_status_message_after_finalizing_records():
# Test that we can render the initial status message before starting to read
with freeze_time("2022-01-01 00:00:00"):
progress = ReadProgress()
expected_lines = [
"Started reading at 00:00:00.",
"Read **0** records over **0 seconds** (0.0 records / second).",
"Started reading from source at `00:00:00`",
"Read **0** records over **0.00 seconds** (0.0 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())

# We need to read one record to start the "time since first record" timer
progress.log_records_read(1)

# Test after reading some records
with freeze_time("2022-01-01 00:01:00"):
progress.log_records_read(100)
expected_lines = [
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())
Expand All @@ -132,23 +137,26 @@ def test_get_status_message_after_finalizing_records():
progress = ReadProgress()
progress.reset(1)
expected_lines = [
"Started reading at 00:00:00.",
"Read **0** records over **0 seconds** (0.0 records / second).",
"Started reading from source at `00:00:00`",
"Read **0** records over **0.00 seconds** (0.0 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())

# We need to read one record to start the "time since first record" timer
progress.log_records_read(1)

# Test after writing some records and starting to finalize
with freeze_time("2022-01-02 00:01:00"):
progress.log_records_read(100)
progress.log_batch_written("stream1", 50)
progress.log_batches_finalizing("stream1", 1)
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
]
_assert_lines(expected_lines, progress._get_status_message())

Expand All @@ -157,12 +165,12 @@ def test_get_status_message_after_finalizing_records():
progress.log_batches_finalized("stream1", 1)
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Finalized **1** batches over 60 seconds.",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
"Processed **1** cache file(s) over **60 seconds**",
]
_assert_lines(expected_lines, progress._get_status_message())

Expand All @@ -172,13 +180,13 @@ def test_get_status_message_after_finalizing_records():
message = progress._get_status_message()
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Finalized **1** batches over 60 seconds.",
"Completed 1 out of 1 streams:",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
"Processed **1** cache file(s) over **60 seconds",
"Completed processing 1 out of 1 streams",
"- stream1",
"Total time elapsed: 2min 0s",
]
Expand Down