Skip to content

Commit

Permalink
?
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jan 10, 2024
1 parent 402e575 commit e0cc667
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 20 deletions.
30 changes: 23 additions & 7 deletions shotover-proxy/benches/windsock/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use test_helpers::docker_compose::docker_compose;
use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use test_helpers::rdkafka::client::DefaultClientContext;
use test_helpers::rdkafka::config::ClientConfig;
use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord};
Expand Down Expand Up @@ -347,6 +349,24 @@ impl Bench for KafkaBench {
// only one string field so we just directly store the value in resources
let broker_address = resources;

let admin: AdminClient<DefaultClientContext> = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.create()
.unwrap();
admin
.create_topics(
&[NewTopic {
name: "topic_foo",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
}],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(60)))),
)
.await
.unwrap();

let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.set("message.timeout.ms", "5000")
Expand Down Expand Up @@ -413,14 +433,10 @@ struct BenchTaskProducerKafka {
#[async_trait]
impl BenchTaskProducer for BenchTaskProducerKafka {
async fn produce_one(&self) -> Result<(), String> {
let key = rand::random::<[u8; 4]>();
// key is set to None which will result in round robin routing between all brokers
let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message);
self.producer
.send(
FutureRecord::to("topic_foo")
.payload(&self.message)
.key(&key),
Timeout::Never,
)
.send(record, Timeout::Never)
.await
// Take just the error, ignoring the message contents because large messages result in unreadable noise in the logs.
.map_err(|e| format!("{:?}", e.0))
Expand Down
49 changes: 47 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,54 @@
use std::time::Duration;
use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use test_helpers::rdkafka::client::DefaultClientContext;
use test_helpers::rdkafka::config::ClientConfig;
use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::rdkafka::util::Timeout;
use test_helpers::rdkafka::Message;

async fn produce_consume(brokers: &str, topic_name: &str) {
async fn admin(brokers: &str) {
let admin: AdminClient<DefaultClientContext> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.create()
.unwrap();
admin
.create_topics(
&[
NewTopic {
name: "foo",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "acks0",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "to_delete",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
admin
.delete_topics(
&["to_delete"],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
}

async fn produce_consume(brokers: &str) {
let topic_name = "foo";
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
Expand Down Expand Up @@ -97,6 +141,7 @@ async fn produce_consume_acks0(brokers: &str) {
}

pub async fn basic(address: &str) {
produce_consume(address, "foo").await;
admin(address).await;
produce_consume(address).await;
produce_consume_acks0(address).await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ services:
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment:
&environment
environment: &environment
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.2:9092"
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093"
KAFKA_CFG_NODE_ID: 0
volumes:
&volumes
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
Expand Down
17 changes: 12 additions & 5 deletions shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use crate::codec::kafka::RequestHeader as CodecRequestHeader;
use anyhow::{anyhow, Context, Result};
use bytes::{BufMut, Bytes, BytesMut};
use kafka_protocol::messages::{
ApiKey, DescribeClusterResponse, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest,
JoinGroupResponse, LeaderAndIsrRequest, ListOffsetsRequest, ListOffsetsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest,
ProduceResponse, RequestHeader, ResponseHeader, SyncGroupRequest, SyncGroupResponse,
ApiKey, CreateTopicsRequest, DeleteTopicsRequest, DescribeClusterResponse, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, HeartbeatRequest,
HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest,
ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, ProduceRequest, ProduceResponse, RequestHeader, ResponseHeader,
SyncGroupRequest, SyncGroupResponse,
};
use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, StrBytes};
use std::fmt::{Display, Formatter, Result as FmtResult};
Expand Down Expand Up @@ -81,6 +82,8 @@ pub enum RequestBody {
FindCoordinator(FindCoordinatorRequest),
LeaderAndIsr(LeaderAndIsrRequest),
Heartbeat(HeartbeatRequest),
CreateTopics(CreateTopicsRequest),
DeleteTopics(DeleteTopicsRequest),
Unknown { api_key: ApiKey, message: Bytes },
}

Expand Down Expand Up @@ -156,6 +159,8 @@ impl KafkaFrame {
}
ApiKey::LeaderAndIsrKey => RequestBody::LeaderAndIsr(decode(&mut bytes, version)?),
ApiKey::HeartbeatKey => RequestBody::Heartbeat(decode(&mut bytes, version)?),
ApiKey::CreateTopicsKey => RequestBody::CreateTopics(decode(&mut bytes, version)?),
ApiKey::DeleteTopicsKey => RequestBody::DeleteTopics(decode(&mut bytes, version)?),
api_key => RequestBody::Unknown {
api_key,
message: bytes,
Expand Down Expand Up @@ -229,6 +234,8 @@ impl KafkaFrame {
RequestBody::FindCoordinator(x) => encode(x, bytes, version)?,
RequestBody::LeaderAndIsr(x) => encode(x, bytes, version)?,
RequestBody::Heartbeat(x) => encode(x, bytes, version)?,
RequestBody::CreateTopics(x) => encode(x, bytes, version)?,
RequestBody::DeleteTopics(x) => encode(x, bytes, version)?,
RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message),
}
}
Expand Down
71 changes: 69 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::Hasher;
use std::net::SocketAddr;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, RwLock};
Expand Down Expand Up @@ -63,6 +64,7 @@ pub struct KafkaSinkClusterBuilder {
shotover_nodes: Vec<KafkaAddress>,
connect_timeout: Duration,
read_timeout: Option<Duration>,
controller_broker: Arc<AtomicBrokerId>,
group_to_coordinator_broker: Arc<DashMap<GroupId, BrokerId>>,
topics: Arc<DashMap<TopicName, Topic>>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
Expand Down Expand Up @@ -94,6 +96,7 @@ impl KafkaSinkClusterBuilder {
shotover_nodes,
connect_timeout: Duration::from_millis(connect_timeout_ms),
read_timeout: receive_timeout,
controller_broker: Arc::new(AtomicBrokerId::new()),
group_to_coordinator_broker: Arc::new(DashMap::new()),
topics: Arc::new(DashMap::new()),
nodes_shared: Arc::new(RwLock::new(vec![])),
Expand All @@ -111,6 +114,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
read_timeout: self.read_timeout,
nodes: vec![],
nodes_shared: self.nodes_shared.clone(),
controller_broker: self.controller_broker.clone(),
group_to_coordinator_broker: self.group_to_coordinator_broker.clone(),
topics: self.topics.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
Expand All @@ -126,6 +130,28 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
}
}

struct AtomicBrokerId(AtomicI64);

impl AtomicBrokerId {
fn new() -> Self {
AtomicBrokerId(i64::MAX.into())
}

fn set(&self, value: BrokerId) {
self.0
.store(value.0.into(), std::sync::atomic::Ordering::Relaxed)
}

/// Returns `None` when set has never been called.
/// Otherwise returns `Some` containing the latest set value.
fn get(&self) -> Option<BrokerId> {
match self.0.load(std::sync::atomic::Ordering::Relaxed) {
i64::MAX => None,
other => Some(BrokerId(other as i32)),
}
}
}

pub struct KafkaSinkCluster {
first_contact_points: Vec<String>,
shotover_nodes: Vec<KafkaAddress>,
Expand All @@ -134,6 +160,7 @@ pub struct KafkaSinkCluster {
read_timeout: Option<Duration>,
nodes: Vec<KafkaNode>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
controller_broker: Arc<AtomicBrokerId>,
group_to_coordinator_broker: Arc<DashMap<GroupId, BrokerId>>,
topics: Arc<DashMap<TopicName, Topic>>,
rng: SmallRng,
Expand Down Expand Up @@ -263,7 +290,8 @@ impl KafkaSinkCluster {
self.add_node_if_new(node).await;
}

if !topics.is_empty() {
// request and process metadata if we are missing topics or the controller broker id
if !topics.is_empty() || self.controller_broker.get().is_none() {
let mut metadata = self.get_metadata_of_topics(topics).await?;
match metadata.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
Expand Down Expand Up @@ -378,6 +406,7 @@ impl KafkaSinkCluster {
results.push(rx);
}

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Heartbeat(heartbeat),
..
Expand Down Expand Up @@ -407,6 +436,12 @@ impl KafkaSinkCluster {
results.push(self.route_to_coordinator(message, group_id).await?);
}

// route to controller broker
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::CreateTopics(_),
..
})) => results.push(self.route_to_controller(message).await?),

// route to random node
_ => {
let connection = self
Expand Down Expand Up @@ -473,7 +508,7 @@ impl KafkaSinkCluster {
connection: None,
}),
other => Err(anyhow!(
"Unexpected message returned to metadata request {other:?}"
"Unexpected message returned to findcoordinator request {other:?}"
)),
}
}
Expand Down Expand Up @@ -608,6 +643,36 @@ impl KafkaSinkCluster {
Ok(responses)
}

async fn route_to_controller(
&mut self,
message: Message,
) -> Result<oneshot::Receiver<Response>> {
let broker_id = self.controller_broker.get().unwrap();

let connection = if let Some(node) =
self.nodes.iter_mut().find(|x| x.broker_id == *broker_id)
{
node.get_connection(self.connect_timeout).await?.clone()
} else {
tracing::warn!("no known broker with id {broker_id:?}, routing message to a random node so that a NOT_CONTROLLER or similar error is returned to the client");
self.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.await?
.clone()
};

let (tx, rx) = oneshot::channel();
connection
.send(Request {
message,
return_chan: Some(tx),
})
.map_err(|_| anyhow!("Failed to send"))?;
Ok(rx)
}

async fn route_to_coordinator(
&mut self,
message: Message,
Expand Down Expand Up @@ -656,6 +721,8 @@ impl KafkaSinkCluster {
self.add_node_if_new(node).await;
}

self.controller_broker.set(metadata.controller_id);

for topic in &metadata.topics {
self.topics.insert(
topic.0.clone(),
Expand Down

0 comments on commit e0cc667

Please sign in to comment.