Skip to content

Commit

Permalink
refactor: make mq interface simpler. (apache#337)
Browse files Browse the repository at this point in the history
* make mq interface simpler.

* address CR.
  • Loading branch information
Rachelint authored Oct 24, 2022
1 parent cc40ec3 commit e5dbcbb
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 150 deletions.
234 changes: 96 additions & 138 deletions components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@

//! Kafka implementation's detail
use std::{cmp::Ordering, collections::HashMap, fmt::Display, sync::Arc};
use std::{collections::HashMap, fmt::Display, sync::Arc};

use async_trait::async_trait;
use common_util::define_result;
use futures::StreamExt;
use log::info;
use rskafka::{
client::{
consumer::{StartOffset, StreamConsumer, StreamConsumerBuilder},
consumer::{StartOffset as KafkaStartOffset, StreamConsumer, StreamConsumerBuilder},
controller::ControllerClient,
error::{Error as RskafkaError, ProtocolError},
partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling},
Client, ClientBuilder,
},
record::{Record, RecordAndOffset},
};
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use snafu::{Backtrace, ResultExt, Snafu};
use tokio::sync::RwLock;

use crate::{
kafka::config::{Config, ConsumerConfig},
ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset,
ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset, OffsetType, StartOffset,
};

/// The topic (with just one partition) client for Kafka
Expand All @@ -48,43 +48,34 @@ pub enum Error {
source: RskafkaError,
},

#[snafu(display("Failed to produce to kafka topic:{}, err:{}", topic_name, source))]
Produce {
topic_name: String,
source: RskafkaError,
},

#[snafu(display(
"Failed to consume all data in topic:{} when:{}, source:{}",
"Failed to fetch offset(type:{}) from kafka topic:{}, err:{}",
offset_type,
topic_name,
when,
source
))]
ConsumeAll {
FetchOffset {
topic_name: String,
source: RskafkaError,
offset_type: OffsetType,
},

#[snafu(display("Failed to produce to kafka topic:{}, err:{}", topic_name, source))]
Produce {
topic_name: String,
source: RskafkaError,
when: ConsumeAllWhen,
},

#[snafu(display(
"Race happened in scanning partition in topic:{}, when:{}, msg:[{}], backtrace:{}",
"Failed to consume in topic:{} when:{}, source:{}",
topic_name,
when,
msg,
backtrace
source
))]
ConsumeAllRace {
topic_name: String,
when: ConsumeAllWhen,
msg: String,
backtrace: Backtrace,
},

#[snafu(display("Timeout happened while polling the stream for consuming all data in topic:{}, timeout_opt:{}, backtrace:{}", topic_name, timeout_opt, backtrace))]
ConsumeAllTimeout {
Consume {
topic_name: String,
timeout_opt: String,
backtrace: Backtrace,
source: RskafkaError,
when: ConsumeWhen,
},

#[snafu(display(
Expand All @@ -106,18 +97,16 @@ pub enum Error {
define_result!(Error);

#[derive(Debug)]
pub enum ConsumeAllWhen {
pub enum ConsumeWhen {
Start,
InitIterator,
PollStream,
}

impl Display for ConsumeAllWhen {
impl Display for ConsumeWhen {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConsumeAllWhen::Start => f.write_str("start"),
ConsumeAllWhen::InitIterator => f.write_str("init_iterator"),
ConsumeAllWhen::PollStream => f.write_str("poll_stream"),
ConsumeWhen::Start => f.write_str("start"),
ConsumeWhen::PollStream => f.write_str("poll_stream"),
}
}
}
Expand Down Expand Up @@ -249,22 +238,44 @@ impl MessageQueue for KafkaImpl {
})?)
}

async fn consume_all(&self, topic_name: &str) -> Result<KafkaConsumeIterator> {
info!("Need to consume all data in kafka topic:{}", topic_name);

async fn fetch_offset(&self, topic_name: &str, offset_type: OffsetType) -> Result<Offset> {
let topic_client =
self.get_or_create_topic_client(topic_name)
.await
.context(ConsumeAll {
.context(FetchOffset {
topic_name: topic_name.to_string(),
when: ConsumeAllWhen::Start,
offset_type,
})?;
KafkaConsumeIterator::new(

topic_client
.get_offset(offset_type.into())
.await
.context(FetchOffset {
topic_name: topic_name.to_string(),
offset_type,
})
}

async fn consume(
&self,
topic_name: &str,
start_offset: StartOffset,
) -> Result<KafkaConsumeIterator> {
info!("Consume data in kafka topic:{}", topic_name);

let topic_client = self
.get_or_create_topic_client(topic_name)
.await
.context(Consume {
topic_name: topic_name.to_string(),
when: ConsumeWhen::Start,
})?;
Ok(KafkaConsumeIterator::new(
topic_name,
self.config.consumer_config.clone(),
topic_client,
)
.await
start_offset,
))
}

async fn delete_up_to(&self, topic_name: &str, offset: Offset) -> Result<()> {
Expand All @@ -289,56 +300,26 @@ impl MessageQueue for KafkaImpl {

Ok(())
}

// TODO: should design a stream consume method for slave node to fetch wals.
}

pub struct KafkaConsumeIterator {
topic_name: String,
stream_consumer: Option<StreamConsumer>,
high_watermark: i64,
stream_consumer: StreamConsumer,
}

impl KafkaConsumeIterator {
pub async fn new(
pub fn new(
topic_name: &str,
config: ConsumerConfig,
topic_client: TopicClientRef,
) -> Result<Self> {
start_offset: StartOffset,
) -> Self {
info!("Init consumer of topic:{}, config:{:?}", topic_name, config);

// We should make sure the partition is not empty firstly.
let start_offset =
topic_client
.get_offset(OffsetAt::Earliest)
.await
.context(ConsumeAll {
topic_name: topic_name.to_string(),
when: ConsumeAllWhen::InitIterator,
})?;
let high_watermark =
topic_client
.get_offset(OffsetAt::Latest)
.await
.context(ConsumeAll {
topic_name: topic_name.to_string(),
when: ConsumeAllWhen::InitIterator,
})?;
ensure!(
start_offset <= high_watermark,
ConsumeAllRace {
topic_name,
msg: format!(
"high watermark:{} is smaller than start offset:{}",
high_watermark, start_offset
),
when: ConsumeAllWhen::InitIterator
}
);
// If not empty, make consuming stream.
let stream_consumer = {
let mut stream_builder = StreamConsumerBuilder::new(topic_client, start_offset.into());

let mut stream_builder = StreamConsumerBuilder::new(topic_client, StartOffset::Earliest);
let stream_consumer = if start_offset < high_watermark {
// If not empty, make consuming stream.
if let Some(max_wait_ms) = config.max_wait_ms {
stream_builder = stream_builder.with_max_wait_ms(max_wait_ms)
}
Expand All @@ -351,79 +332,37 @@ impl KafkaConsumeIterator {
stream_builder = stream_builder.with_min_batch_size(max_batch_size)
}

Some(stream_builder.build())
} else {
None
stream_builder.build()
};

Ok(KafkaConsumeIterator {
KafkaConsumeIterator {
topic_name: topic_name.to_string(),
stream_consumer,
high_watermark,
})
}
}
}

#[async_trait]
impl ConsumeIterator for KafkaConsumeIterator {
type Error = Error;

async fn next_message(&mut self) -> Option<Result<MessageAndOffset>> {
let stream = match &mut self.stream_consumer {
Some(stream) => stream,
None => {
return None;
}
};

async fn next_message(&mut self) -> Result<(MessageAndOffset, Offset)> {
// Return message and offset from buffer.
match stream.next().await {
Some(Ok((record, high_watermark))) => match high_watermark.cmp(&self.high_watermark) {
Ordering::Greater => Some(
ConsumeAllRace {
topic_name: self.topic_name.clone(),
msg: format!("remote high watermark:{} is greater than local:{}", high_watermark, self.high_watermark),
when: ConsumeAllWhen::PollStream,
}
.fail(),
),
match self.stream_consumer.next().await {
Some(Ok((record, high_watermark))) => Ok((record.into(), high_watermark)),

Ordering::Less => Some(
Unknown {
msg: format!(
"remote high watermark:{} is less than local:{} in topic:{}, it shouldn't decrease while consuming all data",
high_watermark,
self.high_watermark,
self.topic_name
),
}
.fail(),
),

Ordering::Equal => {
if record.offset + 1 == self.high_watermark {
info!("Consume all data successfully in topic:{}", self.topic_name);
self.stream_consumer = None;
}

Some(Ok(record.into()))
}
},

Some(Err(e)) => Some(Err(e).context(ConsumeAll {
Some(Err(e)) => Err(e).context(Consume {
topic_name: self.topic_name.clone(),
when: ConsumeAllWhen::PollStream,
})),

None => Some(
Unknown {
msg: format!(
"consuming stream return None due to unknown cause, topic:{}",
self.topic_name
),
}
.fail(),
),
when: ConsumeWhen::PollStream,
}),

None => Unknown {
msg: format!(
"consuming stream return None due to unknown cause, topic:{}",
self.topic_name
),
}
.fail(),
}
}
}
Expand Down Expand Up @@ -454,3 +393,22 @@ impl From<RecordAndOffset> for MessageAndOffset {
}
}
}

impl From<StartOffset> for KafkaStartOffset {
fn from(start_offset: StartOffset) -> Self {
match start_offset {
StartOffset::Earliest => KafkaStartOffset::Earliest,
StartOffset::Latest => KafkaStartOffset::Latest,
StartOffset::At(offset) => KafkaStartOffset::At(offset),
}
}
}

impl From<OffsetType> for OffsetAt {
fn from(offset_type: OffsetType) -> Self {
match offset_type {
OffsetType::EarliestOffset => OffsetAt::Earliest,
OffsetType::HighWaterMark => OffsetAt::Latest,
}
}
}
Loading

0 comments on commit e5dbcbb

Please sign in to comment.