diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml index 30ea791da..a37415f17 100644 --- a/.github/workflows/rust_build.yml +++ b/.github/workflows/rust_build.yml @@ -26,4 +26,4 @@ jobs: - name: Unit Test working-directory: ./rust - run: cargo test \ No newline at end of file + run: cargo test -- --nocapture \ No newline at end of file diff --git a/rust/Cargo.toml b/rust/Cargo.toml index dae0b60ee..689ab0e5c 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -18,6 +18,18 @@ name = "rocketmq" version = "0.1.0" edition = "2021" +rust-version = "1.61" +authors = [ + "SSpirits ", + "Zhanhui Li ", +] + +license = "MIT/Apache-2.0" +readme = "./README.md" +repository = "https://github.com/apache/rocketmq-clients" +documentation = "https://docs.rs/rocketmq" +description = "Rust client for Apache RocketMQ" +keywords = ["rocketmq", "api", "client", "sdk", "grpc"] [dependencies] tokio = { version = "1", features = ["full"] } @@ -42,16 +54,15 @@ slog-json = "2.6.1" opentelemetry = { version = "0.19.0", features = ["metrics", "rt-tokio"] } opentelemetry-otlp = { version = "0.12.0", features = ["metrics", "grpc-tonic"] } +minitrace = "0.4" byteorder = "1" mac_address = "1.1.4" hex = "0.4.3" -time = "0.3.19" +time = "0.3" once_cell = "1.9.0" tokio-stream="0.1.12" -minitrace = "0.4.1" - mockall = "0.11.4" mockall_double= "0.3.0" diff --git a/rust/README.md b/rust/README.md index e8621d5fd..01814945d 100644 --- a/rust/README.md +++ b/rust/README.md @@ -6,13 +6,13 @@ Here is the rust implementation of the client for [Apache RocketMQ](https://rocketmq.apache.org/). Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client), the current implementation is based on separating architecture for computing and storage, which is the more recommended way to access the RocketMQ service. -Here are some preparations you may need to know (or refer to [here](https://rocketmq.apache.org/docs/quickStart/02quickstart)). +Here are some preparations you may need to know [Quick Start](https://rocketmq.apache.org/docs/quickStart/02quickstart). ## Getting Started ### Requirements -1. rust and cargo +1. rust toolchain, rocketmq's MSRV is 1.61. 2. protoc 3.15.0+ 3. setup name server, broker, and [proxy](https://github.com/apache/rocketmq/tree/develop/proxy). diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs index c1b00d1ce..a7ad0ab70 100644 --- a/rust/examples/producer.rs +++ b/rust/examples/producer.rs @@ -15,7 +15,7 @@ * limitations under the License. */ use rocketmq::conf::{ClientOption, ProducerOption}; -use rocketmq::model::message::MessageImpl; +use rocketmq::model::message::MessageBuilder; use rocketmq::Producer; #[tokio::main] @@ -34,9 +34,9 @@ async fn main() { producer.start().await.unwrap(); // build message - let message = MessageImpl::builder() + let message = MessageBuilder::builder() .set_topic("test_topic") - .set_tags("test_tag") + .set_tag("test_tag") .set_body("hello world".as_bytes().to_vec()) .build() .unwrap(); diff --git a/rust/src/client.rs b/rust/src/client.rs index fe5cf340c..11e4419ab 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -486,7 +486,8 @@ impl Client { } #[cfg(test)] -mod tests { +pub(crate) mod tests { + use lazy_static::lazy_static; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread::sleep; @@ -507,6 +508,11 @@ mod tests { use super::*; + lazy_static! { + // The lock is used to prevent the mocking static function at same time during parallel testing. + pub(crate) static ref MTX: Mutex<()> = Mutex::new(()); + } + fn new_client_for_test() -> Client { Client { logger: terminal_logger(), diff --git a/rust/src/conf.rs b/rust/src/conf.rs index bb6faf6d2..7ecb69179 100644 --- a/rust/src/conf.rs +++ b/rust/src/conf.rs @@ -14,9 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +//! Configuration of RocketMQ rust client. + use crate::model::common::ClientType; +#[allow(unused_imports)] +use crate::producer::Producer; +#[allow(unused_imports)] +use crate::simple_consumer::SimpleConsumer; use std::time::Duration; +/// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy. #[derive(Debug, Clone)] pub struct ClientOption { pub(crate) client_type: ClientType, @@ -43,41 +51,55 @@ impl Default for ClientOption { } impl ClientOption { + /// Get the access url of RocketMQ proxy pub fn access_url(&self) -> &str { &self.access_url } + /// Set the access url of RocketMQ proxy pub fn set_access_url(&mut self, access_url: impl Into) { self.access_url = access_url.into(); } + /// Whether to enable tls pub fn enable_tls(&self) -> bool { self.enable_tls } + /// Set whether to enable tls, default is true pub fn set_enable_tls(&mut self, enable_tls: bool) { self.enable_tls = enable_tls; } + /// Get the timeout of connection and generic request pub fn timeout(&self) -> &Duration { &self.timeout } + /// Set the timeout of connection and generic request, default is 3 seconds pub fn set_timeout(&mut self, timeout: Duration) { self.timeout = timeout; } + /// Get the await duration during long polling pub fn long_polling_timeout(&self) -> &Duration { &self.long_polling_timeout } + /// Set the await duration during long polling, default is 40 seconds + /// + /// This option only affects receive requests, it means timeout for a receive request will be `long_polling_timeout` + `timeout` pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) { self.long_polling_timeout = long_polling_timeout; } } +/// Log format for output. #[derive(Debug, Clone, Eq, PartialEq)] pub enum LoggingFormat { + /// Print log in terminal Terminal, + /// Print log in json file Json, } +/// The configuration of [`Producer`]. #[derive(Debug, Clone)] pub struct ProducerOption { logging_format: LoggingFormat, @@ -100,23 +122,29 @@ impl Default for ProducerOption { } impl ProducerOption { + /// Get the logging format of producer pub fn logging_format(&self) -> &LoggingFormat { &self.logging_format } + /// Set the logging format for producer pub fn set_logging_format(&mut self, logging_format: LoggingFormat) { self.logging_format = logging_format; } + /// Whether to prefetch route info pub fn prefetch_route(&self) -> &bool { &self.prefetch_route } + /// Set whether to prefetch route info, default is true pub fn set_prefetch_route(&mut self, prefetch_route: bool) { self.prefetch_route = prefetch_route; } + /// Get which topic(s) to messages to pub fn topics(&self) -> &Option> { &self.topics } + /// Set which topic(s) to messages to, it will prefetch route info for these topics when the producer starts pub fn set_topics(&mut self, topics: Vec>) { self.topics = Some(topics.into_iter().map(|t| t.into()).collect()); } @@ -129,14 +157,17 @@ impl ProducerOption { self.namespace = name_space.into(); } + /// Whether to validate message type pub fn validate_message_type(&self) -> bool { self.validate_message_type } + /// Set whether to validate message type, default is true pub fn set_validate_message_type(&mut self, validate_message_type: bool) { self.validate_message_type = validate_message_type; } } +/// The configuration of [`SimpleConsumer`]. #[derive(Debug, Clone)] pub struct SimpleConsumerOption { logging_format: LoggingFormat, @@ -159,30 +190,38 @@ impl Default for SimpleConsumerOption { } impl SimpleConsumerOption { + /// Set the logging format of simple consumer pub fn logging_format(&self) -> &LoggingFormat { &self.logging_format } + /// set the logging format for simple consumer pub fn set_logging_format(&mut self, logging_format: LoggingFormat) { self.logging_format = logging_format; } + /// Get the consumer group of simple consumer pub fn consumer_group(&self) -> &str { &self.consumer_group } + /// Set the consumer group of simple consumer pub fn set_consumer_group(&mut self, consumer_group: impl Into) { self.consumer_group = consumer_group.into(); } + /// Whether to prefetch route info pub fn prefetch_route(&self) -> &bool { &self.prefetch_route } + /// Set whether to prefetch route info, default is true pub fn set_prefetch_route(&mut self, prefetch_route: bool) { self.prefetch_route = prefetch_route; } + /// Set which topic(s) to receive messages pub fn topics(&self) -> &Option> { &self.topics } + /// Set which topic(s) to receive messages, it will prefetch route info for these topics when the simple consumer starts pub fn set_topics(&mut self, topics: Vec>) { self.topics = Some(topics.into_iter().map(|t| t.into()).collect()); } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 7152611a0..1a13a5a96 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -14,6 +14,111 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +//! # The Rust Implementation of Apache RocketMQ Client +//! +//! Here is the official rust client for [Apache RocketMQ](https://rocketmq.apache.org/) +//! providing async/await API powered by tokio runtime. +//! +//! Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client), +//! the current implementation is based on separating architecture for computing and storage, +//! which is the more recommended way to access the RocketMQ service. +//! +//! Here are some preparations you may need to know: [RocketMQ Quick Start](https://rocketmq.apache.org/docs/quickStart/02quickstart). +//! +//! ## Examples +//! +//! Basic usage: +//! +//! ### Producer +//! ```rust,no_run +//! use rocketmq::conf::{ClientOption, ProducerOption}; +//! use rocketmq::model::message::MessageBuilder; +//! use rocketmq::Producer; +//! +//! #[tokio::main] +//! async fn main() { +//! // recommend to specify which topic(s) you would like to send message to +//! // producer will prefetch topic route when start and failed fast if topic not exist +//! let mut producer_option = ProducerOption::default(); +//! producer_option.set_topics(vec!["test_topic"]); +//! +//! // set which rocketmq proxy to connect +//! let mut client_option = ClientOption::default(); +//! client_option.set_access_url("localhost:8081"); +//! +//! // build and start producer +//! let producer = Producer::new(producer_option, client_option).unwrap(); +//! producer.start().await.unwrap(); +//! +//! // build message +//! let message = MessageBuilder::builder() +//! .set_topic("test_topic") +//! .set_tag("test_tag") +//! .set_body("hello world".as_bytes().to_vec()) +//! .build() +//! .unwrap(); +//! +//! // send message to rocketmq proxy +//! let result = producer.send_one(message).await; +//! debug_assert!(result.is_ok(), "send message failed: {:?}", result); +//! println!( +//! "send message success, message_id={}", +//! result.unwrap().message_id +//! ); +//! } +//! ``` +//! +//! ### Simple Consumer +//! ```rust,no_run +//! use rocketmq::conf::{ClientOption, SimpleConsumerOption}; +//! use rocketmq::model::common::{FilterExpression, FilterType}; +//! use rocketmq::SimpleConsumer; +//! +//! #[tokio::main] +//! async fn main() { +//! // recommend to specify which topic(s) you would like to send message to +//! // simple consumer will prefetch topic route when start and failed fast if topic not exist +//! let mut consumer_option = SimpleConsumerOption::default(); +//! consumer_option.set_topics(vec!["test_topic"]); +//! consumer_option.set_consumer_group("SimpleConsumerGroup"); +//! +//! // set which rocketmq proxy to connect +//! let mut client_option = ClientOption::default(); +//! client_option.set_access_url("localhost:8081"); +//! +//! // build and start simple consumer +//! let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap(); +//! consumer.start().await.unwrap(); +//! +//! // pop message from rocketmq proxy +//! let receive_result = consumer +//! .receive( +//! "test_topic".to_string(), +//! &FilterExpression::new(FilterType::Tag, "test_tag"), +//! ) +//! .await; +//! debug_assert!( +//! receive_result.is_ok(), +//! "receive message failed: {:?}", +//! receive_result.unwrap_err() +//! ); +//! +//! let messages = receive_result.unwrap(); +//! for message in messages { +//! println!("receive message: {:?}", message); +//! // ack message to rocketmq proxy +//! let ack_result = consumer.ack(message).await; +//! debug_assert!( +//! ack_result.is_ok(), +//! "ack message failed: {:?}", +//! ack_result.unwrap_err() +//! ); +//! } +//! } +//! ``` +//! + #[allow(dead_code)] pub mod conf; mod error; diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs index a488b1525..4023b9593 100644 --- a/rust/src/model/common.rs +++ b/rust/src/model/common.rs @@ -14,6 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +//! Common data model of RocketMQ rust client. + use std::net::IpAddr; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -45,6 +48,7 @@ pub(crate) enum RouteStatus { Found(Arc), } +/// Access points for receive messages or querying topic routes. #[derive(Debug, Clone)] pub struct Endpoints { endpoint_url: String, @@ -58,7 +62,7 @@ impl Endpoints { const ENDPOINT_SEPARATOR: &'static str = ","; const ADDRESS_SEPARATOR: &'static str = ":"; - pub fn from_url(endpoint_url: &str) -> Result { + pub(crate) fn from_url(endpoint_url: &str) -> Result { if endpoint_url.is_empty() { return Err(ClientError::new( ErrorKind::Config, @@ -161,42 +165,67 @@ impl Endpoints { } } + /// Get endpoint url pub fn endpoint_url(&self) -> &str { &self.endpoint_url } + /// Get address scheme of endpoint pub fn scheme(&self) -> AddressScheme { self.scheme } - pub fn inner(&self) -> &pb::Endpoints { + pub(crate) fn inner(&self) -> &pb::Endpoints { &self.inner } - pub fn into_inner(self) -> pb::Endpoints { + #[allow(dead_code)] + pub(crate) fn into_inner(self) -> pb::Endpoints { self.inner } } +/// Filter type for message filtering. +/// +/// RocketMQ allows to filter messages by tag or SQL. #[derive(Clone, Copy)] #[repr(i32)] pub enum FilterType { + /// Filter by tag Tag = 1, + /// Filter by SQL Sql = 2, } +/// Filter expression for message filtering. pub struct FilterExpression { pub(crate) filter_type: FilterType, pub(crate) expression: String, } impl FilterExpression { + /// Create a new filter expression + /// + /// # Arguments + /// + /// * `filter_type` - set filter type + /// * `expression` - set message tag or SQL query string pub fn new(filter_type: FilterType, expression: impl Into) -> Self { FilterExpression { filter_type, expression: expression.into(), } } + + /// Get filter type + pub fn filter_type(&self) -> FilterType { + self.filter_type + } + + /// Get message tag or SQL query string + pub fn expression(&self) -> &str { + &self.expression + } } #[cfg(test)] diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs index 5ca53df1a..79a025082 100644 --- a/rust/src/model/message.rs +++ b/rust/src/model/message.rs @@ -14,12 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +//! Message data model of RocketMQ rust client. + use crate::error::{ClientError, ErrorKind}; use crate::model::common::Endpoints; use crate::model::message_id::UNIQ_ID_GENERATOR; use crate::pb; use std::collections::HashMap; +/// [`Message`] is the data model for sending. pub trait Message { fn take_message_id(&mut self) -> String; fn take_topic(&mut self) -> String; @@ -31,12 +35,11 @@ pub trait Message { fn take_delivery_timestamp(&mut self) -> Option; } -#[derive(Debug)] -pub struct MessageImpl { +pub(crate) struct MessageImpl { pub(crate) message_id: String, pub(crate) topic: String, pub(crate) body: Option>, - pub(crate) tags: Option, + pub(crate) tag: Option, pub(crate) keys: Option>, pub(crate) properties: Option>, pub(crate) message_group: Option, @@ -57,7 +60,7 @@ impl Message for MessageImpl { } fn take_tag(&mut self) -> Option { - self.tags.take() + self.tag.take() } fn take_keys(&mut self) -> Vec { @@ -77,14 +80,22 @@ impl Message for MessageImpl { } } -impl MessageImpl { +/// [`MessageBuilder`] is the builder for [`Message`]. +pub struct MessageBuilder { + message: MessageImpl, +} + +impl MessageBuilder { + const OPERATION_BUILD_MESSAGE: &'static str = "build_message"; + + /// Create a new [`MessageBuilder`] for building a message. [Read more](https://rocketmq.apache.org/docs/domainModel/04message/) pub fn builder() -> MessageBuilder { MessageBuilder { message: MessageImpl { message_id: UNIQ_ID_GENERATOR.lock().next_id(), topic: "".to_string(), body: None, - tags: None, + tag: None, keys: None, properties: None, message_group: None, @@ -93,6 +104,13 @@ impl MessageImpl { } } + /// Create a new [`MessageBuilder`] for building a fifo message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/03fifomessage) + /// + /// # Arguments + /// + /// * `topic` - topic of the message + /// * `body` - message body + /// * `message_group` - message group, messages with same message group will be delivered in FIFO order pub fn fifo_message_builder( topic: impl Into, body: Vec, @@ -103,7 +121,7 @@ impl MessageImpl { message_id: UNIQ_ID_GENERATOR.lock().next_id(), topic: topic.into(), body: Some(body), - tags: None, + tag: None, keys: None, properties: None, message_group: Some(message_group.into()), @@ -112,6 +130,13 @@ impl MessageImpl { } } + /// Create a new [`MessageBuilder`] for building a delay message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/02delaymessage) + /// + /// # Arguments + /// + /// * `topic` - topic of the message + /// * `body` - message body + /// * `delay_time` - delivery timestamp of message, specify when to deliver the message pub fn delay_message_builder( topic: impl Into, body: Vec, @@ -122,7 +147,7 @@ impl MessageImpl { message_id: UNIQ_ID_GENERATOR.lock().next_id(), topic: topic.into(), body: Some(body), - tags: None, + tag: None, keys: None, properties: None, message_group: None, @@ -130,35 +155,32 @@ impl MessageImpl { }, } } -} - -pub struct MessageBuilder { - message: MessageImpl, -} - -impl MessageBuilder { - const OPERATION_BUILD_MESSAGE: &'static str = "build_message"; + /// Set topic for message, which is required pub fn set_topic(mut self, topic: impl Into) -> Self { self.message.topic = topic.into(); self } + /// Set message body, which is required pub fn set_body(mut self, body: Vec) -> Self { self.message.body = Some(body); self } - pub fn set_tags(mut self, tags: impl Into) -> Self { - self.message.tags = Some(tags.into()); + /// Set message tag + pub fn set_tag(mut self, tag: impl Into) -> Self { + self.message.tag = Some(tag.into()); self } + /// Set message keys pub fn set_keys(mut self, keys: Vec>) -> Self { self.message.keys = Some(keys.into_iter().map(|k| k.into()).collect()); self } + /// Set message properties pub fn set_properties( mut self, properties: HashMap, impl Into>, @@ -172,11 +194,17 @@ impl MessageBuilder { self } + /// Set message group, which is required for fifo message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/03fifomessage) + /// + /// The message group could not be set with delivery timestamp at the same time pub fn set_message_group(mut self, message_group: impl Into) -> Self { self.message.message_group = Some(message_group.into()); self } + /// Set delivery timestamp, which is required for delay message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/02delaymessage) + /// + /// The delivery timestamp could not be set with message group at the same time pub fn set_delivery_timestamp(mut self, delivery_timestamp: i64) -> Self { self.message.delivery_timestamp = Some(delivery_timestamp); self @@ -197,7 +225,8 @@ impl MessageBuilder { Ok(()) } - pub fn build(self) -> Result { + /// Build message + pub fn build(self) -> Result { self.check_message().map_err(|e| { ClientError::new(ErrorKind::InvalidMessage, &e, Self::OPERATION_BUILD_MESSAGE) })?; @@ -205,6 +234,7 @@ impl MessageBuilder { } } +/// [`AckMessageEntry`] is the data model for ack message. pub trait AckMessageEntry { fn topic(&self) -> String; fn message_id(&self) -> String; @@ -212,6 +242,9 @@ pub trait AckMessageEntry { fn endpoints(&self) -> &Endpoints; } +/// [`MessageView`] is the data model for receive message. +/// +/// [`MessageView`] has implemented [`AckMessageEntry`] trait. #[derive(Debug)] pub struct MessageView { pub(crate) message_id: String, @@ -267,46 +300,57 @@ impl MessageView { } } + /// Get message id pub fn message_id(&self) -> &str { &self.message_id } + /// Get topic of message pub fn topic(&self) -> &str { &self.topic } + /// Get message body pub fn body(&self) -> &[u8] { &self.body } + /// Get message tag pub fn tag(&self) -> Option<&str> { self.tag.as_deref() } + /// Get message keys pub fn keys(&self) -> &[String] { &self.keys } + /// Get message properties pub fn properties(&self) -> &HashMap { &self.properties } + /// Get message group of fifo message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/03fifomessage) pub fn message_group(&self) -> Option<&str> { self.message_group.as_deref() } + /// Get delivery timestamp of delay message. [Read more](https://rocketmq.apache.org/docs/featureBehavior/02delaymessage) pub fn delivery_timestamp(&self) -> Option { self.delivery_timestamp } + /// Get born host of message pub fn born_host(&self) -> &str { &self.born_host } + /// Get born timestamp of message pub fn born_timestamp(&self) -> i64 { self.born_timestamp } + /// Get delivery attempt of message pub fn delivery_attempt(&self) -> i32 { self.delivery_attempt } @@ -320,10 +364,10 @@ mod tests { fn common_test_message() { let mut properties = HashMap::new(); properties.insert("key", "value".to_string()); - let message = MessageImpl::builder() + let message = MessageBuilder::builder() .set_topic("test") .set_body(vec![1, 2, 3]) - .set_tags("tag") + .set_tag("tag") .set_keys(vec!["key"]) .set_properties(properties) .build(); @@ -340,14 +384,14 @@ mod tests { properties }); - let message = MessageImpl::builder() + let message = MessageBuilder::builder() .set_topic("test") .set_body(vec![1, 2, 3]) .set_message_group("message_group") .set_delivery_timestamp(123456789) .build(); assert!(message.is_err()); - let err = message.unwrap_err(); + let err = message.err().unwrap(); assert_eq!(err.kind, ErrorKind::InvalidMessage); assert_eq!( err.message, @@ -355,14 +399,15 @@ mod tests { ); let message = - MessageImpl::fifo_message_builder("test", vec![1, 2, 3], "message_group").build(); + MessageBuilder::fifo_message_builder("test", vec![1, 2, 3], "message_group").build(); let mut message = message.unwrap(); assert_eq!( message.take_message_group(), Some("message_group".to_string()) ); - let message = MessageImpl::delay_message_builder("test", vec![1, 2, 3], 123456789).build(); + let message = + MessageBuilder::delay_message_builder("test", vec![1, 2, 3], 123456789).build(); let mut message = message.unwrap(); assert_eq!(message.take_delivery_timestamp(), Some(123456789)); } diff --git a/rust/src/model/mod.rs b/rust/src/model/mod.rs index aef326189..8ac95e28c 100644 --- a/rust/src/model/mod.rs +++ b/rust/src/model/mod.rs @@ -14,6 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +//! Data model of RocketMQ rust client. + pub mod common; pub mod message; pub(crate) mod message_id; diff --git a/rust/src/producer.rs b/rust/src/producer.rs index fc5656736..697b70e30 100644 --- a/rust/src/producer.rs +++ b/rust/src/producer.rs @@ -15,11 +15,6 @@ * limitations under the License. */ -//! Publish messages of various types to brokers. -//! -//! `Producer` is a thin wrapper of internal `Client` struct that shoulders the actual workloads. -//! Most of its methods take shared reference so that application developers may use it at will. - use std::time::{SystemTime, UNIX_EPOCH}; use mockall_double::double; @@ -39,9 +34,12 @@ use crate::util::{ }; use crate::{log, pb}; -/// `Producer` is the core struct, to which application developers should turn, when publishing messages to brokers. +/// [`Producer`] is the core struct, to which application developers should turn, when publishing messages to RocketMQ proxy. +/// +/// [`Producer`] is a thin wrapper of internal client struct that shoulders the actual workloads. +/// Most of its methods take shared reference so that application developers may use it at will. /// -/// `Producer` is `Send` and `Sync` by design, so that developers may get started easily. +/// [`Producer`] is `Send` and `Sync` by design, so that developers may get started easily. #[derive(Debug)] pub struct Producer { option: ProducerOption, @@ -52,6 +50,12 @@ pub struct Producer { impl Producer { const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message"; + /// Create a new producer instance + /// + /// # Arguments + /// + /// * `option` - producer option + /// * `client_option` - client option pub fn new(option: ProducerOption, client_option: ClientOption) -> Result { let client_option = ClientOption { client_type: ClientType::Producer, @@ -68,6 +72,7 @@ impl Producer { }) } + /// Start the producer pub async fn start(&self) -> Result<(), ClientError> { if let Some(topics) = self.option.topics() { for topic in topics { @@ -172,6 +177,11 @@ impl Producer { Ok((topic, last_message_group.unwrap(), pb_messages)) } + /// Send a single message + /// + /// # Arguments + /// + /// * `message` - the message to send pub async fn send_one( &self, message: impl message::Message, @@ -180,6 +190,11 @@ impl Producer { Ok(results[0].clone()) } + /// Send a batch of messages + /// + /// # Arguments + /// + /// * `messages` - A vector that holds the messages to send pub async fn send( &self, messages: Vec, @@ -210,7 +225,7 @@ mod tests { use crate::error::ErrorKind; use crate::log::terminal_logger; use crate::model::common::Route; - use crate::model::message::MessageImpl; + use crate::model::message::{MessageBuilder, MessageImpl}; use crate::pb::{Broker, Code, MessageQueue, Status}; use std::sync::Arc; @@ -226,6 +241,8 @@ mod tests { #[tokio::test] async fn producer_start() -> Result<(), ClientError> { + let _m = crate::client::tests::MTX.lock(); + let ctx = Client::new_context(); ctx.expect().return_once(|_, _, _| { let mut client = Client::default(); @@ -252,10 +269,10 @@ mod tests { #[tokio::test] async fn producer_transform_messages_to_protobuf() { let producer = new_producer_for_test(); - let messages = vec![MessageImpl::builder() + let messages = vec![MessageBuilder::builder() .set_topic("DefaultCluster") .set_body("hello world".as_bytes().to_vec()) - .set_tags("tag") + .set_tag("tag") .set_keys(vec!["key"]) .set_properties(vec![("key", "value")].into_iter().collect()) .set_message_group("message_group".to_string()) @@ -295,7 +312,7 @@ mod tests { message_id: "".to_string(), topic: "".to_string(), body: None, - tags: None, + tag: None, keys: None, properties: None, message_group: None, @@ -308,12 +325,12 @@ mod tests { assert_eq!(err.message, "message topic is empty"); let messages = vec![ - MessageImpl::builder() + MessageBuilder::builder() .set_topic("DefaultCluster") .set_body("hello world".as_bytes().to_vec()) .build() .unwrap(), - MessageImpl::builder() + MessageBuilder::builder() .set_topic("DefaultCluster_dup") .set_body("hello world".as_bytes().to_vec()) .build() @@ -326,13 +343,13 @@ mod tests { assert_eq!(err.message, "Not all messages have the same topic."); let messages = vec![ - MessageImpl::builder() + MessageBuilder::builder() .set_topic("DefaultCluster") .set_body("hello world".as_bytes().to_vec()) .set_message_group("message_group") .build() .unwrap(), - MessageImpl::builder() + MessageBuilder::builder() .set_topic("DefaultCluster") .set_body("hello world".as_bytes().to_vec()) .set_message_group("message_group_dup") @@ -384,7 +401,7 @@ mod tests { }); producer .send_one( - MessageImpl::builder() + MessageBuilder::builder() .set_topic("test_topic") .set_body(vec![]) .build() diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs index d06c64101..eefa93b1b 100644 --- a/rust/src/simple_consumer.rs +++ b/rust/src/simple_consumer.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + use std::time::Duration; use mockall_double::double; @@ -30,6 +31,15 @@ use crate::util::{ }; use crate::{log, pb}; +/// [`SimpleConsumer`] is a lightweight consumer to consume messages from RocketMQ proxy. +/// +/// If you want to fully control the message consumption operation by yourself, +/// the simple consumer should be your first consideration. +/// +/// [`SimpleConsumer`] is a thin wrapper of internal client struct that shoulders the actual workloads. +/// Most of its methods take shared reference so that application developers may use it at will. +/// +/// [`SimpleConsumer`] is `Send` and `Sync` by design, so that developers may get started easily. #[derive(Debug)] pub struct SimpleConsumer { option: SimpleConsumerOption, @@ -41,6 +51,7 @@ impl SimpleConsumer { const OPERATION_START_SIMPLE_CONSUMER: &'static str = "simple_consumer.start"; const OPERATION_RECEIVE_MESSAGE: &'static str = "simple_consumer.receive_message"; + /// Create a new simple consumer instance pub fn new( option: SimpleConsumerOption, client_option: ClientOption, @@ -61,6 +72,7 @@ impl SimpleConsumer { }) } + /// Start the simple consumer pub async fn start(&self) -> Result<(), ClientError> { if self.option.consumer_group().is_empty() { return Err(ClientError::new( @@ -83,6 +95,12 @@ impl SimpleConsumer { Ok(()) } + /// receive messages from the specified topic + /// + /// # Arguments + /// + /// * `topic` - the topic for receiving messages + /// * `expression` - the subscription for the topic pub async fn receive( &self, topic: impl AsRef, @@ -92,6 +110,14 @@ impl SimpleConsumer { .await } + /// receive messages from the specified topic with batch size and invisible duration + /// + /// # Arguments + /// + /// * `topic` - the topic for receiving messages + /// * `expression` - the subscription for the topic + /// * `batch_size` - max message num of server returned + /// * `invisible_duration` - set the invisible duration of messages that return from the server, these messages will not be visible to other consumers unless timeout pub async fn receive_with_batch_size( &self, topic: &str, @@ -122,6 +148,13 @@ impl SimpleConsumer { .collect()) } + /// Ack the specified message + /// + /// It is important to acknowledge every consumed message, otherwise, they will be received again after the invisible duration + /// + /// # Arguments + /// + /// * `ack_entry` - special message view with handle want to ack pub async fn ack(&self, ack_entry: impl AckMessageEntry + 'static) -> Result<(), ClientError> { self.client.ack_message(ack_entry).await?; Ok(()) @@ -142,6 +175,8 @@ mod tests { #[tokio::test] async fn simple_consumer_start() -> Result<(), ClientError> { + let _m = crate::client::tests::MTX.lock(); + let ctx = Client::new_context(); ctx.expect().return_once(|_, _, _| { let mut client = Client::default();