Skip to content

Commit

Permalink
feat: impl DoNothing wal (apache#1311)
Browse files Browse the repository at this point in the history
## Rationale
In some deployments when latest data is allowed to lost, wal is not a
required module.

Also, wal may cost too much resources if write throughput is high, by
disable it, we can save those resources.


## Detailed Changes
- Add a `DoNothing` wal implementation
- Introduce `WalConfig`, and add a new `disable_data` flag in it.
- This `Config` wrap old `StorageConfig`, and use `#[serde(flatten)]` to
keep backwards compatible.

## Test Plan
Manually
  • Loading branch information
jiacai2050 authored Nov 20, 2023
1 parent 7de0519 commit 046cdd3
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 117 deletions.
6 changes: 3 additions & 3 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use object_store::config::StorageOptions;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use time_ext::ReadableDuration;
use wal::config::StorageConfig;
use wal::config::Config as WalConfig;

pub use crate::{compaction::scheduler::SchedulerConfig, table_options::TableOptions};

Expand Down Expand Up @@ -112,7 +112,7 @@ pub struct Config {
/// + RocksDB
/// + OBKV
/// + Kafka
pub wal: StorageConfig,
pub wal: WalConfig,

/// Recover mode
///
Expand Down Expand Up @@ -188,7 +188,7 @@ impl Default for Config {
max_bytes_per_write_batch: None,
mem_usage_sampling_interval: ReadableDuration::secs(0),
wal_encode: WalEncodeConfig::default(),
wal: StorageConfig::RocksDB(Box::default()),
wal: WalConfig::default(),
remote_engine_client: remote_engine_client::config::Config::default(),
recover_mode: RecoverMode::TableBased,
metrics: MetricsOptions::default(),
Expand Down
42 changes: 26 additions & 16 deletions analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use table_engine::{
use tempfile::TempDir;
use time_ext::ReadableDuration;
use wal::{
config::StorageConfig,
config::{Config as WalConfig, StorageConfig},
manager::{OpenedWals, WalRuntimes, WalsOpener},
rocksdb_impl::{config::RocksDBStorageConfig, manager::RocksDBWalsOpener},
table_kv_impl::wal::MemWalsOpener,
Expand Down Expand Up @@ -506,10 +506,13 @@ impl Builder {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
},
wal: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
wal: WalConfig {
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
disable_data: false,
},
..Default::default()
};

Expand Down Expand Up @@ -581,11 +584,13 @@ impl Default for RocksDBEngineBuildContext {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
},

wal: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
wal: WalConfig {
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
disable_data: false,
},
..Default::default()
};

Expand Down Expand Up @@ -614,11 +619,13 @@ impl Clone for RocksDBEngineBuildContext {
};

config.storage = storage;
config.wal = StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
}));

config.wal = WalConfig {
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
disable_data: false,
};
Self {
config,
open_method: self.open_method,
Expand Down Expand Up @@ -674,7 +681,10 @@ impl Default for MemoryEngineBuildContext {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
},
wal: StorageConfig::Obkv(Box::default()),
wal: WalConfig {
storage: StorageConfig::Obkv(Box::default()),
disable_data: false,
},
..Default::default()
};

Expand Down
14 changes: 13 additions & 1 deletion src/ceresdb/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
}
}

fn validate_config(config: &Config) {
let is_data_wal_disabled = config.analytic.wal.disable_data;
if is_data_wal_disabled {
let is_cluster = config.cluster_deployment.is_some();
if !is_cluster {
panic!("Invalid config, we can only disable data wal in cluster deployments")
}
}
}

/// Run a server, returns when the server is shutdown by user
pub fn run_server(config: Config, log_runtime: RuntimeLevel) {
let runtimes = Arc::new(build_engine_runtimes(&config.runtime));
Expand All @@ -108,8 +118,10 @@ pub fn run_server(config: Config, log_runtime: RuntimeLevel) {

info!("Server starts up, config:{:#?}", config);

validate_config(&config);

runtimes.default_runtime.block_on(async {
match config.analytic.wal {
match config.analytic.wal.storage {
StorageConfig::RocksDB(_) => {
#[cfg(feature = "wal-rocksdb")]
{
Expand Down
21 changes: 21 additions & 0 deletions src/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ pub type KafkaStorageConfig = crate::message_queue_impl::config::KafkaStorageCon
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct KafkaStorageConfig;

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
// The flatten attribute inlines keys from a field into the parent struct.
// That's to say `storage` has no real usage, it's just a placeholder.
#[serde(flatten)]
pub storage: StorageConfig,
/// If true, data wal will return Ok directly, without any IO operations.
// Note: this is only used for test, we shouldn't enable this in production.
#[serde(default)]
pub disable_data: bool,
}

impl Default for Config {
fn default() -> Self {
Self {
storage: StorageConfig::RocksDB(Box::default()),
disable_data: false,
}
}
}

/// Options for wal storage backend
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type")]
Expand Down
74 changes: 74 additions & 0 deletions src/wal/src/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_types::SequenceNumber;

use crate::{
log_batch::LogWriteBatch,
manager::{
BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, Result, ScanContext,
ScanRequest, WalLocation, WalManager, WriteContext,
},
};

#[derive(Debug)]
pub struct DoNothing;

#[async_trait]
impl WalManager for DoNothing {
async fn sequence_num(&self, _location: WalLocation) -> Result<SequenceNumber> {
Ok(0)
}

async fn mark_delete_entries_up_to(
&self,
_location: WalLocation,
_sequence_num: SequenceNumber,
) -> Result<()> {
Ok(())
}

async fn close_region(&self, _region: RegionId) -> Result<()> {
Ok(())
}

async fn close_gracefully(&self) -> Result<()> {
Ok(())
}

async fn read_batch(
&self,
_ctx: &ReadContext,
_req: &ReadRequest,
) -> Result<BatchLogIteratorAdapter> {
Ok(BatchLogIteratorAdapter::empty())
}

async fn write(&self, _ctx: &WriteContext, _batch: &LogWriteBatch) -> Result<SequenceNumber> {
Ok(0)
}

async fn scan(
&self,
_ctx: &ScanContext,
_req: &ScanRequest,
) -> Result<BatchLogIteratorAdapter> {
Ok(BatchLogIteratorAdapter::empty())
}

async fn get_statistics(&self) -> Option<String> {
None
}
}
1 change: 1 addition & 0 deletions src/wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#![feature(trait_alias)]

pub mod config;
mod dummy;
pub mod kv_encoder;
pub mod log_batch;
pub mod manager;
Expand Down
11 changes: 9 additions & 2 deletions src/wal/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use runtime::Runtime;
use snafu::ResultExt;

use crate::{
config::StorageConfig,
config::Config,
log_batch::{LogEntry, LogWriteBatch, PayloadDecodeContext, PayloadDecoder},
metrics::WAL_WRITE_BYTES_HISTOGRAM,
};
Expand Down Expand Up @@ -400,6 +400,13 @@ impl BatchLogIteratorAdapter {
}
}

pub fn empty() -> Self {
Self {
iter: None,
batch_size: 1,
}
}

async fn simulated_async_next<D, F>(
&mut self,
decoder: D,
Expand Down Expand Up @@ -549,7 +556,7 @@ pub(crate) const MANIFEST_DIR_NAME: &str = "manifest";

#[async_trait]
pub trait WalsOpener: Send + Sync + Default {
async fn open_wals(&self, config: &StorageConfig, runtimes: WalRuntimes) -> Result<OpenedWals>;
async fn open_wals(&self, config: &Config, runtimes: WalRuntimes) -> Result<OpenedWals>;
}

#[cfg(test)]
Expand Down
25 changes: 15 additions & 10 deletions src/wal/src/message_queue_impl/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use runtime::Runtime;
use snafu::ResultExt;

use crate::{
config::StorageConfig,
config::{Config, StorageConfig},
log_batch::{LogEntry, LogWriteBatch},
manager::{
self, error::*, AsyncLogIterator, BatchLogIteratorAdapter, OpenedWals, ReadContext,
Expand Down Expand Up @@ -138,8 +138,8 @@ pub struct KafkaWalsOpener;

#[async_trait]
impl WalsOpener for KafkaWalsOpener {
async fn open_wals(&self, config: &StorageConfig, runtimes: WalRuntimes) -> Result<OpenedWals> {
let kafka_wal_config = match config {
async fn open_wals(&self, config: &Config, runtimes: WalRuntimes) -> Result<OpenedWals> {
let kafka_wal_config = match &config.storage {
StorageConfig::Kafka(config) => config.clone(),
_ => {
return InvalidWalConfig {
Expand All @@ -156,12 +156,17 @@ impl WalsOpener for KafkaWalsOpener {
let kafka = KafkaImpl::new(kafka_wal_config.kafka.clone())
.await
.context(OpenKafka)?;
let data_wal = MessageQueueImpl::new(
WAL_DIR_NAME.to_string(),
kafka.clone(),
default_runtime.clone(),
kafka_wal_config.data_namespace,
);
let data_wal = if config.disable_data {
Arc::new(crate::dummy::DoNothing) as Arc<_>
} else {
let data_wal = MessageQueueImpl::new(
WAL_DIR_NAME.to_string(),
kafka.clone(),
default_runtime.clone(),
kafka_wal_config.data_namespace,
);
Arc::new(data_wal) as Arc<_>
};

let manifest_wal = MessageQueueImpl::new(
MANIFEST_DIR_NAME.to_string(),
Expand All @@ -171,7 +176,7 @@ impl WalsOpener for KafkaWalsOpener {
);

Ok(OpenedWals {
data_wal: Arc::new(data_wal),
data_wal,
manifest_wal: Arc::new(manifest_wal),
})
}
Expand Down
Loading

0 comments on commit 046cdd3

Please sign in to comment.