-
Notifications
You must be signed in to change notification settings - Fork 115
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs(sdk): add stream builder documentation and examples (#1543)
This PR adds example code for the recently introduced IggyStream, IggyStreamConsumer, and IggyStreamProducer builder so that new users have an easy to follow blueprint. Furthermore, this PR adds documentation to all fields of the newly introduced IggyConsumerConfig and IggyProducerConfig. Lastly, this PR adds a comprehensively documented sample configuration for each of the new configuration types. This is the last PR in the Epilogue series, so it all ends now :-) --------- Signed-off-by: Marvin Hansen <marvin.hansen@gmail.com>
- Loading branch information
1 parent
48ba0ce
commit e94a0a1
Showing
16 changed files
with
519 additions
and
53 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
use crate::shared::args::Args; | ||
use iggy::client_provider; | ||
use iggy::client_provider::ClientProviderConfig; | ||
use iggy::clients::client::IggyClient; | ||
use iggy::error::IggyError; | ||
use std::sync::Arc; | ||
|
||
/// Builds an Iggy client using the provided stream and topic identifiers. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `stream_id` - The identifier of the stream. | ||
/// * `topic_id` - The identifier of the topic. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A `Result` wrapping the `IggyClient` instance or an `IggyError`. | ||
/// | ||
pub async fn build_client( | ||
stream_id: &str, | ||
topic_id: &str, | ||
connect: bool, | ||
) -> Result<IggyClient, IggyError> { | ||
let args = Args::new(stream_id.to_string(), topic_id.to_string()); | ||
build_client_from_args(args.to_sdk_args(), connect).await | ||
} | ||
|
||
/// Builds an Iggy client using the provided `Args`. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `args` - The `Args` to use to build the client. | ||
/// | ||
/// # Returns | ||
/// | ||
/// A `Result` wrapping the `IggyClient` instance or an `IggyError`. | ||
/// | ||
pub async fn build_client_from_args( | ||
args: iggy::args::Args, | ||
connect: bool, | ||
) -> Result<IggyClient, IggyError> { | ||
// Build client provider configuration | ||
let client_provider_config = Arc::new( | ||
ClientProviderConfig::from_args(args).expect("Failed to create client provider config"), | ||
); | ||
|
||
// Build client_provider | ||
let client = client_provider::get_raw_client(client_provider_config, connect) | ||
.await | ||
.expect("Failed to build client provider"); | ||
|
||
// Build client | ||
let client = match IggyClient::builder().with_client(client).build() { | ||
Ok(client) => client, | ||
Err(e) => return Err(e), | ||
}; | ||
|
||
Ok(client) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
pub mod args; | ||
pub mod client; | ||
pub mod messages; | ||
pub mod messages_generator; | ||
pub mod stream; | ||
pub mod system; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
use iggy::clients::consumer::ReceivedMessage; | ||
use iggy::consumer_ext::MessageConsumer; | ||
use iggy::error::IggyError; | ||
|
||
#[derive(Debug)] | ||
pub struct PrintEventConsumer {} | ||
|
||
impl MessageConsumer for PrintEventConsumer { | ||
async fn consume(&self, message: ReceivedMessage) -> Result<(), IggyError> { | ||
// Extract message payload as raw bytes | ||
let raw_message = message.message.payload.as_ref(); | ||
// Convert raw bytes into string | ||
let message = String::from_utf8_lossy(raw_message); | ||
// Print message | ||
println!("Message received: {}", message); | ||
|
||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
use crate::shared::stream::PrintEventConsumer; | ||
use iggy::client::{Client, StreamClient}; | ||
use iggy::consumer_ext::IggyConsumerMessageExt; | ||
use iggy::error::IggyError; | ||
use iggy::messages::send_messages::Message; | ||
use iggy::stream_builder::{IggyStream, IggyStreamConfig}; | ||
use iggy_examples::shared; | ||
use std::str::FromStr; | ||
use tokio::sync::oneshot; | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), IggyError> { | ||
println!("Build iggy client and connect it."); | ||
let client = shared::client::build_client("test_stream", "test_topic", true).await?; | ||
|
||
println!("Build iggy producer & consumer"); | ||
// For customization, use the `new` or `from_stream_topic` constructor | ||
let stream_config = IggyStreamConfig::default(); | ||
let (producer, consumer) = IggyStream::build(&client, &stream_config).await?; | ||
|
||
println!("Start message stream"); | ||
let (sender, receiver) = oneshot::channel(); | ||
tokio::spawn(async move { | ||
match consumer | ||
// PrintEventConsumer is imported from examples/src/shared/stream.rs | ||
.consume_messages(&PrintEventConsumer {}, receiver) | ||
.await | ||
{ | ||
Ok(_) => {} | ||
Err(err) => eprintln!("Failed to consume messages: {err}"), | ||
} | ||
}); | ||
|
||
println!("Send 3 test messages..."); | ||
producer.send_one(Message::from_str("Hello World")?).await?; | ||
producer.send_one(Message::from_str("Hola Iggy")?).await?; | ||
producer.send_one(Message::from_str("Hi Apache")?).await?; | ||
|
||
// Wait a bit for all messages to arrive. | ||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||
|
||
println!("Stop the message stream and shutdown iggy client"); | ||
sender.send(()).expect("Failed to send shutdown signal"); | ||
client.delete_stream(stream_config.stream_id()).await?; | ||
client.shutdown().await?; | ||
|
||
Ok(()) | ||
} |
98 changes: 98 additions & 0 deletions
98
examples/src/stream-builder/stream-consumer-config/main.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
use iggy::client::Client; | ||
use iggy::clients::consumer::{AutoCommit, AutoCommitWhen}; | ||
use iggy::consumer::ConsumerKind; | ||
use iggy::consumer_ext::IggyConsumerMessageExt; | ||
use iggy::error::IggyError; | ||
use iggy::identifier::Identifier; | ||
use iggy::messages::poll_messages::PollingStrategy; | ||
use iggy::stream_builder::{IggyConsumerConfig, IggyStreamConsumer}; | ||
use iggy::utils::duration::IggyDuration; | ||
use iggy_examples::shared::stream::PrintEventConsumer; | ||
use std::str::FromStr; | ||
use tokio::sync::oneshot; | ||
|
||
const IGGY_URL: &str = "iggy://iggy:iggy@localhost:8090"; | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), IggyError> { | ||
let stream = "test_stream"; | ||
let topic = "test_topic"; | ||
|
||
let config = IggyConsumerConfig::builder() | ||
// Set the stream identifier and name. | ||
.stream_id(Identifier::from_str_value(stream)?) | ||
.stream_name(stream) | ||
// Set the topic identifier and name | ||
.topic_id(Identifier::from_str_value(topic)?) | ||
.topic_name(topic) | ||
// The auto-commit configuration for storing the message offset on the server. | ||
// * Disabled: The auto-commit is disabled and the offset must be stored manually by the consumer. | ||
// * Interval: The auto-commit is enabled and the offset is stored on the server after a certain interval. | ||
// * IntervalOrWhen: The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode when consuming the messages. | ||
// * IntervalOrAfter: [This requires the `IggyConsumerMessageExt` trait when using `consume_messages()`.] | ||
// The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode after consuming the messages. | ||
// * When: The auto-commit is enabled and the offset is stored on the server depending on the mode when consuming the messages. | ||
// * After: [This requires the `IggyConsumerMessageExt` trait when using `consume_messages()`.] | ||
// The auto-commit is enabled and the offset is stored on the server depending on the mode after consuming the messages. | ||
.auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) | ||
// The max number of messages to send in a batch. The greater the batch size, the higher the throughput for bulk data. | ||
// Note, there is a tradeoff between batch size and latency, so you want to benchmark your setup. | ||
// Note, this only applies to batch send messages. Single messages are sent immediately. | ||
.batch_size(100) | ||
// Create the stream if it doesn't exist. | ||
.create_stream_if_not_exists(true) | ||
// Create the topic if it doesn't exist. | ||
.create_topic_if_not_exists(true) | ||
// The name of the consumer. Must be unique. | ||
.consumer_name("test_consumer".to_string()) | ||
// The type of consumer. It can be either `Consumer` or `ConsumerGroup`. ConsumerGroup is default. | ||
.consumer_kind(ConsumerKind::ConsumerGroup) | ||
// Sets the number of partitions for ConsumerKind `Consumer`. Does not apply to `ConsumerGroup`. | ||
.partitions_count(1) | ||
// The polling interval for messages. | ||
.polling_interval(IggyDuration::from_str("5ms").unwrap()) | ||
// `PollingStrategy` specifies from where to start polling messages. | ||
// It has the following kinds: | ||
// - `Offset` - start polling from the specified offset. | ||
// - `Timestamp` - start polling from the specified timestamp. | ||
// - `First` - start polling from the first message in the partition. This enables messages replay in order of arrival. | ||
// - `Last` - start polling from the last message in the partition. This disables messages replay since only the latest message is pulled. | ||
// - `Next` - start polling from the next message after the last polled message based on the stored consumer offset. | ||
.polling_strategy(PollingStrategy::last()) | ||
// Sets the polling retry interval in case of server disconnection. | ||
.polling_retry_interval(IggyDuration::new_from_secs(1)) | ||
// Sets the number of retries and the interval when initializing the consumer if the stream or topic is not found. | ||
// Might be useful when the stream or topic is created dynamically by the producer. | ||
// The retry only occurs when configured and is disabled by default. | ||
// When you want to retry at most 5 times with an interval of 1 second, | ||
// you set `init_retries` to 5 and `init_interval` to 1 second. | ||
.init_retries(5) | ||
.init_interval(IggyDuration::new_from_secs(1)) | ||
// Optionally, set a custom client side encryptor for encrypting the messages' payloads. Currently only Aes256Gcm is supported. | ||
// Key must be identical to the one used by the producer; thus ensure secure key exchange i.e. K8s secret etc. | ||
// Note, this is independent of server side encryption meaning you can add client encryption, server encryption, or both. | ||
// .encryptor(Arc::new(EncryptorKind::Aes256Gcm(Aes256GcmEncryptor::new(&[1; 32])?))) | ||
.build(); | ||
|
||
let (client, consumer) = IggyStreamConsumer::with_client_from_url(IGGY_URL, &config).await?; | ||
|
||
println!("Start message stream"); | ||
let (tx, rx) = oneshot::channel(); | ||
tokio::spawn(async move { | ||
match consumer | ||
// PrintEventConsumer is imported from examples/src/shared/stream.rs | ||
.consume_messages(&PrintEventConsumer {}, rx) | ||
.await | ||
{ | ||
Ok(_) => {} | ||
Err(err) => eprintln!("Failed to consume messages: {err}"), | ||
} | ||
}); | ||
|
||
// Wait a bit for all messages to arrive. | ||
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; | ||
println!("Stop the message stream and shutdown iggy client"); | ||
tx.send(()).expect("Failed to send shutdown signal"); | ||
client.shutdown().await?; | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
use iggy::client::Client; | ||
use iggy::consumer_ext::IggyConsumerMessageExt; | ||
use iggy::error::IggyError; | ||
use iggy::stream_builder::{IggyConsumerConfig, IggyStreamConsumer}; | ||
use iggy_examples::shared::stream::PrintEventConsumer; | ||
use tokio::sync::oneshot; | ||
|
||
const IGGY_URL: &str = "iggy://iggy:iggy@localhost:8090"; | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), IggyError> { | ||
println!("Build iggy client & consumer"); | ||
//For customization, use the `new` or `from_stream_topic` constructor | ||
let config = IggyConsumerConfig::default(); | ||
let (client, consumer) = IggyStreamConsumer::with_client_from_url(IGGY_URL, &config).await?; | ||
|
||
println!("Start message stream"); | ||
let (tx, rx) = oneshot::channel(); | ||
tokio::spawn(async move { | ||
match consumer | ||
// PrintEventConsumer is imported from examples/src/shared/stream.rs | ||
.consume_messages(&PrintEventConsumer {}, rx) | ||
.await | ||
{ | ||
Ok(_) => {} | ||
Err(err) => eprintln!("Failed to consume messages: {err}"), | ||
} | ||
}); | ||
|
||
// Wait a bit for all messages to arrive. | ||
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; | ||
println!("Stop the message stream and shutdown iggy client"); | ||
tx.send(()).expect("Failed to send shutdown signal"); | ||
client.shutdown().await?; | ||
Ok(()) | ||
} |
Oops, something went wrong.