Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement kafka controller routing #1419

Merged
merged 4 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 142 additions & 17 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,136 @@
use std::time::Duration;
use test_helpers::rdkafka::admin::{
AdminClient, AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier,
ResourceSpecifier, TopicReplication,
};
use test_helpers::rdkafka::client::DefaultClientContext;
use test_helpers::rdkafka::config::ClientConfig;
use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::rdkafka::types::RDKafkaErrorCode;
use test_helpers::rdkafka::util::Timeout;
use test_helpers::rdkafka::Message;

async fn produce_consume(brokers: &str, topic_name: &str) {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
async fn admin(config: ClientConfig) {
let admin: AdminClient<DefaultClientContext> = config.create().unwrap();
admin
.create_topics(
&[
NewTopic {
name: "foo",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "acks0",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "to_delete",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();

let results = admin
.create_partitions(
&[NewPartitions {
// TODO: modify topic "foo" instead so that we can test our handling of that with interesting partiton + replication count
topic_name: "to_delete",
new_partition_count: 2,
assignment: None,
}],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
let result = result.unwrap();
assert_eq!(result, "to_delete")
}

let results = admin
.describe_configs(
// TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well.
// Will need to find a way to get a valid broker id and to create a group.
&[ResourceSpecifier::Topic("to_delete")],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
let result = result.unwrap();
assert_eq!(
result.specifier,
OwnedResourceSpecifier::Topic("to_delete".to_owned())
);
}

let results = admin
.alter_configs(
&[AlterConfig {
specifier: ResourceSpecifier::Topic("to_delete"),
entries: [("foo", "bar")].into(),
}],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
assert_eq!(
result.unwrap(),
OwnedResourceSpecifier::Topic("to_delete".to_owned())
);
}

let results = admin
.delete_topics(
&["to_delete"],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
assert_eq!(result.unwrap(), "to_delete");
}
}

async fn admin_cleanup(config: ClientConfig) {
let admin: AdminClient<DefaultClientContext> = config.create().unwrap();
let results = admin
// The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist.
// So just make sure to run it against a group that does exist.
.delete_groups(
&["some_group"],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
match result {
Ok(result) => assert_eq!(result, "some_group"),
Err(err) => assert_eq!(
err,
// Allow this error which can occur due to race condition in the test, but do not allow any other error types
("some_group".to_owned(), RDKafkaErrorCode::NonEmptyGroup)
),
}
}
}

async fn produce_consume(client: ClientConfig) {
let topic_name = "foo";
let producer: FutureProducer = client
.clone()
.set("message.timeout.ms", "5000")
// internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber
.set("debug", "all")
.create()
.unwrap();

Expand All @@ -21,13 +142,12 @@ async fn produce_consume(brokers: &str, topic_name: &str) {
.unwrap();
assert_eq!(delivery_status, (0, 0));

let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
let consumer: StreamConsumer = client
.clone()
.set("group.id", "some_group")
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
.set("debug", "all")
.create()
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();
Expand All @@ -44,13 +164,12 @@ async fn produce_consume(brokers: &str, topic_name: &str) {
assert_eq!(0, message.partition());
}

async fn produce_consume_acks0(brokers: &str) {
async fn produce_consume_acks0(client: ClientConfig) {
let topic_name = "acks0";
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
let producer: FutureProducer = client
.clone()
.set("message.timeout.ms", "5000")
.set("acks", "0")
.set("debug", "all")
.create()
.unwrap();

Expand All @@ -71,13 +190,12 @@ async fn produce_consume_acks0(brokers: &str) {
.unwrap();
}

let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
let consumer: StreamConsumer = client
.clone()
.set("group.id", "some_group")
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
.set("debug", "all")
.create()
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();
Expand All @@ -97,6 +215,13 @@ async fn produce_consume_acks0(brokers: &str) {
}

pub async fn basic(address: &str) {
produce_consume(address, "foo").await;
produce_consume_acks0(address).await;
let mut client = ClientConfig::new();
client
.set("bootstrap.servers", address)
// internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber
.set("debug", "all");
admin(client.clone()).await;
produce_consume(client.clone()).await;
produce_consume_acks0(client.clone()).await;
admin_cleanup(client.clone()).await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ services:
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment:
&environment
environment: &environment
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.2:9092"
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093"
KAFKA_CFG_NODE_ID: 0
volumes:
&volumes
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
Expand Down
17 changes: 16 additions & 1 deletion shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::codec::kafka::RequestHeader as CodecRequestHeader;
use anyhow::{anyhow, Context, Result};
use bytes::{BufMut, Bytes, BytesMut};
use kafka_protocol::messages::{
ApiKey, DescribeClusterResponse, FetchRequest, FetchResponse, FindCoordinatorRequest,
ApiKey, CreateTopicsRequest, DeleteGroupsRequest, DeleteTopicsRequest, DescribeClusterResponse,
DescribeConfigsRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest,
JoinGroupResponse, LeaderAndIsrRequest, ListOffsetsRequest, ListOffsetsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest,
Expand Down Expand Up @@ -81,6 +82,10 @@ pub enum RequestBody {
FindCoordinator(FindCoordinatorRequest),
LeaderAndIsr(LeaderAndIsrRequest),
Heartbeat(HeartbeatRequest),
CreateTopics(CreateTopicsRequest),
DeleteTopics(DeleteTopicsRequest),
DeleteGroups(DeleteGroupsRequest),
DescribeConfigs(DescribeConfigsRequest),
Unknown { api_key: ApiKey, message: Bytes },
}

Expand Down Expand Up @@ -156,6 +161,12 @@ impl KafkaFrame {
}
ApiKey::LeaderAndIsrKey => RequestBody::LeaderAndIsr(decode(&mut bytes, version)?),
ApiKey::HeartbeatKey => RequestBody::Heartbeat(decode(&mut bytes, version)?),
ApiKey::CreateTopicsKey => RequestBody::CreateTopics(decode(&mut bytes, version)?),
ApiKey::DeleteTopicsKey => RequestBody::DeleteTopics(decode(&mut bytes, version)?),
ApiKey::DeleteGroupsKey => RequestBody::DeleteGroups(decode(&mut bytes, version)?),
ApiKey::DescribeConfigsKey => {
RequestBody::DescribeConfigs(decode(&mut bytes, version)?)
}
api_key => RequestBody::Unknown {
api_key,
message: bytes,
Expand Down Expand Up @@ -229,6 +240,10 @@ impl KafkaFrame {
RequestBody::FindCoordinator(x) => encode(x, bytes, version)?,
RequestBody::LeaderAndIsr(x) => encode(x, bytes, version)?,
RequestBody::Heartbeat(x) => encode(x, bytes, version)?,
RequestBody::CreateTopics(x) => encode(x, bytes, version)?,
RequestBody::DeleteTopics(x) => encode(x, bytes, version)?,
RequestBody::DeleteGroups(x) => encode(x, bytes, version)?,
RequestBody::DescribeConfigs(x) => encode(x, bytes, version)?,
RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message),
}
}
Expand Down
Loading
Loading