Skip to content

Commit

Permalink
KafkaSinkCluster minor refactor (#1554)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Apr 2, 2024
1 parent 1ae9f50 commit fa67547
Showing 1 changed file with 115 additions and 101 deletions.
216 changes: 115 additions & 101 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,112 +471,15 @@ impl KafkaSinkCluster {
match message.frame() {
// route to partition leader
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(produce),
body: RequestBody::Produce(_),
..
})) => {
let mut connection = None;
// assume that all topics in this message have the same routing requirements
let (topic_name, topic_data) = produce
.topic_data
.iter()
.next()
.ok_or_else(|| anyhow!("No topics in produce message"))?;
if let Some(topic) = self.topic_by_name.get(&topic_name.0) {
// assume that all partitions in this topic have the same routing requirements
let partition = &topic.partitions[topic_data
.partition_data
.first()
.ok_or_else(|| anyhow!("No partitions in topic"))?
.index
as usize];
for node in &mut self.nodes {
if node.broker_id == partition.leader_id {
connection = Some(node.broker_id);
}
}
}
let destination = match connection {
Some(connection) => connection,
None => {
tracing::warn!("no known partition leader for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes.choose(&mut self.rng).unwrap().broker_id
}
};

self.pending_requests.push_back(PendingRequest::Routed {
destination,
request: message,
})
}
})) => self.route_produce_request(message)?,

// route to random partition replica
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
body: RequestBody::Fetch(_),
..
})) => {
let destination = if fetch.session_id == 0 {
// assume that all topics in this message have the same routing requirements
let topic = fetch
.topics
.first()
.ok_or_else(|| anyhow!("No topics in fetch message"))?;

// This way of constructing topic_meta is kind of crazy, but it works around borrow checker limitations
// Old clients only specify the topic name and some newer clients only specify the topic id.
// So we need to check the id first and then fallback to the name.
let topic_name = &topic.topic;
let topic_by_id = self.topic_by_id.get(&topic.topic_id);
let topic_by_name;
let mut topic_meta = topic_by_id.as_deref();
if topic_meta.is_none() {
topic_by_name = self.topic_by_name.get(&topic.topic);
topic_meta = topic_by_name.as_deref();
}

let destination = if let Some(topic_meta) = topic_meta {
let partition_index = topic
.partitions
.first()
.ok_or_else(|| anyhow!("No partitions in topic"))?
.partition
as usize;
// assume that all partitions in this topic have the same routing requirements
if let Some(partition) = topic_meta.partitions.get(partition_index) {
self.nodes
.iter_mut()
.filter(|node| {
partition.replica_nodes.contains(&node.broker_id)
})
.choose(&mut self.rng)
.unwrap()
.broker_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition replica for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes.choose(&mut self.rng).unwrap().broker_id
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes.choose(&mut self.rng).unwrap().broker_id
};
self.fetch_request_destinations
.insert(message.id(), destination);
destination
} else {
// route via session id
if let Some(destination) =
self.fetch_session_id_to_broker.get(&fetch.session_id)
{
*destination
} else {
todo!()
}
};
self.pending_requests.push_back(PendingRequest::Routed {
destination,
request: message,
})
}
})) => self.route_fetch_request(message)?,

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -643,6 +546,117 @@ impl KafkaSinkCluster {
Ok(())
}

fn route_produce_request(&mut self, mut message: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(produce),
..
})) = message.frame()
{
let mut connection = None;
// assume that all topics in this message have the same routing requirements
let (topic_name, topic_data) = produce
.topic_data
.iter()
.next()
.ok_or_else(|| anyhow!("No topics in produce message"))?;
if let Some(topic) = self.topic_by_name.get(&topic_name.0) {
// assume that all partitions in this topic have the same routing requirements
let partition = &topic.partitions[topic_data
.partition_data
.first()
.ok_or_else(|| anyhow!("No partitions in topic"))?
.index as usize];
for node in &mut self.nodes {
if node.broker_id == partition.leader_id {
connection = Some(node.broker_id);
}
}
}
let destination = match connection {
Some(connection) => connection,
None => {
tracing::warn!("no known partition leader for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes.choose(&mut self.rng).unwrap().broker_id
}
};

self.pending_requests.push_back(PendingRequest::Routed {
destination,
request: message,
})
}

Ok(())
}

fn route_fetch_request(&mut self, mut message: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
})) = message.frame()
{
let destination = if fetch.session_id == 0 {
// assume that all topics in this message have the same routing requirements
let topic = fetch
.topics
.first()
.ok_or_else(|| anyhow!("No topics in fetch message"))?;

// This way of constructing topic_meta is kind of crazy, but it works around borrow checker limitations
// Old clients only specify the topic name and some newer clients only specify the topic id.
// So we need to check the id first and then fallback to the name.
let topic_name = &topic.topic;
let topic_by_id = self.topic_by_id.get(&topic.topic_id);
let topic_by_name;
let mut topic_meta = topic_by_id.as_deref();
if topic_meta.is_none() {
topic_by_name = self.topic_by_name.get(&topic.topic);
topic_meta = topic_by_name.as_deref();
}

let destination = if let Some(topic_meta) = topic_meta {
let partition_index = topic
.partitions
.first()
.ok_or_else(|| anyhow!("No partitions in topic"))?
.partition as usize;
// assume that all partitions in this topic have the same routing requirements
if let Some(partition) = topic_meta.partitions.get(partition_index) {
self.nodes
.iter_mut()
.filter(|node| partition.replica_nodes.contains(&node.broker_id))
.choose(&mut self.rng)
.unwrap()
.broker_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition replica for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes.choose(&mut self.rng).unwrap().broker_id
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes.choose(&mut self.rng).unwrap().broker_id
};
self.fetch_request_destinations
.insert(message.id(), destination);
destination
} else {
// route via session id
if let Some(destination) = self.fetch_session_id_to_broker.get(&fetch.session_id) {
*destination
} else {
todo!()
}
};
self.pending_requests.push_back(PendingRequest::Routed {
destination,
request: message,
})
}

Ok(())
}

async fn find_coordinator_of_group(
&mut self,
group: GroupId,
Expand Down

0 comments on commit fa67547

Please sign in to comment.