Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make mq interface simpler. #337

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 96 additions & 138 deletions components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@

//! Kafka implementation's detail

use std::{cmp::Ordering, collections::HashMap, fmt::Display, sync::Arc};
use std::{collections::HashMap, fmt::Display, sync::Arc};

use async_trait::async_trait;
use common_util::define_result;
use futures::StreamExt;
use log::info;
use rskafka::{
client::{
consumer::{StartOffset, StreamConsumer, StreamConsumerBuilder},
consumer::{StartOffset as KafkaStartOffset, StreamConsumer, StreamConsumerBuilder},
controller::ControllerClient,
error::{Error as RskafkaError, ProtocolError},
partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling},
Client, ClientBuilder,
},
record::{Record, RecordAndOffset},
};
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use snafu::{Backtrace, ResultExt, Snafu};
use tokio::sync::RwLock;

use crate::{
kafka::config::{Config, ConsumerConfig},
ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset,
ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset, OffsetType, StartOffset,
};

/// The topic (with just one partition) client for Kafka
Expand All @@ -48,43 +48,34 @@ pub enum Error {
source: RskafkaError,
},

#[snafu(display("Failed to produce to kafka topic:{}, err:{}", topic_name, source))]
Produce {
topic_name: String,
source: RskafkaError,
},

#[snafu(display(
"Failed to consume all data in topic:{} when:{}, source:{}",
"Failed to fetch offset(type:{}) from kafka topic:{}, err:{}",
offset_type,
topic_name,
when,
source
))]
ConsumeAll {
FetchOffset {
topic_name: String,
source: RskafkaError,
offset_type: OffsetType,
},

#[snafu(display("Failed to produce to kafka topic:{}, err:{}", topic_name, source))]
Produce {
topic_name: String,
source: RskafkaError,
when: ConsumeAllWhen,
},

#[snafu(display(
"Race happened in scanning partition in topic:{}, when:{}, msg:[{}], backtrace:{}",
"Failed to consume in topic:{} when:{}, source:{}",
topic_name,
when,
msg,
backtrace
source
))]
ConsumeAllRace {
topic_name: String,
when: ConsumeAllWhen,
msg: String,
backtrace: Backtrace,
},

#[snafu(display("Timeout happened while polling the stream for consuming all data in topic:{}, timeout_opt:{}, backtrace:{}", topic_name, timeout_opt, backtrace))]
ConsumeAllTimeout {
Consume {
topic_name: String,
timeout_opt: String,
backtrace: Backtrace,
source: RskafkaError,
when: ConsumeWhen,
},

#[snafu(display(
Expand All @@ -106,18 +97,16 @@ pub enum Error {
define_result!(Error);

#[derive(Debug)]
pub enum ConsumeAllWhen {
pub enum ConsumeWhen {
Start,
InitIterator,
PollStream,
}

impl Display for ConsumeAllWhen {
impl Display for ConsumeWhen {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConsumeAllWhen::Start => f.write_str("start"),
ConsumeAllWhen::InitIterator => f.write_str("init_iterator"),
ConsumeAllWhen::PollStream => f.write_str("poll_stream"),
ConsumeWhen::Start => f.write_str("start"),
ConsumeWhen::PollStream => f.write_str("poll_stream"),
}
}
}
Expand Down Expand Up @@ -249,22 +238,44 @@ impl MessageQueue for KafkaImpl {
})?)
}

async fn consume_all(&self, topic_name: &str) -> Result<KafkaConsumeIterator> {
info!("Need to consume all data in kafka topic:{}", topic_name);

async fn fetch_offset(&self, topic_name: &str, offset_type: OffsetType) -> Result<Offset> {
let topic_client =
self.get_or_create_topic_client(topic_name)
.await
.context(ConsumeAll {
.context(FetchOffset {
topic_name: topic_name.to_string(),
when: ConsumeAllWhen::Start,
offset_type,
})?;
KafkaConsumeIterator::new(

topic_client
.get_offset(offset_type.into())
.await
.context(FetchOffset {
topic_name: topic_name.to_string(),
offset_type,
})
}

async fn consume(
&self,
topic_name: &str,
start_offset: StartOffset,
) -> Result<KafkaConsumeIterator> {
info!("Consume data in kafka topic:{}", topic_name);

let topic_client = self
.get_or_create_topic_client(topic_name)
.await
.context(Consume {
topic_name: topic_name.to_string(),
when: ConsumeWhen::Start,
})?;
Ok(KafkaConsumeIterator::new(
topic_name,
self.config.consumer_config.clone(),
topic_client,
)
.await
start_offset,
))
}

async fn delete_up_to(&self, topic_name: &str, offset: Offset) -> Result<()> {
Expand All @@ -289,56 +300,26 @@ impl MessageQueue for KafkaImpl {

Ok(())
}

// TODO: should design a stream consume method for slave node to fetch wals.
}

pub struct KafkaConsumeIterator {
topic_name: String,
stream_consumer: Option<StreamConsumer>,
high_watermark: i64,
stream_consumer: StreamConsumer,
}

impl KafkaConsumeIterator {
pub async fn new(
pub fn new(
topic_name: &str,
config: ConsumerConfig,
topic_client: TopicClientRef,
) -> Result<Self> {
start_offset: StartOffset,
) -> Self {
info!("Init consumer of topic:{}, config:{:?}", topic_name, config);

// We should make sure the partition is not empty firstly.
let start_offset =
topic_client
.get_offset(OffsetAt::Earliest)
.await
.context(ConsumeAll {
topic_name: topic_name.to_string(),
when: ConsumeAllWhen::InitIterator,
})?;
let high_watermark =
topic_client
.get_offset(OffsetAt::Latest)
.await
.context(ConsumeAll {
topic_name: topic_name.to_string(),
when: ConsumeAllWhen::InitIterator,
})?;
ensure!(
start_offset <= high_watermark,
ConsumeAllRace {
topic_name,
msg: format!(
"high watermark:{} is smaller than start offset:{}",
high_watermark, start_offset
),
when: ConsumeAllWhen::InitIterator
}
);
// If not empty, make consuming stream.
let stream_consumer = {
let mut stream_builder = StreamConsumerBuilder::new(topic_client, start_offset.into());

let mut stream_builder = StreamConsumerBuilder::new(topic_client, StartOffset::Earliest);
let stream_consumer = if start_offset < high_watermark {
// If not empty, make consuming stream.
if let Some(max_wait_ms) = config.max_wait_ms {
stream_builder = stream_builder.with_max_wait_ms(max_wait_ms)
}
Expand All @@ -351,79 +332,37 @@ impl KafkaConsumeIterator {
stream_builder = stream_builder.with_min_batch_size(max_batch_size)
}

Some(stream_builder.build())
} else {
None
stream_builder.build()
};

Ok(KafkaConsumeIterator {
KafkaConsumeIterator {
topic_name: topic_name.to_string(),
stream_consumer,
high_watermark,
})
}
}
}

#[async_trait]
impl ConsumeIterator for KafkaConsumeIterator {
type Error = Error;

async fn next_message(&mut self) -> Option<Result<MessageAndOffset>> {
let stream = match &mut self.stream_consumer {
Some(stream) => stream,
None => {
return None;
}
};

async fn next_message(&mut self) -> Result<(MessageAndOffset, Offset)> {
// Return message and offset from buffer.
match stream.next().await {
Some(Ok((record, high_watermark))) => match high_watermark.cmp(&self.high_watermark) {
Ordering::Greater => Some(
ConsumeAllRace {
topic_name: self.topic_name.clone(),
msg: format!("remote high watermark:{} is greater than local:{}", high_watermark, self.high_watermark),
when: ConsumeAllWhen::PollStream,
}
.fail(),
),
match self.stream_consumer.next().await {
Some(Ok((record, high_watermark))) => Ok((record.into(), high_watermark)),

Ordering::Less => Some(
Unknown {
msg: format!(
"remote high watermark:{} is less than local:{} in topic:{}, it shouldn't decrease while consuming all data",
high_watermark,
self.high_watermark,
self.topic_name
),
}
.fail(),
),

Ordering::Equal => {
if record.offset + 1 == self.high_watermark {
info!("Consume all data successfully in topic:{}", self.topic_name);
self.stream_consumer = None;
}

Some(Ok(record.into()))
}
},

Some(Err(e)) => Some(Err(e).context(ConsumeAll {
Some(Err(e)) => Err(e).context(Consume {
topic_name: self.topic_name.clone(),
when: ConsumeAllWhen::PollStream,
})),

None => Some(
Unknown {
msg: format!(
"consuming stream return None due to unknown cause, topic:{}",
self.topic_name
),
}
.fail(),
),
when: ConsumeWhen::PollStream,
}),

None => Unknown {
msg: format!(
"consuming stream return None due to unknown cause, topic:{}",
self.topic_name
),
}
.fail(),
}
}
}
Expand Down Expand Up @@ -454,3 +393,22 @@ impl From<RecordAndOffset> for MessageAndOffset {
}
}
}

impl From<StartOffset> for KafkaStartOffset {
fn from(start_offset: StartOffset) -> Self {
match start_offset {
StartOffset::Earliest => KafkaStartOffset::Earliest,
StartOffset::Latest => KafkaStartOffset::Latest,
StartOffset::At(offset) => KafkaStartOffset::At(offset),
}
}
}

impl From<OffsetType> for OffsetAt {
fn from(offset_type: OffsetType) -> Self {
match offset_type {
OffsetType::EarliestOffset => OffsetAt::Earliest,
OffsetType::HighWaterMark => OffsetAt::Latest,
}
}
}
Loading