Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Rework/optimize partition size computation #2243

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/fluvio-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ nix = "0.23.1"
fluvio-types = { path = "../fluvio-types", features = [
"events",
], version = "0.3.1" }
fluvio-future = { version = "0.3.13", features = ["fs", "mmap", "zero_copy"] }
fluvio-future = { version = "0.3.15", features = ["fs", "mmap", "zero_copy"] }
fluvio-protocol = { path = "../fluvio-protocol" }
dataplane = { path = "../fluvio-dataplane-protocol", package = "fluvio-dataplane-protocol", features = [
"file",
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-storage/src/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl FileRecordsSlice {
})
}

pub fn get_len(&self) -> u64 {
self.len
}

pub fn get_base_offset(&self) -> Offset {
self.base_offset
}
Expand Down
51 changes: 13 additions & 38 deletions crates/fluvio-storage/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use std::mem;
use std::sync::Arc;

use fluvio_future::file_slice::AsyncFileSlice;
use fluvio_future::fs::read_dir;
use fluvio_protocol::Encoder;
use futures_lite::StreamExt;
use tracing::{debug, trace, warn, instrument, info, error};
use tracing::{debug, trace, warn, instrument, info};
use async_trait::async_trait;

use fluvio_future::fs::{create_dir_all, remove_dir_all};
Expand Down Expand Up @@ -103,40 +101,17 @@ impl ReplicaStorage for FileReplica {
/// return the size in byte
#[instrument(skip(self))]
async fn get_partition_size(&self) -> Result<u64, ErrorCode> {
let mut entries = read_dir(&self.option.base_dir).await.map_err(|err| {
error!(
"failed to open the log base dir \"{}\": {}",
self.option.base_dir.to_string_lossy(),
err
);
ErrorCode::StorageError
})?;
let active_len = self.active_segment.get_msg_log().get_pos();

let mut total_size = None;
while let Ok(Some(entry)) = entries.try_next().await {
let os_file_name = entry.file_name();

if os_file_name.to_string_lossy().ends_with(".log") {
let metadata = entry.metadata().await.map_err(|err| {
error!(
"fetching metadata for file \"{}\" failed: {err}",
os_file_name.to_string_lossy(),
);
ErrorCode::StorageError
})?;
let file_len = metadata.len();
debug!("Log segment found. Add {file_len} byte to the total_size.");
*total_size.get_or_insert(0) += file_len;
}
}
debug!("Active segment length: {active_len}.");
Afourcat marked this conversation as resolved.
Show resolved Hide resolved
let total_prev_segments_len = {
let reader = self.prev_segments.read().await;
reader.get_total_logs_len() as u32
sehz marked this conversation as resolved.
Show resolved Hide resolved
};
debug!("Cumulated previous segments length: {total_prev_segments_len}.");
Afourcat marked this conversation as resolved.
Show resolved Hide resolved
let total_len = active_len + total_prev_segments_len;

total_size.ok_or_else(|| {
error!(
"no log file found in {}",
self.option.base_dir.to_string_lossy()
);
ErrorCode::SpuError
})
return Ok(total_len.into());
}

/// write records to this replica
Expand Down Expand Up @@ -721,11 +696,11 @@ mod tests {
.await
.expect("delete base dir");

let error = replica
let size = replica
.get_partition_size()
.await
.expect_err("error partition size");
assert_eq!(error, dataplane::ErrorCode::StorageError)
.expect("error partition size");
assert_eq!(size, 79);
sehz marked this conversation as resolved.
Show resolved Hide resolved
}

/// test fetch only committed records
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio-storage/src/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ impl SegmentList {
self.segments.len()
}

#[instrument(skip(self))]
pub fn get_total_logs_len(&self) -> usize {
self.segments
.values()
.map(|segment| segment.get_msg_log().get_len() as usize)
.sum()
}

#[instrument(skip(self, segment))]
fn add_segment(&mut self, segment: ReadSegment) -> Offset {
debug!(
Expand Down
69 changes: 69 additions & 0 deletions tests/cli/smoke_tests/e2e-segment-partitions.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env bats

TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper"
export TEST_HELPER_DIR

load "$TEST_HELPER_DIR"/tools_check.bash
load "$TEST_HELPER_DIR"/fluvio_dev.bash
load "$TEST_HELPER_DIR"/bats-support/load.bash
load "$TEST_HELPER_DIR"/bats-assert/load.bash

setup_file() {
TOPIC_NAME=$(random_string)
export TOPIC_NAME
debug_msg "Topic name: $TOPIC_NAME"

TOPIC_NAME_2=$(random_string)
export TOPIC_NAME_2
debug_msg "Topic name: $TOPIC_NAME_2"

TOPIC_NAME_3=$(random_string)
export TOPIC_NAME_3
debug_msg "Topic name: $TOPIC_NAME_3"

MESSAGE="$(random_string 1000)"
export MESSAGE
debug_msg "$MESSAGE"
}

teardown_file() {
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME_2"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME_3"
}

@test "Create a topic" {
debug_msg "topic: $TOPIC_NAME with segment size of 1024"
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME" --segment-size 1024
assert_success

debug_msg "topic: $TOPIC_NAME with segment size of 2048"
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_2" --segment-size 2048
assert_success

debug_msg "topic: $TOPIC_NAME with segment size of 4096"
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_3" --segment-size 4096
assert_success
}

@test "Produce message" {
run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME"'
run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME"'

run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_2"'
run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_2"'
run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_2"'

run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_3"'
run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_3"'
run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_3"'
run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_3"'
run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_3"'
assert_success
}

@test "Check Input Argument Error" {
run timeout 15s kubectl logs pod/fluvio-spg-main-0
debug_msg "Checking for \'Invalid Argument\' Error in SPG logs"
refute_output --partial 'message: "Invalid argument"'
}