Skip to content

Commit

Permalink
fix(server): correct batch materialization logic in BatchAccumulator (#…
Browse files Browse the repository at this point in the history
…1555)

This commit refactors the `BatchAccumulator` by removing the
`capacity` field and simplifying the logic for materializing
batches. The `materialize_batch_and_maybe_update_state` method
is renamed to `materialize_batch_and_update_state`, and the
logic for handling message offsets and timestamps is streamlined.

The changes improve the clarity and efficiency of the batch processing
logic, addressing issues with the previous handling of `has_remainder`.
Additionally, the logic in `writing_messages.rs` is updated to reflect
these changes, ensuring that unsaved messages are correctly managed
based on batch size.

Now, instead of having hard-limit of `messages_required_to_save` we
will trigger it once when capacity if exceeded and save all messages
on disk, instead of handling remainder.
  • Loading branch information
hubcio authored Feb 18, 2025
1 parent 61cb1bc commit 5c48f2d
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.206"
version = "0.4.207"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
52 changes: 23 additions & 29 deletions server/src/streaming/batching/batch_accumulator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_HEADER_LEN};
use crate::streaming::local_sizeable::LocalSizeable;
use crate::streaming::models::messages::RetainedMessage;
use bytes::BytesMut;
use iggy::utils::byte_size::IggyByteSize;
Expand All @@ -12,7 +11,6 @@ pub struct BatchAccumulator {
current_size: IggyByteSize,
current_offset: u64,
current_timestamp: u64,
capacity: u64,
messages: Vec<Arc<RetainedMessage>>,
}

Expand All @@ -23,7 +21,6 @@ impl BatchAccumulator {
current_size: IggyByteSize::from(0),
current_offset: 0,
current_timestamp: 0,
capacity: capacity as u64,
messages: Vec::with_capacity(capacity),
}
}
Expand All @@ -41,11 +38,13 @@ impl BatchAccumulator {
start_offset: u64,
end_offset: u64,
) -> Vec<Arc<RetainedMessage>> {
self.messages
.iter()
.filter(|msg| msg.offset >= start_offset && msg.offset <= end_offset)
.cloned()
.collect()
let start_idx = self
.messages
.partition_point(|msg| msg.offset < start_offset);
let end_idx = self
.messages
.partition_point(|msg| msg.offset <= end_offset);
self.messages[start_idx..end_idx].to_vec()
}

pub fn is_empty(&self) -> bool {
Expand All @@ -68,42 +67,37 @@ impl BatchAccumulator {
self.base_offset
}

pub fn materialize_batch_and_maybe_update_state(&mut self) -> (bool, RetainedMessageBatch) {
pub fn materialize_batch_and_update_state(&mut self) -> RetainedMessageBatch {
let batch_base_offset = self.base_offset;
let batch_last_offset_delta = (self.current_offset - self.base_offset) as u32;
let split_point = std::cmp::min(self.capacity as usize, self.messages.len());

let num_messages = self.messages.len();
let last_batch_timestamp = if num_messages > 0 {
self.messages[num_messages - 1].timestamp
} else {
0
};

let messages = std::mem::take(&mut self.messages);
let mut bytes = BytesMut::with_capacity(self.current_size.as_bytes_u64() as usize);
let last_batch_timestamp = self
.messages
.get(split_point - 1)
.map_or(0, |msg| msg.timestamp);
for message in self.messages.drain(..split_point) {
for message in messages {
message.extend(&mut bytes);
}

let has_remainder = !self.messages.is_empty();
if has_remainder {
self.base_offset = self.messages.first().unwrap().offset;
self.current_size = self
.messages
.iter()
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>();
self.current_offset = self.messages.last().unwrap().offset;
self.current_timestamp = self.messages.last().unwrap().timestamp;
}
self.base_offset = 0;
self.current_size = IggyByteSize::from(0);
self.current_offset = 0;
self.current_timestamp = 0;

let batch_payload = bytes.freeze();
let batch_payload_len = IggyByteSize::from(batch_payload.len() as u64);
let batch = RetainedMessageBatch::new(
RetainedMessageBatch::new(
batch_base_offset,
batch_last_offset_delta,
last_batch_timestamp,
batch_payload_len,
batch_payload,
);
(has_remainder, batch)
)
}
}

Expand Down
2 changes: 2 additions & 0 deletions server/src/streaming/segments/indexes/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub struct Index {
impl PartialEq<Self> for Index {
fn eq(&self, other: &Self) -> bool {
self.offset == other.offset
&& self.position == other.position
&& self.timestamp == other.timestamp
}
}

Expand Down
8 changes: 5 additions & 3 deletions server/src/streaming/segments/indexes/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
},
};
use tokio::task::spawn_blocking;
use tracing::{error, trace, warn};
use tracing::{error, trace};

/// A dedicated struct for reading from the index file.
#[derive(Debug)]
Expand Down Expand Up @@ -127,6 +127,7 @@ impl SegmentIndexReader {
return Err(IggyError::CannotReadFile);
}
};
let mut last_index = Index::default();
for chunk in buf.chunks_exact(INDEX_SIZE as usize) {
let current_index = parse_index(chunk)
.with_error_context(|e| format!("Failed to parse index {}: {e}", self.file_path))?;
Expand All @@ -139,9 +140,10 @@ impl SegmentIndexReader {
index_range.end = current_index;
break;
}
last_index = current_index;
}
if index_range.start == Index::default() {
warn!("Failed to find index >= {}", relative_start_offset);
if index_range.end == Index::default() {
index_range.end = last_index;
}
Ok(Some(index_range))
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/streaming/segments/reading_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl Segment {
start_offset,
end_offset
);
let messages_count = (start_offset + end_offset) as usize;
let messages_count = (start_offset + end_offset + 1) as usize;
let messages = self
.load_batches_by_range(index_range)
.await
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/segments/writing_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ impl Segment {
self.partition_id
);

let (has_remainder, batch) = batch_accumulator.materialize_batch_and_maybe_update_state();
let batch = batch_accumulator.materialize_batch_and_update_state();
let batch_size = batch.get_size_bytes();
if has_remainder {
if batch_size > 0 {
self.unsaved_messages = Some(batch_accumulator);
}
let confirmation = match confirmation {
Expand Down

0 comments on commit 5c48f2d

Please sign in to comment.