Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Improve progress prints #302

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 42 additions & 24 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def _get_elapsed_time_str(seconds: float) -> str:
Minutes are always included after 1 minute elapsed.
Hours are always included after 1 hour elapsed.
"""
if seconds <= 2: # noqa: PLR2004 # Magic numbers OK here.
# Less than 1 minute elapsed
return f"{seconds:.2f} seconds"

if seconds <= 60: # noqa: PLR2004 # Magic numbers OK here.
# Less than 1 minute elapsed
return f"{seconds:.0f} seconds"
Expand Down Expand Up @@ -245,6 +249,7 @@ def reset(self, num_streams_expected: int) -> None:

# Reads
self.read_start_time = time.time()
self.first_record_received_time: float | None = None
self.read_end_time = None
self.total_records_read = 0

Expand All @@ -263,12 +268,22 @@ def reset(self, num_streams_expected: int) -> None:
self._start_rich_view()

@property
def elapsed_seconds(self) -> int:
def elapsed_seconds(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_end_time:
return self.finalize_end_time - self.read_start_time

return time.time() - self.read_start_time

@property
def elapsed_seconds_since_first_record(self) -> int:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_end_time:
return int(self.finalize_end_time - self.read_start_time)
return int(
self.finalize_end_time - (self.first_record_received_time or self.read_start_time)
)

return int(time.time() - self.read_start_time)
return int(time.time() - (self.first_record_received_time or self.read_start_time))

@property
def elapsed_time_string(self) -> str:
Expand Down Expand Up @@ -297,13 +312,13 @@ def elapsed_read_time_string(self) -> str:
return _get_elapsed_time_str(self.elapsed_read_seconds)

@property
def elapsed_finalization_seconds(self) -> int:
def elapsed_finalization_seconds(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_start_time is None:
return 0
if self.finalize_end_time is None:
return int(time.time() - self.finalize_start_time)
return int(self.finalize_end_time - self.finalize_start_time)
return time.time() - self.finalize_start_time
return self.finalize_end_time - self.finalize_start_time

@property
def elapsed_finalization_time_str(self) -> str:
Expand All @@ -312,6 +327,9 @@ def elapsed_finalization_time_str(self) -> str:

def log_records_read(self, new_total_count: int) -> None:
"""Load a number of records read."""
if self.first_record_received_time is None:
self.first_record_received_time = time.time()

self.total_records_read = new_total_count

# This is some math to make updates adaptive to the scale of records read.
Expand Down Expand Up @@ -399,50 +417,50 @@ def _get_status_message(self) -> str:
# Format start time as a friendly string in local timezone:
start_time_str = _to_time_str(self.read_start_time)
records_per_second: float = 0.0
if self.elapsed_read_seconds > 0:
if self.elapsed_seconds_since_first_record > 0:
records_per_second = round(
float(self.total_records_read) / self.elapsed_read_seconds,
float(self.total_records_read) / self.elapsed_seconds_since_first_record,
ndigits=1,
)
status_message = (
f"## Read Progress\n\n"
f"Started reading at {start_time_str}.\n\n"
f"Read **{self.total_records_read:,}** records "
f"### Read Progress\n\n"
f"**Started reading from source at `{start_time_str}`:**\n\n"
f"- Read **{self.total_records_read:,}** records "
f"over **{self.elapsed_read_time_string}** "
f"({records_per_second:,} records / second).\n\n"
)
if self.total_records_written > 0:
status_message += (
f"Wrote **{self.total_records_written:,}** records "
f"over {self.total_batches_written:,} batches.\n\n"
f"- Cached **{self.total_records_written:,}** records "
f"into {self.total_batches_written:,} local cache file(s).\n\n"
)
if self.read_end_time is not None:
read_end_time_str = _to_time_str(self.read_end_time)
status_message += f"Finished reading at {read_end_time_str}.\n\n"
status_message += f"- Finished reading from source at `{read_end_time_str}`.\n\n"
if self.finalize_start_time is not None:
finalize_start_time_str = _to_time_str(self.finalize_start_time)
status_message += f"Started finalizing streams at {finalize_start_time_str}.\n\n"
status_message += f"**Started cache processing at `{finalize_start_time_str}`:**\n\n"
status_message += (
f"Finalized **{self.total_batches_finalized}** batches "
f"over {self.elapsed_finalization_time_str}.\n\n"
f"- Processed **{self.total_batches_finalized}** cache "
f"file(s) over **{self.elapsed_finalization_time_str}**.\n\n"
)
if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += f"- Finished cache processing at `{completion_time_str}`.\n\n"

if self.finalized_stream_names:
status_message += (
f"Completed {len(self.finalized_stream_names)} "
f"**Completed processing {len(self.finalized_stream_names)} "
+ (f"out of {self.num_streams_expected} " if self.num_streams_expected else "")
+ "streams:\n\n"
+ "streams:**\n\n"
)
for stream_name in self.finalized_stream_names:
status_message += f" - {stream_name}\n"

status_message += "\n\n"

if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += (
f"Completed writing at {completion_time_str}. "
f"Total time elapsed: {self.elapsed_time_string}\n\n"
)
status_message += f"**Total time elapsed: {self.elapsed_time_string}**\n\n"
status_message += "\n------------------------------------------------\n"

return status_message
Expand Down
Loading