Skip to content

Commit

Permalink
engine: refactor MetricsFlusher and add fallback storage IO measuri…
Browse files Browse the repository at this point in the history
…ng (tikv#9458)

Signed-off-by: tabokie <xy.tao@outlook.com>

### What is changed and how it works?

What's Changed:

Remove metrics flusher in engine_traits and use background worker instead, add IO metrics task which reports statistics from iosnooper, or IO rate limiter if BCC is unavailable.

### Check List <!--REMOVE the items that are not applicable-->

Tests <!-- At least one of them must be included. -->

- Unit test

### Release note <!-- bugfixes or new feature need a release note -->

No release note
  • Loading branch information
tabokie authored Jan 28, 2021
1 parent 579f5a4 commit 20b0e95
Show file tree
Hide file tree
Showing 23 changed files with 357 additions and 428 deletions.
12 changes: 1 addition & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ members = [
"components/test_pd",
"components/tikv_alloc",
"components/match_template",
"components/engine_traits/tests",
"components/codec",
"components/configuration",
"components/panic_hook",
Expand Down
3 changes: 2 additions & 1 deletion cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ portable = ["tikv/portable"]
sse = ["tikv/sse"]
mem-profiling = ["tikv/mem-profiling"]
failpoints = ["tikv/failpoints"]
bcc-iosnoop = ["tikv/bcc-iosnoop"]
protobuf-codec = [
"backup/protobuf-codec",
"cdc/protobuf-codec",
Expand Down Expand Up @@ -86,7 +87,7 @@ error_code = { path = "../components/error_code", default-features = false }
file_system = { path = "../components/file_system" }
fs2 = "0.4"
futures = "0.3"
tokio = { version = "0.2", features = ["rt-threaded"] }
tokio = { version = "0.2", features = ["rt-threaded", "time"] }
grpcio = { version = "0.7", default-features = false, features = ["openssl-vendored"] }
hex = "0.4"
keys = { path = "../components/keys", default-features = false }
Expand Down
103 changes: 69 additions & 34 deletions cmd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
net::SocketAddr,
path::{Path, PathBuf},
sync::{atomic::AtomicU64, Arc, Mutex},
time::{Duration, Instant},
};

use concurrency_manager::ConcurrencyManager;
Expand All @@ -25,11 +26,16 @@ use engine_rocks::{
RocksEngine,
};
use engine_traits::{
compaction_job::CompactionJobInfo, EngineFileSystemInspector, Engines, MetricsFlusher,
RaftEngine, CF_DEFAULT, CF_WRITE,
compaction_job::CompactionJobInfo, EngineFileSystemInspector, Engines, RaftEngine, CF_DEFAULT,
CF_WRITE,
};
use file_system::{
set_io_rate_limiter, BytesFetcher, IORateLimiter, MetricsManager as IOMetricsManager,
};
use fs2::FileExt;
use futures::compat::Stream01CompatExt;
use futures::executor::block_on;
use futures::stream::StreamExt;
use grpcio::{EnvBuilder, Environment};
use kvproto::{
backup::create_backup, cdcpb::create_change_data, deadlock::create_deadlock,
Expand Down Expand Up @@ -74,18 +80,17 @@ use tikv::{
mvcc::MvccConsistencyCheckObserver,
},
};
use tikv_util::config::{ReadableSize, VersionTrack};
use tikv_util::{
check_environment_variables,
config::ensure_dir_exist,
config::{ensure_dir_exist, ReadableSize, VersionTrack},
sys::sys_quota::SysQuota,
time::Monitor,
worker::{Builder as WorkerBuilder, FutureWorker, Worker},
timer::GLOBAL_TIMER_HANDLE,
worker::{Builder as WorkerBuilder, FutureWorker, LazyWorker, Worker},
};
use tokio::runtime::Builder;

use crate::{setup::*, signal_handler};
use tikv_util::worker::LazyWorker;

/// Run a TiKV server. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
Expand All @@ -110,11 +115,8 @@ pub fn run_tikv(config: TiKvConfig) {

macro_rules! run_impl {
($ER: ty) => {{
let enable_io_snoop = config.enable_io_snoop;
let mut tikv = TiKVServer::<$ER>::init(config);
if enable_io_snoop {
tikv.init_io_snooper();
}
let fetcher = tikv.init_io_utility();
tikv.check_conflict_addr();
tikv.init_fs();
tikv.init_yatp();
Expand All @@ -124,7 +126,7 @@ pub fn run_tikv(config: TiKvConfig) {
let gc_worker = tikv.init_gc_worker();
let server_config = tikv.init_servers(&gc_worker);
tikv.register_services();
tikv.init_metrics_flusher();
tikv.init_metrics_flusher(fetcher);
tikv.run_server(server_config);
tikv.run_status_server();

Expand All @@ -142,6 +144,8 @@ pub fn run_tikv(config: TiKvConfig) {

const RESERVED_OPEN_FDS: u64 = 1000;

const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);

/// A complete TiKV server.
struct TiKVServer<ER: RaftEngine> {
config: TiKvConfig,
Expand Down Expand Up @@ -841,25 +845,36 @@ impl<ER: RaftEngine> TiKVServer<ER> {
}
}

fn init_metrics_flusher(&mut self) {
let mut metrics_flusher = Box::new(MetricsFlusher::new(
self.engines.as_ref().unwrap().engines.clone(),
));

// Start metrics flusher
if let Err(e) = metrics_flusher.start() {
error!(%e; "failed to start metrics flusher");
}

self.to_stop.push(metrics_flusher);
fn init_io_utility(&mut self) -> BytesFetcher {
let io_snooper_on = self.config.enable_io_snoop
&& file_system::init_io_snooper()
.map_err(|e| error!(%e; "failed to init io snooper"))
.is_ok();
let (fetcher, limiter) = if io_snooper_on {
(BytesFetcher::FromIOSnooper(), IORateLimiter::new(0, false))
} else {
let limiter = IORateLimiter::new(0, true);
(BytesFetcher::FromRateLimiter(limiter.statistics()), limiter)
};
set_io_rate_limiter(Some(Arc::new(limiter)));
fetcher
}

fn init_io_snooper(&mut self) {
if let Err(e) = file_system::init_io_snooper() {
error!(%e; "failed to init io snooper");
} else {
info!("init io snooper successfully"; "pid" => nix::unistd::getpid().to_string());
}
fn init_metrics_flusher(&mut self, fetcher: BytesFetcher) {
let handle = self.background_worker.clone_raw_handle();
let mut interval = GLOBAL_TIMER_HANDLE
.interval(Instant::now(), DEFAULT_METRICS_FLUSH_INTERVAL)
.compat();
let mut engine_metrics =
EngineMetricsManager::new(self.engines.as_ref().unwrap().engines.clone());
let mut io_metrics = IOMetricsManager::new(fetcher);
handle.spawn(async move {
while let Some(Ok(_)) = interval.next().await {
let now = Instant::now();
engine_metrics.flush(now);
io_metrics.flush(now);
}
});
}

fn run_server(&mut self, server_config: Arc<ServerConfig>) {
Expand Down Expand Up @@ -1146,12 +1161,6 @@ where
}
}

impl<ER: RaftEngine> Stop for MetricsFlusher<RocksEngine, ER> {
fn stop(mut self: Box<Self>) {
(*self).stop()
}
}

impl Stop for Worker {
fn stop(self: Box<Self>) {
Worker::stop(&self);
Expand All @@ -1163,3 +1172,29 @@ impl<T: fmt::Display + Send + 'static> Stop for LazyWorker<T> {
self.stop_worker();
}
}

const DEFAULT_ENGINE_METRICS_RESET_INTERVAL: Duration = Duration::from_millis(60_000);

pub struct EngineMetricsManager<R: RaftEngine> {
engines: Engines<RocksEngine, R>,
last_reset: Instant,
}

impl<R: RaftEngine> EngineMetricsManager<R> {
pub fn new(engines: Engines<RocksEngine, R>) -> Self {
EngineMetricsManager {
engines,
last_reset: Instant::now(),
}
}

pub fn flush(&mut self, now: Instant) {
self.engines.kv.flush_metrics("kv");
self.engines.raft.flush_metrics("raft");
if now.duration_since(self.last_reset) >= DEFAULT_ENGINE_METRICS_RESET_INTERVAL {
self.engines.kv.reset_statistics();
self.engines.raft.reset_statistics();
self.last_reset = now;
}
}
}
33 changes: 17 additions & 16 deletions components/engine_rocks/src/file_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ mod tests {
use crate::raw::{ColumnFamilyOptions, DBCompressionType};
use crate::raw_util::{new_engine_opt, CFOptions};
use engine_traits::{CompactExt, CF_DEFAULT};
use file_system::{set_io_rate_limiter, BytesRecorder, IOOp, IORateLimiter, IOType};
use file_system::{IOOp, IORateLimiter, IORateLimiterStatistics, IOType, WithIORateLimiter};
use keys::data_key;
use rocksdb::Writable;
use rocksdb::{DBOptions, DB};
use std::sync::Arc;
use tempfile::Builder;

fn new_test_db(dir: &str) -> (Arc<DB>, Arc<BytesRecorder>) {
let recorder = Arc::new(BytesRecorder::new());
set_io_rate_limiter(IORateLimiter::new(10000, Some(recorder.clone())));
fn new_test_db(dir: &str) -> (Arc<DB>, Arc<IORateLimiterStatistics>, WithIORateLimiter) {
let limiter = Arc::new(IORateLimiter::new(1, true));
let stats = limiter.statistics();
let guard = WithIORateLimiter::new(Some(limiter));
let mut db_opts = DBOptions::new();
db_opts.add_event_listener(RocksEventListener::new("test_db"));
let env = get_env(Some(Arc::new(EngineFileSystemInspector::new())), None).unwrap();
Expand All @@ -63,7 +64,7 @@ mod tests {
let db = Arc::new(
new_engine_opt(dir, db_opts, vec![CFOptions::new(CF_DEFAULT, cf_opts)]).unwrap(),
);
(db, recorder)
(db, stats, guard)
}

#[test]
Expand All @@ -74,21 +75,21 @@ mod tests {
.tempdir()
.unwrap();

let (db, recorder) = new_test_db(temp_dir.path().to_str().unwrap());
let (db, stats, _guard) = new_test_db(temp_dir.path().to_str().unwrap());
let value = vec![b'v'; value_size];

db.put(&data_key(b"a1"), &value).unwrap();
db.put(&data_key(b"a2"), &value).unwrap();
db.flush(true /*sync*/).unwrap();
assert!(recorder.fetch(IOType::Flush, IOOp::Write) > value_size * 2);
assert!(recorder.fetch(IOType::Flush, IOOp::Write) < value_size * 3);
recorder.reset();
assert!(stats.fetch(IOType::Flush, IOOp::Write) > value_size * 2);
assert!(stats.fetch(IOType::Flush, IOOp::Write) < value_size * 3);
stats.reset();
db.put(&data_key(b"a2"), &value).unwrap();
db.put(&data_key(b"a3"), &value).unwrap();
db.flush(true /*sync*/).unwrap();
assert!(recorder.fetch(IOType::Flush, IOOp::Write) > value_size * 2);
assert!(recorder.fetch(IOType::Flush, IOOp::Write) < value_size * 3);
recorder.reset();
assert!(stats.fetch(IOType::Flush, IOOp::Write) > value_size * 2);
assert!(stats.fetch(IOType::Flush, IOOp::Write) < value_size * 3);
stats.reset();
db.c()
.compact_range(
CF_DEFAULT, None, /*start_key*/
Expand All @@ -97,9 +98,9 @@ mod tests {
1, /*max_subcompactions*/
)
.unwrap();
assert!(recorder.fetch(IOType::Compaction, IOOp::Read) > value_size * 4);
assert!(recorder.fetch(IOType::Compaction, IOOp::Read) < value_size * 5);
assert!(recorder.fetch(IOType::Compaction, IOOp::Write) > value_size * 3);
assert!(recorder.fetch(IOType::Compaction, IOOp::Write) < value_size * 4);
assert!(stats.fetch(IOType::Compaction, IOOp::Read) > value_size * 4);
assert!(stats.fetch(IOType::Compaction, IOOp::Read) < value_size * 5);
assert!(stats.fetch(IOType::Compaction, IOOp::Write) > value_size * 3);
assert!(stats.fetch(IOType::Compaction, IOOp::Write) < value_size * 4);
}
}
3 changes: 0 additions & 3 deletions components/engine_traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ extern crate tikv_alloc;
#[cfg(test)]
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate slog_global;

// These modules contain traits that need to be implemented by engines, either
Expand Down Expand Up @@ -337,8 +336,6 @@ pub use raft_engine::{CacheStats, RaftEngine, RaftLogBatch};

// These modules need further scrutiny

pub mod metrics_flusher;
pub use crate::metrics_flusher::*;
pub mod compaction_job;
pub mod util;
pub use compaction_job::*;
Expand Down
77 changes: 0 additions & 77 deletions components/engine_traits/src/metrics_flusher.rs

This file was deleted.

Loading

0 comments on commit 20b0e95

Please sign in to comment.