Skip to content

Commit

Permalink
Merge branch 'multi_rocks_test' into 221109-pd-2-raft-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
tabokie authored Nov 14, 2022
2 parents 38778ad + b681af6 commit 062550f
Show file tree
Hide file tree
Showing 54 changed files with 2,363 additions and 341 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tikv"
version = "6.4.0-alpha"
version = "6.5.0-alpha"
authors = ["The TiKV Authors"]
description = "A distributed transactional key-value database powered by Rust and Raft"
license = "Apache-2.0"
Expand Down Expand Up @@ -220,8 +220,8 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "6599eb9dca74229
# When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to
# kvproto at the same time.
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
[patch.'https://github.com/pingcap/kvproto']
# kvproto = { git = "https://github.com/your_github_id/kvproto", branch = "your_branch" }
# [patch.'https://github.com/pingcap/kvproto']
# kvproto = { git = "https://github.com/your_github_id/kvproto", branch="your_branch" }

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
7 changes: 6 additions & 1 deletion components/engine_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,17 @@ pub mod kv {
db_opt: DbOptions,
cf_opts: Vec<(&'static str, KvTestCfOptions)>,
) -> Self {
Self {
let f = Self {
root_path: root_path.to_path_buf(),
db_opt,
cf_opts,
root_db: Arc::new(Mutex::default()),
};
let tablet_path = f.tablets_path();
if !tablet_path.exists() {
std::fs::create_dir_all(f.tablets_path()).unwrap();
}
f
}

fn create_tablet(&self, tablet_path: &Path) -> Result<KvTestEngine> {
Expand Down
23 changes: 16 additions & 7 deletions components/raftstore-v2/src/batch/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{
ops::{Deref, DerefMut},
path::Path,
sync::{Arc, Mutex},
time::Duration,
};
Expand All @@ -22,7 +23,7 @@ use pd_client::PdClient;
use raft::INVALID_ID;
use raftstore::store::{
fsm::store::PeerTickBatch, local_metrics::RaftMetrics, Config, ReadRunner, ReadTask,
StoreWriters, Transport, WriteSenders,
StoreWriters, TabletSnapManager, Transport, WriteSenders,
};
use slog::Logger;
use tikv_util::{
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct StoreContext<EK: KvEngine, ER: RaftEngine, T> {
pub apply_pool: FuturePool,
pub read_scheduler: Scheduler<ReadTask<EK>>,
pub pd_scheduler: Scheduler<PdTask>,
pub snap_mgr: TabletSnapManager,
}

/// A [`PollHandler`] that handles updates of [`StoreFsm`]s and [`PeerFsm`]s.
Expand Down Expand Up @@ -224,6 +226,7 @@ struct StorePollerBuilder<EK: KvEngine, ER: RaftEngine, T> {
apply_pool: FuturePool,
logger: Logger,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
snap_mgr: TabletSnapManager,
}

impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
Expand All @@ -239,6 +242,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
store_writers: &mut StoreWriters<EK, ER>,
logger: Logger,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
snap_mgr: TabletSnapManager,
) -> Self {
let pool_size = cfg.value().apply_batch_system.pool_size;
let max_pool_size = std::cmp::max(
Expand All @@ -263,6 +267,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
logger,
write_senders: store_writers.senders(),
store_meta,
snap_mgr,
}
}

Expand Down Expand Up @@ -332,6 +337,7 @@ where
apply_pool: self.apply_pool.clone(),
read_scheduler: self.read_scheduler.clone(),
pd_scheduler: self.pd_scheduler.clone(),
snap_mgr: self.snap_mgr.clone(),
};
let cfg_tracker = self.cfg.clone().tracker("raftstore".to_string());
StorePoller::new(poll_ctx, cfg_tracker)
Expand Down Expand Up @@ -375,6 +381,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
pd_client: Arc<C>,
router: &StoreRouter<EK, ER>,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
snap_mgr: TabletSnapManager,
) -> Result<()>
where
T: Transport + 'static,
Expand All @@ -389,10 +396,11 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
workers
.store_writers
.spawn(store_id, raft_engine.clone(), None, router, &trans, &cfg)?;
let read_scheduler = workers.async_read_worker.start(
"async-read-worker",
ReadRunner::new(router.clone(), raft_engine.clone()),
);
let mut read_runner = ReadRunner::new(router.clone(), raft_engine.clone());
read_runner.set_snap_mgr(snap_mgr.clone());
let read_scheduler = workers
.async_read_worker
.start("async-read-worker", read_runner);
let pd_scheduler = workers.pd_worker.start(
"pd-worker",
PdRunner::new(
Expand All @@ -417,6 +425,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
&mut workers.store_writers,
self.logger.clone(),
store_meta.clone(),
snap_mgr,
);
self.workers = Some(workers);
let peers = builder.init()?;
Expand Down Expand Up @@ -485,7 +494,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreRouter<EK, ER> {
) -> std::result::Result<(), TrySendError<Box<RaftMessage>>> {
let id = msg.get_region_id();
let peer_msg = PeerMsg::RaftMessage(msg);
let store_msg = match self.try_send(id, peer_msg) {
let store_msg = match self.router.try_send(id, peer_msg) {
Either::Left(Ok(())) => return Ok(()),
Either::Left(Err(TrySendError::Full(PeerMsg::RaftMessage(m)))) => {
return Err(TrySendError::Full(m));
Expand All @@ -496,7 +505,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreRouter<EK, ER> {
Either::Right(PeerMsg::RaftMessage(m)) => StoreMsg::RaftMessage(m),
_ => unreachable!(),
};
match self.send_control(store_msg) {
match self.router.send_control(store_msg) {
Ok(()) => Ok(()),
Err(TrySendError::Full(StoreMsg::RaftMessage(m))) => Err(TrySendError::Full(m)),
Err(TrySendError::Disconnected(StoreMsg::RaftMessage(m))) => {
Expand Down
11 changes: 7 additions & 4 deletions components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,13 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
PeerMsg::Persisted {
peer_id,
ready_number,
} => self
.fsm
.peer_mut()
.on_persisted(self.store_ctx, peer_id, ready_number),
need_scheduled,
} => self.fsm.peer_mut().on_persisted(
self.store_ctx,
peer_id,
ready_number,
need_scheduled,
),
PeerMsg::LogsFetched(fetched_logs) => {
self.fsm.peer_mut().on_logs_fetched(fetched_logs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
util::check_conf_change(
&ctx.cfg,
self.raft_group(),
self.region(),
self.peer(),
changes.as_ref(),
&cc,
Expand Down
5 changes: 4 additions & 1 deletion components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
proposal_ctx: Vec<u8>,
) -> Result<u64> {
store_ctx.raft_metrics.propose.normal.inc();
PEER_PROPOSE_LOG_SIZE_HISTOGRAM.observe(data.len() as f64);
store_ctx
.raft_metrics
.propose_log_size
.observe(data.len() as f64);
if data.len() as u64 > store_ctx.cfg.raft_entry_max_size.0 {
return Err(Error::RaftEntryTooLarge {
region_id: self.region_id(),
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/operation/query/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ mod tests {
region1.set_region_epoch(epoch13.clone());
let term6 = 6;
let mut lease = Lease::new(Duration::seconds(10), Duration::milliseconds(2500));
let read_progress = Arc::new(RegionReadProgress::new(&region1, 1, 1, "".to_owned()));
let read_progress = Arc::new(RegionReadProgress::new(&region1, 1, 1, 1));

let mut cmd = RaftCmdRequest::default();
let mut header = RaftRequestHeader::default();
Expand Down
4 changes: 3 additions & 1 deletion components/raftstore-v2/src/operation/ready/async_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,13 @@ where
}

impl<EK: KvEngine, ER: RaftEngine> PersistedNotifier for StoreRouter<EK, ER> {
fn notify(&self, region_id: u64, peer_id: u64, ready_number: u64) {
fn notify(&self, region_id: u64, peer_id: u64, ready_number: u64, need_scheduled: bool) {
if let Err(e) = self.force_send(
region_id,
PeerMsg::Persisted {
peer_id,
ready_number,
need_scheduled,
},
) {
warn!(
Expand All @@ -224,6 +225,7 @@ impl<EK: KvEngine, ER: RaftEngine> PersistedNotifier for StoreRouter<EK, ER> {
"region_id" => region_id,
"peer_id" => peer_id,
"ready_number" => ready_number,
"need_scheduled" =>need_scheduled,
"error" => ?e,
);
}
Expand Down
54 changes: 44 additions & 10 deletions components/raftstore-v2/src/operation/ready/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,25 @@
mod async_writer;
mod snapshot;

use std::cmp;
use std::{cmp, path::PathBuf, sync::Arc};

use engine_traits::{KvEngine, RaftEngine};
use engine_traits::{KvEngine, MiscExt, OpenOptions, RaftEngine, TabletFactory};
use error_code::ErrorCodeExt;
use kvproto::raft_serverpb::RaftMessage;
use kvproto::raft_serverpb::{PeerState, RaftMessage, RaftSnapshotData};
use protobuf::Message as _;
use raft::{eraftpb, Ready};
use raftstore::store::{util, ExtraStates, FetchedLogs, Transport, WriteTask};
use slog::{debug, error, trace, warn};
use tikv_util::time::{duration_to_sec, monotonic_raw_now};
use raft::{
eraftpb::{self, MessageType, Snapshot},
Ready,
};
use raftstore::{
coprocessor::ApplySnapshotObserver,
store::{util, ExtraStates, FetchedLogs, SnapKey, TabletSnapKey, Transport, WriteTask},
};
use slog::{debug, error, info, trace, warn};
use tikv_util::{
box_err,
time::{duration_to_sec, monotonic_raw_now},
};

pub use self::{
async_writer::AsyncWriter,
Expand All @@ -40,6 +49,7 @@ use crate::{
fsm::PeerFsmDelegate,
raft::{Peer, Storage},
router::{ApplyTask, PeerTick},
Result,
};

impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER, T> {
Expand Down Expand Up @@ -314,7 +324,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
let ready_number = ready.number();
let mut write_task = WriteTask::new(self.region_id(), self.peer_id(), ready_number);
self.storage_mut()
.handle_raft_ready(&mut ready, &mut write_task);
.handle_raft_ready(&mut ready, &mut write_task, ctx);
if !ready.persisted_messages().is_empty() {
write_task.messages = ready
.take_persisted_messages()
Expand Down Expand Up @@ -363,11 +373,23 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
ctx: &mut StoreContext<EK, ER, T>,
peer_id: u64,
ready_number: u64,
need_scheduled: bool,
) {
if peer_id != self.peer_id() {
error!(self.logger, "peer id not matched"; "persisted_peer_id" => peer_id, "persisted_number" => ready_number);
return;
}
if need_scheduled {
self.storage_mut().after_applied_snapshot();
let suffix = self.storage().raft_state().last_index;
let region_id = self.storage().get_region_id();
let tablet = ctx
.tablet_factory
.open_tablet(region_id, Some(suffix), OpenOptions::default())
.unwrap();
self.tablet_mut().set(tablet);
self.schedule_apply_fsm(ctx);
}
let persisted_message = self
.async_writer
.on_persisted(ctx, ready_number, &self.logger);
Expand Down Expand Up @@ -401,11 +423,23 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
/// Apply the ready to the storage. If there is any states need to be
/// persisted, it will be written to `write_task`.
fn handle_raft_ready(&mut self, ready: &mut Ready, write_task: &mut WriteTask<EK, ER>) {
fn handle_raft_ready<T: Transport>(
&mut self,
ready: &mut Ready,
write_task: &mut WriteTask<EK, ER>,
ctx: &mut StoreContext<EK, ER, T>,
) {
let prev_raft_state = self.entry_storage().raft_state().clone();
let ever_persisted = self.ever_persisted();

// TODO: handle snapshot
if !ready.snapshot().is_empty() {
let _ = self.apply_snapshot(
ready.snapshot(),
write_task,
ctx.snap_mgr.clone(),
ctx.tablet_factory.clone(),
);
}

let entry_storage = self.entry_storage_mut();
if !ready.entries().is_empty() {
Expand Down
Loading

0 comments on commit 062550f

Please sign in to comment.