Skip to content

Commit

Permalink
Fix max topic size config (#1154)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Aug 17, 2024
1 parent 5fbac2b commit b7f64ab
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 73 deletions.
75 changes: 17 additions & 58 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion examples/src/multi-tenant/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use iggy::models::permissions::{Permissions, StreamPermissions};
use iggy::models::user_status::UserStatus;
use iggy::users::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
use iggy::utils::duration::IggyDuration;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use iggy_examples::shared::args::Args;
use std::collections::HashMap;
use std::env;
Expand Down Expand Up @@ -247,7 +249,12 @@ async fn create_producers(
.batch_size(batch_size)
.send_interval(IggyDuration::from_str(interval).expect("Invalid duration"))
.partitioning(Partitioning::balanced())
.create_topic_if_not_exists(partitions_count, None)
.create_topic_if_not_exists(
partitions_count,
None,
IggyExpiry::ServerDefault,
MaxTopicSize::ServerDefault,
)
.build();
producer.init().await?;
producers.push(TenantProducer::new(
Expand Down
9 changes: 8 additions & 1 deletion examples/src/new-sdk/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use iggy::clients::client::IggyClient;
use iggy::clients::producer::IggyProducer;
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::utils::duration::IggyDuration;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use iggy_examples::shared::args::Args;
use iggy_examples::shared::messages_generator::MessagesGenerator;
use std::error::Error;
Expand All @@ -28,7 +30,12 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
.batch_size(args.messages_per_batch)
.send_interval(IggyDuration::from_str(&args.interval)?)
.partitioning(Partitioning::balanced())
.create_topic_if_not_exists(3, None)
.create_topic_if_not_exists(
3,
None,
IggyExpiry::ServerDefault,
MaxTopicSize::ServerDefault,
)
.build();
producer.init().await?;
produce_messages(&args, &producer).await?;
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/server/scenarios/system_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
let updated_topic_name = format!("{}-updated", TOPIC_NAME);
let updated_message_expiry = 1000;
let message_expiry_duration = updated_message_expiry.into();
let updated_max_topic_size = MaxTopicSize::Custom(IggyByteSize::from(0x1337));
let updated_max_topic_size = MaxTopicSize::Custom(IggyByteSize::from_str("2 GB").unwrap());
let updated_replication_factor = 5;

client
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.10"
version = "0.6.11"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
20 changes: 18 additions & 2 deletions sdk/src/clients/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct IggyProducer {
create_topic_if_not_exists: bool,
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
topic_message_expiry: IggyExpiry,
topic_max_size: MaxTopicSize,
default_partitioning: Arc<Partitioning>,
can_send_immediately: bool,
last_sent_at: Arc<AtomicU64>,
Expand All @@ -65,6 +67,8 @@ impl IggyProducer {
create_topic_if_not_exists: bool,
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
topic_message_expiry: IggyExpiry,
topic_max_size: MaxTopicSize,
retry_interval: IggyDuration,
) -> Self {
Self {
Expand All @@ -84,6 +88,8 @@ impl IggyProducer {
create_topic_if_not_exists,
topic_partitions_count,
topic_replication_factor,
topic_message_expiry,
topic_max_size,
default_partitioning: Arc::new(Partitioning::balanced()),
can_send_immediately: interval.is_none(),
last_sent_at: Arc::new(AtomicU64::new(0)),
Expand Down Expand Up @@ -154,8 +160,8 @@ impl IggyProducer {
CompressionAlgorithm::None,
self.topic_replication_factor,
id,
IggyExpiry::ServerDefault,
MaxTopicSize::ServerDefault,
self.topic_message_expiry,
self.topic_max_size,
)
.await?;
}
Expand Down Expand Up @@ -419,6 +425,8 @@ pub struct IggyProducerBuilder {
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
retry_interval: IggyDuration,
pub topic_message_expiry: IggyExpiry,
pub topic_max_size: MaxTopicSize,
}

impl IggyProducerBuilder {
Expand Down Expand Up @@ -448,6 +456,8 @@ impl IggyProducerBuilder {
topic_partitions_count: 1,
topic_replication_factor: None,
retry_interval: IggyDuration::ONE_SECOND,
topic_message_expiry: IggyExpiry::ServerDefault,
topic_max_size: MaxTopicSize::ServerDefault,
}
}

Expand Down Expand Up @@ -566,11 +576,15 @@ impl IggyProducerBuilder {
self,
partitions_count: u32,
replication_factor: Option<u8>,
message_expiry: IggyExpiry,
max_size: MaxTopicSize,
) -> Self {
Self {
create_topic_if_not_exists: true,
topic_partitions_count: partitions_count,
topic_replication_factor: replication_factor,
topic_message_expiry: message_expiry,
topic_max_size: max_size,
..self
}
}
Expand Down Expand Up @@ -607,6 +621,8 @@ impl IggyProducerBuilder {
self.create_topic_if_not_exists,
self.topic_partitions_count,
self.topic_replication_factor,
self.topic_message_expiry,
self.topic_max_size,
self.retry_interval,
)
}
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::utils::byte_size::IggyByteSize;
use crate::utils::topic_size::MaxTopicSize;
use strum::{EnumDiscriminants, FromRepr, IntoStaticStr};
use thiserror::Error;

Expand Down Expand Up @@ -194,6 +196,8 @@ pub enum IggyError {
MissingTopics(u32) = 1017,
#[error("Missing partitions for topic with ID: {0} for stream with ID: {1}.")]
MissingPartitions(u32, u32) = 1018,
#[error("Max topic size cannot be lower than segment size. Max topic size: {0} < segment size: {1}.")]
InvalidTopicSize(MaxTopicSize, IggyByteSize) = 1019,
#[error("Cannot create topics directory for stream with ID: {0}, Path: {1}")]
CannotCreateTopicsDirectory(u32, String) = 2000,
#[error(
Expand Down
4 changes: 2 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.11"
version = "0.4.12"
edition = "2021"
build = "src/build.rs"

Expand All @@ -15,7 +15,7 @@ async-stream = "0.3.5"
async-trait = "0.1.80"
atone = "0.3.7"
axum = "0.7.5"
axum-server = { version = "0.6.0", features = ["tls-rustls"] }
axum-server = { version = "0.7.1", features = ["tls-rustls"] }
bcrypt = "0.15.1"
bincode = "1.3.3"
blake3 = "1.5.1"
Expand Down
Loading

0 comments on commit b7f64ab

Please sign in to comment.