Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eap): Mutations consumer MVP #6216

Merged
merged 23 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions rust_snuba/src/arroyo_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use rust_arroyo::processing::strategies::InvalidMessage;
/// Some helper functions that work around Arroyo's ergonomics, and should eventually make it into
untitaker marked this conversation as resolved.
Show resolved Hide resolved
/// Arroyo
use rust_arroyo::types::{InnerMessage, Message};

pub fn invalid_message_err<T>(message: &Message<T>) -> Result<InvalidMessage, anyhow::Error> {
let InnerMessage::BrokerMessage(ref msg) = message.inner_message else {
return Err(anyhow::anyhow!("Unexpected message type"));
};

let err = InvalidMessage {
partition: msg.partition,
offset: msg.offset,
};

Ok(err)
}
83 changes: 57 additions & 26 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::factory::ConsumerStrategyFactory;
use crate::logging::{setup_logging, setup_sentry};
use crate::metrics::global_tags::set_global_tag;
use crate::metrics::statsd::StatsDBackend;
use crate::mutations_factory::MutConsumerStrategyFactory;
use crate::processors;
use crate::types::{InsertOrReplacement, KafkaMessageMetadata};

Expand All @@ -38,6 +39,7 @@ pub fn consumer(
enforce_schema: bool,
max_poll_interval_ms: usize,
async_inserts: bool,
allow_mutability: bool,
python_max_queue_depth: Option<usize>,
health_check_file: Option<&str>,
stop_at_timestamp: Option<i64>,
Expand All @@ -61,6 +63,7 @@ pub fn consumer(
stop_at_timestamp,
batch_write_timeout_ms,
max_bytes_before_external_group_by,
allow_mutability,
)
});
}
Expand All @@ -82,6 +85,7 @@ pub fn consumer_impl(
stop_at_timestamp: Option<i64>,
batch_write_timeout_ms: Option<u64>,
max_bytes_before_external_group_by: Option<usize>,
allow_mutability: bool,
) -> usize {
setup_logging();

Expand Down Expand Up @@ -228,33 +232,60 @@ pub fn consumer_impl(
None
};

let factory = ConsumerStrategyFactory {
storage_config: first_storage,
env_config,
logical_topic_name,
max_batch_size,
max_batch_time,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
commitlog_concurrency: ConcurrencyConfig::new(2),
replacements_concurrency: ConcurrencyConfig::new(4),
async_inserts,
python_max_queue_depth,
use_rust_processor,
health_check_file: health_check_file.map(ToOwned::to_owned),
enforce_schema,
commit_log_producer,
replacements_config,
physical_consumer_group: consumer_group.to_owned(),
physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name),
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
max_bytes_before_external_group_by,
};

let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name);
let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy);

let processor = if allow_mutability {
let mut_factory = MutConsumerStrategyFactory {
storage_config: first_storage,
env_config,
logical_topic_name,
max_batch_size,
max_batch_time,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
async_inserts,
python_max_queue_depth,
use_rust_processor,
health_check_file: health_check_file.map(ToOwned::to_owned),
enforce_schema,
commit_log_producer,
physical_consumer_group: consumer_group.to_owned(),
physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name),
untitaker marked this conversation as resolved.
Show resolved Hide resolved
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
max_bytes_before_external_group_by,
};

StreamProcessor::with_kafka(config, mut_factory, topic, dlq_policy)
} else {
let factory = ConsumerStrategyFactory {
storage_config: first_storage,
env_config,
logical_topic_name,
max_batch_size,
max_batch_time,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
commitlog_concurrency: ConcurrencyConfig::new(2),
replacements_concurrency: ConcurrencyConfig::new(4),
async_inserts,
python_max_queue_depth,
use_rust_processor,
health_check_file: health_check_file.map(ToOwned::to_owned),
enforce_schema,
commit_log_producer,
replacements_config,
physical_consumer_group: consumer_group.to_owned(),
physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name),
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
max_bytes_before_external_group_by,
};

StreamProcessor::with_kafka(config, factory, topic, dlq_policy)
};

let mut handle = processor.get_handle();

Expand Down
4 changes: 4 additions & 0 deletions rust_snuba/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
mod arroyo_utils;
mod config;
mod consumer;
mod factory;
mod logging;
mod metrics;
mod mutations;
mod mutations_factory;
mod processors;
mod runtime_config;
mod strategies;
Expand All @@ -27,6 +30,7 @@ pub use config::{
};
pub use factory::ConsumerStrategyFactory;
pub use metrics::statsd::StatsDBackend;
pub use mutations_factory::MutConsumerStrategyFactory;
pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS};
pub use strategies::noop::Noop;
pub use strategies::python::PythonTransformStep;
Expand Down
158 changes: 158 additions & 0 deletions rust_snuba/src/mutations/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::time::Duration;

use rust_arroyo::processing::strategies::run_task_in_threads::{
RunTaskError, RunTaskFunc, TaskRunner,
};
use rust_arroyo::types::Message;
use serde::Serialize;

use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION};
use reqwest::{Client, ClientBuilder};

use crate::mutations::parser::MutationBatch;
use crate::processors::eap_spans::{AttributeMap, PrimaryKey};

#[derive(Clone)]
pub struct ClickhouseWriter {
url: String,
table: String,
client: Client,
}

impl ClickhouseWriter {
pub fn new(
hostname: &str,
http_port: u16,
table: &str,
database: &str,
clickhouse_user: &str,
clickhouse_password: &str,
batch_write_timeout: Option<Duration>,
) -> Self {
let mut headers = HeaderMap::with_capacity(5);
headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate"));
headers.insert(
"X-Clickhouse-User",
HeaderValue::from_str(clickhouse_user).unwrap(),
);
headers.insert(
"X-ClickHouse-Key",
HeaderValue::from_str(clickhouse_password).unwrap(),
);
headers.insert(
"X-ClickHouse-Database",
HeaderValue::from_str(database).unwrap(),
);

let url = format!("http://{hostname}:{http_port}");

let mut client_builder = ClientBuilder::new().default_headers(headers);

if let Some(timeout) = batch_write_timeout {
client_builder = client_builder.timeout(timeout);
}

Self {
url,
table: table.to_owned(),
client: client_builder.build().unwrap(),
}
}

async fn process_message(&self, message: &Message<MutationBatch>) -> anyhow::Result<()> {
let body = format_query(&self.table, message.payload());

self.client
.post(&self.url)
.body(body)
.send()
.await?
.error_for_status()?;

Ok(())
}
}

impl TaskRunner<MutationBatch, (), anyhow::Error> for ClickhouseWriter {
fn get_task(&self, message: Message<MutationBatch>) -> RunTaskFunc<(), anyhow::Error> {
let slf = self.clone();

Box::pin(async move {
slf.process_message(&message)
.await
.map_err(RunTaskError::Other)?;
message.try_map(|_| Ok(()))
})
}
}

fn format_query(table: &str, batch: &MutationBatch) -> Vec<u8> {
let mut attr_columns = String::new();
// attr_combined_columns is intentionally ordered the same as the clickhouse schema.
// INSERT INTO .. SELECT FROM .. matches up columns by position only, and ignores names.
// subqueries don't have this property.
let mut attr_combined_columns = String::new();
for i in 0..20 {
ayirr7 marked this conversation as resolved.
Show resolved Hide resolved
attr_columns.push_str(&format!(",attr_str_{i} Map(String, String)"));

attr_combined_columns.push_str(&format!(
",mapUpdate(old_data.attr_str_{i}, new_data.attr_str_{i}) AS attr_str_{i}"
));
}

for i in 0..20 {
attr_columns.push_str(&format!(",attr_num_{i} Map(String, Float64)"));
attr_combined_columns.push_str(&format!(
",mapUpdate(old_data.attr_num_{i}, new_data.attr_num_{i}) AS attr_num_{i}"
));
}

let mut body = format!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Write cancellation rows

"
INSERT INTO {table}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@untitaker 2 thoughts on this query:

  1. We can rewrite this to avoid JOIN by using the WITH context. We would need to look into the performance differences.

  2. What if we tried a UNION to get both the cancellation and the update in this one query? (SELECT ... UNION SELECT ...)

I'm also wondering if there's a way to do something like INSERT INTO ... VALUES (SELECT ...) (SELECT ...) so we can have 2 insertions back to back?

Just some thoughts -- I can try these out on some local data

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think WITH conflicts directly with input() in my tests. I was also trying to use WITH to make the cancellation rows happen (using both WITH and UNION ALL). I'm not sure how it would help to avoid the join?

If you have a specific query let's discuss 1:1 or just push it, if it works already.

so we can have 2 insertions back to back?

then we should just merge those MutationBatches at JSON level, I don't think the query needs to adapt to accomodate larger batches.

SELECT old_data.* EXCEPT ('attr_.*') {attr_combined_columns}
FROM {table} old_data
JOIN (SELECT * FROM input(
'organization_id UInt64, trace_id UUID, span_id UInt64, _sort_timestamp DateTime {attr_columns}'
)) new_data
ON old_data.organization_id = new_data.organization_id
and old_data.trace_id = new_data.trace_id
and old_data.span_id = new_data.span_id
and old_data._sort_timestamp = new_data._sort_timestamp
and old_data.sign = 1

FORMAT JSONEachRow\n"
)
.into_bytes();

for (filter, update) in &batch.0 {
let mut attributes = AttributeMap::default();
for (k, v) in &update.attr_str {
attributes.insert_str(k.clone(), v.clone());
}

for (k, v) in &update.attr_num {
attributes.insert_num(k.clone(), *v);
}

let row = MutationRow {
filter: filter.clone(),
attributes,
};

serde_json::to_writer(&mut body, &row).unwrap();
body.push(b'\n');
}

body
}

#[derive(Serialize, Default)]
struct MutationRow {
#[serde(flatten)]
attributes: AttributeMap,

#[serde(flatten)]
filter: PrimaryKey,
}
2 changes: 2 additions & 0 deletions rust_snuba/src/mutations/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod clickhouse;
pub mod parser;
Loading
Loading