Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: conditionally compile wal impls #1272

Merged
merged 17 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ignore:
- "integration_test"
- "meta_client"
- "remote_engine_client"
- "wal/src/message_queue_impl"
- "wal/src/table_kv_impl"
- "src/wal/src/message_queue_impl"
- "src/wal/src/table_kv_impl"
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,28 @@ init:

build-debug:
ls -alh
cd $(DIR); cargo build
cd $(DIR); cargo build $(CARGO_FEATURE_FLAGS)

build:
ls -alh
cd $(DIR); cargo build --release
cd $(DIR); cargo build --release $(CARGO_FEATURE_FLAGS)

build-slim:
ls -alh
cd $(DIR); cargo build --profile release-slim
cd $(DIR); cargo build --profile release-slim $(CARGO_FEATURE_FLAGS)

build-asan:
ls -alh
export RUSTFLAGS=-Zsanitizer=address RUSTDOCFLAGS=-Zsanitizer=address
cd $(DIR); cargo build -Zbuild-std --target x86_64-unknown-linux-gnu --release
cd $(DIR); cargo build -Zbuild-std --target x86_64-unknown-linux-gnu --release $(CARGO_FEATURE_FLAGS)

build-arm64:
ls -alh
cd $(DIR); cargo build --release --no-default-features
cd $(DIR); cargo build --release --no-default-features $(CARGO_FEATURE_FLAGS)

build-with-console:
ls -alh
cd $(DIR); RUSTFLAGS="--cfg tokio_unstable" cargo build --release
cd $(DIR); RUSTFLAGS="--cfg tokio_unstable" cargo build --release $(CARGO_FEATURE_FLAGS)

test:
cd $(DIR); cargo test --workspace -- --test-threads=4
Expand Down
6 changes: 5 additions & 1 deletion analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ workspace = true
workspace = true

[features]
default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue"]
test = ["tempfile"]
wal-table-kv = ["wal/wal-table-kv"]
wal-message-queue = ["wal/wal-message-queue"]
wal-rocksdb = ["wal/wal-rocksdb"]

[dependencies]
# In alphabetical order
Expand Down Expand Up @@ -82,4 +86,4 @@ pin-project-lite = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
test_util = { workspace = true }
wal = { workspace = true }
wal = { workspace = true, features = ["wal-message-queue", "wal-rocksdb", "wal-table-kv"] }
2 changes: 1 addition & 1 deletion analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ mod tests {
use object_store::LocalFileSystem;
use runtime::Runtime;
use table_engine::table::{SchemaId, TableId, TableSeqGenerator};
use wal::rocks_impl::manager::Builder as WalBuilder;
use wal::rocksdb_impl::manager::Builder as WalBuilder;

use super::*;
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use time_ext::ReadableDuration;
use wal::{
config::StorageConfig,
manager::{OpenedWals, WalRuntimes, WalsOpener},
rocks_impl::{config::RocksDBStorageConfig, manager::RocksDBWalsOpener},
rocksdb_impl::{config::RocksDBStorageConfig, manager::RocksDBWalsOpener},
table_kv_impl::wal::MemWalsOpener,
};

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ time_ext = { workspace = true }
tokio = { workspace = true }
toml_ext = { workspace = true }
trace_metric = { workspace = true }
wal = { workspace = true }
wal = { workspace = true, features = ["wal-rocksdb", "wal-message-queue", "wal-table-kv"] }
zstd = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ build-meta:
./build_meta.sh

build-ceresdb:
cd .. && cargo build --bin ceresdb-server
cd .. && cargo build --bin ceresdb-server --features wal-table-kv,wal-message-queue,wal-rocksdb

build-test:
cargo build
Expand Down
6 changes: 6 additions & 0 deletions src/ceresdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ workspace = true
[package.edition]
workspace = true

[features]
default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue"]
wal-table-kv = ["wal/wal-table-kv"]
wal-message-queue = ["wal/wal-message-queue"]
wal-rocksdb = ["wal/wal-rocksdb"]

[dependencies]
analytic_engine = { workspace = true }
catalog = { workspace = true }
Expand Down
45 changes: 39 additions & 6 deletions src/ceresdb/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ use tracing_util::{
use wal::{
config::StorageConfig,
manager::{WalRuntimes, WalsOpener},
message_queue_impl::wal::KafkaWalsOpener,
rocks_impl::manager::RocksDBWalsOpener,
table_kv_impl::wal::ObkvWalsOpener,
};

use crate::{
Expand Down Expand Up @@ -96,18 +93,54 @@ pub fn run_server(config: Config, log_runtime: RuntimeLevel) {
runtimes.default_runtime.block_on(async {
match config.analytic.wal {
StorageConfig::RocksDB(_) => {
run_server_with_runtimes::<RocksDBWalsOpener>(config, engine_runtimes, log_runtime)
#[cfg(feature = "wal-rocksdb")]
{
use wal::rocksdb_impl::manager::RocksDBWalsOpener;
run_server_with_runtimes::<RocksDBWalsOpener>(
config,
engine_runtimes,
log_runtime,
)
.await
}
#[cfg(not(feature = "wal-rocksdb"))]
{
panic!("RocksDB WAL not bundled!");
}
}

StorageConfig::Obkv(_) => {
run_server_with_runtimes::<ObkvWalsOpener>(config, engine_runtimes, log_runtime)
#[cfg(feature = "wal-table-kv")]
{
use wal::table_kv_impl::wal::ObkvWalsOpener;
run_server_with_runtimes::<ObkvWalsOpener>(
config,
engine_runtimes,
log_runtime,
)
.await;
}
#[cfg(not(feature = "wal-table-kv"))]
{
panic!("Table KV WAL not bundled!");
}
}

StorageConfig::Kafka(_) => {
run_server_with_runtimes::<KafkaWalsOpener>(config, engine_runtimes, log_runtime)
#[cfg(feature = "wal-message-queue")]
{
use wal::message_queue_impl::wal::KafkaWalsOpener;
run_server_with_runtimes::<KafkaWalsOpener>(
config,
engine_runtimes,
log_runtime,
)
.await;
}
#[cfg(not(feature = "wal-message-queue"))]
{
panic!("Message Queue WAL not bundled!");
}
}
}
});
Expand Down
14 changes: 12 additions & 2 deletions src/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,20 @@ workspace = true
[package.edition]
workspace = true

[features]
wal-message-queue = ["dep:message_queue"]
wal-table-kv = ["dep:table_kv"]
wal-rocksdb = ["dep:rocksdb"]

[[test]]
name = "read_write"
required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb"]

[dependencies.rocksdb]
git = "https://github.com/tikv/rust-rocksdb.git"
rev = "f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f"
features = ["portable"]
optional = true

[dependencies]
async-trait = { workspace = true }
Expand All @@ -41,7 +51,7 @@ generic_error = { workspace = true }
lazy_static = { workspace = true }
logger = { workspace = true }
macros = { workspace = true }
message_queue = { workspace = true }
message_queue = { workspace = true, optional = true }
prometheus = { workspace = true }
prost = { workspace = true }
runtime = { workspace = true }
Expand All @@ -50,7 +60,7 @@ serde_json = { workspace = true }
size_ext = { workspace = true }
smallvec = { workspace = true }
snafu = { workspace = true }
table_kv = { workspace = true }
table_kv = { workspace = true, optional = true }
time_ext = { workspace = true }
timed_task = { workspace = true }
tokio = { workspace = true }
Expand Down
21 changes: 17 additions & 4 deletions src/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,23 @@

use serde::{Deserialize, Serialize};

use crate::{
message_queue_impl::config::KafkaStorageConfig, rocks_impl::config::RocksDBStorageConfig,
table_kv_impl::config::ObkvStorageConfig,
};
#[cfg(feature = "wal-rocksdb")]
pub type RocksDBStorageConfig = crate::rocksdb_impl::config::RocksDBStorageConfig;
#[cfg(not(feature = "wal-rocksdb"))]
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct RocksDBStorageConfig;

#[cfg(feature = "wal-table-kv")]
pub type ObkvStorageConfig = crate::table_kv_impl::config::ObkvStorageConfig;
#[cfg(not(feature = "wal-table-kv"))]
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct ObkvStorageConfig;

#[cfg(feature = "wal-message-queue")]
pub type KafkaStorageConfig = crate::message_queue_impl::config::KafkaStorageConfig;
#[cfg(not(feature = "wal-message-queue"))]
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct KafkaStorageConfig;

/// Options for wal storage backend
#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down
7 changes: 3 additions & 4 deletions src/wal/src/kv_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,7 @@ mod tests {
use super::*;
use crate::{
kv_encoder::CommonLogKey,
log_batch::PayloadDecoder,
tests::util::{TestPayload, TestPayloadDecoder},
log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecoder},
};

#[test]
Expand All @@ -771,9 +770,9 @@ mod tests {
assert_eq!(log_key, decoded_key);
}

let decoder = TestPayloadDecoder;
let decoder = MemoryPayloadDecoder;
for val in 0..8 {
let payload = TestPayload { val };
let payload = MemoryPayload { val };

encoding.encode_value(&mut buf, &payload).unwrap();

Expand Down
8 changes: 4 additions & 4 deletions src/wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ pub mod config;
pub mod kv_encoder;
pub mod log_batch;
pub mod manager;
#[cfg(feature = "wal-message-queue")]
pub mod message_queue_impl;
pub(crate) mod metrics;
pub mod rocks_impl;
#[cfg(feature = "wal-rocksdb")]
pub mod rocksdb_impl;
#[cfg(feature = "wal-table-kv")]
pub mod table_kv_impl;

#[cfg(test)]
mod tests;
44 changes: 40 additions & 4 deletions src/wal/src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use std::fmt::Debug;

use bytes_ext::{Buf, BufMut};
use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut};
use common_types::{table::TableId, SequenceNumber};

use crate::manager::WalLocation;
use crate::manager::{Error, WalLocation};

pub trait Payload: Send + Sync + Debug {
type Error: std::error::Error + Send + Sync + 'static;
Expand All @@ -30,6 +30,30 @@ pub trait Payload: Send + Sync + Debug {
fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), Self::Error>;
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct MemoryPayload {
pub val: u32,
}

impl Payload for MemoryPayload {
type Error = Error;

fn encode_size(&self) -> usize {
4
}

fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), Self::Error> {
buf.try_put_u32(self.val).expect("must write");
Ok(())
}
}

impl From<&u32> for MemoryPayload {
fn from(v: &u32) -> Self {
Self { val: *v }
}
}

#[derive(Debug)]
pub struct LogEntry<P> {
pub table_id: TableId,
Expand All @@ -46,8 +70,8 @@ pub struct LogWriteEntry {
/// A batch of `LogWriteEntry`s.
#[derive(Debug)]
pub struct LogWriteBatch {
pub(crate) location: WalLocation,
pub(crate) entries: Vec<LogWriteEntry>,
pub location: WalLocation,
pub entries: Vec<LogWriteEntry>,
}

impl LogWriteBatch {
Expand Down Expand Up @@ -89,3 +113,15 @@ pub trait PayloadDecoder: Send + Sync {
/// Decode `Target` from the `bytes`.
fn decode<B: Buf>(&self, buf: &mut B) -> Result<Self::Target, Self::Error>;
}

pub struct MemoryPayloadDecoder;

impl PayloadDecoder for MemoryPayloadDecoder {
type Error = Error;
type Target = MemoryPayload;

fn decode<B: SafeBuf>(&self, buf: &mut B) -> Result<Self::Target, Self::Error> {
let val = buf.try_get_u32().expect("should succeed to read u32");
Ok(MemoryPayload { val })
}
}
8 changes: 5 additions & 3 deletions src/wal/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ pub mod error {
backtrace: Backtrace,
},

#[cfg(feature = "wal-table-kv")]
#[snafu(display("Failed to open obkv, err:{}", source))]
OpenObkv { source: table_kv::obkv::Error },

#[cfg(feature = "wal-message-queue")]
#[snafu(display("Failed to open kafka, err:{}", source))]
OpenKafka {
source: message_queue::kafka::kafka_impl::Error,
Expand Down Expand Up @@ -523,7 +525,7 @@ mod tests {
use runtime::{self, Runtime};

use super::*;
use crate::{log_batch::LogEntry, tests::util::TestPayloadDecoder};
use crate::log_batch::{LogEntry, MemoryPayloadDecoder};

#[derive(Debug, Clone)]
struct TestIterator {
Expand Down Expand Up @@ -605,7 +607,7 @@ mod tests {

loop {
buffer = iter
.next_log_entries(TestPayloadDecoder, buffer)
.next_log_entries(MemoryPayloadDecoder, buffer)
.await
.unwrap();
for entry in buffer.iter() {
Expand All @@ -629,7 +631,7 @@ mod tests {
let mut buffer = VecDeque::with_capacity(3);
loop {
buffer = iter
.next_log_entries(TestPayloadDecoder, buffer)
.next_log_entries(MemoryPayloadDecoder, buffer)
.await
.unwrap();
for entry in buffer.iter() {
Expand Down
Loading