Skip to content

Commit

Permalink
Improve unsaved_messages buffer and impl flush_unsaved_buffer endpoint (
Browse files Browse the repository at this point in the history
#1203)

Improve unsaved_messages buffer by batching chunks of messages before
saving and implement `flush_unaved_buffer` method for `IggyClient`
  • Loading branch information
numinnex committed Sep 1, 2024
1 parent 934c672 commit c128b0b
Show file tree
Hide file tree
Showing 36 changed files with 758 additions and 412 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
"path": "partitions",
"enforce_fsync": false,
"validate_checksum": false,
"messages_required_to_save": 10000
"messages_required_to_save": 5000
},
"segment": {
"size": "1 GB",
Expand Down
2 changes: 1 addition & 1 deletion configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ validate_checksum = false
# The threshold of buffered messages before triggering a save to disk (integer).
# Specifies how many messages accumulate before persisting to storage.
# Adjusting this can balance between write performance and data durability.
messages_required_to_save = 10_000
messages_required_to_save = 5_000

# Segment configuration
[system.segment]
Expand Down
24 changes: 23 additions & 1 deletion integration/tests/data_integrity/verify_after_server_restart.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::bench::run_bench_and_wait_for_finish;
use iggy::client::SystemClient;
use iggy::client::{MessageClient, SystemClient};
use iggy::clients::client::IggyClient;
use iggy::identifier::Identifier;
use iggy::utils::byte_size::IggyByteSize;
use integration::{
tcp_client::TcpClientFactory,
Expand All @@ -11,6 +12,7 @@ use integration::{
use serial_test::parallel;
use std::{collections::HashMap, str::FromStr};

// TODO(numminex) - Move the message generation method from benchmark run to a special method.
#[tokio::test]
#[parallel]
async fn should_fill_data_and_verify_after_restart() {
Expand All @@ -37,10 +39,30 @@ async fn should_fill_data_and_verify_after_restart() {
amount_of_data_to_process,
);

let default_bench_stream_identifiers: [Identifier; 10] = [
Identifier::numeric(3000001).unwrap(),
Identifier::numeric(3000002).unwrap(),
Identifier::numeric(3000003).unwrap(),
Identifier::numeric(3000004).unwrap(),
Identifier::numeric(3000005).unwrap(),
Identifier::numeric(3000006).unwrap(),
Identifier::numeric(3000007).unwrap(),
Identifier::numeric(3000008).unwrap(),
Identifier::numeric(3000009).unwrap(),
Identifier::numeric(3000010).unwrap(),
];

// 4. Connect and login to newly started server
let client = TcpClientFactory { server_addr }.create_client().await;
let client = IggyClient::create(client, None, None);
login_root(&client).await;
let topic_id = Identifier::numeric(1).unwrap();
for stream_id in default_bench_stream_identifiers {
client
.flush_unsaved_buffer(&stream_id, &topic_id, 1, false)
.await
.unwrap();
}

// 5. Save stats from the first server
let stats = client.get_stats().await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const T2_NAME: &str = "test-topic-2";
const MESSAGE_PAYLOAD_SIZE_BYTES: u64 = 57;
const MSG_SIZE: u64 = 16 + 8 + 8 + 4 + 4 + 4 + 1 + MESSAGE_PAYLOAD_SIZE_BYTES; // number of bytes in a single message
const MSGS_COUNT: u64 = 117; // number of messages in a single topic after one pass of appending
const MSGS_SIZE: u64 = MSG_SIZE * MSGS_COUNT + 8 + 4 + 8 + 4; // number of bytes in a single topic after one pass of appending
const MSGS_SIZE: u64 = MSG_SIZE * MSGS_COUNT; // number of bytes in a single topic after one pass of appending

pub async fn run(client_factory: &dyn ClientFactory) {
let _ = tracing_subscriber::fmt::try_init();
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/server/scenarios/system_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(topic.name, TOPIC_NAME);
assert_eq!(topic.partitions_count, PARTITIONS_COUNT);
assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize);
assert_eq!(topic.size, 55914);
assert_eq!(topic.size, 55890);
assert_eq!(topic.messages_count, MESSAGES_COUNT as u64);
let topic_partition = topic.partitions.get((PARTITION_ID - 1) as usize).unwrap();
assert_eq!(topic_partition.id, PARTITION_ID);
Expand Down
109 changes: 40 additions & 69 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::streaming::common::test_setup::TestSetup;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use iggy::bytes_serializable::BytesSerializable;
use iggy::models::messages::{MessageState, PolledMessage};
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::{checksum, timestamp::IggyTimestamp};
use server::streaming::batching::message_batch::RetainedMessageBatch;
use server::streaming::models::messages::RetainedMessage;
use server::streaming::segments::segment;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION};
use server::streaming::sizeable::Sizeable;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::fs;
Expand Down Expand Up @@ -151,40 +151,29 @@ async fn should_persist_and_load_segment_with_messages() {
)
.await;
let messages_count = 10;
let mut base_offset = 0;
let mut last_timestamp = IggyTimestamp::zero().as_micros();
let mut batch_buffer = BytesMut::new();
let mut messages = Vec::new();
let mut batch_size = 0u64;
for i in 0..messages_count {
let message = create_message(i, "test", IggyTimestamp::now());
if i == 0 {
base_offset = message.offset;
}
if i == messages_count - 1 {
last_timestamp = message.timestamp;
}

let retained_message = RetainedMessage {
let retained_message = Arc::new(RetainedMessage {
id: message.id,
offset: message.offset,
timestamp: message.timestamp,
checksum: message.checksum,
message_state: message.state,
headers: message.headers.map(|headers| headers.to_bytes()),
payload: message.payload.clone(),
};
retained_message.extend(&mut batch_buffer);
});
batch_size += retained_message.get_size_bytes() as u64;
messages.push(retained_message);
}
let batch = Arc::new(RetainedMessageBatch::new(
base_offset,
messages_count as u32 - 1,
last_timestamp,
batch_buffer.len() as u32,
batch_buffer.freeze(),
));
segment.append_batch(batch).await.unwrap();

segment
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();

let mut loaded_segment = segment::Segment::create(
stream_id,
topic_id,
Expand Down Expand Up @@ -247,40 +236,28 @@ async fn given_all_expired_messages_segment_should_be_expired() {
let messages_count = 10;
let now = IggyTimestamp::now();
let mut expired_timestamp = (now.as_micros() - 2 * message_expiry_ms).into();
let mut base_offset = 0;
let mut last_timestamp = 0;
let mut batch_buffer = BytesMut::new();
let mut batch_size = 0u64;
let mut messages = Vec::new();
for i in 0..messages_count {
let message = create_message(i, "test", expired_timestamp);
expired_timestamp = (expired_timestamp.as_micros() + 1).into();
if i == 0 {
base_offset = message.offset;
}
if i == messages_count - 1 {
last_timestamp = message.timestamp;
}

let retained_message = RetainedMessage {
let retained_message = Arc::new(RetainedMessage {
id: message.id,
offset: message.offset,
timestamp: message.timestamp,
checksum: message.checksum,
message_state: message.state,
headers: message.headers.map(|headers| headers.to_bytes()),
payload: message.payload.clone(),
};
retained_message.extend(&mut batch_buffer);
});
batch_size += retained_message.get_size_bytes() as u64;
messages.push(retained_message);
}
let batch = Arc::new(RetainedMessageBatch::new(
base_offset,
messages_count as u32 - 1,
last_timestamp,
batch_buffer.len() as u32,
batch_buffer.freeze(),
));

segment.append_batch(batch).await.unwrap();

segment
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();

segment.is_closed = true;
Expand Down Expand Up @@ -330,27 +307,21 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
let expired_message = create_message(0, "test", expired_timestamp.into());
let not_expired_message = create_message(1, "test", not_expired_timestamp.into());

let mut expired_batch_buffer = BytesMut::new();
let expired_retained_message = RetainedMessage {
let expired_retained_message = Arc::new(RetainedMessage {
id: expired_message.id,
offset: expired_message.offset,
timestamp: expired_message.timestamp,
checksum: expired_message.checksum,
message_state: expired_message.state,
headers: expired_message.headers.map(|headers| headers.to_bytes()),
payload: expired_message.payload.clone(),
};
expired_retained_message.extend(&mut expired_batch_buffer);
let expired_batch = Arc::new(RetainedMessageBatch::new(
0,
1,
expired_timestamp,
expired_batch_buffer.len() as u32,
expired_batch_buffer.freeze(),
));
});
let mut expired_messages = Vec::new();
let expired_message_size = expired_retained_message.get_size_bytes() as u64;
expired_messages.push(expired_retained_message);

let mut not_expired_batch_buffer = BytesMut::new();
let not_expired_retained_message = RetainedMessage {
let mut not_expired_messages = Vec::new();
let not_expired_retained_message = Arc::new(RetainedMessage {
id: not_expired_message.id,
offset: not_expired_message.offset,
timestamp: not_expired_message.timestamp,
Expand All @@ -360,18 +331,18 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
.headers
.map(|headers| headers.to_bytes()),
payload: not_expired_message.payload.clone(),
};
not_expired_retained_message.extend(&mut not_expired_batch_buffer);
let not_expired_batch = Arc::new(RetainedMessageBatch::new(
1,
1,
expired_timestamp,
not_expired_batch_buffer.len() as u32,
not_expired_batch_buffer.freeze(),
));
});
let not_expired_message_size = not_expired_retained_message.get_size_bytes() as u64;
not_expired_messages.push(not_expired_retained_message);

segment.append_batch(expired_batch).await.unwrap();
segment.append_batch(not_expired_batch).await.unwrap();
segment
.append_batch(expired_message_size, 1, &expired_messages)
.await
.unwrap();
segment
.append_batch(not_expired_message_size, 1, &not_expired_messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();

let is_expired = segment.is_expired(now).await;
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.13"
version = "0.6.14"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
19 changes: 19 additions & 0 deletions sdk/src/binary/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::command::{POLL_MESSAGES_CODE, SEND_MESSAGES_CODE};
use crate::consumer::Consumer;
use crate::error::IggyError;
use crate::identifier::Identifier;
use crate::messages::flush_unsaved_buffer::FlushUnsavedBuffer;
use crate::messages::poll_messages::PollingStrategy;
use crate::messages::send_messages::{Message, Partitioning};
use crate::messages::{poll_messages, send_messages};
Expand Down Expand Up @@ -55,4 +56,22 @@ impl<B: BinaryClient> MessageClient for B {
.await?;
Ok(())
}

async fn flush_unsaved_buffer(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: u32,
fsync: bool,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(&FlushUnsavedBuffer {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partition_id,
fsync,
})
.await?;
Ok(())
}
}
9 changes: 9 additions & 0 deletions sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,15 @@ pub trait MessageClient {
partitioning: &Partitioning,
messages: &mut [Message],
) -> Result<(), IggyError>;
/// Force flush of the `unsaved_messages` buffer to disk, optionally fsyncing the data.
#[allow(clippy::too_many_arguments)]
async fn flush_unsaved_buffer(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: u32,
fsync: bool,
) -> Result<(), IggyError>;
}

/// This trait defines the methods to interact with the consumer offset module.
Expand Down
14 changes: 14 additions & 0 deletions sdk/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,20 @@ impl MessageClient for IggyClient {
.send_messages(stream_id, topic_id, partitioning, messages)
.await
}

async fn flush_unsaved_buffer(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: u32,
fsync: bool,
) -> Result<(), IggyError> {
self.client
.read()
.await
.flush_unsaved_buffer(stream_id, topic_id, partition_id, fsync)
.await
}
}

#[async_trait]
Expand Down
3 changes: 3 additions & 0 deletions sdk/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub const POLL_MESSAGES: &str = "message.poll";
pub const POLL_MESSAGES_CODE: u32 = 100;
pub const SEND_MESSAGES: &str = "message.send";
pub const SEND_MESSAGES_CODE: u32 = 101;
pub const FLUSH_UNSAVED_BUFFER: &str = "message.flush_unsaved_buffer";
pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102;
pub const GET_CONSUMER_OFFSET: &str = "consumer_offset.get";
pub const GET_CONSUMER_OFFSET_CODE: u32 = 120;
pub const STORE_CONSUMER_OFFSET: &str = "consumer_offset.store";
Expand Down Expand Up @@ -114,6 +116,7 @@ pub fn get_name_from_code(code: u32) -> Result<&'static str, IggyError> {
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => Ok(LOGIN_WITH_PERSONAL_ACCESS_TOKEN),
SEND_MESSAGES_CODE => Ok(SEND_MESSAGES),
POLL_MESSAGES_CODE => Ok(POLL_MESSAGES),
FLUSH_UNSAVED_BUFFER_CODE => Ok(FLUSH_UNSAVED_BUFFER),
STORE_CONSUMER_OFFSET_CODE => Ok(STORE_CONSUMER_OFFSET),
GET_CONSUMER_OFFSET_CODE => Ok(GET_CONSUMER_OFFSET),
GET_STREAM_CODE => Ok(GET_STREAM),
Expand Down
Loading

0 comments on commit c128b0b

Please sign in to comment.