Skip to content

Commit

Permalink
feat: improve progress prints
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Jul 18, 2024
1 parent 0616d8a commit 0b3df13
Showing 1 changed file with 42 additions and 24 deletions.
66 changes: 42 additions & 24 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def _get_elapsed_time_str(seconds: float) -> str:
Minutes are always included after 1 minute elapsed.
Hours are always included after 1 hour elapsed.
"""
if seconds <= 2: # noqa: PLR2004 # Magic numbers OK here.
# Less than 1 minute elapsed
return f"{seconds:.2f} seconds"

if seconds <= 60: # noqa: PLR2004 # Magic numbers OK here.
# Less than 1 minute elapsed
return f"{seconds:.0f} seconds"
Expand Down Expand Up @@ -245,6 +249,7 @@ def reset(self, num_streams_expected: int) -> None:

# Reads
self.read_start_time = time.time()
self.first_record_received_time: float | None = None
self.read_end_time = None
self.total_records_read = 0

Expand All @@ -263,12 +268,22 @@ def reset(self, num_streams_expected: int) -> None:
self._start_rich_view()

@property
def elapsed_seconds(self) -> int:
def elapsed_seconds(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_end_time:
return self.finalize_end_time - self.read_start_time

return time.time() - self.read_start_time

@property
def elapsed_seconds_since_first_record(self) -> int:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_end_time:
return int(self.finalize_end_time - self.read_start_time)
return int(
self.finalize_end_time - (self.first_record_received_time or self.read_start_time)
)

return int(time.time() - self.read_start_time)
return int(time.time() - (self.first_record_received_time or self.read_start_time))

@property
def elapsed_time_string(self) -> str:
Expand Down Expand Up @@ -297,13 +312,13 @@ def elapsed_read_time_string(self) -> str:
return _get_elapsed_time_str(self.elapsed_read_seconds)

@property
def elapsed_finalization_seconds(self) -> int:
def elapsed_finalization_seconds(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_start_time is None:
return 0
if self.finalize_end_time is None:
return int(time.time() - self.finalize_start_time)
return int(self.finalize_end_time - self.finalize_start_time)
return time.time() - self.finalize_start_time
return self.finalize_end_time - self.finalize_start_time

@property
def elapsed_finalization_time_str(self) -> str:
Expand All @@ -312,6 +327,9 @@ def elapsed_finalization_time_str(self) -> str:

def log_records_read(self, new_total_count: int) -> None:
"""Load a number of records read."""
if self.first_record_received_time is None:
self.first_record_received_time = time.time()

self.total_records_read = new_total_count

# This is some math to make updates adaptive to the scale of records read.
Expand Down Expand Up @@ -399,50 +417,50 @@ def _get_status_message(self) -> str:
# Format start time as a friendly string in local timezone:
start_time_str = _to_time_str(self.read_start_time)
records_per_second: float = 0.0
if self.elapsed_read_seconds > 0:
if self.elapsed_seconds_since_first_record > 0:
records_per_second = round(
float(self.total_records_read) / self.elapsed_read_seconds,
float(self.total_records_read) / self.elapsed_seconds_since_first_record,
ndigits=1,
)
status_message = (
f"## Read Progress\n\n"
f"Started reading at {start_time_str}.\n\n"
f"Read **{self.total_records_read:,}** records "
f"### Read Progress\n\n"
f"**Started reading from source at `{start_time_str}`:**\n\n"
f"- Read **{self.total_records_read:,}** records "
f"over **{self.elapsed_read_time_string}** "
f"({records_per_second:,} records / second).\n\n"
)
if self.total_records_written > 0:
status_message += (
f"Wrote **{self.total_records_written:,}** records "
f"over {self.total_batches_written:,} batches.\n\n"
f"- Cached **{self.total_records_written:,}** records "
f"into {self.total_batches_written:,} local cache file(s).\n\n"
)
if self.read_end_time is not None:
read_end_time_str = _to_time_str(self.read_end_time)
status_message += f"Finished reading at {read_end_time_str}.\n\n"
status_message += f"- Finished reading from source at `{read_end_time_str}`.\n\n"
if self.finalize_start_time is not None:
finalize_start_time_str = _to_time_str(self.finalize_start_time)
status_message += f"Started finalizing streams at {finalize_start_time_str}.\n\n"
status_message += f"**Started cache processing at `{finalize_start_time_str}`:**\n\n"
status_message += (
f"Finalized **{self.total_batches_finalized}** batches "
f"over {self.elapsed_finalization_time_str}.\n\n"
f"- Processed **{self.total_batches_finalized}** cache "
f"file(s) over **{self.elapsed_finalization_time_str}**.\n\n"
)
if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += f"- Finished cache processing at `{completion_time_str}`.\n\n"

if self.finalized_stream_names:
status_message += (
f"Completed {len(self.finalized_stream_names)} "
f"**Completed processing {len(self.finalized_stream_names)} "
+ (f"out of {self.num_streams_expected} " if self.num_streams_expected else "")
+ "streams:\n\n"
+ "streams:**\n\n"
)
for stream_name in self.finalized_stream_names:
status_message += f" - {stream_name}\n"

status_message += "\n\n"

if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += (
f"Completed writing at {completion_time_str}. "
f"Total time elapsed: {self.elapsed_time_string}\n\n"
)
status_message += f"**Total time elapsed: {self.elapsed_time_string}**\n\n"
status_message += "\n------------------------------------------------\n"

return status_message
Expand Down

0 comments on commit 0b3df13

Please sign in to comment.