Skip to content

Commit

Permalink
ref(async-inserts): add clickhouse-concurrency cli arg
Browse files Browse the repository at this point in the history
  • Loading branch information
MeredithAnya committed Jun 4, 2024
1 parent 8a4a698 commit 4339bc5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
5 changes: 4 additions & 1 deletion rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub fn consumer(
no_strict_offset_reset: bool,
consumer_config_raw: &str,
concurrency: usize,
clickhouse_concurrency: usize,
use_rust_processor: bool,
enforce_schema: bool,
max_poll_interval_ms: usize,
Expand All @@ -48,6 +49,7 @@ pub fn consumer(
no_strict_offset_reset,
consumer_config_raw,
concurrency,
clickhouse_concurrency,
use_rust_processor,
enforce_schema,
max_poll_interval_ms,
Expand All @@ -66,6 +68,7 @@ pub fn consumer_impl(
no_strict_offset_reset: bool,
consumer_config_raw: &str,
concurrency: usize,
clickhouse_concurrency: usize,
use_rust_processor: bool,
enforce_schema: bool,
max_poll_interval_ms: usize,
Expand Down Expand Up @@ -215,7 +218,7 @@ pub fn consumer_impl(
max_batch_size,
max_batch_time,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(2),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
commitlog_concurrency: ConcurrencyConfig::new(2),
replacements_concurrency: ConcurrencyConfig::new(4),
async_inserts,
Expand Down
7 changes: 7 additions & 0 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
"--concurrency",
type=int,
)
@click.option(
"--clickhouse-concurrency",
type=int,
help="Number of concurrent clickhouse batches at one time.",
)
@click.option(
"--use-rust-processor/--use-python-processor",
"use_rust_processor",
Expand Down Expand Up @@ -172,6 +177,7 @@ def rust_consumer(
max_batch_time_ms: int,
log_level: str,
concurrency: Optional[int],
clickhouse_concurrency: Optional[int],
use_rust_processor: bool,
group_instance_id: Optional[str],
max_poll_interval_ms: int,
Expand Down Expand Up @@ -215,6 +221,7 @@ def rust_consumer(
no_strict_offset_reset,
consumer_config_raw,
concurrency or 1,
clickhouse_concurrency or 2,
use_rust_processor,
enforce_schema,
max_poll_interval_ms,
Expand Down

0 comments on commit 4339bc5

Please sign in to comment.