Skip to content

Commit

Permalink
simplify and refactor perf_test example script
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Oct 6, 2024
1 parent ba1a8b8 commit f2ce489
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 75 deletions.
1 change: 1 addition & 0 deletions airbyte/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"""For example, --config='{password: "SECRET:MY_PASSWORD"}'."""
)


def _resolve_config(
config: str,
) -> dict[str, Any]:
Expand Down
126 changes: 51 additions & 75 deletions examples/run_perf_test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,77 @@
"""
Simple script to get performance profile of read throughput.
This script accepts a single argument `-e=SCALE` as a power of 10.
This script accepts a single argument `-n=NUM_RECORDS` with record count
provided as a regular number or in scientific notation.
-e=2 is equivalent to 500 records.
-e=3 is equivalent to 5_000 records.
-e=4 is equivalent to 50_000 records.
-e=5 is equivalent to 500_000 records.
-e=6 is equivalent to 5_000_000 records.
When providing in scientific notation:
Use smaller values of `e` (2-3) to understand read and overhead costs.
Use larger values of `e` (4-5) to understand write throughput at scale.
-n=5e2 is equivalent to 500 records.
-n=5e3 is equivalent to 5_000 records.
-n=5e4 is equivalent to 50_000 records.
-n=5e5 is equivalent to 500_000 records.
-n=5e6 is equivalent to 5_000_000 records.
For performance profiling, use `viztracer` to generate a flamegraph:
```
poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=3
poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=5
poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e3
poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e5
```
To run without profiling, prefix script name with `poetry run python`:
```
# Run with 5_000 records
poetry run python ./examples/run_perf_test_reads.py -e=3
poetry run python ./examples/run_perf_test_reads.py -n=1e3
# Run with 500_000 records
poetry run python ./examples/run_perf_test_reads.py -e=5
poetry run python ./examples/run_perf_test_reads.py -n=1e5
# Load 5_000 records to Snowflake
poetry run python ./examples/run_perf_test_reads.py -e=3 --cache=snowflake
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=snowflake
# Load 5_000 records to BigQuery
poetry run python ./examples/run_perf_test_reads.py -e=3 --cache=bigquery
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=bigquery
```
You can also use this script to test destination load performance:
```bash
# Load 5_000 records to BigQuery
poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=e2e
```
Testing raw PyAirbyte throughput with and without caching:
```bash
# Test raw PyAirbyte throughput with caching (Source->Cache):
poetry run python ./examples/run_perf_test_reads.py -e=5
poetry run python ./examples/run_perf_test_reads.py -n=1e3
# Test raw PyAirbyte throughput without caching (Source->Destination):
poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e --no-cache
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=e2e --no-cache
```
Testing Python CDK throughput:
```bash
# Test max throughput:
poetry run python ./examples/run_perf_test_reads.py -n=2400000 --source=hardcoded --destination=e2e
# Test max throughput with 2.4 million records:
poetry run python ./examples/run_perf_test_reads.py -n=2.4e6 --source=hardcoded --destination=e2e
# Analyze tracing data:
poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=3 --source=hardcoded --destination=e2e
poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e3 --source=hardcoded --destination=e2e
```
Note:
- The Faker stream ('purchases') is assumed to be 220 bytes, meaning 4_500 records is
approximately 1 MB. Based on this: 25K records/second is approximately 5.5 MB/s.
- The E2E stream is assumed to be 180 bytes, meaning 5_500 records is
approximately 1 MB. Based on this: 40K records/second is approximately 7.2 MB/s
and 60K records/second is approximately 10.5 MB/s.
"""

from __future__ import annotations

import argparse
import tempfile
from decimal import Decimal
from typing import TYPE_CHECKING

import airbyte as ab
from airbyte.caches import BigQueryCache, CacheBase, SnowflakeCache
from airbyte.destinations import Destination, get_noop_destination
from airbyte.secrets.google_gsm import GoogleGSMSecretManager
from airbyte.sources import get_benchmark_source
from typing_extensions import Literal

if TYPE_CHECKING:
Expand Down Expand Up @@ -142,8 +139,11 @@ def get_cache(

def get_source(
source_alias: str,
num_records: int,
num_records: int | str,
) -> Source:
if isinstance(num_records, str):
num_records = int(Decimal(num_records))

if source_alias == "faker":
return ab.get_source(
"source-faker",
Expand All @@ -152,20 +152,8 @@ def get_source(
streams=["purchases"],
)

if source_alias == "e2e":
return ab.get_source(
"source-e2e",
docker_image="airbyte/source-e2e-test:latest",
streams="*",
config={
"type": "BENCHMARK",
"schema": "FIVE_STRING_COLUMNS",
"terminationCondition": {
"type": "MAX_RECORDS",
"max": num_records,
},
},
)
if source_alias in ["e2e", "benchmark"]:
return get_benchmark_source(num_records=num_records)

if source_alias == "hardcoded":
return ab.get_source(
Expand All @@ -180,32 +168,19 @@ def get_source(


def get_destination(destination_type: str) -> ab.Destination:
if destination_type == "e2e":
return ab.get_destination(
name="destination-e2e-test",
config={
"test_destination": {
"test_destination_type": "LOGGING",
"logging_config": {
"logging_type": "FirstN",
"max_entry_count": 100,
},
}
},
docker_image="airbyte/destination-e2e-test:latest",
)
if destination_type in ["e2e", "noop"]:
return get_noop_destination()

raise ValueError(f"Unknown destination type: {destination_type}") # noqa: TRY003


def main(
e: int | None = None,
n: int | None = None,
n: int | str = "5e5",
cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb",
source_alias: str = "e2e",
destination_type: str | None = None,
) -> None:
num_records: int = n or 5 * (10 ** (e or 3))
num_records = int(Decimal(n))
cache_type = "duckdb" if cache_type is None else cache_type

cache: CacheBase | Literal[False] = get_cache(
Expand All @@ -216,34 +191,31 @@ def main(
num_records=num_records,
)
source.check()
destination: Destination | None = None

if destination_type:
destination = get_destination(destination_type=destination_type)

if cache is not False:
read_result = source.read(cache)
if destination_type:
if destination:
destination.write(read_result)
else:
assert (
destination is not None
), "Destination is required when caching is disabled."
destination.write(source, cache=False)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run performance test reads.")
parser.add_argument(
"-e",
type=int,
help=(
"The scale, as a power of 10."
"Recommended values: 2-3 (500 or 5_000) for read and overhead costs, "
" 4-6 (50K or 5MM) for write throughput. "
"This is mutually exclusive with -n."
),
)
parser.add_argument(
"-n",
type=int,
type=str,
help=(
"The number of records to generate in the source. "
"This is mutually exclusive with -e."
"This can be provided in scientific notation, for instance "
"'2.4e6' for 2.4 million and '5e5' for 500K."
),
)
parser.add_argument(
Expand All @@ -266,7 +238,12 @@ def main(
"while the `faker` source runs natively in Python. The 'hardcoded' source is "
"similar to the 'e2e' source, but written in Python."
),
choices=["faker", "e2e", "hardcoded"],
choices=[
"benchmark",
"e2e",
"hardcoded",
"faker",
],
default="hardcoded",
)
parser.add_argument(
Expand All @@ -279,7 +256,6 @@ def main(
args = parser.parse_args()

main(
e=args.e,
n=args.n,
cache_type=args.cache if not args.no_cache else False,
source_alias=args.source,
Expand Down

0 comments on commit f2ce489

Please sign in to comment.