From 900308e853f4f9499c39f76cbb20378d80c2a1ac Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 31 Jul 2024 15:27:23 -0700 Subject: [PATCH 1/3] Chore: Add '--no-cache' to example perf profiling script --- examples/run_perf_test_reads.py | 35 ++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/examples/run_perf_test_reads.py b/examples/run_perf_test_reads.py index e5dee85f..b3a25043 100644 --- a/examples/run_perf_test_reads.py +++ b/examples/run_perf_test_reads.py @@ -45,8 +45,7 @@ approximately 1 MB. Based on this: 25K records/second is approximately 5.5 MB/s. - The E2E stream is assumed to be 180 bytes, meaning 5_500 records is approximately 1 MB. Based on this: 40K records/second is approximately 7.2 MB/s - and 61K records/second is approximately 11 MB/s. - + and 60K records/second is approximately 10.5 MB/s. """ from __future__ import annotations @@ -58,6 +57,7 @@ import airbyte as ab from airbyte.caches import BigQueryCache, CacheBase, SnowflakeCache from airbyte.secrets.google_gsm import GoogleGSMSecretManager +from typing_extensions import Literal if TYPE_CHECKING: from airbyte.sources.base import Source @@ -78,7 +78,12 @@ def get_gsm_secret_json(secret_name: str) -> dict: return secret.parse_json() -def get_cache(cache_type: str) -> CacheBase: +def get_cache( + cache_type: Literal["duckdb", "snowflake", "bigquery", False], +) -> CacheBase | Literal[False]: + if cache_type is False: + return False + if cache_type == "duckdb": return ab.new_local_cache() @@ -168,14 +173,14 @@ def get_destination(destination_type: str) -> ab.Destination: def main( e: int | None = None, n: int | None = None, - cache_type: str = "duckdb", + cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb", source_alias: str = "e2e", destination_type: str | None = None, ) -> None: num_records: int = n or 5 * (10 ** (e or 3)) - cache_type = cache_type or "duckdb" + cache_type = "duckdb" if cache_type is None else cache_type - cache: CacheBase = get_cache( + cache: CacheBase | False = get_cache( cache_type=cache_type, ) source: Source = get_source( @@ -183,10 +188,13 @@ def main( num_records=num_records, ) source.check() - read_result = source.read(cache) - if destination_type: - destination = get_destination(destination_type=destination_type) - destination.write(read_result) + destination = get_destination(destination_type=destination_type) + if cache is not False: + read_result = source.read(cache) + if destination_type: + destination.write(read_result) + else: + destination.write(source, cache=False) if __name__ == "__main__": @@ -216,6 +224,11 @@ def main( choices=["duckdb", "snowflake", "bigquery"], default="duckdb", ) + parser.add_argument( + "--no-cache", + action="store_true", + help="Disable caching.", + ) parser.add_argument( "--source", type=str, @@ -238,7 +251,7 @@ def main( main( e=args.e, n=args.n, - cache_type=args.cache, + cache_type=args.cache if not args.no_cache else False, source_alias=args.source, destination_type=args.destination, ) From 73064ca6599729c91b732fd4777c5f57db9a7a21 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 31 Jul 2024 15:40:49 -0700 Subject: [PATCH 2/3] add raw throughput test examples --- examples/run_perf_test_reads.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/examples/run_perf_test_reads.py b/examples/run_perf_test_reads.py index b3a25043..82d76ae5 100644 --- a/examples/run_perf_test_reads.py +++ b/examples/run_perf_test_reads.py @@ -40,6 +40,15 @@ poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e ``` +Testing raw PyAirbyte throughput with and without caching: + +```bash +# Test raw PyAirbyte throughput with caching (Source->Cache): +poetry run python ./examples/run_perf_test_reads.py -e=5 +# Test raw PyAirbyte throughput without caching (Source->Destination): +poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e --no-cache +``` + Note: - The Faker stream ('purchases') is assumed to be 220 bytes, meaning 4_500 records is approximately 1 MB. Based on this: 25K records/second is approximately 5.5 MB/s. @@ -188,7 +197,8 @@ def main( num_records=num_records, ) source.check() - destination = get_destination(destination_type=destination_type) + if destination_type: + destination = get_destination(destination_type=destination_type) if cache is not False: read_result = source.read(cache) if destination_type: From 1e3d0f64c09f932df8c4dbfb82b006ff90b009ce Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 31 Jul 2024 16:13:17 -0700 Subject: [PATCH 3/3] fix lint issue --- examples/run_perf_test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/run_perf_test_reads.py b/examples/run_perf_test_reads.py index 82d76ae5..87c7f450 100644 --- a/examples/run_perf_test_reads.py +++ b/examples/run_perf_test_reads.py @@ -189,7 +189,7 @@ def main( num_records: int = n or 5 * (10 ** (e or 3)) cache_type = "duckdb" if cache_type is None else cache_type - cache: CacheBase | False = get_cache( + cache: CacheBase | Literal[False] = get_cache( cache_type=cache_type, ) source: Source = get_source(