Skip to content

Commit

Permalink
KafkaSinkCluster: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 15, 2023
1 parent 997b6f8 commit d30562a
Showing 1 changed file with 129 additions and 122 deletions.
251 changes: 129 additions & 122 deletions shotover/src/transforms/kafka/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use async_trait::async_trait;
use dashmap::DashMap;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::{
ApiKey, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest, MetadataRequest,
MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest, TopicName,
ApiKey, BrokerId, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest,
MetadataRequest, MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest,
TopicName,
};
use kafka_protocol::protocol::{Builder, StrBytes};
use rand::rngs::SmallRng;
Expand Down Expand Up @@ -60,8 +61,8 @@ pub struct KafkaSinkClusterBuilder {
shotover_nodes: Vec<KafkaAddress>,
connect_timeout: Duration,
read_timeout: Option<Duration>,
coordinator_broker_id: Arc<DashMap<StrBytes, i32>>,
topics: Arc<DashMap<StrBytes, Topic>>,
group_to_coordinator_broker: Arc<DashMap<GroupId, BrokerId>>,
topics: Arc<DashMap<TopicName, Topic>>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
}

Expand Down Expand Up @@ -91,7 +92,7 @@ impl KafkaSinkClusterBuilder {
shotover_nodes,
connect_timeout: Duration::from_millis(connect_timeout_ms),
read_timeout: receive_timeout,
coordinator_broker_id: Arc::new(DashMap::new()),
group_to_coordinator_broker: Arc::new(DashMap::new()),
topics: Arc::new(DashMap::new()),
nodes_shared: Arc::new(RwLock::new(vec![])),
}
Expand All @@ -108,7 +109,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
read_timeout: self.read_timeout,
nodes: vec![],
nodes_shared: self.nodes_shared.clone(),
coordinator_broker_id: self.coordinator_broker_id.clone(),
group_to_coordinator_broker: self.group_to_coordinator_broker.clone(),
topics: self.topics.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
})
Expand All @@ -131,8 +132,8 @@ pub struct KafkaSinkCluster {
read_timeout: Option<Duration>,
nodes: Vec<KafkaNode>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
coordinator_broker_id: Arc<DashMap<StrBytes, i32>>,
topics: Arc<DashMap<StrBytes, Topic>>,
group_to_coordinator_broker: Arc<DashMap<GroupId, BrokerId>>,
topics: Arc<DashMap<TopicName, Topic>>,
rng: SmallRng,
}

Expand Down Expand Up @@ -182,15 +183,16 @@ impl Transform for KafkaSinkCluster {
}

impl KafkaSinkCluster {
fn store_topic(&self, topics: &mut Vec<StrBytes>, topic: TopicName) {
if self.topics.get(&topic.0).is_none() && !topics.contains(&topic.0) {
topics.push(topic.0);
fn store_topic(&self, topics: &mut Vec<TopicName>, topic: TopicName) {
if self.topics.get(&topic).is_none() && !topics.contains(&topic) {
topics.push(topic);
}
}

fn store_group(&self, groups: &mut Vec<StrBytes>, group_id: GroupId) {
if self.coordinator_broker_id.get(&group_id.0).is_none() && !groups.contains(&group_id.0) {
groups.push(group_id.0);
fn store_group(&self, groups: &mut Vec<GroupId>, group_id: GroupId) {
if self.group_to_coordinator_broker.get(&group_id).is_none() && !groups.contains(&group_id)
{
groups.push(group_id);
}
}

Expand Down Expand Up @@ -249,105 +251,13 @@ impl KafkaSinkCluster {
}

for group in groups {
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::builder()
.request_api_key(ApiKey::FindCoordinatorKey as i16)
.request_api_version(2)
.correlation_id(0)
.client_id(None)
.unknown_tagged_fields(Default::default())
.build()
.unwrap(),
body: RequestBody::FindCoordinator(
FindCoordinatorRequest::builder()
.coordinator_keys(vec![])
.key_type(0)
.key(group.clone())
.unknown_tagged_fields(Default::default())
.build()
.unwrap(),
),
}));

let connection = self
.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.await?;
let (tx, rx) = oneshot::channel();
connection
.send(Request {
message: request,
return_chan: Some(tx),
})
.map_err(|_| anyhow!("Failed to send"))?;
let mut response = rx.await.unwrap().response.unwrap();
match response.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::FindCoordinator(coordinator),
..
})) => {
self.coordinator_broker_id
.insert(group, coordinator.node_id.0);
}
other => {
return Err(anyhow!(
"Unexpected message returned to metadata request {other:?}"
))
}
}
let coordinator = self.find_coordinator_of_group(group.clone()).await?;
self.group_to_coordinator_broker.insert(group, coordinator);
}

if !topics.is_empty() {
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::builder()
.request_api_key(ApiKey::MetadataKey as i16)
.request_api_version(4)
.correlation_id(0)
.client_id(None)
.unknown_tagged_fields(Default::default())
.build()
.unwrap(),
body: RequestBody::Metadata(
MetadataRequest::builder()
.topics(Some(
topics
.into_iter()
.map(|name| {
MetadataRequestTopic::builder()
.name(Some(TopicName(name)))
.topic_id(Uuid::nil())
.unknown_tagged_fields(Default::default())
.build()
.unwrap()
})
.collect(),
))
.allow_auto_topic_creation(false)
.include_cluster_authorized_operations(false)
.include_topic_authorized_operations(false)
.unknown_tagged_fields(Default::default())
.build()
.unwrap(),
),
}));

let connection = self
.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.await?;
let (tx, rx) = oneshot::channel();
connection
.send(Request {
message: request,
return_chan: Some(tx),
})
.map_err(|_| anyhow!("Failed to send"))?;
let mut response = rx.await.unwrap().response.unwrap();
match response.frame() {
let mut metadata = self.get_metadata_of_topics(topics).await?;
match metadata.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Metadata(metadata),
..
Expand Down Expand Up @@ -464,28 +374,28 @@ impl KafkaSinkCluster {
body: RequestBody::Heartbeat(heartbeat),
..
})) => {
let group_id = heartbeat.group_id.0.clone();
let group_id = heartbeat.group_id.clone();
results.push(self.route_to_coordinator(message, group_id).await?);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SyncGroup(sync_group),
..
})) => {
let group_id = sync_group.group_id.0.clone();
let group_id = sync_group.group_id.clone();
results.push(self.route_to_coordinator(message, group_id).await?);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetFetch(offset_fetch),
..
})) => {
let group_id = offset_fetch.group_id.0.clone();
let group_id = offset_fetch.group_id.clone();
results.push(self.route_to_coordinator(message, group_id).await?);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::JoinGroup(join_group),
..
})) => {
let group_id = join_group.group_id.0.clone();
let group_id = join_group.group_id.clone();
results.push(self.route_to_coordinator(message, group_id).await?);
}

Expand All @@ -511,6 +421,101 @@ impl KafkaSinkCluster {
Ok(results)
}

async fn find_coordinator_of_group(&mut self, group: GroupId) -> Result<BrokerId> {
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::builder()
.request_api_key(ApiKey::FindCoordinatorKey as i16)
.request_api_version(2)
.correlation_id(0)
.client_id(None)
.unknown_tagged_fields(Default::default())
.build()
.unwrap(),
body: RequestBody::FindCoordinator(
FindCoordinatorRequest::builder()
.coordinator_keys(vec![])
.key_type(0)
.key(group.0)
.unknown_tagged_fields(Default::default())
.build()
.unwrap(),
),
}));

let connection = self
.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.await?;
let (tx, rx) = oneshot::channel();
connection
.send(Request {
message: request,
return_chan: Some(tx),
})
.map_err(|_| anyhow!("Failed to send"))?;
let mut response = rx.await.unwrap().response.unwrap();
match response.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::FindCoordinator(coordinator),
..
})) => Ok(coordinator.node_id),
other => Err(anyhow!(
"Unexpected message returned to metadata request {other:?}"
)),
}
}
async fn get_metadata_of_topics(&mut self, topics: Vec<TopicName>) -> Result<Message> {
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::builder()
.request_api_key(ApiKey::MetadataKey as i16)
.request_api_version(4)
.correlation_id(0)
.client_id(None)
.unknown_tagged_fields(Default::default())
.build()
.unwrap(),
body: RequestBody::Metadata(
MetadataRequest::builder()
.topics(Some(
topics
.into_iter()
.map(|name| {
MetadataRequestTopic::builder()
.name(Some(name))
.topic_id(Uuid::nil())
.unknown_tagged_fields(Default::default())
.build()
.unwrap()
})
.collect(),
))
.allow_auto_topic_creation(false)
.include_cluster_authorized_operations(false)
.include_topic_authorized_operations(false)
.unknown_tagged_fields(Default::default())
.build()
.unwrap(),
),
}));

let connection = self
.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.await?;
let (tx, rx) = oneshot::channel();
connection
.send(Request {
message: request,
return_chan: Some(tx),
})
.map_err(|_| anyhow!("Failed to send"))?;
Ok(rx.await.unwrap().response.unwrap())
}

async fn receive_responses(
&mut self,
find_coordinator_requests: &[FindCoordinator],
Expand Down Expand Up @@ -540,8 +545,8 @@ impl KafkaSinkCluster {

if *version <= 3 {
if request.key_type == 0 {
self.coordinator_broker_id
.insert(request.key.clone(), find_coordinator.node_id.0);
self.group_to_coordinator_broker
.insert(GroupId(request.key.clone()), find_coordinator.node_id);
}
rewrite_address(
&self.shotover_nodes,
Expand All @@ -551,8 +556,10 @@ impl KafkaSinkCluster {
} else {
for coordinator in &mut find_coordinator.coordinators {
if request.key_type == 0 {
self.coordinator_broker_id
.insert(coordinator.key.clone(), find_coordinator.node_id.0);
self.group_to_coordinator_broker.insert(
GroupId(coordinator.key.clone()),
find_coordinator.node_id,
);
}
rewrite_address(
&self.shotover_nodes,
Expand Down Expand Up @@ -597,11 +604,11 @@ impl KafkaSinkCluster {
async fn route_to_coordinator(
&mut self,
message: Message,
group_name: StrBytes,
group_id: GroupId,
) -> Result<oneshot::Receiver<Response>> {
let mut connection = None;
for node in &mut self.nodes {
if let Some(broker_id) = self.coordinator_broker_id.get(&group_name) {
if let Some(broker_id) = self.group_to_coordinator_broker.get(&group_id) {
if node.broker_id == *broker_id {
connection = Some(node.get_connection(self.connect_timeout).await?.clone());
}
Expand All @@ -610,7 +617,7 @@ impl KafkaSinkCluster {
let connection = match connection {
Some(connection) => connection,
None => {
tracing::warn!("no known coordinator for {group_name:?}, routing message to a random node so that a NOT_COORDINATOR or similar error is returned to the client");
tracing::warn!("no known coordinator for {group_id:?}, routing message to a random node so that a NOT_COORDINATOR or similar error is returned to the client");
self.nodes
.choose_mut(&mut self.rng)
.unwrap()
Expand Down Expand Up @@ -653,7 +660,7 @@ impl KafkaSinkCluster {

for topic in &metadata.topics {
self.topics.insert(
topic.0.clone().0,
topic.0.clone(),
Topic {
partitions: topic
.1
Expand Down

0 comments on commit d30562a

Please sign in to comment.