Skip to content

Commit

Permalink
Fix: Improve progress updates, including list of streams finalized (#147
Browse files Browse the repository at this point in the history
)
  • Loading branch information
aaronsteers authored Mar 28, 2024
1 parent 7652c33 commit 2e28882
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 4 deletions.
2 changes: 2 additions & 0 deletions airbyte/_processors/sql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ def write_stream_data(
finally:
self._drop_temp_table(temp_table_name, if_exists=True)

progress.log_stream_finalized(stream_name)

# Return the batch handles as measure of work completed.
return batches_to_finalize

Expand Down
2 changes: 1 addition & 1 deletion airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def _tally_records(

for message in messages:
yield message
progress.log_records_read(self._processed_records)
progress.log_records_read(new_total_count=self._processed_records)

def _log_sync_start(
self,
Expand Down
2 changes: 1 addition & 1 deletion examples/run_faker_to_motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
api_key=MOTHERDUCK_API_KEY,
)

result = source.read(cache=cache)
result = source.read(cache=cache, force_full_refresh=True)

for name, records in result.streams.items():
print(f"Stream {name}: {len(records)} records")
6 changes: 6 additions & 0 deletions examples/run_snowflake_faker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""
Usage:
poetry install
poetry run python examples/run_snowflake_faker.py
"""

from __future__ import annotations

import airbyte as ab
Expand Down
19 changes: 18 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ airbyte-source-faker = "^6.0.0"
tomli = "^2.0"
responses = "^0.25.0"
airbyte-source-pokeapi = "^0.2.0"
pytest-mock = "^3.14.0"

[build-system]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"]
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,10 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
can pass across all cache types.
"""
all_cache_type_fixtures: dict[str, str] = {
"BigQuery": "new_bigquery_cache",
# Ordered by priority (fastest first)
"DuckDB": "new_duckdb_cache",
"Postgres": "new_postgres_cache",
"BigQuery": "new_bigquery_cache",
"Snowflake": "new_snowflake_cache",
}
if is_windows():
Expand Down
39 changes: 39 additions & 0 deletions tests/integration_tests/test_all_cache_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import airbyte as ab
from airbyte._executor import _get_bin_dir
from airbyte.progress import progress, ReadProgress


# Product count is always the same, regardless of faker scale.
Expand Down Expand Up @@ -117,6 +118,22 @@ def test_pokeapi_read(
assert len(list(result.cache.streams["pokemon"])) == 1


@pytest.fixture(scope="function")
def progress_mock(
mocker: pytest.MockerFixture,
) -> ReadProgress:
"""Fixture to return a mocked version of progress.progress."""
# Mock the progress object.
mocker.spy(progress, 'reset')
mocker.spy(progress, 'log_records_read')
mocker.spy(progress, 'log_batch_written')
mocker.spy(progress, 'log_batches_finalizing')
mocker.spy(progress, 'log_batches_finalized')
mocker.spy(progress, 'log_stream_finalized')
mocker.spy(progress, 'log_success')
return progress


# Uncomment this line if you want to see performance trace logs.
# You can render perf traces using the viztracer CLI or the VS Code VizTracer Extension.
#@viztracer.trace_and_save(output_dir=".pytest_cache/snowflake_trace/")
Expand All @@ -125,11 +142,33 @@ def test_pokeapi_read(
def test_faker_read(
source_faker_seed_a: ab.Source,
new_generic_cache: ab.caches.CacheBase,
progress_mock: ReadProgress,
) -> None:
"""Test that the append strategy works as expected."""
result = source_faker_seed_a.read(
new_generic_cache, write_strategy="replace", force_full_refresh=True
)
configured_count = source_faker_seed_a._config["count"]

# These numbers expect only 'users' stream selected:

assert progress_mock.total_records_read == configured_count
assert progress_mock.total_records_written == configured_count
assert progress_mock.log_records_read.call_count >= configured_count
assert progress_mock.reset.call_count == 1
assert progress_mock.log_batch_written.call_count == 1
assert progress_mock.total_batches_written == 1
assert progress_mock.log_batches_finalizing.call_count == 1
assert progress_mock.log_batches_finalized.call_count == 1
assert progress_mock.total_batches_finalized == 1
assert progress_mock.finalized_stream_names == {"users"}
assert progress_mock.log_stream_finalized.call_count == 1
assert progress_mock.log_success.call_count == 1

status_msg: str = progress_mock._get_status_message()
assert "Read **0** records" not in status_msg
assert f"Read **{configured_count}** records" in status_msg

assert len(list(result.cache.streams["users"])) == FAKER_SCALE_A


Expand Down

0 comments on commit 2e28882

Please sign in to comment.