From 12aa64069d93f4b7014a5fc8d024a76b2d049780 Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 21 Oct 2022 19:04:59 +0800 Subject: [PATCH] address CR and add test. --- Cargo.lock | 26 +-- components/message_queue/Cargo.toml | 14 +- components/message_queue/src/kafka/config.rs | 79 +++---- .../message_queue/src/kafka/kafka_impl.rs | 216 +++++++++++------- components/message_queue/src/kafka/mod.rs | 4 + components/message_queue/src/lib.rs | 32 +-- components/message_queue/src/tests.rs | 0 components/message_queue/src/tests/cases.rs | 116 ++++++++++ components/message_queue/src/tests/mod.rs | 7 + components/message_queue/src/tests/util.rs | 36 +++ 10 files changed, 371 insertions(+), 159 deletions(-) delete mode 100644 components/message_queue/src/tests.rs create mode 100644 components/message_queue/src/tests/cases.rs create mode 100644 components/message_queue/src/tests/mod.rs create mode 100644 components/message_queue/src/tests/util.rs diff --git a/Cargo.lock b/Cargo.lock index 177c27f600..086ac2adc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1424,19 +1424,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "dashmap" -version = "5.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" -dependencies = [ - "cfg-if 1.0.0", - "hashbrown", - "lock_api 0.4.9", - "once_cell", - "parking_lot_core 0.9.3", -] - [[package]] name = "datafusion" version = "9.0.0" @@ -3133,16 +3120,16 @@ name = "message_queue" version = "0.1.0" dependencies = [ "async-trait", + "chrono", "common_util", - "dashmap 5.4.0", "futures 0.3.21", "log", - "rand 0.7.3", "rskafka", "serde", "serde_derive", "snafu 0.6.10", - "time 0.3.15", + "tokio 1.20.1", + "uuid 1.1.2", ] [[package]] @@ -4840,11 +4827,11 @@ dependencies = [ [[package]] name = "rskafka" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47f86cd4975252119d94a5c202548a0ef037f7263b44be3130577bc4e4e9288a" +source = "git+https://github.com/influxdata/rskafka.git?rev=00988a564b1db0249d858065fc110476c075efad#00988a564b1db0249d858065fc110476c075efad" dependencies = [ "async-trait", "bytes 1.2.1", + "chrono", "crc32c", "flate2", "futures 0.3.21", @@ -4855,7 +4842,6 @@ dependencies = [ "rand 0.8.5", "snap", "thiserror", - "time 0.3.15", "tokio 1.20.1", "tracing", ] @@ -6938,7 +6924,7 @@ version = "0.0.1" source = "git+https://github.com/tikv/yatp.git?rev=4b71f8abd86890f0d1e95778c2b6bf5a9ee4c502#4b71f8abd86890f0d1e95778c2b6bf5a9ee4c502" dependencies = [ "crossbeam-deque 0.7.4", - "dashmap 3.11.10", + "dashmap", "fail", "lazy_static", "num_cpus", diff --git a/components/message_queue/Cargo.toml b/components/message_queue/Cargo.toml index 277a6ed15b..efc0937a95 100644 --- a/components/message_queue/Cargo.toml +++ b/components/message_queue/Cargo.toml @@ -12,13 +12,17 @@ common_util = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } snafu = { workspace = true } -rskafka = { version = "0.3.0", default-features = false, features = ["compression-gzip", "compression-lz4", "compression-snappy"] } -# rskafka = "0.3.0" -time = "0.3.15" +chrono = { workspace = true } async-trait = { workspace = true } -dashmap = "5.4.0" log = { workspace = true } futures = { workspace = true } +tokio = { workspace = true } + +[dependencies.rskafka] +git = "https://github.com/influxdata/rskafka.git" +rev = "00988a564b1db0249d858065fc110476c075efad" +default-features = false +features = ["compression-gzip", "compression-lz4", "compression-snappy"] [dev-dependencies] -rand = { workspace = true } +uuid = { version = "1.0", features = ["v4"] } diff --git a/components/message_queue/src/kafka/config.rs b/components/message_queue/src/kafka/config.rs index 6bf631a392..d302e7334b 100644 --- a/components/message_queue/src/kafka/config.rs +++ b/components/message_queue/src/kafka/config.rs @@ -1,3 +1,7 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Kafka implementation's config + use serde_derive::{Deserialize, Serialize}; /// Generic client config that is used for consumers, producers as well as admin @@ -6,85 +10,76 @@ use serde_derive::{Deserialize, Serialize}; #[serde(default)] pub struct Config { pub client_config: ClientConfig, - pub topic_creation_config: TopicCreationConfig, - pub wal_config: WalConfig, + pub topic_management_config: TopicManagementConfig, + pub consumer_config: ConsumerConfig, + // TODO: may need some config options for producer, + // but it seems nothing needed now. } -/// Generic client config that is used for consumers, producers as well as admin -/// operations (like "create topic"). #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] pub struct ClientConfig { - /// The endpoint of boost broker, must be set and will panic if not. + /// The endpoint of boost broker, must be set and will panic if found it + /// None. pub boost_broker: Option, /// Maximum message size in bytes. /// - /// extracted from `max_message_size`. Defaults to `None` (rskafka default). + /// Defaults to `None` (rskafka default). pub max_message_size: Option, /// Optional SOCKS5 proxy to use for connecting to the brokers. /// - /// extracted from `socks5_proxy`. Defaults to `None`. + /// Defaults to `None`. pub socks5_proxy: Option, } /// Config for topic creation. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] -pub struct TopicCreationConfig { +pub struct TopicManagementConfig { /// Replication factor. /// /// Extracted from `replication_factor` option. Defaults to `1`. - pub replication_factor: i16, + pub create_replication_factor: i16, - /// Timeout in ms. + /// The maximum amount of time to wait while creating topic. /// - /// Extracted from `timeout_ms` option. Defaults to `5_000`. - pub timeout_ms: i32, + /// Defaults to `5_000`. + pub create_max_wait_ms: i32, + + /// The maximum amount of time to wait while deleting records in topic. + /// + /// Defaults to `5_000`. + pub delete_max_wait_ms: i32, } -impl Default for TopicCreationConfig { +impl Default for TopicManagementConfig { fn default() -> Self { Self { - replication_factor: 1, - timeout_ms: 5000, + create_replication_factor: 1, + create_max_wait_ms: 5000, + delete_max_wait_ms: 5000, } } } /// Config for consumers. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] -pub struct WalConfig { - /// Will wait for at least `min_batch_size` bytes of data +pub struct ConsumerConfig { + /// The maximum amount of time to wait for data before returning. /// - /// Extracted from `consumer_max_wait_ms`. Defaults to `None` (rskafka - /// default). - pub reader_max_wait_ms: Option, + /// Defaults to `None` (rskafka default). + pub max_wait_ms: Option, - /// The maximum amount of data to fetch in a single batch + /// The maximum amount of data for the consumer to fetch in a single batch. /// - /// Extracted from `consumer_min_batch_size`. Defaults to `None` (rskafka - /// default). - pub reader_min_batch_size: Option, + /// Defaults to `None` (rskafka default). + pub min_batch_size: Option, - /// The maximum amount of time to wait for data before returning + /// Will wait for at least `min_batch_size` bytes of data. /// - /// Extracted from `consumer_max_batch_size`. Defaults to `None` (rskafka - /// default). - pub reader_max_batch_size: Option, - - pub reader_consume_all_wait_ms: i32, -} - -impl Default for WalConfig { - fn default() -> Self { - Self { - reader_max_wait_ms: Default::default(), - reader_min_batch_size: Default::default(), - reader_max_batch_size: Default::default(), - reader_consume_all_wait_ms: 5000, - } - } + /// Defaults to `None` (rskafka default). + pub max_batch_size: Option, } diff --git a/components/message_queue/src/kafka/kafka_impl.rs b/components/message_queue/src/kafka/kafka_impl.rs index db3e8ae4f8..6421cf3d49 100644 --- a/components/message_queue/src/kafka/kafka_impl.rs +++ b/components/message_queue/src/kafka/kafka_impl.rs @@ -1,8 +1,11 @@ -use std::{cmp::Ordering, fmt::Display, sync::Arc}; +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Kafka implementation's detail + +use std::{cmp::Ordering, collections::HashMap, fmt::Display, sync::Arc}; use async_trait::async_trait; use common_util::define_result; -use dashmap::DashMap; use futures::StreamExt; use log::info; use rskafka::{ @@ -10,22 +13,26 @@ use rskafka::{ consumer::{StartOffset, StreamConsumer, StreamConsumerBuilder}, controller::ControllerClient, error::{Error as RskafkaError, ProtocolError}, - partition::{Compression, OffsetAt, PartitionClient}, + partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling}, Client, ClientBuilder, }, record::{Record, RecordAndOffset}, }; -use snafu::{ensure, ResultExt, Snafu}; +use snafu::{ensure, Backtrace, ResultExt, Snafu}; +use tokio::sync::RwLock; -use super::config::WalConfig; -use crate::{kafka::config::Config, ConsumeAllIterator, Message, MessageAndOffset, MessageQueue}; +use crate::{ + kafka::config::{Config, ConsumerConfig}, + ConsumeIterator, Message, MessageAndOffset, MessageQueue, Offset, +}; -/// The topic(with just one partition) client for Kafka +/// The topic (with just one partition) client for Kafka // -/// `Arc` is needed to ensure its lifetime. Because in future's gc process, -/// it may has removed from pool but still is still being used. +/// `Arc` is needed to ensure its lifetime because in future's gc process, +/// it may has removed from pool but be still in use. type TopicClientRef = Arc; -const PARTITIONS_NUM: i32 = 1; +const PARTITION_NUM: i32 = 1; +const DEFAULT_PARTITION: i32 = 0; #[derive(Debug, Snafu)] pub enum Error { @@ -60,19 +67,24 @@ pub enum Error { }, #[snafu(display( - "Race happened in scanning partition in topic:{}, when:{}", + "Race happened in scanning partition in topic:{}, when:{}, msg:[{}], backtrace:{}", topic_name, - when + when, + msg, + backtrace ))] ConsumeAllRace { topic_name: String, when: ConsumeAllWhen, + msg: String, + backtrace: Backtrace, }, - #[snafu(display("Timeout happened while polling the stream for consuming all data in topic:{}, timeout_opt:{}", topic_name, timeout_opt))] + #[snafu(display("Timeout happened while polling the stream for consuming all data in topic:{}, timeout_opt:{}, backtrace:{}", topic_name, timeout_opt, backtrace))] ConsumeAllTimeout { topic_name: String, timeout_opt: String, + backtrace: Backtrace, }, #[snafu(display( @@ -87,10 +99,12 @@ pub enum Error { source: RskafkaError, }, - #[snafu(display("Unknown error occurred, msg:{}", msg))] - Unknown { msg: String }, + #[snafu(display("Unknown error occurred, msg:[{}], backtrace:{}", msg, backtrace))] + Unknown { msg: String, backtrace: Backtrace }, } +define_result!(Error); + #[derive(Debug)] pub enum ConsumeAllWhen { Start, @@ -108,14 +122,12 @@ impl Display for ConsumeAllWhen { } } -define_result!(Error); - -struct KafkaImpl { +pub struct KafkaImpl { config: Config, client: Client, controller_client: ControllerClient, // TODO: maybe gc is needed for `partition_client_pool`. - topic_client_pool: DashMap, + topic_client_pool: RwLock>, } impl KafkaImpl { @@ -123,7 +135,7 @@ impl KafkaImpl { info!("Kafka init, config:{:?}", config); if config.client_config.boost_broker.is_none() { - panic!("The boost_broker must be set"); + panic!("The boost broker must be set"); } let mut client_builder = @@ -140,55 +152,79 @@ impl KafkaImpl { config, client, controller_client, - topic_client_pool: DashMap::default(), + topic_client_pool: RwLock::new(HashMap::new()), }) } - fn get_or_create_topic_client( + async fn get_or_create_topic_client( &self, topic_name: &str, ) -> std::result::Result { - Ok(self - .topic_client_pool - .entry(topic_name.to_string()) - .or_insert(Arc::new( - self.client.partition_client(topic_name, PARTITIONS_NUM)?, - )) - .clone()) + { + let topic_client_pool = self.topic_client_pool.read().await; + // If found, just return it + if let Some(client) = topic_client_pool.get(topic_name) { + return Ok(client.clone()); + } + } + + // Otherwise, we should make a double-check first, + // and if still not found(other thread may has inserted it), + // we should create it. + let mut topic_client_pool = self.topic_client_pool.write().await; + if let Some(client) = topic_client_pool.get(topic_name) { + Ok(client.clone()) + } else { + let client = Arc::new( + self.client + .partition_client(topic_name, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + .await?, + ); + topic_client_pool.insert(topic_name.to_string(), client.clone()); + Ok(client) + } } } #[async_trait] impl MessageQueue for KafkaImpl { - type ConsumeAllIterator = KafkaConsumeAllIterator; + type ConsumeIterator = KafkaConsumeIterator; type Error = Error; async fn create_topic_if_not_exist(&self, topic_name: &str) -> Result<()> { // Check in partition_client_pool first, maybe has exist. - if self.topic_client_pool.contains_key(topic_name) { - info!( - "Topic:{} has exist in kafka and connection to the topic is still alive.", - topic_name - ); - return Ok(()); + { + let topic_client_pool = self.topic_client_pool.read().await; + + if topic_client_pool.contains_key(topic_name) { + info!( + "Topic:{} has exist in kafka and connection to the topic is still alive", + topic_name + ); + return Ok(()); + } } // Create topic in Kafka. - let topic_creation_config = &self.config.topic_creation_config; + let topic_management_config = &self.config.topic_management_config; info!("Try to create topic:{} in kafka", topic_name); let result = self .controller_client .create_topic( topic_name, - PARTITIONS_NUM, - topic_creation_config.replication_factor, - topic_creation_config.timeout_ms, + PARTITION_NUM, + topic_management_config.create_replication_factor, + topic_management_config.create_max_wait_ms, ) .await; match result { // Race condition between check and creation action, that's OK. - Ok(_) | Err(RskafkaError::ServerError(ProtocolError::TopicAlreadyExists, ..)) => Ok(()), + Ok(_) + | Err(RskafkaError::ServerError { + protocol_error: ProtocolError::TopicAlreadyExists, + .. + }) => Ok(()), Err(e) => Err(e).context(CreateTopic { topic_name: topic_name.to_string(), @@ -196,14 +232,15 @@ impl MessageQueue for KafkaImpl { } } - async fn produce(&self, topic_name: &str, message: Vec) -> Result> { + async fn produce(&self, topic_name: &str, messages: Vec) -> Result> { let topic_client = self .get_or_create_topic_client(topic_name) + .await .context(Produce { topic_name: topic_name.to_string(), })?; - let records: Vec = message.into_iter().map(|m| m.into()).collect(); + let records: Vec = messages.into_iter().map(|m| m.into()).collect(); Ok(topic_client .produce(records, Compression::default()) .await @@ -212,48 +249,64 @@ impl MessageQueue for KafkaImpl { })?) } - async fn consume_all(&self, topic_name: &str) -> Result { - let topic_client = self - .get_or_create_topic_client(topic_name) - .context(ConsumeAll { - topic_name: topic_name.to_string(), - when: ConsumeAllWhen::Start, - })?; - KafkaConsumeAllIterator::new(topic_name, self.config.wal_config.clone(), topic_client).await + async fn consume_all(&self, topic_name: &str) -> Result { + info!("Need to consume all data in kafka topic:{}", topic_name); + + let topic_client = + self.get_or_create_topic_client(topic_name) + .await + .context(ConsumeAll { + topic_name: topic_name.to_string(), + when: ConsumeAllWhen::Start, + })?; + KafkaConsumeIterator::new( + topic_name, + self.config.consumer_config.clone(), + topic_client, + ) + .await } - async fn delete_up_to(&self, topic_name: &str, offset: i64) -> Result<()> { - let topic_client = self - .get_or_create_topic_client(topic_name) - .context(DeleteUpTo { - topic_name: topic_name.to_string(), - offset, - })?; + async fn delete_up_to(&self, topic_name: &str, offset: Offset) -> Result<()> { + let topic_client = + self.get_or_create_topic_client(topic_name) + .await + .context(DeleteUpTo { + topic_name: topic_name.to_string(), + offset, + })?; + topic_client - .delete_records(offset, self.config.wal_config.reader_max_wait_ms.unwrap()) + .delete_records( + offset, + self.config.topic_management_config.delete_max_wait_ms, + ) .await .context(DeleteUpTo { topic_name: topic_name.to_string(), offset, })?; + Ok(()) } // TODO: should design a stream consume method for slave node to fetch wals. } -struct KafkaConsumeAllIterator { +pub struct KafkaConsumeIterator { topic_name: String, - consuming_stream: Option, + stream_consumer: Option, high_watermark: i64, } -impl KafkaConsumeAllIterator { +impl KafkaConsumeIterator { pub async fn new( topic_name: &str, - config: WalConfig, + config: ConsumerConfig, topic_client: TopicClientRef, ) -> Result { + info!("Init consumer of topic:{}, config:{:?}", topic_name, config); + // We should make sure the partition is not empty firstly. let start_offset = topic_client @@ -274,24 +327,28 @@ impl KafkaConsumeAllIterator { ensure!( start_offset <= high_watermark, ConsumeAllRace { - topic_name: topic_name.to_string(), + topic_name, + msg: format!( + "high watermark:{} is smaller than start offset:{}", + high_watermark, start_offset + ), when: ConsumeAllWhen::InitIterator } ); let mut stream_builder = StreamConsumerBuilder::new(topic_client, StartOffset::Earliest); - let consuming_stream = if start_offset < high_watermark { + let stream_consumer = if start_offset < high_watermark { // If not empty, make consuming stream. - if let Some(reader_max_wait_ms) = config.reader_max_wait_ms { - stream_builder = stream_builder.with_max_wait_ms(reader_max_wait_ms) + if let Some(max_wait_ms) = config.max_wait_ms { + stream_builder = stream_builder.with_max_wait_ms(max_wait_ms) } - if let Some(reader_min_batch_size) = config.reader_min_batch_size { - stream_builder = stream_builder.with_min_batch_size(reader_min_batch_size) + if let Some(min_batch_size) = config.min_batch_size { + stream_builder = stream_builder.with_min_batch_size(min_batch_size) } - if let Some(reader_max_batch_size) = config.reader_max_batch_size { - stream_builder = stream_builder.with_min_batch_size(reader_max_batch_size) + if let Some(max_batch_size) = config.max_batch_size { + stream_builder = stream_builder.with_min_batch_size(max_batch_size) } Some(stream_builder.build()) @@ -299,20 +356,20 @@ impl KafkaConsumeAllIterator { None }; - Ok(KafkaConsumeAllIterator { + Ok(KafkaConsumeIterator { topic_name: topic_name.to_string(), - consuming_stream, + stream_consumer, high_watermark, }) } } #[async_trait] -impl ConsumeAllIterator for KafkaConsumeAllIterator { +impl ConsumeIterator for KafkaConsumeIterator { type Error = Error; async fn next_message(&mut self) -> Option> { - let stream = match &mut self.consuming_stream { + let stream = match &mut self.stream_consumer { Some(stream) => stream, None => { return None; @@ -325,6 +382,7 @@ impl ConsumeAllIterator for KafkaConsumeAllIterator { Ordering::Greater => Some( ConsumeAllRace { topic_name: self.topic_name.clone(), + msg: format!("remote high watermark:{} is greater than local:{}", high_watermark, self.high_watermark), when: ConsumeAllWhen::PollStream, } .fail(), @@ -333,7 +391,9 @@ impl ConsumeAllIterator for KafkaConsumeAllIterator { Ordering::Less => Some( Unknown { msg: format!( - "High watermark decrease while consuming all data in topic:{}", + "remote high watermark:{} is less than local:{} in topic:{}, it shouldn't decrease while consuming all data", + high_watermark, + self.high_watermark, self.topic_name ), } @@ -343,7 +403,7 @@ impl ConsumeAllIterator for KafkaConsumeAllIterator { Ordering::Equal => { if record.offset + 1 == self.high_watermark { info!("Consume all data successfully in topic:{}", self.topic_name); - self.consuming_stream = None; + self.stream_consumer = None; } Some(Ok(record.into())) @@ -358,7 +418,7 @@ impl ConsumeAllIterator for KafkaConsumeAllIterator { None => Some( Unknown { msg: format!( - "Consuming stream return None due to unknown cause, topic:{}", + "consuming stream return None due to unknown cause, topic:{}", self.topic_name ), } diff --git a/components/message_queue/src/kafka/mod.rs b/components/message_queue/src/kafka/mod.rs index 33b00f0245..961a34d4c9 100644 --- a/components/message_queue/src/kafka/mod.rs +++ b/components/message_queue/src/kafka/mod.rs @@ -1,2 +1,6 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Message queue component's Kafka implementation + pub mod config; pub mod kafka_impl; diff --git a/components/message_queue/src/lib.rs b/components/message_queue/src/lib.rs index 4831157150..1e3db2c493 100644 --- a/components/message_queue/src/lib.rs +++ b/components/message_queue/src/lib.rs @@ -1,28 +1,32 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Message queue component + pub mod kafka; +#[cfg(any(test, feature = "test"))] +pub mod tests; use std::{collections::BTreeMap, result::Result}; use async_trait::async_trait; -use time::OffsetDateTime; +use chrono::{DateTime, Utc}; + +pub type Offset = i64; -/// Topic's producer -/// -/// In ceresdb every topic just has one partition. -/// Because there won't be a huge number of topic(the upper bound is just about -/// 1000). +/// Message queue interface supporting the methods needed in wal module. #[async_trait] pub trait MessageQueue: Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; - type ConsumeAllIterator: ConsumeAllIterator + Send; + type ConsumeIterator: ConsumeIterator + Send; async fn create_topic_if_not_exist(&self, topic_name: &str) -> Result<(), Self::Error>; async fn produce( &self, topic_name: &str, - message: Vec, - ) -> Result, Self::Error>; - async fn consume_all(&self, topic_name: &str) -> Result; - async fn delete_up_to(&self, topic_name: &str, offset: i64) -> Result<(), Self::Error>; + messages: Vec, + ) -> Result, Self::Error>; + async fn consume_all(&self, topic_name: &str) -> Result; + async fn delete_up_to(&self, topic_name: &str, offset: Offset) -> Result<(), Self::Error>; // TODO: should design a stream consume method for slave node to fetch wals. } @@ -32,18 +36,18 @@ pub struct Message { pub key: Option>, pub value: Option>, pub headers: BTreeMap>, - pub timestamp: OffsetDateTime, + pub timestamp: DateTime, } /// Record that has offset information attached. #[derive(Debug, Clone, PartialEq, Eq)] pub struct MessageAndOffset { pub message: Message, - pub offset: i64, + pub offset: Offset, } #[async_trait] -pub trait ConsumeAllIterator { +pub trait ConsumeIterator { type Error: std::error::Error + Send + Sync + 'static; async fn next_message(&mut self) -> Option>; diff --git a/components/message_queue/src/tests.rs b/components/message_queue/src/tests.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/components/message_queue/src/tests/cases.rs b/components/message_queue/src/tests/cases.rs new file mode 100644 index 0000000000..61776134bc --- /dev/null +++ b/components/message_queue/src/tests/cases.rs @@ -0,0 +1,116 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Test cases for message queue + +use crate::{ + kafka::{config::Config, kafka_impl::KafkaImpl}, + tests::util::{generate_test_data, random_topic_name}, + ConsumeIterator, Message, MessageQueue, +}; + +#[tokio::test] +#[ignore] +async fn test_kafka() { + let mut config = Config::default(); + config.client_config.boost_broker = Some("127.0.0.1:9011".to_string()); + let kafka_impl = KafkaImpl::new(config).await.unwrap(); + + run_message_queue_test(kafka_impl).await; +} + +async fn test_create_topic(message_queue: &T) { + assert!(message_queue + .create_topic_if_not_exist(random_topic_name().as_str()) + .await + .is_ok()); + // Topic has already existed is ok. + assert!(message_queue + .create_topic_if_not_exist(random_topic_name().as_str()) + .await + .is_ok()); +} + +async fn run_message_queue_test(message_queue: T) { + test_create_topic(&message_queue).await; + + test_simple_produce_consume(&message_queue).await; + + test_delete(&message_queue).await; + + test_consume_empty_topic(&message_queue).await; +} + +async fn test_simple_produce_consume(message_queue: &T) { + let topic_name = random_topic_name(); + assert!(message_queue + .create_topic_if_not_exist(topic_name.as_str()) + .await + .is_ok()); + + // Call produce to push messages at first, then call consume to pull back and + // compare. + let test_messages = generate_test_data(10); + assert!(message_queue + .produce(&topic_name, test_messages.clone()) + .await + .is_ok()); + consume_all_and_compare(message_queue, &topic_name, 0, &test_messages).await; +} + +async fn test_delete(message_queue: &T) { + let topic_name = random_topic_name(); + assert!(message_queue + .create_topic_if_not_exist(topic_name.as_str()) + .await + .is_ok()); + + // Test consume and produce. + let test_messages = generate_test_data(10); + assert!(message_queue + .produce(&topic_name, test_messages.clone()) + .await + .is_ok()); + consume_all_and_compare(message_queue, &topic_name, 0, &test_messages).await; + + // Test consume after deleting. + assert!(message_queue.delete_up_to(&topic_name, 3).await.is_ok()); + consume_all_and_compare(message_queue, &topic_name, 3, &test_messages).await; +} + +async fn consume_all_and_compare( + message_queue: &T, + topic_name: &str, + start_offset: i64, + test_messages: &[Message], +) { + let iter = message_queue.consume_all(topic_name).await; + assert!(iter.is_ok()); + let mut iter = iter.unwrap(); + let mut offset = start_offset; + let mut cnt = 0; + while let Some(message_and_offset) = iter.next_message().await { + assert!(message_and_offset.is_ok()); + let message_and_offset = message_and_offset.unwrap(); + assert_eq!(message_and_offset.offset, offset); + assert_eq!(message_and_offset.message, test_messages[offset as usize]); + + offset += 1; + cnt += 1; + } + assert_eq!(cnt, test_messages.len() as i64 - start_offset); +} + +async fn test_consume_empty_topic(message_queue: &T) { + let topic_name = random_topic_name(); + assert!(message_queue + .create_topic_if_not_exist(topic_name.as_str()) + .await + .is_ok()); + + // Call produce to push messages at first, then call consume to pull back and + // compare. + let iter = message_queue.consume_all(&topic_name).await; + assert!(iter.is_ok()); + let mut iter = iter.unwrap(); + assert!(iter.next_message().await.is_none()); +} diff --git a/components/message_queue/src/tests/mod.rs b/components/message_queue/src/tests/mod.rs new file mode 100644 index 0000000000..33922c6bb2 --- /dev/null +++ b/components/message_queue/src/tests/mod.rs @@ -0,0 +1,7 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Tests for message queue + +#[cfg_attr(feature = "test", allow(dead_code, unused_imports))] +mod cases; +pub mod util; diff --git a/components/message_queue/src/tests/util.rs b/components/message_queue/src/tests/util.rs new file mode 100644 index 0000000000..abd80b88df --- /dev/null +++ b/components/message_queue/src/tests/util.rs @@ -0,0 +1,36 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Test util for message queue + +use std::collections::BTreeMap; + +use chrono::{DateTime, Duration, TimeZone, Utc}; + +use crate::Message; + +pub fn generate_test_data(cnt: usize) -> Vec { + let mut messages = Vec::with_capacity(cnt); + let base_ts = Utc.timestamp_millis(1337); + for i in 0..cnt { + let key = format!("test_key_{}", i); + let val = format!("test_val_{}", i); + let timestamp = base_ts + Duration::milliseconds(i as i64); + + messages.push(message(key.as_bytes(), val.as_bytes(), timestamp)); + } + + messages +} + +fn message(key: &[u8], value: &[u8], timestamp: DateTime) -> Message { + Message { + key: Some(key.to_vec()), + value: Some(value.to_vec()), + headers: BTreeMap::new(), + timestamp, + } +} + +pub fn random_topic_name() -> String { + format!("test_topic_{}", uuid::Uuid::new_v4()) +}