Skip to content

Commit

Permalink
address CR.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 24, 2022
1 parent 1bd85ed commit 301dbff
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 14 deletions.
4 changes: 2 additions & 2 deletions components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl MessageQueue for KafkaImpl {
})?)
}

async fn get_earliest_offset(&self, topic_name: &str) -> Result<Offset> {
async fn fetch_earliest_offset(&self, topic_name: &str) -> Result<Offset> {
let topic_client =
self.get_or_create_topic_client(topic_name)
.await
Expand All @@ -271,7 +271,7 @@ impl MessageQueue for KafkaImpl {
})
}

async fn get_high_watermark(&self, topic_name: &str) -> Result<Offset> {
async fn fetch_high_watermark(&self, topic_name: &str) -> Result<Offset> {
let topic_client =
self.get_or_create_topic_client(topic_name)
.await
Expand Down
10 changes: 5 additions & 5 deletions components/message_queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ pub trait MessageQueue: Send + Sync + 'static {

async fn create_topic_if_not_exist(&self, topic_name: &str) -> Result<(), Self::Error>;

async fn get_earliest_offset(&self, topic_name: &str) -> Result<Offset, Self::Error>;
async fn fetch_earliest_offset(&self, topic_name: &str) -> Result<Offset, Self::Error>;

async fn get_high_watermark(&self, topic_name: &str) -> Result<Offset, Self::Error>;
async fn fetch_high_watermark(&self, topic_name: &str) -> Result<Offset, Self::Error>;

async fn produce(
&self,
Expand Down Expand Up @@ -67,15 +67,15 @@ pub trait ConsumeIterator {
/// At which position shall the stream start.
#[derive(Debug, Clone, Copy)]
pub enum StartOffset {
/// At the earlist known offset.
/// At the earliest known offset.
///
/// This might be larger than 0 if some records were already deleted due to
/// a retention policy or delete records.
/// a retention policy or delete operations.
Earliest,

/// At the latest known offset.
///
/// This is helpful if you only want ot process new data.
/// This is helpful if you only want to process new data.
Latest,

/// At a specific offset.
Expand Down
26 changes: 19 additions & 7 deletions components/message_queue/src/tests/cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ async fn consume_all_and_compare<T: MessageQueue>(
let iter = message_queue
.consume(topic_name, StartOffset::Earliest)
.await;
let high_watermark = message_queue.get_high_watermark(topic_name).await.unwrap();
let high_watermark = message_queue
.fetch_high_watermark(topic_name)
.await
.unwrap();
assert!(iter.is_ok());
let mut iter = iter.unwrap();
let mut offset = start_offset;
Expand Down Expand Up @@ -143,10 +146,13 @@ async fn test_consume_fetch_offset<T: MessageQueue>(message_queue: &T) {
// At the beginning, the topic's partition is empty, earliest offset and high
// watermark should be zero.
let earliest_offset = message_queue
.get_earliest_offset(&topic_name)
.fetch_earliest_offset(&topic_name)
.await
.unwrap();
let high_watermark = message_queue
.fetch_high_watermark(&topic_name)
.await
.unwrap();
let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap();
assert_eq!(earliest_offset, 0);
assert_eq!(high_watermark, 0);

Expand All @@ -158,21 +164,27 @@ async fn test_consume_fetch_offset<T: MessageQueue>(message_queue: &T) {
.await
.is_ok());
let earliest_offset = message_queue
.get_earliest_offset(&topic_name)
.fetch_earliest_offset(&topic_name)
.await
.unwrap();
let high_watermark = message_queue
.fetch_high_watermark(&topic_name)
.await
.unwrap();
let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap();
assert_eq!(earliest_offset, 0);
assert_eq!(high_watermark, 10);

// We delete some messages later, and the earliest offset will become the offset
// which is deleted to.
assert!(message_queue.delete_up_to(&topic_name, 3).await.is_ok());
let earliest_offset = message_queue
.get_earliest_offset(&topic_name)
.fetch_earliest_offset(&topic_name)
.await
.unwrap();
let high_watermark = message_queue
.fetch_high_watermark(&topic_name)
.await
.unwrap();
let high_watermark = message_queue.get_high_watermark(&topic_name).await.unwrap();
assert_eq!(earliest_offset, 3);
assert_eq!(high_watermark, 10);
}

0 comments on commit 301dbff

Please sign in to comment.