Skip to content

Commit

Permalink
Consolidate Index and TimeIndex into single Index
Browse files Browse the repository at this point in the history
  • Loading branch information
hubcio committed Oct 30, 2024
1 parent 4396aef commit d9e9481
Show file tree
Hide file tree
Showing 45 changed files with 315 additions and 1,314 deletions.
50 changes: 29 additions & 21 deletions .github/workflows/backwards_compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ jobs:
runs-on: ubuntu-latest
needs: check_commit_message
if: ${{ needs.check_commit_message.outputs.should_skip != 'true' }}
env:
BRANCH_NAME: ${{ github.head_ref || github.ref_name }}
steps:
- run: echo "${{ needs.check_commit_message.outputs.should_skip == 'true' }}"

Expand All @@ -87,7 +89,10 @@ jobs:
with:
key: "v2"

- name: Build (PR)
- name: Reset to origin/master
run: git reset --hard origin/master

- name: Build (origin/master)
run: cargo build

- uses: JarvusInnovations/background-action@v1
Expand All @@ -98,52 +103,55 @@ jobs:
wait-on: tcp:localhost:8090
wait-for: 1m
log-output: true
log-output-if: true
log-output-if: timeout
tail: true

- name: Run send bench (PR)
- name: Run send bench (origin/master)
timeout-minutes: 1
run: target/debug/iggy-bench --warmup-time 0 --verbose send --message-batches 50 --messages-per-batch 100 tcp

- name: Run poll bench (origin/master)
timeout-minutes: 1
run: target/debug/iggy-bench --warmup-time 0 --verbose send --message-batches 25 --messages-per-batch 25 tcp
run: target/debug/iggy-bench --warmup-time 0 --verbose poll --message-batches 50 --messages-per-batch 100 tcp

- name: Stop iggy-server
- name: Stop iggy-server (origin/master)
timeout-minutes: 1
run: pkill iggy-server && while pgrep -l iggy-server; do sleep 1; done;
run: pkill -15 iggy-server && while pgrep -l iggy-server; do sleep 2; done;

- name: Print iggy-server logs (PR)
- name: Print iggy-server logs (origin/master)
run: cat local_data/logs/iggy*

- name: Remove iggy-server logs (PR)
- name: Print server logs (origin/master)
run: cat local_data/logs/iggy*

- name: Remove iggy-server logs (origin/master)
run: rm local_data/logs/iggy*

- name: Reset to origin/master
run: git reset --hard origin/master
- name: Reset to pull request branch (PR)
run: git fetch && git reset --hard origin/$BRANCH_NAME

- name: Build (origin/master)
- name: Build (PR)
run: cargo build

- uses: JarvusInnovations/background-action@v1
name: Run iggy-server in background (origin/master)
name: Run iggy-server in background (PR)
with:
run: |
target/debug/iggy-server &
wait-on: tcp:localhost:8090
wait-for: 1m
log-output: true
log-output-if: true
log-output-if: timeout
tail: true

- name: Run poll bench (origin/master)
- name: Run poll bench (PR)
timeout-minutes: 1
run: target/debug/iggy-bench --warmup-time 0 --verbose poll --message-batches 25 --messages-per-batch 25 tcp
run: target/debug/iggy-bench --warmup-time 0 --verbose poll --message-batches 50 --messages-per-batch 100 tcp

- name: Run send bench (origin/master)
timeout-minutes: 1
run: target/debug/iggy-bench --warmup-time 0 --verbose send --message-batches 25 --messages-per-batch 25 tcp

- name: Stop iggy-server
- name: Stop iggy-server (PR)
timeout-minutes: 1
run: pkill iggy-server && while pgrep -l iggy-server; do sleep 1; done;

- name: Print server logs (origin/master)
- name: Print server logs (PR)
run: cat local_data/logs/iggy*

3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@
"segment": {
"size": "1 GB",
"cache_indexes": true,
"cache_time_indexes": true,
"message_expiry": "none",
"archive_expired": false
},
Expand Down
7 changes: 1 addition & 6 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -437,16 +437,11 @@ message_expiry = "none"
# Configures whether expired segments are archived (boolean) or just deleted without archiving.
archive_expired = false

# Controls whether to cache indexes for segment access (boolean).
# Controls whether to cache indexes (time and positional) for segment access (boolean).
# `true` keeps indexes in memory, speeding up data retrieval.
# `false` reads indexes from disk, which can conserve memory at the cost of access speed.
cache_indexes = true

# Determines whether to cache time-based indexes for segments (boolean).
# `true` allows faster timestamp-based data retrieval by keeping indexes in memory.
# `false` conserves memory by reading time indexes from disk, which may slow down access.
cache_time_indexes = true

# Message deduplication configuration
[system.message_deduplication]
# Controls whether message deduplication is enabled (boolean).
Expand Down
4 changes: 1 addition & 3 deletions integration/tests/streaming/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use iggy::utils::timestamp::IggyTimestamp;
use server::state::system::PartitionState;
use server::streaming::batching::appendable_batch_info::AppendableBatchInfo;
use server::streaming::partitions::partition::Partition;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION};
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION};
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::Arc;
use tokio::fs;
Expand Down Expand Up @@ -202,10 +202,8 @@ async fn assert_persisted_partition(partition_path: &str, with_segment: bool) {
let segment_path = format!("{}/{:0>20}", partition_path, start_offset);
let log_path = format!("{}.{}", segment_path, LOG_EXTENSION);
let index_path = format!("{}.{}", segment_path, INDEX_EXTENSION);
let time_index_path = format!("{}.{}", segment_path, TIME_INDEX_EXTENSION);
assert!(fs::metadata(&log_path).await.is_ok());
assert!(fs::metadata(&index_path).await.is_ok());
assert!(fs::metadata(&time_index_path).await.is_ok());
}
}

Expand Down
5 changes: 1 addition & 4 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use iggy::utils::expiry::IggyExpiry;
use iggy::utils::{checksum, timestamp::IggyTimestamp};
use server::streaming::models::messages::RetainedMessage;
use server::streaming::segments::segment;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION};
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION};
use server::streaming::sizeable::Sizeable;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
Expand Down Expand Up @@ -111,7 +111,6 @@ async fn should_load_existing_segment_from_disk() {
assert_eq!(loaded_segment.is_closed, segment.is_closed);
assert_eq!(loaded_segment.log_path, segment.log_path);
assert_eq!(loaded_segment.index_path, segment.index_path);
assert_eq!(loaded_segment.time_index_path, segment.time_index_path);
assert!(loaded_messages.is_empty());
}
}
Expand Down Expand Up @@ -353,10 +352,8 @@ async fn assert_persisted_segment(partition_path: &str, start_offset: u64) {
let segment_path = format!("{}/{:0>20}", partition_path, start_offset);
let log_path = format!("{}.{}", segment_path, LOG_EXTENSION);
let index_path = format!("{}.{}", segment_path, INDEX_EXTENSION);
let time_index_path = format!("{}.{}", segment_path, TIME_INDEX_EXTENSION);
assert!(fs::metadata(&log_path).await.is_ok());
assert!(fs::metadata(&index_path).await.is_ok());
assert!(fs::metadata(&time_index_path).await.is_ok());
}

fn create_message(offset: u64, payload: &str, timestamp: IggyTimestamp) -> PolledMessage {
Expand Down
6 changes: 6 additions & 0 deletions sdk/src/utils/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ impl IggyDuration {
IggyDuration { duration }
}

pub fn new_from_secs(secs: u64) -> IggyDuration {
IggyDuration {
duration: Duration::from_secs(secs),
}
}

pub fn as_human_time_string(&self) -> String {
format!("{}", format_duration(self.duration))
}
Expand Down
30 changes: 23 additions & 7 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.60"
version = "0.4.61"
edition = "2021"
build = "src/build.rs"

Expand All @@ -12,7 +12,6 @@ tokio-console = ["dep:console-subscriber", "tokio/tracing"]
[dependencies]
ahash = { version = "0.8.11" }
anyhow = "1.0.86"
async-stream = "0.3.5"
async-trait = "0.1.82"
atone = "0.3.7"
axum = "0.7.5"
Expand All @@ -36,13 +35,29 @@ moka = { version = "0.12.5", features = ["future"] }
openssl = { version = "0.10.66", features = ["vendored"] }
opentelemetry = { version = "0.26.0", features = ["trace", "logs"] }
opentelemetry-appender-tracing = { version = "0.26.0", features = ["log"] }
opentelemetry-otlp = { version = "0.26.0", features = ["logs", "trace", "grpc-tonic", "http", "http-proto", "reqwest-client", "tokio"] }
opentelemetry-otlp = { version = "0.26.0", features = [
"logs",
"trace",
"grpc-tonic",
"http",
"http-proto",
"reqwest-client",
"tokio",
] }
opentelemetry-semantic-conventions = { version = "0.26.0" }
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio", "logs", "trace", "tokio"] }
opentelemetry_sdk = { version = "0.26.0", features = [
"rt-tokio",
"logs",
"trace",
"tokio",
] }
prometheus-client = "0.22.2"
quinn = { version = "0.11.5" }
rcgen = "0.13.1"
reqwest = { version = "0.12.4", features = ["rustls-tls", "rustls-tls-no-provider"] }
reqwest = { version = "0.12.4", features = [
"rustls-tls",
"rustls-tls-no-provider",
] }
ring = "0.17.8"
rmp-serde = "1.3.0"
rust-s3 = { version = "0.34.0", features = ["default"] }
Expand Down Expand Up @@ -79,10 +94,11 @@ tikv-jemallocator = { version = "0.6", optional = true }
[build-dependencies]
figment = { version = "0.10.18", features = ["json", "toml", "env"] }
serde_json = "1.0.127"
vergen-git2 = { version = "1.0.0", features = ["build",
vergen-git2 = { version = "1.0.0", features = [
"build",
"cargo",
"rustc",
"si"
"si",
] }

[[bin]]
Expand Down
6 changes: 1 addition & 5 deletions server/src/channels/commands/maintain_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,7 @@ async fn archive_segments(
}

let segment = segment.unwrap();
let files = [
segment.index_path.as_ref(),
segment.time_index_path.as_ref(),
segment.log_path.as_ref(),
];
let files = [segment.index_path.as_ref(), segment.log_path.as_ref()];
if let Err(error) = archiver.archive(&files, None).await {
error!(
"Failed to archive segment with start offset: {} for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}",
Expand Down
2 changes: 1 addition & 1 deletion server/src/channels/commands/print_sysinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ServerCommand<SysInfoPrintCommand> for SysInfoPrintExecutor {
/ stats.total_memory.as_bytes_u64() as f64)
* 100f64;

info!("CPU: {:.2}% / {:.2}% (IggyUsage/Total), Mem: {:.2}% / {} / {} / {} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages processed: {}, Read: {}, Written: {}, Run Time: {} s",
info!("CPU: {:.2}% / {:.2}% (IggyUsage/Total), Mem: {:.2}% / {} / {} / {} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages processed: {}, Read: {}, Written: {}, Uptime: {}",
stats.cpu_usage,
stats.total_cpu_usage,
free_memory_percent,
Expand Down
Loading

0 comments on commit d9e9481

Please sign in to comment.