Skip to content

Commit

Permalink
Merge branch 'main' into release_0.4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jul 23, 2024
2 parents d64d1ad + dfdd714 commit f9064bd
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 23 deletions.
154 changes: 148 additions & 6 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use test_helpers::connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier,
ResourceType,
ResourceType, TopicPartition,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
Expand All @@ -13,6 +14,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "partitions1_with_offset",
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "partitions3",
num_partitions: 3,
Expand Down Expand Up @@ -75,7 +81,9 @@ pub async fn produce_consume_partitions1(
)
.await;

let mut consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
Expand Down Expand Up @@ -116,6 +124,7 @@ pub async fn produce_consume_partitions1(
offset: Some(i * 2 + 1),
})
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
Expand All @@ -127,9 +136,11 @@ pub async fn produce_consume_partitions1(
}
}

// if we create a new consumer it will start from the begginning since `auto.offset.reset = earliest`
// if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
Expand Down Expand Up @@ -158,10 +169,138 @@ pub async fn produce_consume_partitions1(
}
}

pub async fn produce_consume_commit_offsets_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
{
let producer = connection_builder.connect_producer(1).await;
producer
.assert_produce(
Record {
payload: "Initial",
topic_name,
key: Some("Key"),
},
Some(0),
)
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;

// The offset to be committed should be lastProcessedMessageOffset + 1
let offset1 = HashMap::from([(
TopicPartition {
topic_name: topic_name.to_owned(),
partition: 0,
},
1,
)]);
consumer.assert_commit_offsets(offset1);

producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(1),
)
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(1),
})
.await;

let offset2 = HashMap::from([(
TopicPartition {
topic_name: topic_name.to_owned(),
partition: 0,
},
2,
)]);
consumer.assert_commit_offsets(offset2);

producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: Some("Key"),
},
Some(2),
)
.await;
}

{
// The new consumer should consume Message2 which is at the last uncommitted offset
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(2),
})
.await;
}

{
// The new consumer should still consume Message2 as its offset has not been committed
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(2),
})
.await;
}

{
// A new consumer in another group should consume from the beginning since auto.offset.reset = earliest and enable.auto.commit false
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_without_offsets")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;
}
}

async fn produce_consume_partitions3(connection_builder: &KafkaConnectionBuilder) {
let topic_name = "partitions3";
let producer = connection_builder.connect_producer(1).await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;

for _ in 0..5 {
producer
Expand Down Expand Up @@ -222,7 +361,9 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
.await;
}

let mut consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;

for j in 0..10 {
consumer
Expand All @@ -240,6 +381,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
admin_setup(connection_builder).await;
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_partitions3(connection_builder).await;

// Only run this test case on the java driver,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
cassandra-one:
image: shotover/cassandra-test:5.0-beta1-r2
image: shotover/cassandra-test:5.0-rc1-r3
ports:
- "9043:9042"
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ networks:

services:
cassandra-one:
image: &image shotover/cassandra-test:5.0-beta1-r2
image: &image shotover/cassandra-test:5.0-rc1-r3
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
Expand Down
7 changes: 7 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,13 @@ impl KafkaSinkCluster {
};
self.route_to_coordinator(message, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetCommit(offset_commit),
..
})) => {
let group_id = offset_commit.group_id.clone();
self.route_to_coordinator(message, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::JoinGroup(join_group),
..
Expand Down
68 changes: 65 additions & 3 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::{HashMap, HashSet};
// Allow direct usage of the APIs when the feature is enabled
pub use rdkafka;

use super::{ExpectedResponse, NewPartition, Record};
use super::{ExpectedResponse, NewPartition, Record, TopicPartition};
use anyhow::Result;
use pretty_assertions::assert_eq;
use rdkafka::admin::AdminClient;
Expand Down Expand Up @@ -61,11 +62,11 @@ impl KafkaConnectionBuilderCpp {
}
}

pub async fn connect_consumer(&self, topic_name: &str) -> KafkaConsumerCpp {
pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumerCpp {
let consumer: StreamConsumer = self
.client
.clone()
.set("group.id", "some_group")
.set("group.id", group)
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
Expand Down Expand Up @@ -157,6 +158,67 @@ impl KafkaConsumerCpp {
offset: Some(message.offset()),
}
}

/// The offset to be committed should be lastProcessedMessageOffset + 1.
pub fn commit(&self, offsets: &HashMap<TopicPartition, i64>) {
let offsets_map: HashMap<(String, i32), rdkafka::Offset> = offsets
.iter()
.map(|(tp, offset)| {
(
(tp.topic_name.clone(), tp.partition),
rdkafka::Offset::Offset(*offset),
)
})
.collect();

let offsets_list = rdkafka::TopicPartitionList::from_topic_map(&offsets_map).unwrap();

tokio::task::block_in_place(|| {
self.consumer
.commit(&offsets_list, rdkafka::consumer::CommitMode::Sync)
.unwrap()
});
}

pub fn committed_offsets(
&self,
partitions: HashSet<TopicPartition>,
) -> HashMap<TopicPartition, i64> {
let mut offsets = HashMap::new();
let mut tpl = rdkafka::TopicPartitionList::with_capacity(partitions.len());

// This TopicPartitionList is used to query the committed offsets for the partitions
// Hence offset is set to Invalid
for tp in &partitions {
tpl.add_partition_offset(
tp.topic_name.as_str(),
tp.partition,
rdkafka::Offset::Invalid,
)
.expect("Failed to add the topic and partition");
}

let committed_offsets = tokio::task::block_in_place(|| {
self.consumer
.committed_offsets(tpl, Timeout::After(Duration::from_secs(30)))
.unwrap()
});

for tp_offset in committed_offsets.elements() {
offsets.insert(
TopicPartition {
topic_name: tp_offset.topic().to_string(),
partition: tp_offset.partition(),
},
match tp_offset.offset() {
rdkafka::Offset::Offset(offset) => offset,
_ => continue,
},
);
}

offsets
}
}

pub struct KafkaAdminCpp {
Expand Down
Loading

0 comments on commit f9064bd

Please sign in to comment.