From 1bd85edf2023c1a3ad490e04eaa89173398a7b2e Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 24 Oct 2022 16:45:57 +0800 Subject: [PATCH 1/2] make mq interface simpler. --- .../message_queue/src/kafka/kafka_impl.rs | 256 ++++++++---------- components/message_queue/src/lib.rs | 37 ++- components/message_queue/src/tests/cases.rs | 80 +++++- 3 files changed, 225 insertions(+), 148 deletions(-) diff --git a/components/message_queue/src/kafka/kafka_impl.rs b/components/message_queue/src/kafka/kafka_impl.rs index 6421cf3d49..cf1868263b 100644 --- a/components/message_queue/src/kafka/kafka_impl.rs +++ b/components/message_queue/src/kafka/kafka_impl.rs @@ -2,7 +2,7 @@ //! Kafka implementation's detail -use std::{cmp::Ordering, collections::HashMap, fmt::Display, sync::Arc}; +use std::{collections::HashMap, fmt::Display, sync::Arc}; use async_trait::async_trait; use common_util::define_result; @@ -10,7 +10,7 @@ use futures::StreamExt; use log::info; use rskafka::{ client::{ - consumer::{StartOffset, StreamConsumer, StreamConsumerBuilder}, + consumer::{StartOffset as KafkaStartOffset, StreamConsumer, StreamConsumerBuilder}, controller::ControllerClient, error::{Error as RskafkaError, ProtocolError}, partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling}, @@ -18,12 +18,12 @@ use rskafka::{ }, record::{Record, RecordAndOffset}, }; -use snafu::{ensure, Backtrace, ResultExt, Snafu}; +use snafu::{Backtrace, ResultExt, Snafu}; use tokio::sync::RwLock; use crate::{ kafka::config::{Config, ConsumerConfig}, - ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset, + ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset, StartOffset, }; /// The topic (with just one partition) client for Kafka @@ -48,43 +48,34 @@ pub enum Error { source: RskafkaError, }, - #[snafu(display("Failed to produce to kafka topic:{}, err:{}", topic_name, source))] - Produce { - topic_name: String, - source: RskafkaError, - }, - #[snafu(display( - "Failed to consume all data in topic:{} when:{}, source:{}", + "Failed to fetch offset(type:{}) from kafka topic:{}, err:{}", + offset_type, topic_name, - when, source ))] - ConsumeAll { + FetchOffset { + topic_name: String, + source: RskafkaError, + offset_type: OffsetType, + }, + + #[snafu(display("Failed to produce to kafka topic:{}, err:{}", topic_name, source))] + Produce { topic_name: String, source: RskafkaError, - when: ConsumeAllWhen, }, #[snafu(display( - "Race happened in scanning partition in topic:{}, when:{}, msg:[{}], backtrace:{}", + "Failed to consume in topic:{} when:{}, source:{}", topic_name, when, - msg, - backtrace + source ))] - ConsumeAllRace { - topic_name: String, - when: ConsumeAllWhen, - msg: String, - backtrace: Backtrace, - }, - - #[snafu(display("Timeout happened while polling the stream for consuming all data in topic:{}, timeout_opt:{}, backtrace:{}", topic_name, timeout_opt, backtrace))] - ConsumeAllTimeout { + Consume { topic_name: String, - timeout_opt: String, - backtrace: Backtrace, + source: RskafkaError, + when: ConsumeWhen, }, #[snafu(display( @@ -106,18 +97,31 @@ pub enum Error { define_result!(Error); #[derive(Debug)] -pub enum ConsumeAllWhen { +pub enum ConsumeWhen { Start, - InitIterator, PollStream, } -impl Display for ConsumeAllWhen { +impl Display for ConsumeWhen { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ConsumeAllWhen::Start => f.write_str("start"), - ConsumeAllWhen::InitIterator => f.write_str("init_iterator"), - ConsumeAllWhen::PollStream => f.write_str("poll_stream"), + ConsumeWhen::Start => f.write_str("start"), + ConsumeWhen::PollStream => f.write_str("poll_stream"), + } + } +} + +#[derive(Debug)] +pub enum OffsetType { + EarliestOffset, + HighWaterMark, +} + +impl Display for OffsetType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OffsetType::EarliestOffset => f.write_str("earliest_offset"), + OffsetType::HighWaterMark => f.write_str("high_watermark"), } } } @@ -249,22 +253,62 @@ impl MessageQueue for KafkaImpl { })?) } - async fn consume_all(&self, topic_name: &str) -> Result { - info!("Need to consume all data in kafka topic:{}", topic_name); + async fn get_earliest_offset(&self, topic_name: &str) -> Result { + let topic_client = + self.get_or_create_topic_client(topic_name) + .await + .context(FetchOffset { + topic_name: topic_name.to_string(), + offset_type: OffsetType::EarliestOffset, + })?; + + topic_client + .get_offset(OffsetAt::Earliest) + .await + .context(FetchOffset { + topic_name: topic_name.to_string(), + offset_type: OffsetType::EarliestOffset, + }) + } + async fn get_high_watermark(&self, topic_name: &str) -> Result { let topic_client = self.get_or_create_topic_client(topic_name) .await - .context(ConsumeAll { + .context(FetchOffset { topic_name: topic_name.to_string(), - when: ConsumeAllWhen::Start, + offset_type: OffsetType::HighWaterMark, })?; - KafkaConsumeIterator::new( + + topic_client + .get_offset(OffsetAt::Latest) + .await + .context(FetchOffset { + topic_name: topic_name.to_string(), + offset_type: OffsetType::HighWaterMark, + }) + } + + async fn consume( + &self, + topic_name: &str, + start_offset: StartOffset, + ) -> Result { + info!("Consume data in kafka topic:{}", topic_name); + + let topic_client = self + .get_or_create_topic_client(topic_name) + .await + .context(Consume { + topic_name: topic_name.to_string(), + when: ConsumeWhen::Start, + })?; + Ok(KafkaConsumeIterator::new( topic_name, self.config.consumer_config.clone(), topic_client, - ) - .await + start_offset, + )) } async fn delete_up_to(&self, topic_name: &str, offset: Offset) -> Result<()> { @@ -289,56 +333,26 @@ impl MessageQueue for KafkaImpl { Ok(()) } - - // TODO: should design a stream consume method for slave node to fetch wals. } pub struct KafkaConsumeIterator { topic_name: String, - stream_consumer: Option, - high_watermark: i64, + stream_consumer: StreamConsumer, } impl KafkaConsumeIterator { - pub async fn new( + pub fn new( topic_name: &str, config: ConsumerConfig, topic_client: TopicClientRef, - ) -> Result { + start_offset: StartOffset, + ) -> Self { info!("Init consumer of topic:{}, config:{:?}", topic_name, config); - // We should make sure the partition is not empty firstly. - let start_offset = - topic_client - .get_offset(OffsetAt::Earliest) - .await - .context(ConsumeAll { - topic_name: topic_name.to_string(), - when: ConsumeAllWhen::InitIterator, - })?; - let high_watermark = - topic_client - .get_offset(OffsetAt::Latest) - .await - .context(ConsumeAll { - topic_name: topic_name.to_string(), - when: ConsumeAllWhen::InitIterator, - })?; - ensure!( - start_offset <= high_watermark, - ConsumeAllRace { - topic_name, - msg: format!( - "high watermark:{} is smaller than start offset:{}", - high_watermark, start_offset - ), - when: ConsumeAllWhen::InitIterator - } - ); + // If not empty, make consuming stream. + let stream_consumer = { + let mut stream_builder = StreamConsumerBuilder::new(topic_client, start_offset.into()); - let mut stream_builder = StreamConsumerBuilder::new(topic_client, StartOffset::Earliest); - let stream_consumer = if start_offset < high_watermark { - // If not empty, make consuming stream. if let Some(max_wait_ms) = config.max_wait_ms { stream_builder = stream_builder.with_max_wait_ms(max_wait_ms) } @@ -351,16 +365,13 @@ impl KafkaConsumeIterator { stream_builder = stream_builder.with_min_batch_size(max_batch_size) } - Some(stream_builder.build()) - } else { - None + stream_builder.build() }; - Ok(KafkaConsumeIterator { + KafkaConsumeIterator { topic_name: topic_name.to_string(), stream_consumer, - high_watermark, - }) + } } } @@ -368,62 +379,23 @@ impl KafkaConsumeIterator { impl ConsumeIterator for KafkaConsumeIterator { type Error = Error; - async fn next_message(&mut self) -> Option> { - let stream = match &mut self.stream_consumer { - Some(stream) => stream, - None => { - return None; - } - }; - + async fn next_message(&mut self) -> Result<(MessageAndOffset, Offset)> { // Return message and offset from buffer. - match stream.next().await { - Some(Ok((record, high_watermark))) => match high_watermark.cmp(&self.high_watermark) { - Ordering::Greater => Some( - ConsumeAllRace { - topic_name: self.topic_name.clone(), - msg: format!("remote high watermark:{} is greater than local:{}", high_watermark, self.high_watermark), - when: ConsumeAllWhen::PollStream, - } - .fail(), - ), - - Ordering::Less => Some( - Unknown { - msg: format!( - "remote high watermark:{} is less than local:{} in topic:{}, it shouldn't decrease while consuming all data", - high_watermark, - self.high_watermark, - self.topic_name - ), - } - .fail(), - ), - - Ordering::Equal => { - if record.offset + 1 == self.high_watermark { - info!("Consume all data successfully in topic:{}", self.topic_name); - self.stream_consumer = None; - } + match self.stream_consumer.next().await { + Some(Ok((record, high_watermark))) => Ok((record.into(), high_watermark)), - Some(Ok(record.into())) - } - }, - - Some(Err(e)) => Some(Err(e).context(ConsumeAll { + Some(Err(e)) => Err(e).context(Consume { topic_name: self.topic_name.clone(), - when: ConsumeAllWhen::PollStream, - })), - - None => Some( - Unknown { - msg: format!( - "consuming stream return None due to unknown cause, topic:{}", - self.topic_name - ), - } - .fail(), - ), + when: ConsumeWhen::PollStream, + }), + + None => Unknown { + msg: format!( + "consuming stream return None due to unknown cause, topic:{}", + self.topic_name + ), + } + .fail(), } } } @@ -454,3 +426,13 @@ impl From for MessageAndOffset { } } } + +impl From for KafkaStartOffset { + fn from(start_offset: StartOffset) -> Self { + match start_offset { + StartOffset::Earliest => KafkaStartOffset::Earliest, + StartOffset::Latest => KafkaStartOffset::Latest, + StartOffset::At(offset) => KafkaStartOffset::At(offset), + } + } +} diff --git a/components/message_queue/src/lib.rs b/components/message_queue/src/lib.rs index 1e3db2c493..daae33282f 100644 --- a/components/message_queue/src/lib.rs +++ b/components/message_queue/src/lib.rs @@ -20,12 +20,23 @@ pub trait MessageQueue: Send + Sync + 'static { type ConsumeIterator: ConsumeIterator + Send; async fn create_topic_if_not_exist(&self, topic_name: &str) -> Result<(), Self::Error>; + + async fn get_earliest_offset(&self, topic_name: &str) -> Result; + + async fn get_high_watermark(&self, topic_name: &str) -> Result; + async fn produce( &self, topic_name: &str, messages: Vec, ) -> Result, Self::Error>; - async fn consume_all(&self, topic_name: &str) -> Result; + + async fn consume( + &self, + topic_name: &str, + start_offset: StartOffset, + ) -> Result; + async fn delete_up_to(&self, topic_name: &str, offset: Offset) -> Result<(), Self::Error>; // TODO: should design a stream consume method for slave node to fetch wals. } @@ -50,5 +61,27 @@ pub struct MessageAndOffset { pub trait ConsumeIterator { type Error: std::error::Error + Send + Sync + 'static; - async fn next_message(&mut self) -> Option>; + async fn next_message(&mut self) -> Result<(MessageAndOffset, Offset), Self::Error>; +} + +/// At which position shall the stream start. +#[derive(Debug, Clone, Copy)] +pub enum StartOffset { + /// At the earlist known offset. + /// + /// This might be larger than 0 if some records were already deleted due to + /// a retention policy or delete records. + Earliest, + + /// At the latest known offset. + /// + /// This is helpful if you only want ot process new data. + Latest, + + /// At a specific offset. + /// + /// NOTICE: if the setting start offset smaller than the earliest + /// offset in topic('s partition), it will be reset to the earliest + /// offset automatically. + At(Offset), } diff --git a/components/message_queue/src/tests/cases.rs b/components/message_queue/src/tests/cases.rs index 61776134bc..9075ef4c71 100644 --- a/components/message_queue/src/tests/cases.rs +++ b/components/message_queue/src/tests/cases.rs @@ -2,10 +2,14 @@ //! Test cases for message queue +use std::time::Duration; + +use tokio::time::timeout; + use crate::{ kafka::{config::Config, kafka_impl::KafkaImpl}, tests::util::{generate_test_data, random_topic_name}, - ConsumeIterator, Message, MessageQueue, + ConsumeIterator, Message, MessageQueue, StartOffset, }; #[tokio::test] @@ -38,6 +42,8 @@ async fn run_message_queue_test(message_queue: T) { test_delete(&message_queue).await; test_consume_empty_topic(&message_queue).await; + + test_consume_fetch_offset(&message_queue).await; } async fn test_simple_produce_consume(message_queue: &T) { @@ -83,19 +89,28 @@ async fn consume_all_and_compare( start_offset: i64, test_messages: &[Message], ) { - let iter = message_queue.consume_all(topic_name).await; + let iter = message_queue + .consume(topic_name, StartOffset::Earliest) + .await; + let high_watermark = message_queue.get_high_watermark(topic_name).await.unwrap(); assert!(iter.is_ok()); let mut iter = iter.unwrap(); let mut offset = start_offset; let mut cnt = 0; - while let Some(message_and_offset) = iter.next_message().await { - assert!(message_and_offset.is_ok()); - let message_and_offset = message_and_offset.unwrap(); + + loop { + let res = iter.next_message().await; + assert!(res.is_ok()); + let (message_and_offset, _) = res.unwrap(); assert_eq!(message_and_offset.offset, offset); assert_eq!(message_and_offset.message, test_messages[offset as usize]); offset += 1; cnt += 1; + + if message_and_offset.offset + 1 == high_watermark { + break; + } } assert_eq!(cnt, test_messages.len() as i64 - start_offset); } @@ -109,8 +124,55 @@ async fn test_consume_empty_topic(message_queue: &T) { // Call produce to push messages at first, then call consume to pull back and // compare. - let iter = message_queue.consume_all(&topic_name).await; - assert!(iter.is_ok()); - let mut iter = iter.unwrap(); - assert!(iter.next_message().await.is_none()); + let mut iter = message_queue + .consume(&topic_name, StartOffset::Earliest) + .await + .unwrap(); + assert!(timeout(Duration::from_millis(1000), iter.next_message()) + .await + .is_err()); +} + +async fn test_consume_fetch_offset(message_queue: &T) { + let topic_name = random_topic_name(); + assert!(message_queue + .create_topic_if_not_exist(topic_name.as_str()) + .await + .is_ok()); + + // At the beginning, the topic's partition is empty, earliest offset and high + // watermark should be zero. + let earliest_offset = message_queue + .get_earliest_offset(&topic_name) + .await + .unwrap(); + let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap(); + assert_eq!(earliest_offset, 0); + assert_eq!(high_watermark, 0); + + // We produce so messages into it, earliest is still zero, but high watermark + // will equal to the amount of messages. + let test_messages = generate_test_data(10); + assert!(message_queue + .produce(&topic_name, test_messages.clone()) + .await + .is_ok()); + let earliest_offset = message_queue + .get_earliest_offset(&topic_name) + .await + .unwrap(); + let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap(); + assert_eq!(earliest_offset, 0); + assert_eq!(high_watermark, 10); + + // We delete some messages later, and the earliest offset will become the offset + // which is deleted to. + assert!(message_queue.delete_up_to(&topic_name, 3).await.is_ok()); + let earliest_offset = message_queue + .get_earliest_offset(&topic_name) + .await + .unwrap(); + let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap(); + assert_eq!(earliest_offset, 3); + assert_eq!(high_watermark, 10); } From 5404c35ca0e86fd9b468654d25438ec5a1fd6e25 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 24 Oct 2022 17:01:33 +0800 Subject: [PATCH 2/2] address CR. --- .../message_queue/src/kafka/kafka_impl.rs | 52 +++++-------------- components/message_queue/src/lib.rs | 35 +++++++++---- components/message_queue/src/tests/cases.rs | 28 +++++++--- 3 files changed, 59 insertions(+), 56 deletions(-) diff --git a/components/message_queue/src/kafka/kafka_impl.rs b/components/message_queue/src/kafka/kafka_impl.rs index cf1868263b..ab14855fea 100644 --- a/components/message_queue/src/kafka/kafka_impl.rs +++ b/components/message_queue/src/kafka/kafka_impl.rs @@ -23,7 +23,7 @@ use tokio::sync::RwLock; use crate::{ kafka::config::{Config, ConsumerConfig}, - ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset, StartOffset, + ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset, OffsetType, StartOffset, }; /// The topic (with just one partition) client for Kafka @@ -111,21 +111,6 @@ impl Display for ConsumeWhen { } } -#[derive(Debug)] -pub enum OffsetType { - EarliestOffset, - HighWaterMark, -} - -impl Display for OffsetType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - OffsetType::EarliestOffset => f.write_str("earliest_offset"), - OffsetType::HighWaterMark => f.write_str("high_watermark"), - } - } -} - pub struct KafkaImpl { config: Config, client: Client, @@ -253,39 +238,21 @@ impl MessageQueue for KafkaImpl { })?) } - async fn get_earliest_offset(&self, topic_name: &str) -> Result { + async fn fetch_offset(&self, topic_name: &str, offset_type: OffsetType) -> Result { let topic_client = self.get_or_create_topic_client(topic_name) .await .context(FetchOffset { topic_name: topic_name.to_string(), - offset_type: OffsetType::EarliestOffset, + offset_type, })?; topic_client - .get_offset(OffsetAt::Earliest) + .get_offset(offset_type.into()) .await .context(FetchOffset { topic_name: topic_name.to_string(), - offset_type: OffsetType::EarliestOffset, - }) - } - - async fn get_high_watermark(&self, topic_name: &str) -> Result { - let topic_client = - self.get_or_create_topic_client(topic_name) - .await - .context(FetchOffset { - topic_name: topic_name.to_string(), - offset_type: OffsetType::HighWaterMark, - })?; - - topic_client - .get_offset(OffsetAt::Latest) - .await - .context(FetchOffset { - topic_name: topic_name.to_string(), - offset_type: OffsetType::HighWaterMark, + offset_type, }) } @@ -436,3 +403,12 @@ impl From for KafkaStartOffset { } } } + +impl From for OffsetAt { + fn from(offset_type: OffsetType) -> Self { + match offset_type { + OffsetType::EarliestOffset => OffsetAt::Earliest, + OffsetType::HighWaterMark => OffsetAt::Latest, + } + } +} diff --git a/components/message_queue/src/lib.rs b/components/message_queue/src/lib.rs index daae33282f..971ee1b5e8 100644 --- a/components/message_queue/src/lib.rs +++ b/components/message_queue/src/lib.rs @@ -6,7 +6,7 @@ pub mod kafka; #[cfg(any(test, feature = "test"))] pub mod tests; -use std::{collections::BTreeMap, result::Result}; +use std::{collections::BTreeMap, fmt::Display, result::Result}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -21,9 +21,11 @@ pub trait MessageQueue: Send + Sync + 'static { async fn create_topic_if_not_exist(&self, topic_name: &str) -> Result<(), Self::Error>; - async fn get_earliest_offset(&self, topic_name: &str) -> Result; - - async fn get_high_watermark(&self, topic_name: &str) -> Result; + async fn fetch_offset( + &self, + topic_name: &str, + offset_type: OffsetType, + ) -> Result; async fn produce( &self, @@ -67,21 +69,34 @@ pub trait ConsumeIterator { /// At which position shall the stream start. #[derive(Debug, Clone, Copy)] pub enum StartOffset { - /// At the earlist known offset. + /// At the earliest known offset. /// /// This might be larger than 0 if some records were already deleted due to - /// a retention policy or delete records. + /// a retention policy or delete operations. Earliest, /// At the latest known offset. /// - /// This is helpful if you only want ot process new data. + /// This is helpful if you only want to process new data. Latest, /// At a specific offset. /// - /// NOTICE: if the setting start offset smaller than the earliest - /// offset in topic('s partition), it will be reset to the earliest - /// offset automatically. + /// Note that specifying an offset that is unknown will result in the error. At(Offset), } + +#[derive(Debug, Clone, Copy)] +pub enum OffsetType { + EarliestOffset, + HighWaterMark, +} + +impl Display for OffsetType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OffsetType::EarliestOffset => f.write_str("earliest_offset"), + OffsetType::HighWaterMark => f.write_str("high_watermark"), + } + } +} diff --git a/components/message_queue/src/tests/cases.rs b/components/message_queue/src/tests/cases.rs index 9075ef4c71..4b2c562775 100644 --- a/components/message_queue/src/tests/cases.rs +++ b/components/message_queue/src/tests/cases.rs @@ -9,7 +9,7 @@ use tokio::time::timeout; use crate::{ kafka::{config::Config, kafka_impl::KafkaImpl}, tests::util::{generate_test_data, random_topic_name}, - ConsumeIterator, Message, MessageQueue, StartOffset, + ConsumeIterator, Message, MessageQueue, OffsetType, StartOffset, }; #[tokio::test] @@ -92,7 +92,10 @@ async fn consume_all_and_compare( let iter = message_queue .consume(topic_name, StartOffset::Earliest) .await; - let high_watermark = message_queue.get_high_watermark(topic_name).await.unwrap(); + let high_watermark = message_queue + .fetch_offset(topic_name, OffsetType::HighWaterMark) + .await + .unwrap(); assert!(iter.is_ok()); let mut iter = iter.unwrap(); let mut offset = start_offset; @@ -143,10 +146,13 @@ async fn test_consume_fetch_offset(message_queue: &T) { // At the beginning, the topic's partition is empty, earliest offset and high // watermark should be zero. let earliest_offset = message_queue - .get_earliest_offset(&topic_name) + .fetch_offset(&topic_name, OffsetType::EarliestOffset) + .await + .unwrap(); + let high_watermark = message_queue + .fetch_offset(&topic_name, OffsetType::HighWaterMark) .await .unwrap(); - let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap(); assert_eq!(earliest_offset, 0); assert_eq!(high_watermark, 0); @@ -158,10 +164,13 @@ async fn test_consume_fetch_offset(message_queue: &T) { .await .is_ok()); let earliest_offset = message_queue - .get_earliest_offset(&topic_name) + .fetch_offset(&topic_name, OffsetType::EarliestOffset) + .await + .unwrap(); + let high_watermark = message_queue + .fetch_offset(&topic_name, OffsetType::HighWaterMark) .await .unwrap(); - let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap(); assert_eq!(earliest_offset, 0); assert_eq!(high_watermark, 10); @@ -169,10 +178,13 @@ async fn test_consume_fetch_offset(message_queue: &T) { // which is deleted to. assert!(message_queue.delete_up_to(&topic_name, 3).await.is_ok()); let earliest_offset = message_queue - .get_earliest_offset(&topic_name) + .fetch_offset(&topic_name, OffsetType::EarliestOffset) + .await + .unwrap(); + let high_watermark = message_queue + .fetch_offset(&topic_name, OffsetType::HighWaterMark) .await .unwrap(); - let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap(); assert_eq!(earliest_offset, 3); assert_eq!(high_watermark, 10); }