Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Jul 18, 2024
1 parent 83df743 commit 44524e3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 33 deletions.
22 changes: 10 additions & 12 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def reset(self, num_streams_expected: int) -> None:

# Reads
self.read_start_time = time.time()
self.first_record_received_time: float | None = None
self.first_record_received_time = None
self.read_end_time = None
self.total_records_read = 0

Expand All @@ -277,14 +277,14 @@ def elapsed_seconds(self) -> float:
return time.time() - self.read_start_time

@property
def elapsed_seconds_since_first_record(self) -> int:
def elapsed_read_time(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_end_time:
return int(
self.finalize_end_time - (self.first_record_received_time or self.read_start_time)
if self.finalize_start_time:
return self.finalize_start_time - (
self.first_record_received_time or self.read_start_time
)

return int(time.time() - (self.first_record_received_time or self.read_start_time))
return time.time() - (self.first_record_received_time or self.read_start_time)

@property
def elapsed_time_string(self) -> str:
Expand Down Expand Up @@ -418,17 +418,15 @@ def _get_status_message(self) -> str:
# Format start time as a friendly string in local timezone:
start_time_str = _to_time_str(self.read_start_time)
records_per_second: float = 0.0
if self.elapsed_seconds_since_first_record > 0:
records_per_second = round(
float(self.total_records_read) / self.elapsed_seconds_since_first_record,
ndigits=1,
)
if self.elapsed_read_time > 0:
records_per_second = self.total_records_read / self.elapsed_read_time

status_message = (
f"### Read Progress\n\n"
f"**Started reading from source at `{start_time_str}`:**\n\n"
f"- Read **{self.total_records_read:,}** records "
f"over **{self.elapsed_read_time_string}** "
f"({records_per_second:,} records / second).\n\n"
f"({records_per_second:,.1f} records / second).\n\n"
)
if self.total_records_written > 0:
status_message += (
Expand Down
50 changes: 29 additions & 21 deletions tests/unit_tests/test_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,29 @@ def _assert_lines(expected_lines, actual_lines: list[str] | str):
if isinstance(actual_lines, list):
actual_lines = "\n".join(actual_lines)
for line in expected_lines:
assert line in actual_lines, f"Missing line: {line}"
assert (
line in actual_lines
), f"Missing line:\n{line}\n\nIn lines:\n\n{actual_lines}"


def test_get_status_message_after_finalizing_records():
# Test that we can render the initial status message before starting to read
with freeze_time("2022-01-01 00:00:00"):
progress = ReadProgress()
expected_lines = [
"Started reading at 00:00:00.",
"Read **0** records over **0 seconds** (0.0 records / second).",
"Started reading from source at `00:00:00`",
"Read **0** records over **0.00 seconds** (0.0 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())

# We need to read one record to start the "time since first record" timer
progress.log_records_read(1)

# Test after reading some records
with freeze_time("2022-01-01 00:01:00"):
progress.log_records_read(100)
expected_lines = [
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())
Expand All @@ -132,23 +137,26 @@ def test_get_status_message_after_finalizing_records():
progress = ReadProgress()
progress.reset(1)
expected_lines = [
"Started reading at 00:00:00.",
"Read **0** records over **0 seconds** (0.0 records / second).",
"Started reading from source at `00:00:00`",
"Read **0** records over **0.00 seconds** (0.0 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())

# We need to read one record to start the "time since first record" timer
progress.log_records_read(1)

# Test after writing some records and starting to finalize
with freeze_time("2022-01-02 00:01:00"):
progress.log_records_read(100)
progress.log_batch_written("stream1", 50)
progress.log_batches_finalizing("stream1", 1)
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
]
_assert_lines(expected_lines, progress._get_status_message())

Expand All @@ -157,12 +165,12 @@ def test_get_status_message_after_finalizing_records():
progress.log_batches_finalized("stream1", 1)
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Finalized **1** batches over 60 seconds.",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
"Processed **1** cache file(s) over **60 seconds**",
]
_assert_lines(expected_lines, progress._get_status_message())

Expand All @@ -172,13 +180,13 @@ def test_get_status_message_after_finalizing_records():
message = progress._get_status_message()
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Finalized **1** batches over 60 seconds.",
"Completed 1 out of 1 streams:",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
"Processed **1** cache file(s) over **60 seconds",
"Completed processing 1 out of 1 streams",
"- stream1",
"Total time elapsed: 2min 0s",
]
Expand Down

0 comments on commit 44524e3

Please sign in to comment.