From b7f64ab6c32a55291315962e9073e45b3db7193e Mon Sep 17 00:00:00 2001 From: Piotr Gankiewicz Date: Sat, 17 Aug 2024 20:34:38 +0200 Subject: [PATCH] Fix max topic size config (#1154) --- Cargo.lock | 75 +++++-------------- examples/src/multi-tenant/producer/main.rs | 9 ++- examples/src/new-sdk/producer/main.rs | 9 ++- .../tests/server/scenarios/system_scenario.rs | 2 +- sdk/Cargo.toml | 2 +- sdk/src/clients/producer.rs | 20 ++++- sdk/src/error.rs | 4 + server/Cargo.toml | 4 +- server/src/streaming/streams/topics.rs | 31 +++++++- server/src/streaming/topics/topic.rs | 12 ++- 10 files changed, 95 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 89fa23ba7..67d2e28f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -624,9 +624,9 @@ dependencies = [ [[package]] name = "axum-server" -version = "0.6.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1ad46c3ec4e12f4a4b6835e173ba21c25e484c9d02b49770bf006ce5367c036" +checksum = "56bac90848f6a9393ac03c63c640925c4b7c8ca21654de40d53f55964667c7d8" dependencies = [ "arc-swap", "bytes", @@ -637,10 +637,11 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "pin-project-lite", - "rustls 0.21.12", + "rustls", "rustls-pemfile", + "rustls-pki-types", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tower", "tower-service", ] @@ -2135,10 +2136,10 @@ dependencies = [ "http 1.1.0", "hyper 1.4.1", "hyper-util", - "rustls 0.23.12", + "rustls", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls", "tower-service", ] @@ -2244,7 +2245,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.10" +version = "0.6.11" dependencies = [ "aes-gcm", "anyhow", @@ -2276,7 +2277,7 @@ dependencies = [ "reqwest", "reqwest-middleware", "reqwest-retry", - "rustls 0.23.12", + "rustls", "serde", "serde_derive", "serde_json", @@ -3410,7 +3411,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.12", + "rustls", "thiserror", "tokio", "tracing", @@ -3426,7 +3427,7 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls 0.23.12", + "rustls", "rustls-platform-verifier", "slab", "thiserror", @@ -3884,18 +3885,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.21.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" -dependencies = [ - "log", - "ring", - "rustls-webpki 0.101.7", - "sct", -] - [[package]] name = "rustls" version = "0.23.12" @@ -3907,7 +3896,7 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.102.6", + "rustls-webpki", "subtle", "zeroize", ] @@ -3952,10 +3941,10 @@ dependencies = [ "jni", "log", "once_cell", - "rustls 0.23.12", + "rustls", "rustls-native-certs", "rustls-platform-verifier-android", - "rustls-webpki 0.102.6", + "rustls-webpki", "security-framework", "security-framework-sys", "webpki-roots", @@ -3968,16 +3957,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84e217e7fdc8466b5b35d30f8c0a30febd29173df4a3a0c2115d306b9c4117ad" -[[package]] -name = "rustls-webpki" -version = "0.101.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "rustls-webpki" version = "0.102.6" @@ -4052,16 +4031,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sdd" version = "1.7.0" @@ -4256,7 +4225,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.11" +version = "0.4.12" dependencies = [ "anyhow", "async-stream", @@ -4286,7 +4255,7 @@ dependencies = [ "ring", "rmp-serde", "rust-s3", - "rustls 0.23.12", + "rustls", "rustls-pemfile", "serde", "serde_json", @@ -4778,23 +4747,13 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.12", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.12", + "rustls", "rustls-pki-types", "tokio", ] diff --git a/examples/src/multi-tenant/producer/main.rs b/examples/src/multi-tenant/producer/main.rs index c7713a1dc..d6a18992b 100644 --- a/examples/src/multi-tenant/producer/main.rs +++ b/examples/src/multi-tenant/producer/main.rs @@ -11,6 +11,8 @@ use iggy::models::permissions::{Permissions, StreamPermissions}; use iggy::models::user_status::UserStatus; use iggy::users::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME}; use iggy::utils::duration::IggyDuration; +use iggy::utils::expiry::IggyExpiry; +use iggy::utils::topic_size::MaxTopicSize; use iggy_examples::shared::args::Args; use std::collections::HashMap; use std::env; @@ -247,7 +249,12 @@ async fn create_producers( .batch_size(batch_size) .send_interval(IggyDuration::from_str(interval).expect("Invalid duration")) .partitioning(Partitioning::balanced()) - .create_topic_if_not_exists(partitions_count, None) + .create_topic_if_not_exists( + partitions_count, + None, + IggyExpiry::ServerDefault, + MaxTopicSize::ServerDefault, + ) .build(); producer.init().await?; producers.push(TenantProducer::new( diff --git a/examples/src/new-sdk/producer/main.rs b/examples/src/new-sdk/producer/main.rs index 82468c182..aa35b8fa5 100644 --- a/examples/src/new-sdk/producer/main.rs +++ b/examples/src/new-sdk/producer/main.rs @@ -5,6 +5,8 @@ use iggy::clients::client::IggyClient; use iggy::clients::producer::IggyProducer; use iggy::messages::send_messages::{Message, Partitioning}; use iggy::utils::duration::IggyDuration; +use iggy::utils::expiry::IggyExpiry; +use iggy::utils::topic_size::MaxTopicSize; use iggy_examples::shared::args::Args; use iggy_examples::shared::messages_generator::MessagesGenerator; use std::error::Error; @@ -28,7 +30,12 @@ async fn main() -> anyhow::Result<(), Box> { .batch_size(args.messages_per_batch) .send_interval(IggyDuration::from_str(&args.interval)?) .partitioning(Partitioning::balanced()) - .create_topic_if_not_exists(3, None) + .create_topic_if_not_exists( + 3, + None, + IggyExpiry::ServerDefault, + MaxTopicSize::ServerDefault, + ) .build(); producer.init().await?; produce_messages(&args, &producer).await?; diff --git a/integration/tests/server/scenarios/system_scenario.rs b/integration/tests/server/scenarios/system_scenario.rs index 8fea2707a..a4c528111 100644 --- a/integration/tests/server/scenarios/system_scenario.rs +++ b/integration/tests/server/scenarios/system_scenario.rs @@ -561,7 +561,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { let updated_topic_name = format!("{}-updated", TOPIC_NAME); let updated_message_expiry = 1000; let message_expiry_duration = updated_message_expiry.into(); - let updated_max_topic_size = MaxTopicSize::Custom(IggyByteSize::from(0x1337)); + let updated_max_topic_size = MaxTopicSize::Custom(IggyByteSize::from_str("2 GB").unwrap()); let updated_replication_factor = 5; client diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index b3aff7355..745991cf8 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.10" +version = "0.6.11" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/clients/producer.rs b/sdk/src/clients/producer.rs index dee492126..b79832af3 100644 --- a/sdk/src/clients/producer.rs +++ b/sdk/src/clients/producer.rs @@ -42,6 +42,8 @@ pub struct IggyProducer { create_topic_if_not_exists: bool, topic_partitions_count: u32, topic_replication_factor: Option, + topic_message_expiry: IggyExpiry, + topic_max_size: MaxTopicSize, default_partitioning: Arc, can_send_immediately: bool, last_sent_at: Arc, @@ -65,6 +67,8 @@ impl IggyProducer { create_topic_if_not_exists: bool, topic_partitions_count: u32, topic_replication_factor: Option, + topic_message_expiry: IggyExpiry, + topic_max_size: MaxTopicSize, retry_interval: IggyDuration, ) -> Self { Self { @@ -84,6 +88,8 @@ impl IggyProducer { create_topic_if_not_exists, topic_partitions_count, topic_replication_factor, + topic_message_expiry, + topic_max_size, default_partitioning: Arc::new(Partitioning::balanced()), can_send_immediately: interval.is_none(), last_sent_at: Arc::new(AtomicU64::new(0)), @@ -154,8 +160,8 @@ impl IggyProducer { CompressionAlgorithm::None, self.topic_replication_factor, id, - IggyExpiry::ServerDefault, - MaxTopicSize::ServerDefault, + self.topic_message_expiry, + self.topic_max_size, ) .await?; } @@ -419,6 +425,8 @@ pub struct IggyProducerBuilder { topic_partitions_count: u32, topic_replication_factor: Option, retry_interval: IggyDuration, + pub topic_message_expiry: IggyExpiry, + pub topic_max_size: MaxTopicSize, } impl IggyProducerBuilder { @@ -448,6 +456,8 @@ impl IggyProducerBuilder { topic_partitions_count: 1, topic_replication_factor: None, retry_interval: IggyDuration::ONE_SECOND, + topic_message_expiry: IggyExpiry::ServerDefault, + topic_max_size: MaxTopicSize::ServerDefault, } } @@ -566,11 +576,15 @@ impl IggyProducerBuilder { self, partitions_count: u32, replication_factor: Option, + message_expiry: IggyExpiry, + max_size: MaxTopicSize, ) -> Self { Self { create_topic_if_not_exists: true, topic_partitions_count: partitions_count, topic_replication_factor: replication_factor, + topic_message_expiry: message_expiry, + topic_max_size: max_size, ..self } } @@ -607,6 +621,8 @@ impl IggyProducerBuilder { self.create_topic_if_not_exists, self.topic_partitions_count, self.topic_replication_factor, + self.topic_message_expiry, + self.topic_max_size, self.retry_interval, ) } diff --git a/sdk/src/error.rs b/sdk/src/error.rs index 8b798092f..ecfa34d9e 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -1,3 +1,5 @@ +use crate::utils::byte_size::IggyByteSize; +use crate::utils::topic_size::MaxTopicSize; use strum::{EnumDiscriminants, FromRepr, IntoStaticStr}; use thiserror::Error; @@ -194,6 +196,8 @@ pub enum IggyError { MissingTopics(u32) = 1017, #[error("Missing partitions for topic with ID: {0} for stream with ID: {1}.")] MissingPartitions(u32, u32) = 1018, + #[error("Max topic size cannot be lower than segment size. Max topic size: {0} < segment size: {1}.")] + InvalidTopicSize(MaxTopicSize, IggyByteSize) = 1019, #[error("Cannot create topics directory for stream with ID: {0}, Path: {1}")] CannotCreateTopicsDirectory(u32, String) = 2000, #[error( diff --git a/server/Cargo.toml b/server/Cargo.toml index 6797caa77..07ff02279 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.11" +version = "0.4.12" edition = "2021" build = "src/build.rs" @@ -15,7 +15,7 @@ async-stream = "0.3.5" async-trait = "0.1.80" atone = "0.3.7" axum = "0.7.5" -axum-server = { version = "0.6.0", features = ["tls-rustls"] } +axum-server = { version = "0.7.1", features = ["tls-rustls"] } bcrypt = "0.15.1" bincode = "1.3.3" blake3 = "1.5.1" diff --git a/server/src/streaming/streams/topics.rs b/server/src/streaming/streams/topics.rs index 2d3567d7d..47597b118 100644 --- a/server/src/streaming/streams/topics.rs +++ b/server/src/streaming/streams/topics.rs @@ -1,3 +1,4 @@ +use crate::configs::system::SystemConfig; use crate::streaming::streams::stream::Stream; use crate::streaming::topics::topic::Topic; use iggy::compression::compression_algorithm::CompressionAlgorithm; @@ -26,6 +27,7 @@ impl Stream { max_topic_size: MaxTopicSize, replication_factor: u8, ) -> Result { + let max_topic_size = get_max_topic_size(max_topic_size, &self.config)?; let name = text::to_lowercase_non_whitespace(name); if self.topics_ids.contains_key(&name) { return Err(IggyError::TopicNameAlreadyExists(name, self.stream_id)); @@ -83,6 +85,7 @@ impl Stream { max_topic_size: MaxTopicSize, replication_factor: u8, ) -> Result<(), IggyError> { + let max_topic_size = get_max_topic_size(max_topic_size, &self.config)?; let topic_id; { let topic = self.get_topic(id)?; @@ -112,9 +115,9 @@ impl Stream { self.topics_ids.insert(updated_name.clone(), topic_id); let topic = self.get_topic_mut(id)?; - let max_topic_size = match max_topic_size { - MaxTopicSize::ServerDefault => topic.config.topic.max_size, - _ => max_topic_size, + let message_expiry = match message_expiry { + IggyExpiry::ServerDefault => topic.config.segment.message_expiry, + _ => message_expiry, }; topic.name = updated_name; @@ -226,6 +229,25 @@ impl Stream { } } +fn get_max_topic_size( + max_topic_size: MaxTopicSize, + config: &SystemConfig, +) -> Result { + match max_topic_size { + MaxTopicSize::ServerDefault => Ok(config.topic.max_size), + _ => { + if max_topic_size.as_bytes_u64() >= config.segment.size.as_bytes_u64() { + Ok(max_topic_size) + } else { + Err(IggyError::InvalidTopicSize( + max_topic_size, + config.segment.size, + )) + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -242,8 +264,9 @@ mod tests { let topic_name = "test_topic"; let message_expiry = IggyExpiry::NeverExpire; let compression_algorithm = CompressionAlgorithm::None; - let max_topic_size = MaxTopicSize::Custom(IggyByteSize::from(100)); let config = Arc::new(SystemConfig::default()); + let max_topic_size = 2 * config.segment.size.as_bytes_u64(); + let max_topic_size = MaxTopicSize::Custom(IggyByteSize::from(max_topic_size)); let storage = Arc::new(get_test_system_storage()); let mut stream = Stream::create(stream_id, stream_name, config, storage); stream diff --git a/server/src/streaming/topics/topic.rs b/server/src/streaming/topics/topic.rs index c7cdc7fa2..37980a429 100644 --- a/server/src/streaming/topics/topic.rs +++ b/server/src/streaming/topics/topic.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::RwLock; +use tracing::info; const ALMOST_FULL_THRESHOLD: f64 = 0.9; @@ -110,9 +111,9 @@ impl Topic { consumer_groups_ids: HashMap::new(), current_consumer_group_id: AtomicU32::new(1), current_partition_id: AtomicU32::new(1), - message_expiry: match config.segment.message_expiry { - IggyExpiry::NeverExpire => message_expiry, - value => value, + message_expiry: match message_expiry { + IggyExpiry::ServerDefault => config.segment.message_expiry, + _ => message_expiry, }, compression_algorithm, max_topic_size: match max_topic_size { @@ -124,6 +125,11 @@ impl Topic { created_at: IggyTimestamp::now(), }; + info!( + "Received message expiry: {}, set expiry: {}", + message_expiry, topic.message_expiry + ); + topic.add_partitions(partitions_count)?; Ok(topic) }