diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index 2ea272f3..0d8b6f98 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -548,6 +548,8 @@ def write_stream_data( finally: self._drop_temp_table(temp_table_name, if_exists=True) + progress.log_stream_finalized(stream_name) + # Return the batch handles as measure of work completed. return batches_to_finalize diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index a6054186..5dea2d54 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -611,7 +611,7 @@ def _tally_records( for message in messages: yield message - progress.log_records_read(self._processed_records) + progress.log_records_read(new_total_count=self._processed_records) def _log_sync_start( self, diff --git a/examples/run_faker_to_motherduck.py b/examples/run_faker_to_motherduck.py index 6fa024f9..aecee90a 100644 --- a/examples/run_faker_to_motherduck.py +++ b/examples/run_faker_to_motherduck.py @@ -35,7 +35,7 @@ api_key=MOTHERDUCK_API_KEY, ) -result = source.read(cache=cache) +result = source.read(cache=cache, force_full_refresh=True) for name, records in result.streams.items(): print(f"Stream {name}: {len(records)} records") diff --git a/examples/run_snowflake_faker.py b/examples/run_snowflake_faker.py index d0ea5910..3e5f7b8f 100644 --- a/examples/run_snowflake_faker.py +++ b/examples/run_snowflake_faker.py @@ -1,4 +1,10 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +""" +Usage: + poetry install + poetry run python examples/run_snowflake_faker.py +""" + from __future__ import annotations import airbyte as ab diff --git a/poetry.lock b/poetry.lock index ed8c5fd7..5cd17835 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1893,6 +1893,23 @@ pytest = ">=4.0,<8.0" docker-compose-v1 = ["docker-compose (>=1.27.3,<2.0)"] tests = ["pytest-pycodestyle (>=2.0.0,<3.0)", "pytest-pylint (>=0.14.1,<1.0)", "requests (>=2.22.0,<3.0)"] +[[package]] +name = "pytest-mock" +version = "3.14.0" +description = "Thin-wrapper around the mock package for easier use with pytest" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0"}, + {file = "pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f"}, +] + +[package.dependencies] +pytest = ">=6.2.5" + +[package.extras] +dev = ["pre-commit", "pytest-asyncio", "tox"] + [[package]] name = "pytest-mypy" version = "0.10.3" @@ -2759,4 +2776,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "6d0f5ac339648d6d1ed37194baa79af2302a09c6ce2850575bd4c5081f7fc5e4" +content-hash = "794c7c2ea986106433a869349d1463fa3cc0a2bfb18d98f4ab85e2c635100623" diff --git a/pyproject.toml b/pyproject.toml index 50656e18..dc8cda2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,7 @@ airbyte-source-faker = "^6.0.0" tomli = "^2.0" responses = "^0.25.0" airbyte-source-pokeapi = "^0.2.0" +pytest-mock = "^3.14.0" [build-system] requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"] diff --git a/tests/conftest.py b/tests/conftest.py index 88dcf215..30d8a7ed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -325,9 +325,10 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: can pass across all cache types. """ all_cache_type_fixtures: dict[str, str] = { - "BigQuery": "new_bigquery_cache", + # Ordered by priority (fastest first) "DuckDB": "new_duckdb_cache", "Postgres": "new_postgres_cache", + "BigQuery": "new_bigquery_cache", "Snowflake": "new_snowflake_cache", } if is_windows(): diff --git a/tests/integration_tests/test_all_cache_types.py b/tests/integration_tests/test_all_cache_types.py index fb61d652..5b6f8f46 100644 --- a/tests/integration_tests/test_all_cache_types.py +++ b/tests/integration_tests/test_all_cache_types.py @@ -15,6 +15,7 @@ import airbyte as ab from airbyte._executor import _get_bin_dir +from airbyte.progress import progress, ReadProgress # Product count is always the same, regardless of faker scale. @@ -117,6 +118,22 @@ def test_pokeapi_read( assert len(list(result.cache.streams["pokemon"])) == 1 +@pytest.fixture(scope="function") +def progress_mock( + mocker: pytest.MockerFixture, +) -> ReadProgress: + """Fixture to return a mocked version of progress.progress.""" + # Mock the progress object. + mocker.spy(progress, 'reset') + mocker.spy(progress, 'log_records_read') + mocker.spy(progress, 'log_batch_written') + mocker.spy(progress, 'log_batches_finalizing') + mocker.spy(progress, 'log_batches_finalized') + mocker.spy(progress, 'log_stream_finalized') + mocker.spy(progress, 'log_success') + return progress + + # Uncomment this line if you want to see performance trace logs. # You can render perf traces using the viztracer CLI or the VS Code VizTracer Extension. #@viztracer.trace_and_save(output_dir=".pytest_cache/snowflake_trace/") @@ -125,11 +142,33 @@ def test_pokeapi_read( def test_faker_read( source_faker_seed_a: ab.Source, new_generic_cache: ab.caches.CacheBase, + progress_mock: ReadProgress, ) -> None: """Test that the append strategy works as expected.""" result = source_faker_seed_a.read( new_generic_cache, write_strategy="replace", force_full_refresh=True ) + configured_count = source_faker_seed_a._config["count"] + + # These numbers expect only 'users' stream selected: + + assert progress_mock.total_records_read == configured_count + assert progress_mock.total_records_written == configured_count + assert progress_mock.log_records_read.call_count >= configured_count + assert progress_mock.reset.call_count == 1 + assert progress_mock.log_batch_written.call_count == 1 + assert progress_mock.total_batches_written == 1 + assert progress_mock.log_batches_finalizing.call_count == 1 + assert progress_mock.log_batches_finalized.call_count == 1 + assert progress_mock.total_batches_finalized == 1 + assert progress_mock.finalized_stream_names == {"users"} + assert progress_mock.log_stream_finalized.call_count == 1 + assert progress_mock.log_success.call_count == 1 + + status_msg: str = progress_mock._get_status_message() + assert "Read **0** records" not in status_msg + assert f"Read **{configured_count}** records" in status_msg + assert len(list(result.cache.streams["users"])) == FAKER_SCALE_A