diff --git a/Cargo.lock b/Cargo.lock index 5f903209b..dccf70af2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2267,7 +2267,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.13" +version = "0.6.14" dependencies = [ "aes-gcm", "ahash 0.8.11", @@ -4279,7 +4279,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.21" +version = "0.4.22" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/configs/server.json b/configs/server.json index 20aeb1415..82d00f815 100644 --- a/configs/server.json +++ b/configs/server.json @@ -163,7 +163,7 @@ "path": "partitions", "enforce_fsync": false, "validate_checksum": false, - "messages_required_to_save": 10000 + "messages_required_to_save": 5000 }, "segment": { "size": "1 GB", diff --git a/configs/server.toml b/configs/server.toml index f908a282e..4867cd419 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -392,7 +392,7 @@ validate_checksum = false # The threshold of buffered messages before triggering a save to disk (integer). # Specifies how many messages accumulate before persisting to storage. # Adjusting this can balance between write performance and data durability. -messages_required_to_save = 10_000 +messages_required_to_save = 5_000 # Segment configuration [system.segment] diff --git a/integration/tests/data_integrity/verify_after_server_restart.rs b/integration/tests/data_integrity/verify_after_server_restart.rs index 5179a2150..b4512511d 100644 --- a/integration/tests/data_integrity/verify_after_server_restart.rs +++ b/integration/tests/data_integrity/verify_after_server_restart.rs @@ -1,6 +1,7 @@ use crate::bench::run_bench_and_wait_for_finish; -use iggy::client::SystemClient; +use iggy::client::{MessageClient, SystemClient}; use iggy::clients::client::IggyClient; +use iggy::identifier::Identifier; use iggy::utils::byte_size::IggyByteSize; use integration::{ tcp_client::TcpClientFactory, @@ -11,6 +12,7 @@ use integration::{ use serial_test::parallel; use std::{collections::HashMap, str::FromStr}; +// TODO(numminex) - Move the message generation method from benchmark run to a special method. #[tokio::test] #[parallel] async fn should_fill_data_and_verify_after_restart() { @@ -37,10 +39,30 @@ async fn should_fill_data_and_verify_after_restart() { amount_of_data_to_process, ); + let default_bench_stream_identifiers: [Identifier; 10] = [ + Identifier::numeric(3000001).unwrap(), + Identifier::numeric(3000002).unwrap(), + Identifier::numeric(3000003).unwrap(), + Identifier::numeric(3000004).unwrap(), + Identifier::numeric(3000005).unwrap(), + Identifier::numeric(3000006).unwrap(), + Identifier::numeric(3000007).unwrap(), + Identifier::numeric(3000008).unwrap(), + Identifier::numeric(3000009).unwrap(), + Identifier::numeric(3000010).unwrap(), + ]; + // 4. Connect and login to newly started server let client = TcpClientFactory { server_addr }.create_client().await; let client = IggyClient::create(client, None, None); login_root(&client).await; + let topic_id = Identifier::numeric(1).unwrap(); + for stream_id in default_bench_stream_identifiers { + client + .flush_unsaved_buffer(&stream_id, &topic_id, 1, false) + .await + .unwrap(); + } // 5. Save stats from the first server let stats = client.get_stats().await.unwrap(); diff --git a/integration/tests/server/scenarios/stream_size_validation_scenario.rs b/integration/tests/server/scenarios/stream_size_validation_scenario.rs index 2c545136b..1d742a41e 100644 --- a/integration/tests/server/scenarios/stream_size_validation_scenario.rs +++ b/integration/tests/server/scenarios/stream_size_validation_scenario.rs @@ -17,7 +17,7 @@ const T2_NAME: &str = "test-topic-2"; const MESSAGE_PAYLOAD_SIZE_BYTES: u64 = 57; const MSG_SIZE: u64 = 16 + 8 + 8 + 4 + 4 + 4 + 1 + MESSAGE_PAYLOAD_SIZE_BYTES; // number of bytes in a single message const MSGS_COUNT: u64 = 117; // number of messages in a single topic after one pass of appending -const MSGS_SIZE: u64 = MSG_SIZE * MSGS_COUNT + 8 + 4 + 8 + 4; // number of bytes in a single topic after one pass of appending +const MSGS_SIZE: u64 = MSG_SIZE * MSGS_COUNT; // number of bytes in a single topic after one pass of appending pub async fn run(client_factory: &dyn ClientFactory) { let _ = tracing_subscriber::fmt::try_init(); diff --git a/integration/tests/server/scenarios/system_scenario.rs b/integration/tests/server/scenarios/system_scenario.rs index a4c528111..ad64b0c60 100644 --- a/integration/tests/server/scenarios/system_scenario.rs +++ b/integration/tests/server/scenarios/system_scenario.rs @@ -292,7 +292,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(topic.name, TOPIC_NAME); assert_eq!(topic.partitions_count, PARTITIONS_COUNT); assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize); - assert_eq!(topic.size, 55914); + assert_eq!(topic.size, 55890); assert_eq!(topic.messages_count, MESSAGES_COUNT as u64); let topic_partition = topic.partitions.get((PARTITION_ID - 1) as usize).unwrap(); assert_eq!(topic_partition.id, PARTITION_ID); diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs index cbc379479..7673340bd 100644 --- a/integration/tests/streaming/segment.rs +++ b/integration/tests/streaming/segment.rs @@ -1,13 +1,13 @@ use crate::streaming::common::test_setup::TestSetup; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use iggy::bytes_serializable::BytesSerializable; use iggy::models::messages::{MessageState, PolledMessage}; use iggy::utils::expiry::IggyExpiry; use iggy::utils::{checksum, timestamp::IggyTimestamp}; -use server::streaming::batching::message_batch::RetainedMessageBatch; use server::streaming::models::messages::RetainedMessage; use server::streaming::segments::segment; use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION}; +use server::streaming::sizeable::Sizeable; use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::fs; @@ -151,19 +151,12 @@ async fn should_persist_and_load_segment_with_messages() { ) .await; let messages_count = 10; - let mut base_offset = 0; - let mut last_timestamp = IggyTimestamp::zero().as_micros(); - let mut batch_buffer = BytesMut::new(); + let mut messages = Vec::new(); + let mut batch_size = 0u64; for i in 0..messages_count { let message = create_message(i, "test", IggyTimestamp::now()); - if i == 0 { - base_offset = message.offset; - } - if i == messages_count - 1 { - last_timestamp = message.timestamp; - } - let retained_message = RetainedMessage { + let retained_message = Arc::new(RetainedMessage { id: message.id, offset: message.offset, timestamp: message.timestamp, @@ -171,20 +164,16 @@ async fn should_persist_and_load_segment_with_messages() { message_state: message.state, headers: message.headers.map(|headers| headers.to_bytes()), payload: message.payload.clone(), - }; - retained_message.extend(&mut batch_buffer); + }); + batch_size += retained_message.get_size_bytes() as u64; + messages.push(retained_message); } - let batch = Arc::new(RetainedMessageBatch::new( - base_offset, - messages_count as u32 - 1, - last_timestamp, - batch_buffer.len() as u32, - batch_buffer.freeze(), - )); - segment.append_batch(batch).await.unwrap(); + segment + .append_batch(batch_size, messages_count as u32, &messages) + .await + .unwrap(); segment.persist_messages().await.unwrap(); - let mut loaded_segment = segment::Segment::create( stream_id, topic_id, @@ -247,20 +236,13 @@ async fn given_all_expired_messages_segment_should_be_expired() { let messages_count = 10; let now = IggyTimestamp::now(); let mut expired_timestamp = (now.as_micros() - 2 * message_expiry_ms).into(); - let mut base_offset = 0; - let mut last_timestamp = 0; - let mut batch_buffer = BytesMut::new(); + let mut batch_size = 0u64; + let mut messages = Vec::new(); for i in 0..messages_count { let message = create_message(i, "test", expired_timestamp); expired_timestamp = (expired_timestamp.as_micros() + 1).into(); - if i == 0 { - base_offset = message.offset; - } - if i == messages_count - 1 { - last_timestamp = message.timestamp; - } - let retained_message = RetainedMessage { + let retained_message = Arc::new(RetainedMessage { id: message.id, offset: message.offset, timestamp: message.timestamp, @@ -268,19 +250,14 @@ async fn given_all_expired_messages_segment_should_be_expired() { message_state: message.state, headers: message.headers.map(|headers| headers.to_bytes()), payload: message.payload.clone(), - }; - retained_message.extend(&mut batch_buffer); + }); + batch_size += retained_message.get_size_bytes() as u64; + messages.push(retained_message); } - let batch = Arc::new(RetainedMessageBatch::new( - base_offset, - messages_count as u32 - 1, - last_timestamp, - batch_buffer.len() as u32, - batch_buffer.freeze(), - )); - - segment.append_batch(batch).await.unwrap(); - + segment + .append_batch(batch_size, messages_count as u32, &messages) + .await + .unwrap(); segment.persist_messages().await.unwrap(); segment.is_closed = true; @@ -330,8 +307,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() let expired_message = create_message(0, "test", expired_timestamp.into()); let not_expired_message = create_message(1, "test", not_expired_timestamp.into()); - let mut expired_batch_buffer = BytesMut::new(); - let expired_retained_message = RetainedMessage { + let expired_retained_message = Arc::new(RetainedMessage { id: expired_message.id, offset: expired_message.offset, timestamp: expired_message.timestamp, @@ -339,18 +315,13 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() message_state: expired_message.state, headers: expired_message.headers.map(|headers| headers.to_bytes()), payload: expired_message.payload.clone(), - }; - expired_retained_message.extend(&mut expired_batch_buffer); - let expired_batch = Arc::new(RetainedMessageBatch::new( - 0, - 1, - expired_timestamp, - expired_batch_buffer.len() as u32, - expired_batch_buffer.freeze(), - )); + }); + let mut expired_messages = Vec::new(); + let expired_message_size = expired_retained_message.get_size_bytes() as u64; + expired_messages.push(expired_retained_message); - let mut not_expired_batch_buffer = BytesMut::new(); - let not_expired_retained_message = RetainedMessage { + let mut not_expired_messages = Vec::new(); + let not_expired_retained_message = Arc::new(RetainedMessage { id: not_expired_message.id, offset: not_expired_message.offset, timestamp: not_expired_message.timestamp, @@ -360,18 +331,18 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() .headers .map(|headers| headers.to_bytes()), payload: not_expired_message.payload.clone(), - }; - not_expired_retained_message.extend(&mut not_expired_batch_buffer); - let not_expired_batch = Arc::new(RetainedMessageBatch::new( - 1, - 1, - expired_timestamp, - not_expired_batch_buffer.len() as u32, - not_expired_batch_buffer.freeze(), - )); + }); + let not_expired_message_size = not_expired_retained_message.get_size_bytes() as u64; + not_expired_messages.push(not_expired_retained_message); - segment.append_batch(expired_batch).await.unwrap(); - segment.append_batch(not_expired_batch).await.unwrap(); + segment + .append_batch(expired_message_size, 1, &expired_messages) + .await + .unwrap(); + segment + .append_batch(not_expired_message_size, 1, ¬_expired_messages) + .await + .unwrap(); segment.persist_messages().await.unwrap(); let is_expired = segment.is_expired(now).await; diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index f9c2fc505..5c0699c9e 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.13" +version = "0.6.14" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/binary/messages.rs b/sdk/src/binary/messages.rs index 65ed85206..832d40454 100644 --- a/sdk/src/binary/messages.rs +++ b/sdk/src/binary/messages.rs @@ -5,6 +5,7 @@ use crate::command::{POLL_MESSAGES_CODE, SEND_MESSAGES_CODE}; use crate::consumer::Consumer; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::messages::flush_unsaved_buffer::FlushUnsavedBuffer; use crate::messages::poll_messages::PollingStrategy; use crate::messages::send_messages::{Message, Partitioning}; use crate::messages::{poll_messages, send_messages}; @@ -55,4 +56,22 @@ impl MessageClient for B { .await?; Ok(()) } + + async fn flush_unsaved_buffer( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: u32, + fsync: bool, + ) -> Result<(), IggyError> { + fail_if_not_authenticated(self).await?; + self.send_with_response(&FlushUnsavedBuffer { + stream_id: stream_id.clone(), + topic_id: topic_id.clone(), + partition_id, + fsync, + }) + .await?; + Ok(()) + } } diff --git a/sdk/src/client.rs b/sdk/src/client.rs index a61d727ee..16ca56942 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -326,6 +326,15 @@ pub trait MessageClient { partitioning: &Partitioning, messages: &mut [Message], ) -> Result<(), IggyError>; + /// Force flush of the `unsaved_messages` buffer to disk, optionally fsyncing the data. + #[allow(clippy::too_many_arguments)] + async fn flush_unsaved_buffer( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: u32, + fsync: bool, + ) -> Result<(), IggyError>; } /// This trait defines the methods to interact with the consumer offset module. diff --git a/sdk/src/clients/client.rs b/sdk/src/clients/client.rs index c4efeb489..fdeca8c5c 100644 --- a/sdk/src/clients/client.rs +++ b/sdk/src/clients/client.rs @@ -549,6 +549,20 @@ impl MessageClient for IggyClient { .send_messages(stream_id, topic_id, partitioning, messages) .await } + + async fn flush_unsaved_buffer( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: u32, + fsync: bool, + ) -> Result<(), IggyError> { + self.client + .read() + .await + .flush_unsaved_buffer(stream_id, topic_id, partition_id, fsync) + .await + } } #[async_trait] diff --git a/sdk/src/command.rs b/sdk/src/command.rs index 662a3f411..0045aabf9 100644 --- a/sdk/src/command.rs +++ b/sdk/src/command.rs @@ -47,6 +47,8 @@ pub const POLL_MESSAGES: &str = "message.poll"; pub const POLL_MESSAGES_CODE: u32 = 100; pub const SEND_MESSAGES: &str = "message.send"; pub const SEND_MESSAGES_CODE: u32 = 101; +pub const FLUSH_UNSAVED_BUFFER: &str = "message.flush_unsaved_buffer"; +pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102; pub const GET_CONSUMER_OFFSET: &str = "consumer_offset.get"; pub const GET_CONSUMER_OFFSET_CODE: u32 = 120; pub const STORE_CONSUMER_OFFSET: &str = "consumer_offset.store"; @@ -114,6 +116,7 @@ pub fn get_name_from_code(code: u32) -> Result<&'static str, IggyError> { LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => Ok(LOGIN_WITH_PERSONAL_ACCESS_TOKEN), SEND_MESSAGES_CODE => Ok(SEND_MESSAGES), POLL_MESSAGES_CODE => Ok(POLL_MESSAGES), + FLUSH_UNSAVED_BUFFER_CODE => Ok(FLUSH_UNSAVED_BUFFER), STORE_CONSUMER_OFFSET_CODE => Ok(STORE_CONSUMER_OFFSET), GET_CONSUMER_OFFSET_CODE => Ok(GET_CONSUMER_OFFSET), GET_STREAM_CODE => Ok(GET_STREAM), diff --git a/sdk/src/http/messages.rs b/sdk/src/http/messages.rs index 0f1b289b8..09b04954d 100644 --- a/sdk/src/http/messages.rs +++ b/sdk/src/http/messages.rs @@ -4,6 +4,7 @@ use crate::error::IggyError; use crate::http::client::HttpClient; use crate::http::HttpTransport; use crate::identifier::Identifier; +use crate::messages::flush_unsaved_buffer::FlushUnsavedBuffer; use crate::messages::poll_messages::{PollMessages, PollingStrategy}; use crate::messages::send_messages::{Message, Partitioning, SendMessages}; use crate::models::messages::PolledMessages; @@ -58,8 +59,43 @@ impl MessageClient for HttpClient { .await?; Ok(()) } + + async fn flush_unsaved_buffer( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: u32, + fsync: bool, + ) -> Result<(), IggyError> { + let _ = self + .get_with_query( + &get_path_flush_unsaved_buffer( + &stream_id.as_cow_str(), + &topic_id.as_cow_str(), + partition_id, + fsync, + ), + &FlushUnsavedBuffer { + stream_id: stream_id.clone(), + topic_id: topic_id.clone(), + partition_id, + fsync, + }, + ) + .await?; + Ok(()) + } } fn get_path(stream_id: &str, topic_id: &str) -> String { format!("streams/{stream_id}/topics/{topic_id}/messages") } + +fn get_path_flush_unsaved_buffer( + stream_id: &str, + topic_id: &str, + partition_id: u32, + fsync: bool, +) -> String { + format!("streams/{stream_id}/topics/{topic_id}/messages/flush/{partition_id}/fsync={fsync}") +} diff --git a/sdk/src/messages/flush_unsaved_buffer.rs b/sdk/src/messages/flush_unsaved_buffer.rs new file mode 100644 index 000000000..952db9fd6 --- /dev/null +++ b/sdk/src/messages/flush_unsaved_buffer.rs @@ -0,0 +1,116 @@ +use std::fmt::Display; + +use bytes::{BufMut, Bytes, BytesMut}; +use serde::{Deserialize, Serialize}; + +use crate::{ + bytes_serializable::BytesSerializable, + command::{Command, FLUSH_UNSAVED_BUFFER_CODE}, + error::IggyError, + identifier::Identifier, + validatable::Validatable, +}; + +/// `FlushUnsavedBuffer` command is used to force a flush of `unsaved_buffer` to disk for specific stream -> topic -> partition. +/// - `stream_id` - stream identifier +/// - `topic_id` - topic identifier +/// - `partition_id` - partition identifier +/// - `fsync` - if `true` then the data is flushed to disk and fsynced, if `false` then the data is only flushed to disk. +#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +pub struct FlushUnsavedBuffer { + pub stream_id: Identifier, + pub topic_id: Identifier, + pub partition_id: u32, + pub fsync: bool, +} + +impl FlushUnsavedBuffer { + fn fsync_stringified(&self) -> &'static str { + if self.fsync { + "f" + } else { + "n" + } + } +} + +impl Display for FlushUnsavedBuffer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}|{}|{}|{}", + self.stream_id, + self.topic_id, + self.partition_id, + self.fsync_stringified() + ) + } +} + +impl Command for FlushUnsavedBuffer { + fn code(&self) -> u32 { + FLUSH_UNSAVED_BUFFER_CODE + } +} + +impl BytesSerializable for FlushUnsavedBuffer { + fn to_bytes(&self) -> Bytes { + let stream_id_bytes = self.stream_id.to_bytes(); + let topic_id_bytes = self.topic_id.to_bytes(); + let mut bytes = + BytesMut::with_capacity(stream_id_bytes.len() + topic_id_bytes.len() + 4 + 1); + bytes.put_slice(&stream_id_bytes); + bytes.put_slice(&topic_id_bytes); + bytes.put_u32_le(self.partition_id); + bytes.put_u8(if self.fsync { 1 } else { 0 }); + bytes.freeze() + } + + fn from_bytes(bytes: Bytes) -> Result + where + Self: Sized, + { + let mut position = 0; + let stream_id = Identifier::from_bytes(bytes.clone())?; + position += stream_id.to_bytes().len(); + let topic_id = Identifier::from_bytes(bytes.slice(position..))?; + position += topic_id.to_bytes().len(); + let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into()?); + position += 4; + let fsync = bytes[position] == 1; + Ok(FlushUnsavedBuffer { + stream_id, + topic_id, + partition_id, + fsync, + }) + } +} + +impl Validatable for FlushUnsavedBuffer { + fn validate(&self) -> Result<(), IggyError> { + Ok(()) + } +} + +#[cfg(test)] +mod test { + use crate::bytes_serializable::BytesSerializable; + use crate::identifier::Identifier; + use crate::messages::flush_unsaved_buffer::FlushUnsavedBuffer; + + #[test] + fn test_flush_unsaved_buffer_serialization() { + let stream_id = Identifier::numeric(1).unwrap(); + let topic_id = Identifier::numeric(1).unwrap(); + let flush_unsaved_buffer = super::FlushUnsavedBuffer { + stream_id, + topic_id, + partition_id: 1, + fsync: false, + }; + let bytes = flush_unsaved_buffer.to_bytes(); + let deserialized_flush_unsaved_buffer = FlushUnsavedBuffer::from_bytes(bytes).unwrap(); + assert_eq!(flush_unsaved_buffer, deserialized_flush_unsaved_buffer); + } +} diff --git a/sdk/src/messages/mod.rs b/sdk/src/messages/mod.rs index d559e031d..8a588efca 100644 --- a/sdk/src/messages/mod.rs +++ b/sdk/src/messages/mod.rs @@ -1,3 +1,4 @@ +pub mod flush_unsaved_buffer; pub mod poll_messages; pub mod send_messages; diff --git a/server/Cargo.toml b/server/Cargo.toml index 9f6e9d4fe..04ba3d460 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.21" +version = "0.4.22" edition = "2021" build = "src/build.rs" diff --git a/server/src/binary/command.rs b/server/src/binary/command.rs index 99c6f7659..a6a077928 100644 --- a/server/src/binary/command.rs +++ b/server/src/binary/command.rs @@ -174,5 +174,8 @@ async fn try_handle( ServerCommand::LeaveConsumerGroup(command) => { leave_consumer_group_handler::handle(command, sender, session, system).await } + ServerCommand::FlushUnsavedBuffer(command) => { + flush_unsaved_buffer_handler::handle(command, sender, session, system).await + } } } diff --git a/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs b/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs new file mode 100644 index 000000000..1b75b0e83 --- /dev/null +++ b/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs @@ -0,0 +1,26 @@ +use crate::binary::sender::Sender; +use crate::streaming::session::Session; +use crate::streaming::systems::system::SharedSystem; +use anyhow::Result; +use iggy::error::IggyError; +use iggy::messages::flush_unsaved_buffer::FlushUnsavedBuffer; +use tracing::debug; + +pub async fn handle( + command: FlushUnsavedBuffer, + sender: &mut dyn Sender, + session: &Session, + system: &SharedSystem, +) -> Result<(), IggyError> { + debug!("session: {session}, command: {command}"); + let system = system.read().await; + let stream_id = command.stream_id; + let topic_id = command.topic_id; + let partition_id = command.partition_id; + let fsync = command.fsync; + system + .flush_unsaved_buffer(session, stream_id, topic_id, partition_id, fsync) + .await?; + sender.send_empty_ok_response().await?; + Ok(()) +} diff --git a/server/src/binary/handlers/messages/mod.rs b/server/src/binary/handlers/messages/mod.rs index efb605db3..37da2860d 100644 --- a/server/src/binary/handlers/messages/mod.rs +++ b/server/src/binary/handlers/messages/mod.rs @@ -1,2 +1,3 @@ +pub mod flush_unsaved_buffer_handler; pub mod poll_messages_handler; pub mod send_messages_handler; diff --git a/server/src/command.rs b/server/src/command.rs index 94e220c4d..82fb66053 100644 --- a/server/src/command.rs +++ b/server/src/command.rs @@ -1,5 +1,4 @@ use bytes::{BufMut, Bytes, BytesMut}; -use iggy::bytes_serializable::BytesSerializable; use iggy::command::*; use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup; use iggy::consumer_groups::delete_consumer_group::DeleteConsumerGroup; @@ -45,6 +44,9 @@ use iggy::users::logout_user::LogoutUser; use iggy::users::update_permissions::UpdatePermissions; use iggy::users::update_user::UpdateUser; use iggy::validatable::Validatable; +use iggy::{ + bytes_serializable::BytesSerializable, messages::flush_unsaved_buffer::FlushUnsavedBuffer, +}; use std::fmt::{Display, Formatter}; use strum::EnumString; @@ -70,6 +72,7 @@ pub enum ServerCommand { LoginWithPersonalAccessToken(LoginWithPersonalAccessToken), SendMessages(SendMessages), PollMessages(PollMessages), + FlushUnsavedBuffer(FlushUnsavedBuffer), GetConsumerOffset(GetConsumerOffset), StoreConsumerOffset(StoreConsumerOffset), GetStream(GetStream), @@ -139,6 +142,7 @@ impl BytesSerializable for ServerCommand { ServerCommand::DeleteConsumerGroup(payload) => as_bytes(payload), ServerCommand::JoinConsumerGroup(payload) => as_bytes(payload), ServerCommand::LeaveConsumerGroup(payload) => as_bytes(payload), + ServerCommand::FlushUnsavedBuffer(payload) => as_bytes(payload), } } @@ -184,6 +188,9 @@ impl BytesSerializable for ServerCommand { POLL_MESSAGES_CODE => Ok(ServerCommand::PollMessages(PollMessages::from_bytes( payload, )?)), + FLUSH_UNSAVED_BUFFER_CODE => Ok(ServerCommand::FlushUnsavedBuffer( + FlushUnsavedBuffer::from_bytes(payload)?, + )), STORE_CONSUMER_OFFSET_CODE => Ok(ServerCommand::StoreConsumerOffset( StoreConsumerOffset::from_bytes(payload)?, )), @@ -298,6 +305,7 @@ impl Validatable for ServerCommand { ServerCommand::DeleteConsumerGroup(command) => command.validate(), ServerCommand::JoinConsumerGroup(command) => command.validate(), ServerCommand::LeaveConsumerGroup(command) => command.validate(), + ServerCommand::FlushUnsavedBuffer(command) => command.validate(), } } } @@ -379,6 +387,9 @@ impl Display for ServerCommand { ServerCommand::LeaveConsumerGroup(payload) => { write!(formatter, "{LEAVE_CONSUMER_GROUP}|{payload}") } + ServerCommand::FlushUnsavedBuffer(payload) => { + write!(formatter, "{FLUSH_UNSAVED_BUFFER}|{payload}") + } } } } @@ -599,6 +610,11 @@ mod tests { LEAVE_CONSUMER_GROUP_CODE, &LeaveConsumerGroup::default(), ); + assert_serialized_as_bytes_and_deserialized_from_bytes( + &ServerCommand::FlushUnsavedBuffer(FlushUnsavedBuffer::default()), + FLUSH_UNSAVED_BUFFER_CODE, + &FlushUnsavedBuffer::default(), + ); } fn assert_serialized_as_bytes_and_deserialized_from_bytes( diff --git a/server/src/compat/storage_conversion/mod.rs b/server/src/compat/storage_conversion/mod.rs index d63cc541e..0663e7058 100644 --- a/server/src/compat/storage_conversion/mod.rs +++ b/server/src/compat/storage_conversion/mod.rs @@ -234,7 +234,7 @@ impl SegmentStorage for NoopSegmentStorage { async fn save_batches( &self, _segment: &Segment, - _batches: &[Arc], + _batch: RetainedMessageBatch, ) -> Result { Ok(0) } @@ -260,7 +260,7 @@ impl SegmentStorage for NoopSegmentStorage { Ok(None) } - async fn save_index(&self, _segment: &Segment) -> Result<(), IggyError> { + async fn save_index(&self, _index_path: &str, _index: Index) -> Result<(), IggyError> { Ok(()) } @@ -283,7 +283,7 @@ impl SegmentStorage for NoopSegmentStorage { Ok(None) } - async fn save_time_index(&self, _segment: &Segment) -> Result<(), IggyError> { + async fn save_time_index(&self, _index_path: &str, _index: TimeIndex) -> Result<(), IggyError> { Ok(()) } } diff --git a/server/src/compat/storage_conversion/persistency/partitions.rs b/server/src/compat/storage_conversion/persistency/partitions.rs index 03a818478..7a65c9221 100644 --- a/server/src/compat/storage_conversion/persistency/partitions.rs +++ b/server/src/compat/storage_conversion/persistency/partitions.rs @@ -1,4 +1,5 @@ use crate::configs::system::SystemConfig; +use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::segments::segment::{Segment, LOG_EXTENSION}; use anyhow::Context; @@ -189,8 +190,12 @@ pub async fn load( ); segment.load().await?; + let capacity = partition.config.partition.messages_required_to_save; if !segment.is_closed { - segment.unsaved_batches = Some(Vec::new()) + segment.unsaved_messages = Some(BatchAccumulator::new( + segment.current_offset, + capacity as usize, + )) } // If the first segment has at least a single message, we should increment the offset. diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index 5f660cde7..e29d687e5 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -22,6 +22,10 @@ pub fn router(state: Arc) -> Router { "/streams/:stream_id/topics/:topic_id/messages", get(poll_messages).post(send_messages), ) + .route( + "/streams/:stream_id/topics/:topic_id/messages/flush/:partition_id/:fsync", + get(flush_unsaved_buffer), + ) .with_state(state) } @@ -82,3 +86,23 @@ async fn send_messages( .await?; Ok(StatusCode::CREATED) } + +async fn flush_unsaved_buffer( + State(state): State>, + Extension(identity): Extension, + Path((stream_id, topic_id, partition_id, fsync)): Path<(String, String, u32, bool)>, +) -> Result { + let stream_id = Identifier::from_str_value(&stream_id)?; + let topic_id = Identifier::from_str_value(&topic_id)?; + let system = state.system.read().await; + system + .flush_unsaved_buffer( + &Session::stateless(identity.user_id, identity.ip_address), + stream_id, + topic_id, + partition_id, + fsync, + ) + .await?; + Ok(StatusCode::OK) +} diff --git a/server/src/streaming/batching/batch_accumulator.rs b/server/src/streaming/batching/batch_accumulator.rs new file mode 100644 index 000000000..86382927c --- /dev/null +++ b/server/src/streaming/batching/batch_accumulator.rs @@ -0,0 +1,109 @@ +use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; +use crate::streaming::{models::messages::RetainedMessage, sizeable::Sizeable}; +use bytes::BytesMut; +use std::sync::Arc; + +#[derive(Debug)] +pub struct BatchAccumulator { + base_offset: u64, + current_size: u64, + current_offset: u64, + current_timestamp: u64, + capacity: u64, + messages: Vec>, +} + +impl BatchAccumulator { + pub fn new(base_offset: u64, capacity: usize) -> Self { + Self { + base_offset, + current_size: 0, + current_offset: 0, + current_timestamp: 0, + capacity: capacity as u64, + messages: Vec::with_capacity(capacity), + } + } + + pub fn append(&mut self, batch_size: u64, items: &[Arc]) { + assert!(!items.is_empty()); + self.current_size += batch_size; + self.current_offset = items.last().unwrap().offset; + self.current_timestamp = items.last().unwrap().timestamp; + self.messages.extend(items.iter().cloned()); + } + + pub fn get_messages_by_offset( + &self, + start_offset: u64, + end_offset: u64, + ) -> Vec> { + self.messages + .iter() + .filter(|msg| msg.offset >= start_offset && msg.offset <= end_offset) + .cloned() + .collect() + } + + pub fn is_empty(&self) -> bool { + self.messages.is_empty() + } + + pub fn unsaved_messages_count(&self) -> usize { + self.messages.len() + } + + pub fn batch_max_offset(&self) -> u64 { + self.current_offset + } + + pub fn batch_max_timestamp(&self) -> u64 { + self.current_timestamp + } + + pub fn batch_base_offset(&self) -> u64 { + self.base_offset + } + + pub fn get_size_bytes(&self) -> u64 { + self.current_size + RETAINED_BATCH_OVERHEAD as u64 + } + + pub fn materialize_batch_and_maybe_update_state(&mut self) -> (bool, RetainedMessageBatch) { + let batch_base_offset = self.base_offset; + let batch_last_offset_delta = (self.current_offset - self.base_offset) as u32; + let batch_max_timestamp = self.messages.last().unwrap().timestamp; + let split_point = std::cmp::min(self.capacity as usize, self.messages.len()); + let (batch, remainder) = self.messages.as_slice().split_at(split_point); + let mut bytes = BytesMut::with_capacity(self.current_size as usize); + for message in batch { + message.extend(&mut bytes); + } + + let mut remaining_messages = Vec::with_capacity(remainder.len()); + let has_remainder = !remainder.is_empty(); + if has_remainder { + self.base_offset = remainder.first().unwrap().offset; + self.current_size = remainder + .iter() + .map(|msg| msg.get_size_bytes() as u64) + .sum(); + self.current_offset = remainder.last().unwrap().offset; + self.current_timestamp = remainder.last().unwrap().timestamp; + for message in remainder { + remaining_messages.push(message.clone()); + } + self.messages = remaining_messages; + } + let batch_payload = bytes.freeze(); + let batch_payload_len = batch_payload.len() as u32; + let batch = RetainedMessageBatch::new( + batch_base_offset, + batch_last_offset_delta, + batch_max_timestamp, + batch_payload_len, + batch_payload, + ); + (has_remainder, batch) + } +} diff --git a/server/src/streaming/batching/message_batch.rs b/server/src/streaming/batching/message_batch.rs index 8985f138a..5acacc0a7 100644 --- a/server/src/streaming/batching/message_batch.rs +++ b/server/src/streaming/batching/message_batch.rs @@ -1,14 +1,9 @@ -use std::ops::Deref; - use crate::streaming::batching::batch_filter::BatchItemizer; use crate::streaming::batching::iterator::IntoMessagesIterator; use crate::streaming::models::messages::RetainedMessage; use bytes::{BufMut, Bytes, BytesMut}; -use iggy::error::IggyError::{ - self, MissingBaseOffsetRetainedMessageBatch, MissingLastOffsetDeltaRetainedMessageBatch, - MissingLengthRetainedMessageBatch, MissingMaxTimestampRetainedMessageBatch, - MissingPayloadRetainedMessageBatch, -}; + +pub const RETAINED_BATCH_OVERHEAD: u32 = 8 + 8 + 4 + 4; use crate::streaming::sizeable::Sizeable; #[derive(Debug, Clone)] @@ -37,10 +32,6 @@ impl RetainedMessageBatch { } } - pub fn builder() -> RetainedMessageBatchBuilder { - RetainedMessageBatchBuilder::new() - } - pub fn is_contained_or_overlapping_within_offset_range( &self, start_offset: u64, @@ -87,83 +78,6 @@ where impl Sizeable for RetainedMessageBatch { fn get_size_bytes(&self) -> u32 { - 8 + 4 + 8 + 4 + self.length - } -} - -impl Sizeable for T -where - T: Deref, -{ - fn get_size_bytes(&self) -> u32 { - 8 + 4 + 8 + 4 + self.length - } -} - -#[derive(Debug, Clone)] -pub struct RetainedMessageBatchBuilder { - base_offset: Option, - last_offset_delta: Option, - max_timestamp: Option, - length: Option, - payload: Option, -} - -impl RetainedMessageBatchBuilder { - fn new() -> Self { - RetainedMessageBatchBuilder { - base_offset: None, - last_offset_delta: None, - max_timestamp: None, - length: None, - payload: None, - } - } - - pub fn base_offset(mut self, base_offset: u64) -> Self { - self.base_offset = Some(base_offset); - self - } - - pub fn last_offset_delta(mut self, last_offset_delta: u32) -> Self { - self.last_offset_delta = Some(last_offset_delta); - self - } - - pub fn max_timestamp(mut self, max_timestamp: u64) -> Self { - self.max_timestamp = Some(max_timestamp); - self - } - - pub fn length(mut self, length: u32) -> Self { - self.length = Some(length); - self - } - - pub fn payload(mut self, payload: Bytes) -> Self { - self.payload = Some(payload); - self - } - - pub fn build(self) -> Result { - let base_offset = self - .base_offset - .ok_or(MissingBaseOffsetRetainedMessageBatch)?; - let last_offset_delta = self - .last_offset_delta - .ok_or(MissingLastOffsetDeltaRetainedMessageBatch)?; - let max_timestamp = self - .max_timestamp - .ok_or(MissingMaxTimestampRetainedMessageBatch)?; - let length = self.length.ok_or(MissingLengthRetainedMessageBatch)?; - let bytes = self.payload.ok_or(MissingPayloadRetainedMessageBatch)?; - - Ok(RetainedMessageBatch { - base_offset, - last_offset_delta, - max_timestamp, - length, - bytes, - }) + RETAINED_BATCH_OVERHEAD + self.length } } diff --git a/server/src/streaming/batching/mod.rs b/server/src/streaming/batching/mod.rs index 5ae0300d8..285cf0c2a 100644 --- a/server/src/streaming/batching/mod.rs +++ b/server/src/streaming/batching/mod.rs @@ -1,4 +1,5 @@ pub mod appendable_batch_info; +pub mod batch_accumulator; pub mod batch_filter; pub mod iterator; pub mod message_batch; diff --git a/server/src/streaming/models/messages.rs b/server/src/streaming/models/messages.rs index 8e0998760..105a0ee49 100644 --- a/server/src/streaming/models/messages.rs +++ b/server/src/streaming/models/messages.rs @@ -7,6 +7,7 @@ use iggy::utils::checksum; use iggy::{messages::send_messages::Message, models::messages::MessageState}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; // It's the same as PolledMessages from Iggy models, but with the Arc instead of Message. @@ -28,21 +29,20 @@ pub struct RetainedMessage { pub payload: Bytes, } -impl TryFrom for PolledMessage { - type Error = IggyError; - fn try_from(value: RetainedMessage) -> Result { - let headers = value.headers.map(HashMap::from_bytes).transpose()?; - let messages = PolledMessage { - offset: value.offset, - state: value.message_state, - timestamp: value.timestamp, - id: value.id, - checksum: value.checksum, +impl RetainedMessage { + pub fn to_polled_message(&self) -> Result { + let headers = self.headers.clone().map(HashMap::from_bytes).transpose()?; + let message = PolledMessage { + offset: self.offset, + state: self.message_state, + timestamp: self.timestamp, + id: self.id, + checksum: self.checksum, headers, - length: value.payload.len() as u32, - payload: value.payload, + length: self.payload.len() as u32, + payload: self.payload.clone(), }; - Ok(messages) + Ok(message) } } @@ -122,3 +122,17 @@ impl Sizeable for RetainedMessage { 16 + 8 + 8 + 4 + 1 + headers_len + self.payload.len() as u32 } } + +impl Sizeable for T +where + T: Deref, +{ + fn get_size_bytes(&self) -> u32 { + let headers_len = self + .headers + .as_ref() + .map(|h| 4 + h.len() as u32) + .unwrap_or(4); + 16 + 8 + 8 + 4 + 1 + headers_len + self.payload.len() as u32 + } +} diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index bf1487d8a..bb4ff6a01 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -1,11 +1,9 @@ use crate::streaming::batching::appendable_batch_info::AppendableBatchInfo; -use crate::streaming::batching::batch_filter::BatchItemizer; -use crate::streaming::batching::message_batch::RetainedMessageBatch; +use crate::streaming::batching::iterator::IntoMessagesIterator; use crate::streaming::models::messages::RetainedMessage; use crate::streaming::partitions::partition::Partition; use crate::streaming::polling_consumer::PollingConsumer; use crate::streaming::segments::segment::Segment; -use bytes::BytesMut; use iggy::messages::send_messages::Message; use iggy::models::messages::POLLED_MESSAGE_METADATA; use iggy::utils::timestamp::IggyTimestamp; @@ -15,7 +13,6 @@ use std::time::Duration; use tracing::{trace, warn}; const EMPTY_MESSAGES: Vec = vec![]; -const EMPTY_BATCHES: Vec = vec![]; impl Partition { pub fn get_messages_count(&self) -> u64 { @@ -26,14 +23,14 @@ impl Partition { &self, timestamp: IggyTimestamp, count: u32, - ) -> Result, IggyError> { + ) -> Result>, IggyError> { trace!( "Getting messages by timestamp: {} for partition: {}...", timestamp, self.partition_id ); if self.segments.is_empty() { - return Ok(EMPTY_MESSAGES); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } let timestamp = timestamp.as_micros(); @@ -128,18 +125,18 @@ impl Partition { &self, start_offset: u64, count: u32, - ) -> Result, IggyError> { + ) -> Result>, IggyError> { trace!( "Getting messages for start offset: {} for partition: {}...", start_offset, self.partition_id ); if self.segments.is_empty() { - return Ok(EMPTY_MESSAGES); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } if start_offset > self.current_offset { - return Ok(EMPTY_MESSAGES); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } let end_offset = self.get_end_offset(start_offset, count); @@ -151,17 +148,23 @@ impl Partition { let segments = self.filter_segments_by_offsets(start_offset, end_offset); match segments.len() { - 0 => Ok(EMPTY_MESSAGES), + 0 => Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()), 1 => segments[0].get_messages(start_offset, count).await, _ => Self::get_messages_from_segments(segments, start_offset, count).await, } } - pub async fn get_first_messages(&self, count: u32) -> Result, IggyError> { + pub async fn get_first_messages( + &self, + count: u32, + ) -> Result>, IggyError> { self.get_messages_by_offset(0, count).await } - pub async fn get_last_messages(&self, count: u32) -> Result, IggyError> { + pub async fn get_last_messages( + &self, + count: u32, + ) -> Result>, IggyError> { let mut count = count as u64; if count > self.current_offset + 1 { count = self.current_offset + 1 @@ -176,7 +179,7 @@ impl Partition { &self, consumer: PollingConsumer, count: u32, - ) -> Result, IggyError> { + ) -> Result>, IggyError> { let (consumer_offsets, consumer_id) = match consumer { PollingConsumer::Consumer(consumer_id, _) => (&self.consumer_offsets, consumer_id), PollingConsumer::ConsumerGroup(group_id, _) => (&self.consumer_group_offsets, group_id), @@ -200,7 +203,7 @@ impl Partition { consumer_offset.offset, self.partition_id ); - return Ok(EMPTY_MESSAGES); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } let offset = consumer_offset.offset + 1; @@ -242,7 +245,7 @@ impl Partition { segments: Vec<&Segment>, offset: u64, count: u32, - ) -> Result, IggyError> { + ) -> Result>, IggyError> { let mut messages = Vec::with_capacity(segments.len()); for segment in segments { let segment_messages = segment.get_messages(offset, count).await?; @@ -258,13 +261,13 @@ impl Partition { &self, start_offset: u64, end_offset: u64, - ) -> Option> { + ) -> Option>> { let cache = self.cache.as_ref()?; if cache.is_empty() || start_offset > end_offset || end_offset > self.current_offset { return None; } - let first_buffered_offset = cache[0].base_offset; + let first_buffered_offset = cache[0].offset; trace!( "First buffered offset: {} for partition: {}", first_buffered_offset, @@ -280,7 +283,7 @@ impl Partition { pub async fn get_newest_messages_by_size( &self, size_bytes: u64, - ) -> Result>, IggyError> { + ) -> Result>, IggyError> { trace!( "Getting messages for size: {} bytes for partition: {}...", size_bytes, @@ -288,7 +291,7 @@ impl Partition { ); if self.segments.is_empty() { - return Ok(EMPTY_BATCHES.into_iter().map(Arc::new).collect()); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } let mut remaining_size = size_bytes; @@ -317,11 +320,19 @@ impl Partition { break; } } - - Ok(batches) + let mut retained_messages = Vec::new(); + for batch in batches { + let messages = batch.into_messages_iter().map(Arc::new).collect::>(); + retained_messages.extend(messages); + } + Ok(retained_messages) } - fn load_messages_from_cache(&self, start_offset: u64, end_offset: u64) -> Vec { + fn load_messages_from_cache( + &self, + start_offset: u64, + end_offset: u64, + ) -> Vec> { trace!( "Loading messages from cache, start offset: {}, end offset: {}...", start_offset, @@ -329,41 +340,33 @@ impl Partition { ); if self.cache.is_none() || start_offset > end_offset { - return EMPTY_MESSAGES; + return EMPTY_MESSAGES.into_iter().map(Arc::new).collect(); } let cache = self.cache.as_ref().unwrap(); if cache.is_empty() { - return EMPTY_MESSAGES; + return EMPTY_MESSAGES.into_iter().map(Arc::new).collect(); } - let mut slice_start = 0; - for idx in (0..cache.len()).rev() { - if cache[idx].base_offset <= start_offset { - slice_start = idx; - break; - } + let first_offset = cache[0].offset; + let start_index = (start_offset - first_offset) as usize; + let end_index = usize::min(cache.len(), (end_offset - first_offset + 1) as usize); + let expected_messages_count = end_index - start_index; + + let mut messages = Vec::with_capacity(expected_messages_count); + for i in start_index..end_index { + messages.push(cache[i].clone()); } - let messages_count = (start_offset + end_offset) as usize; - let messages = cache - .iter() - .skip(slice_start) - .filter(|batch| { - batch.is_contained_or_overlapping_within_offset_range(start_offset, end_offset) - }) - .to_messages_with_filter(messages_count, &|msg| { - msg.offset >= start_offset && msg.offset <= end_offset - }); - - let expected_messages_count = (end_offset - start_offset + 1) as usize; + if messages.len() != expected_messages_count { warn!( "Loaded {} messages from cache, expected {}.", messages.len(), expected_messages_count ); - return EMPTY_MESSAGES; + return EMPTY_MESSAGES.into_iter().map(Arc::new).collect(); } + trace!( "Loaded {} messages from cache, start offset: {}, end offset: {}...", messages.len(), @@ -404,9 +407,7 @@ impl Partition { let mut max_timestamp = 0; let mut min_timestamp = 0; - let mut buffer = BytesMut::with_capacity(batch_size as usize); - let mut batch_builder = RetainedMessageBatch::builder(); - batch_builder = batch_builder.base_offset(base_offset); + let mut retained_messages = Vec::with_capacity(messages.len()); if let Some(message_deduplicator) = &self.message_deduplicator { for message in messages { if !message_deduplicator.try_insert(&message.id).await { @@ -422,8 +423,9 @@ impl Partition { min_timestamp = max_timestamp; } let message_offset = base_offset + messages_count as u64; - let message = RetainedMessage::new(message_offset, max_timestamp, message); - message.extend(&mut buffer); + let message = + Arc::new(RetainedMessage::new(message_offset, max_timestamp, message)); + retained_messages.push(message.clone()); messages_count += 1; } } else { @@ -434,8 +436,9 @@ impl Partition { min_timestamp = max_timestamp; } let message_offset = base_offset + messages_count as u64; - let message = RetainedMessage::new(message_offset, max_timestamp, message); - message.extend(&mut buffer); + let message = + Arc::new(RetainedMessage::new(message_offset, max_timestamp, message)); + retained_messages.push(message.clone()); messages_count += 1; } } @@ -452,7 +455,6 @@ impl Partition { self.update_avg_timestamp_delta(avg_timestamp_delta, min_alpha, max_alpha, dynamic_range); let last_offset = base_offset + (messages_count - 1) as u64; - let last_offset_delta = messages_count - 1; if self.should_increment_offset { self.current_offset = last_offset; } else { @@ -460,21 +462,15 @@ impl Partition { self.current_offset = last_offset; } - let batch = Arc::new( - batch_builder - .max_timestamp(max_timestamp) - .last_offset_delta(last_offset_delta) - .length(buffer.len() as u32) - .payload(buffer.freeze()) - .build()?, - ); { let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; - last_segment.append_batch(batch.clone()).await?; + last_segment + .append_batch(batch_size, messages_count, &retained_messages) + .await?; } if let Some(cache) = &mut self.cache { - cache.append(batch); + cache.extend(retained_messages); } self.unsaved_messages_count += messages_count; @@ -497,6 +493,28 @@ impl Partition { Ok(()) } + pub async fn flush_unsaved_buffer(&mut self, fsync: bool) -> Result<(), IggyError> { + let _fsync = fsync; + if self.unsaved_messages_count == 0 { + return Ok(()); + } + + let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; + trace!( + "Segment with start offset: {} for partition with ID: {} will be forcefully persisted on disk...", + last_segment.start_offset, + self.partition_id + ); + + // Make sure all of the messages from the accumulator are persisted + // no leftover from one round trip. + while last_segment.unsaved_messages.is_some() { + last_segment.persist_messages().await.unwrap(); + } + self.unsaved_messages_count = 0; + Ok(()) + } + fn update_avg_timestamp_delta( &mut self, avg_timestamp_delta: IggyDuration, diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index b6752b338..98ab72060 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -1,8 +1,8 @@ use crate::configs::system::SystemConfig; -use crate::streaming::batching::message_batch::RetainedMessageBatch; use crate::streaming::cache::buffer::SmartCache; use crate::streaming::cache::memory_tracker::CacheMemoryTracker; use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator; +use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::segment::Segment; use crate::streaming::storage::SystemStorage; use dashmap::DashMap; @@ -23,7 +23,7 @@ pub struct Partition { pub consumer_offsets_path: String, pub consumer_group_offsets_path: String, pub current_offset: u64, - pub cache: Option>>, + pub cache: Option>>, pub cached_memory_tracker: Option>, pub message_deduplicator: Option, pub unsaved_messages_count: u32, diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 400e8e367..692a4f299 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -1,5 +1,6 @@ use crate::compat::message_conversion::message_converter::MessageFormatConverter; use crate::state::system::PartitionState; +use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::segment::{Segment, LOG_EXTENSION}; @@ -121,8 +122,12 @@ impl PartitionStorage for FilePartitionStorage { } segment.load().await?; + let capacity = partition.config.partition.messages_required_to_save; if !segment.is_closed { - segment.unsaved_batches = Some(Vec::new()) + segment.unsaved_messages = Some(BatchAccumulator::new( + segment.current_offset, + capacity as usize, + )) } // If the first segment has at least a single message, we should increment the offset. diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index 93153c002..186795430 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -1,11 +1,11 @@ +use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::batching::batch_filter::BatchItemizer; -use crate::streaming::batching::message_batch::RetainedMessageBatch; +use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::segments::time_index::TimeIndex; use crate::streaming::sizeable::Sizeable; -use bytes::BufMut; use iggy::error::IggyError; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -26,9 +26,9 @@ impl Segment { &self, mut offset: u64, count: u32, - ) -> Result, IggyError> { + ) -> Result>, IggyError> { if count == 0 { - return Ok(EMPTY_MESSAGES); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } if offset < self.start_offset { @@ -37,21 +37,21 @@ impl Segment { let end_offset = offset + (count - 1) as u64; // In case that the partition messages buffer is disabled, we need to check the unsaved messages buffer - if self.unsaved_batches.is_none() { + if self.unsaved_messages.is_none() { return self.load_messages_from_disk(offset, end_offset).await; } - let unsaved_batches = self.unsaved_batches.as_ref().unwrap(); - if unsaved_batches.is_empty() { + let batch_accumulator = self.unsaved_messages.as_ref().unwrap(); + if batch_accumulator.is_empty() { return self.load_messages_from_disk(offset, end_offset).await; } - let first_offset = unsaved_batches[0].base_offset; + let first_offset = batch_accumulator.batch_base_offset(); if end_offset < first_offset { return self.load_messages_from_disk(offset, end_offset).await; } - let last_offset = unsaved_batches[unsaved_batches.len() - 1].get_last_offset(); + let last_offset = batch_accumulator.batch_max_offset(); if offset >= first_offset && end_offset <= last_offset { return Ok(self.load_messages_from_unsaved_buffer(offset, end_offset)); } @@ -64,7 +64,7 @@ impl Segment { Ok(messages) } - pub async fn get_all_messages(&self) -> Result, IggyError> { + pub async fn get_all_messages(&self) -> Result>, IggyError> { self.get_messages(self.start_offset, self.get_messages_count() as u32) .await } @@ -93,31 +93,16 @@ impl Segment { &self, start_offset: u64, end_offset: u64, - ) -> Vec { - let unsaved_batches = self.unsaved_batches.as_ref().unwrap(); - let slice_start = unsaved_batches - .iter() - .rposition(|batch| batch.base_offset <= start_offset) - .unwrap_or(0); - - // Take only the batch when last_offset >= relative_end_offset and it's base_offset is <= relative_end_offset - // otherwise take batches until the last_offset >= relative_end_offset and base_offset <= relative_start_offset - let messages_count = (start_offset + end_offset) as usize; - unsaved_batches[slice_start..] - .iter() - .filter(|batch| { - batch.is_contained_or_overlapping_within_offset_range(start_offset, end_offset) - }) - .to_messages_with_filter(messages_count, &|msg| { - msg.offset >= start_offset && msg.offset <= end_offset - }) + ) -> Vec> { + let batch_accumulator = self.unsaved_messages.as_ref().unwrap(); + batch_accumulator.get_messages_by_offset(start_offset, end_offset) } async fn load_messages_from_disk( &self, start_offset: u64, end_offset: u64, - ) -> Result, IggyError> { + ) -> Result>, IggyError> { trace!( "Loading messages from disk, segment start offset: {}, end offset: {}, current offset: {}...", start_offset, @@ -130,7 +115,7 @@ impl Segment { "Cannot load messages from disk, invalid offset range: {} - {}.", start_offset, end_offset ); - return Ok(EMPTY_MESSAGES); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } if let Some(indices) = &self.indexes { @@ -148,7 +133,7 @@ impl Segment { start_offset, end_offset ); - return Ok(EMPTY_MESSAGES); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } }; @@ -167,7 +152,7 @@ impl Segment { self.load_messages_from_segment_file(&index_range, start_offset, end_offset) .await } - None => Ok(EMPTY_MESSAGES), + None => Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()), } } @@ -176,7 +161,7 @@ impl Segment { index_range: &IndexRange, start_offset: u64, end_offset: u64, - ) -> Result, IggyError> { + ) -> Result>, IggyError> { let messages_count = (start_offset + end_offset) as usize; let messages = self .storage @@ -195,12 +180,14 @@ impl Segment { self.current_offset ); - Ok(messages) + Ok(messages.into_iter().map(Arc::new).collect()) } pub async fn append_batch( &mut self, - batch: Arc, + batch_size: u64, + messages_count: u32, + batch: &[Arc], ) -> Result<(), IggyError> { if self.is_closed { return Err(IggyError::SegmentClosed( @@ -208,34 +195,22 @@ impl Segment { self.partition_id, )); } - - if let Some(indexes) = &mut self.indexes { - indexes.reserve(1); - } - - if let Some(time_indexes) = &mut self.time_indexes { - time_indexes.reserve(1); - } - - let last_offset = batch.base_offset + batch.last_offset_delta as u64; - self.current_offset = last_offset; - self.end_offset = last_offset; - - self.store_offset_and_timestamp_index_for_batch(last_offset, batch.max_timestamp); - - let messages_size = batch.get_size_bytes(); - let messages_count = batch.last_offset_delta + 1; - - let unsaved_batches = self.unsaved_batches.get_or_insert_with(Vec::new); - unsaved_batches.push(batch); - self.size_bytes += messages_size; - + let messages_cap = self.config.partition.messages_required_to_save as usize; + let batch_base_offset = batch.first().unwrap().offset; + let batch_accumulator = self + .unsaved_messages + .get_or_insert_with(|| BatchAccumulator::new(batch_base_offset, messages_cap)); + batch_accumulator.append(batch_size, batch); + let curr_offset = batch_accumulator.batch_max_offset(); + + self.current_offset = curr_offset; + self.size_bytes += batch_size as u32; self.size_of_parent_stream - .fetch_add(messages_size as u64, Ordering::SeqCst); + .fetch_add(batch_size, Ordering::AcqRel); self.size_of_parent_topic - .fetch_add(messages_size as u64, Ordering::SeqCst); + .fetch_add(batch_size, Ordering::AcqRel); self.size_of_parent_partition - .fetch_add(messages_size as u64, Ordering::SeqCst); + .fetch_add(batch_size, Ordering::AcqRel); self.messages_count_of_parent_stream .fetch_add(messages_count as u64, Ordering::SeqCst); self.messages_count_of_parent_topic @@ -250,57 +225,49 @@ impl Segment { &mut self, batch_last_offset: u64, batch_max_timestamp: u64, - ) { + ) -> (Index, TimeIndex) { let relative_offset = (batch_last_offset - self.start_offset) as u32; trace!("Storing index for relative_offset: {relative_offset}"); + let index = Index { + relative_offset, + position: self.last_index_position, + }; + let time_index = TimeIndex { + relative_offset, + timestamp: batch_max_timestamp, + }; match (&mut self.indexes, &mut self.time_indexes) { (Some(indexes), Some(time_indexes)) => { - indexes.push(Index { - relative_offset, - position: self.size_bytes, - }); - time_indexes.push(TimeIndex { - relative_offset, - timestamp: batch_max_timestamp, - }); + indexes.push(index); + time_indexes.push(time_index); } (Some(indexes), None) => { - indexes.push(Index { - relative_offset, - position: self.size_bytes, - }); + indexes.push(index); } (None, Some(time_indexes)) => { - time_indexes.push(TimeIndex { - relative_offset, - timestamp: batch_max_timestamp, - }); + time_indexes.push(time_index); } (None, None) => {} }; - - // Regardless of whether caching of indexes and time_indexes is on - // store them in the unsaved buffer - self.unsaved_indexes.put_u32_le(relative_offset); - self.unsaved_indexes.put_u32_le(self.size_bytes); - self.unsaved_timestamps.put_u32_le(relative_offset); - self.unsaved_timestamps.put_u64_le(batch_max_timestamp); + (index, time_index) } pub async fn persist_messages(&mut self) -> Result { let storage = self.storage.segment.clone(); - if self.unsaved_batches.is_none() { + if self.unsaved_messages.is_none() { return Ok(0); } - let unsaved_batches = self.unsaved_batches.as_ref().unwrap(); - if unsaved_batches.is_empty() { + let mut batch_accumulator = self.unsaved_messages.take().unwrap(); + if batch_accumulator.is_empty() { return Ok(0); } + let batch_max_offset = batch_accumulator.batch_max_offset(); + let batch_max_timestamp = batch_accumulator.batch_max_timestamp(); + let (index, time_index) = + self.store_offset_and_timestamp_index_for_batch(batch_max_offset, batch_max_timestamp); - let unsaved_messages_number = (unsaved_batches.last().unwrap().get_last_offset() + 1) - - unsaved_batches.first().unwrap().base_offset; - + let unsaved_messages_number = batch_accumulator.unsaved_messages_count(); trace!( "Saving {} messages on disk in segment with start offset: {} for partition with ID: {}...", unsaved_messages_number, @@ -308,11 +275,24 @@ impl Segment { self.partition_id ); - let saved_bytes = storage.save_batches(self, unsaved_batches).await?; - storage.save_index(self).await?; - self.unsaved_indexes.clear(); - storage.save_time_index(self).await?; - self.unsaved_timestamps.clear(); + let (has_remainder, batch) = batch_accumulator.materialize_batch_and_maybe_update_state(); + let batch_size = batch.get_size_bytes(); + if has_remainder { + self.unsaved_messages = Some(batch_accumulator); + } + let saved_bytes = storage.save_batches(self, batch).await?; + storage.save_index(&self.index_path, index).await?; + storage + .save_time_index(&self.time_index_path, time_index) + .await?; + self.last_index_position += batch_size; + self.size_bytes += RETAINED_BATCH_OVERHEAD; + self.size_of_parent_stream + .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); + self.size_of_parent_topic + .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); + self.size_of_parent_partition + .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); trace!( "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", @@ -325,15 +305,12 @@ impl Segment { if self.is_full().await { self.end_offset = self.current_offset; self.is_closed = true; - self.unsaved_batches = None; + self.unsaved_messages = None; info!( "Closed segment with start offset: {} for partition with ID: {}.", self.start_offset, self.partition_id ); - } else { - self.unsaved_batches.as_mut().unwrap().clear(); } - - Ok(unsaved_messages_number as usize) + Ok(unsaved_messages_number) } } diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index c8a927f5b..600240634 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -1,4 +1,3 @@ -use crate::compat::message_conversion::binary_schema::BinarySchema; use crate::compat::message_conversion::chunks_error::IntoTryChunksError; use crate::compat::message_conversion::conversion_writer::ConversionWriter; use crate::compat::message_conversion::message_converter::MessageFormatConverterPersister; @@ -7,12 +6,15 @@ use crate::compat::message_conversion::snapshots::retained_batch_snapshot::Retai use crate::compat::message_conversion::streams::retained_batch::RetainedBatchWriter; use crate::compat::message_conversion::streams::retained_message::RetainedMessageStream; use crate::configs::system::SystemConfig; -use crate::streaming::batching::message_batch::RetainedMessageBatch; use crate::streaming::segments::index::Index; use crate::streaming::segments::time_index::TimeIndex; use crate::streaming::sizeable::Sizeable; use crate::streaming::storage::SystemStorage; use crate::streaming::utils::file; +use crate::{ + compat::message_conversion::binary_schema::BinarySchema, + streaming::batching::batch_accumulator::BatchAccumulator, +}; use futures::{pin_mut, TryStreamExt}; use iggy::error::IggyError; use iggy::utils::expiry::IggyExpiry; @@ -39,6 +41,7 @@ pub struct Segment { pub log_path: String, pub time_index_path: String, pub size_bytes: u32, + pub last_index_position: u32, pub max_size_bytes: u32, pub size_of_parent_stream: Arc, pub size_of_parent_topic: Arc, @@ -48,12 +51,10 @@ pub struct Segment { pub messages_count_of_parent_partition: Arc, pub is_closed: bool, pub(crate) message_expiry: IggyExpiry, - pub(crate) unsaved_batches: Option>>, + pub(crate) unsaved_messages: Option, pub(crate) config: Arc, pub(crate) indexes: Option>, pub(crate) time_indexes: Option>, - pub(crate) unsaved_indexes: Vec, - pub(crate) unsaved_timestamps: Vec, pub(crate) storage: Arc, } @@ -87,6 +88,7 @@ impl Segment { index_path: Self::get_index_path(&path), time_index_path: Self::get_time_index_path(&path), size_bytes: 0, + last_index_position: 0, max_size_bytes: config.segment.size.as_bytes_u64() as u32, message_expiry: match message_expiry { IggyExpiry::ServerDefault => config.segment.message_expiry, @@ -100,9 +102,7 @@ impl Segment { true => Some(Vec::new()), false => None, }, - unsaved_indexes: Vec::new(), - unsaved_timestamps: Vec::new(), - unsaved_batches: None, + unsaved_messages: None, is_closed: false, size_of_parent_stream, size_of_parent_partition, @@ -288,7 +288,7 @@ mod tests { assert_eq!(segment.index_path, index_path); assert_eq!(segment.time_index_path, time_index_path); assert_eq!(segment.message_expiry, message_expiry); - assert!(segment.unsaved_batches.is_none()); + assert!(segment.unsaved_messages.is_none()); assert!(segment.indexes.is_some()); assert!(segment.time_indexes.is_some()); assert!(!segment.is_closed); diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 215add8e8..9283e0a54 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -248,14 +248,11 @@ impl SegmentStorage for FileSegmentStorage { async fn save_batches( &self, segment: &Segment, - batches: &[Arc], + batch: RetainedMessageBatch, ) -> Result { - let messages_size = batches.iter().map(|batch| batch.get_size_bytes()).sum(); - - let mut bytes = BytesMut::with_capacity(messages_size as usize); - for batch in batches { - batch.extend(&mut bytes); - } + let batch_size = batch.get_size_bytes(); + let mut bytes = BytesMut::with_capacity(batch_size as usize); + batch.extend(&mut bytes); if let Err(err) = self .persister @@ -266,7 +263,7 @@ impl SegmentStorage for FileSegmentStorage { return Err(IggyError::CannotSaveMessagesToSegment(err)); } - Ok(messages_size) + Ok(batch_size) } async fn load_message_ids(&self, segment: &Segment) -> Result, IggyError> { @@ -428,12 +425,15 @@ impl SegmentStorage for FileSegmentStorage { Ok(Some(index_range)) } - async fn save_index(&self, segment: &Segment) -> Result<(), IggyError> { + async fn save_index(&self, index_path: &str, index: Index) -> Result<(), IggyError> { + let mut bytes = BytesMut::with_capacity(INDEX_SIZE as usize); + bytes.put_u32_le(index.relative_offset); + bytes.put_u32_le(index.position); if let Err(err) = self .persister - .append(&segment.index_path, &segment.unsaved_indexes) + .append(index_path, &bytes) .await - .with_context(|| format!("Failed to save index to segment: {}", segment.index_path)) + .with_context(|| format!("Failed to save index to segment: {}", index_path)) { return Err(IggyError::CannotSaveIndexToSegment(err)); } @@ -546,17 +546,15 @@ impl SegmentStorage for FileSegmentStorage { Ok(Some(index)) } - async fn save_time_index(&self, segment: &Segment) -> Result<(), IggyError> { + async fn save_time_index(&self, index_path: &str, index: TimeIndex) -> Result<(), IggyError> { + let mut bytes = BytesMut::with_capacity(TIME_INDEX_SIZE as usize); + bytes.put_u32_le(index.relative_offset); + bytes.put_u64_le(index.timestamp); if let Err(err) = self .persister - .append(&segment.time_index_path, &segment.unsaved_timestamps) + .append(index_path, &bytes) .await - .with_context(|| { - format!( - "Failed to save TimeIndex to segment: {}", - segment.time_index_path - ) - }) + .with_context(|| format!("Failed to save TimeIndex to segment: {}", index_path)) { return Err(IggyError::CannotSaveTimeIndexToSegment(err)); } diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index 5ecb54d2a..1acb71c2f 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -74,7 +74,7 @@ pub trait SegmentStorage: Send + Sync { async fn save_batches( &self, segment: &Segment, - batches: &[Arc], + batch: RetainedMessageBatch, ) -> Result; async fn load_message_ids(&self, segment: &Segment) -> Result, IggyError>; async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>; @@ -85,7 +85,7 @@ pub trait SegmentStorage: Send + Sync { index_start_offset: u64, index_end_offset: u64, ) -> Result, IggyError>; - async fn save_index(&self, segment: &Segment) -> Result<(), IggyError>; + async fn save_index(&self, index_path: &str, index: Index) -> Result<(), IggyError>; async fn try_load_time_index_for_timestamp( &self, segment: &Segment, @@ -94,7 +94,7 @@ pub trait SegmentStorage: Send + Sync { async fn load_all_time_indexes(&self, segment: &Segment) -> Result, IggyError>; async fn load_last_time_index(&self, segment: &Segment) -> Result, IggyError>; - async fn save_time_index(&self, segment: &Segment) -> Result<(), IggyError>; + async fn save_time_index(&self, index_path: &str, index: TimeIndex) -> Result<(), IggyError>; } #[derive(Debug)] @@ -300,7 +300,7 @@ pub(crate) mod tests { async fn save_batches( &self, _segment: &Segment, - _batches: &[Arc], + _batch: RetainedMessageBatch, ) -> Result { Ok(0) } @@ -326,7 +326,7 @@ pub(crate) mod tests { Ok(None) } - async fn save_index(&self, _segment: &Segment) -> Result<(), IggyError> { + async fn save_index(&self, _index_path: &str, _index: Index) -> Result<(), IggyError> { Ok(()) } @@ -352,7 +352,11 @@ pub(crate) mod tests { Ok(None) } - async fn save_time_index(&self, _segment: &Segment) -> Result<(), IggyError> { + async fn save_time_index( + &self, + _index_path: &str, + _index: TimeIndex, + ) -> Result<(), IggyError> { Ok(()) } } diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs index 0ad76fe88..33bf2b930 100644 --- a/server/src/streaming/systems/messages.rs +++ b/server/src/streaming/systems/messages.rs @@ -133,6 +133,26 @@ impl System { self.metrics.increment_messages(messages_count); Ok(()) } + + pub async fn flush_unsaved_buffer( + &self, + session: &Session, + stream_id: Identifier, + topic_id: Identifier, + partition_id: u32, + fsync: bool, + ) -> Result<(), IggyError> { + self.ensure_authenticated(session)?; + let topic = self.find_topic(session, &stream_id, &topic_id)?; + // Reuse those permissions as if you can append messages you can flush them + self.permissioner.append_messages( + session.get_user_id(), + topic.stream_id, + topic.topic_id, + )?; + topic.flush_unsaved_buffer(partition_id, fsync).await?; + Ok(()) + } } #[derive(Debug)] diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 3fbf0700e..7187d4cd1 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -1,5 +1,5 @@ use crate::streaming::batching::appendable_batch_info::AppendableBatchInfo; -use crate::streaming::batching::message_batch::RetainedMessageBatch; +use crate::streaming::models::messages::RetainedMessage; use crate::streaming::polling_consumer::PollingConsumer; use crate::streaming::sizeable::Sizeable; use crate::streaming::topics::topic::Topic; @@ -15,7 +15,7 @@ use iggy::utils::timestamp::IggyTimestamp; use std::collections::HashMap; use std::sync::atomic::Ordering; use std::sync::Arc; -use tracing::{debug, info, trace, warn}; +use tracing::{info, trace, warn}; impl Topic { pub fn get_messages_count(&self) -> u64 { @@ -59,7 +59,7 @@ impl Topic { let messages = messages .into_iter() - .map(|msg| msg.try_into()) + .map(|msg| msg.to_polled_message()) .collect::, IggyError>>()?; Ok(PolledMessages { partition_id, @@ -101,6 +101,22 @@ impl Topic { .await } + pub async fn flush_unsaved_buffer( + &self, + partition_id: u32, + fsync: bool, + ) -> Result<(), IggyError> { + let partition = self.partitions.get(&partition_id); + partition + .ok_or_else(|| { + IggyError::PartitionNotFound(partition_id, self.stream_id, self.stream_id) + })? + .write() + .await + .flush_unsaved_buffer(fsync) + .await + } + async fn append_messages_to_partition( &self, appendable_batch_info: AppendableBatchInfo, @@ -175,12 +191,12 @@ impl Topic { }; trace!( - "Loading messages to cache for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}...", - partition.partition_id, - partition.topic_id, - partition.stream_id, - end_offset - ); + "Loading messages to cache for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}...", + partition.partition_id, + partition.topic_id, + partition.stream_id, + end_offset + ); let partition_size_bytes = partition.get_size_bytes(); let cache_limit_bytes = self.config.cache.size.clone().into(); @@ -191,34 +207,25 @@ impl Topic { let size_to_fetch_from_disk = (cache_limit_bytes as f64 * (partition_size_bytes as f64 / total_size_on_disk_bytes as f64)) as u64; - let messages = partition - .get_newest_messages_by_size(size_to_fetch_from_disk) + .get_newest_messages_by_size(size_to_fetch_from_disk as u64) .await?; - if messages.is_empty() { - debug!( - "No messages found on disk for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}", - partition.partition_id, partition.topic_id, partition.stream_id, end_offset - ); - continue; - } - let sum: u64 = messages.iter().map(|m| m.get_size_bytes() as u64).sum(); if !Self::cache_integrity_check(&messages) { warn!( - "Cache integrity check failed for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}. Emptying cache...", - partition.partition_id, partition.topic_id, partition.stream_id, end_offset - ); + "Cache integrity check failed for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}. Emptying cache...", + partition.partition_id, partition.topic_id, partition.stream_id, end_offset + ); } else if let Some(cache) = &mut partition.cache { for message in &messages { cache.push_safe(message.clone()); } info!( - "Loaded {} messages ({} bytes) to cache for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}.", - messages.len(), sum, partition.partition_id, partition.topic_id, partition.stream_id, end_offset - ); + "Loaded {} messages ({} bytes) to cache for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}.", + messages.len(), sum, partition.partition_id, partition.topic_id, partition.stream_id, end_offset + ); } else { warn!( "Cache is invalid for ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}", @@ -230,19 +237,32 @@ impl Topic { Ok(()) } - fn cache_integrity_check(cache: &[Arc]) -> bool { + fn cache_integrity_check(cache: &[Arc]) -> bool { if cache.is_empty() { warn!("Cache is empty!"); return false; } + let first_offset = cache[0].offset; + let last_offset = cache[cache.len() - 1].offset; + for i in 1..cache.len() { - if cache[i].get_last_offset() <= cache[i - 1].get_last_offset() { - warn!("Offsets are not subsequent at index {} offset {}, for previous index {} offset is {}", - i, cache[i].get_last_offset(), i-1, cache[i-1].get_last_offset()); + if cache[i].offset != cache[i - 1].offset + 1 { + warn!("Offsets are not subsequent at index {} offset {}, for previous index {} offset is {}", i, cache[i].offset, i-1, cache[i-1].offset); return false; } } + + let expected_messages_count: u64 = last_offset - first_offset + 1; + if cache.len() != expected_messages_count as usize { + warn!( + "Messages count is in cache ({}) not equal to expected messages count ({})", + cache.len(), + expected_messages_count + ); + return false; + } + true }