diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index 98eb2aae01..58cf9a75fc 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -98,7 +98,6 @@ fn create_factory( }, stop_at_timestamp: None, batch_write_timeout: None, - max_bytes_before_external_group_by: None, }; Box::new(factory) } diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index bf5e14e3d9..9da36d0ff2 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -45,7 +45,6 @@ pub fn consumer( health_check_file: Option<&str>, stop_at_timestamp: Option, batch_write_timeout_ms: Option, - max_bytes_before_external_group_by: Option, max_dlq_buffer_length: Option, ) -> usize { py.allow_threads(|| { @@ -64,7 +63,6 @@ pub fn consumer( health_check_file, stop_at_timestamp, batch_write_timeout_ms, - max_bytes_before_external_group_by, mutations_mode, max_dlq_buffer_length, ) @@ -87,7 +85,6 @@ pub fn consumer_impl( health_check_file: Option<&str>, stop_at_timestamp: Option, batch_write_timeout_ms: Option, - max_bytes_before_external_group_by: Option, mutations_mode: bool, max_dlq_buffer_length: Option, ) -> usize { @@ -278,7 +275,6 @@ pub fn consumer_impl( accountant_topic_config: consumer_config.accountant_topic, stop_at_timestamp, batch_write_timeout, - max_bytes_before_external_group_by, }; StreamProcessor::with_kafka(config, factory, topic, dlq_policy) diff --git a/rust_snuba/src/factory.rs b/rust_snuba/src/factory.rs index 640cfcb840..0e4eb8e811 100644 --- a/rust_snuba/src/factory.rs +++ b/rust_snuba/src/factory.rs @@ -57,7 +57,6 @@ pub struct ConsumerStrategyFactory { pub accountant_topic_config: config::TopicConfig, pub stop_at_timestamp: Option, pub batch_write_timeout: Option, - pub max_bytes_before_external_group_by: Option, } impl ProcessingStrategyFactory for ConsumerStrategyFactory { @@ -121,7 +120,6 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactory { &self.storage_config.clickhouse_cluster.password, self.async_inserts, self.batch_write_timeout, - self.max_bytes_before_external_group_by, ); let accumulator = Arc::new( diff --git a/rust_snuba/src/strategies/clickhouse/batch.rs b/rust_snuba/src/strategies/clickhouse/batch.rs index 970d61958b..b43e5acc49 100644 --- a/rust_snuba/src/strategies/clickhouse/batch.rs +++ b/rust_snuba/src/strategies/clickhouse/batch.rs @@ -43,7 +43,6 @@ impl BatchFactory { clickhouse_password: &str, async_inserts: bool, batch_write_timeout: Option, - max_bytes_before_external_group_by: Option, ) -> Self { let mut headers = HeaderMap::with_capacity(5); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); @@ -71,12 +70,6 @@ impl BatchFactory { } } - if let Some(max_bytes_before_external_group_by) = max_bytes_before_external_group_by { - let mut query_segment: String = "&max_bytes_before_external_group_by=".to_owned(); - query_segment.push_str(&max_bytes_before_external_group_by.to_string()); - query_params.push_str(&query_segment) - } - let url = format!("http://{hostname}:{http_port}?{query_params}"); let query = format!("INSERT INTO {table} FORMAT JSONEachRow"); @@ -275,7 +268,6 @@ mod tests { "", false, None, - None, ); let mut batch = factory.new_batch(); @@ -311,43 +303,6 @@ mod tests { "", true, None, - None, - ); - - let mut batch = factory.new_batch(); - - batch - .write_rows(&RowData::from_encoded_rows(vec![ - br#"{"hello": "world"}"#.to_vec() - ])) - .unwrap(); - - concurrency.handle().block_on(batch.finish()).unwrap(); - - mock.assert(); - } - - #[test] - fn test_write_with_external_groupby() { - crate::testutils::initialize_python(); - let server = MockServer::start(); - let mock = server.mock(|when, then| { - when.method(POST).path("/").body("{\"hello\": \"world\"}\n"); - then.status(200).body("hi"); - }); - - let concurrency = ConcurrencyConfig::new(1); - let factory = BatchFactory::new( - &server.host(), - server.port(), - "testtable", - "testdb", - &concurrency, - "default", - "", - true, - None, - Some(500_000), ); let mut batch = factory.new_batch(); @@ -382,7 +337,6 @@ mod tests { "", false, None, - None, ); let mut batch = factory.new_batch(); @@ -415,7 +369,6 @@ mod tests { "", false, None, - None, ); let mut batch = factory.new_batch(); @@ -452,7 +405,6 @@ mod tests { // pass in an unreasonably short timeout // which prevents the client request from reaching Clickhouse Some(Duration::from_millis(0)), - None, ); let mut batch = factory.new_batch(); @@ -487,7 +439,6 @@ mod tests { true, // pass in a reasonable timeout Some(Duration::from_millis(1000)), - None, ); let mut batch = factory.new_batch(); diff --git a/snuba/cli/rust_consumer.py b/snuba/cli/rust_consumer.py index 5e823831ba..3bdc74a155 100644 --- a/snuba/cli/rust_consumer.py +++ b/snuba/cli/rust_consumer.py @@ -178,16 +178,6 @@ default=None, help="Optional timeout for batch writer client connecting and sending request to Clickhouse", ) -@click.option( - "--max-bytes-before-external-group-by", - type=int, - default=None, - help=""" - Allow batching on disk for GROUP BY queries. This is a test mitigation for whether a - materialized view is causing OOM on inserts. If successful, this should be set in storage config. - If not successful, this option should be removed. - """, -) def rust_consumer( *, storage_names: Sequence[str], @@ -217,7 +207,6 @@ def rust_consumer( enforce_schema: bool, stop_at_timestamp: Optional[int], batch_write_timeout_ms: Optional[int], - max_bytes_before_external_group_by: Optional[int], mutations_mode: bool, max_dlq_buffer_length: Optional[int] ) -> None: @@ -271,7 +260,6 @@ def rust_consumer( health_check_file, stop_at_timestamp, batch_write_timeout_ms, - max_bytes_before_external_group_by, max_dlq_buffer_length, )