Skip to content

Commit

Permalink
Merge pull request #6 from tabokie/221109-pd-2-raft-v2
Browse files Browse the repository at this point in the history
221109 pd 2 raft v2
  • Loading branch information
tonyxuqqi committed Nov 14, 2022
2 parents b681af6 + 062550f commit 7516a0d
Show file tree
Hide file tree
Showing 15 changed files with 964 additions and 14 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions components/raftstore-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,25 @@ engine_traits = { workspace = true }
error_code = { workspace = true }
fail = "0.5"
file_system = { workspace = true }
fs2 = "0.4"
futures = { version = "0.3", features = ["compat"] }
keys = { workspace = true }
kvproto = { git = "https://github.com/pingcap/kvproto.git" }
log_wrappers = { workspace = true }
pd_client = { workspace = true }
prometheus = { version = "0.13", features = ["nightly"] }
protobuf = { version = "2.8", features = ["bytes"] }
raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] }
raft-proto = { version = "0.7.0" }
raftstore = { workspace = true }
resource_metering = { workspace = true }
slog = "2.3"
smallvec = "1.4"
tikv_util = { workspace = true }
time = "0.1"
tracker = { workspace = true }
txn_types = { workspace = true }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }

[dev-dependencies]
engine_test = { workspace = true }
Expand Down
31 changes: 29 additions & 2 deletions components/raftstore-v2/src/batch/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use kvproto::{
metapb::Store,
raft_serverpb::{PeerState, RaftMessage},
};
use pd_client::PdClient;
use raft::INVALID_ID;
use raftstore::store::{
fsm::store::PeerTickBatch, local_metrics::RaftMetrics, Config, ReadRunner, ReadTask,
Expand All @@ -43,6 +44,7 @@ use crate::{
fsm::{PeerFsm, PeerFsmDelegate, SenderFsmPair, StoreFsm, StoreFsmDelegate, StoreMeta},
raft::Storage,
router::{PeerMsg, PeerTick, StoreMsg},
worker::{PdRunner, PdTask},
Error, Result,
};

Expand Down Expand Up @@ -70,6 +72,7 @@ pub struct StoreContext<EK: KvEngine, ER: RaftEngine, T> {
pub tablet_factory: Arc<dyn TabletFactory<EK>>,
pub apply_pool: FuturePool,
pub read_scheduler: Scheduler<ReadTask<EK>>,
pub pd_scheduler: Scheduler<PdTask>,
pub snap_mgr: TabletSnapManager,
}

Expand Down Expand Up @@ -218,6 +221,7 @@ struct StorePollerBuilder<EK: KvEngine, ER: RaftEngine, T> {
trans: T,
router: StoreRouter<EK, ER>,
read_scheduler: Scheduler<ReadTask<EK>>,
pd_scheduler: Scheduler<PdTask>,
write_senders: WriteSenders<EK, ER>,
apply_pool: FuturePool,
logger: Logger,
Expand All @@ -234,6 +238,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
trans: T,
router: StoreRouter<EK, ER>,
read_scheduler: Scheduler<ReadTask<EK>>,
pd_scheduler: Scheduler<PdTask>,
store_writers: &mut StoreWriters<EK, ER>,
logger: Logger,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
Expand All @@ -257,6 +262,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
trans,
router,
read_scheduler,
pd_scheduler,
apply_pool,
logger,
write_senders: store_writers.senders(),
Expand Down Expand Up @@ -330,6 +336,7 @@ where
tablet_factory: self.tablet_factory.clone(),
apply_pool: self.apply_pool.clone(),
read_scheduler: self.read_scheduler.clone(),
pd_scheduler: self.pd_scheduler.clone(),
snap_mgr: self.snap_mgr.clone(),
};
let cfg_tracker = self.cfg.clone().tracker("raftstore".to_string());
Expand All @@ -342,13 +349,15 @@ where
struct Workers<EK: KvEngine, ER: RaftEngine> {
/// Worker for fetching raft logs asynchronously
async_read_worker: Worker,
pd_worker: Worker,
store_writers: StoreWriters<EK, ER>,
}

impl<EK: KvEngine, ER: RaftEngine> Default for Workers<EK, ER> {
fn default() -> Self {
Self {
async_read_worker: Worker::new("async-read-worker"),
pd_worker: Worker::new("pd-worker"),
store_writers: StoreWriters::default(),
}
}
Expand All @@ -362,30 +371,47 @@ pub struct StoreSystem<EK: KvEngine, ER: RaftEngine> {
}

impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
pub fn start<T>(
pub fn start<T, C>(
&mut self,
store_id: u64,
cfg: Arc<VersionTrack<Config>>,
raft_engine: ER,
tablet_factory: Arc<dyn TabletFactory<EK>>,
trans: T,
pd_client: Arc<C>,
router: &StoreRouter<EK, ER>,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
snap_mgr: TabletSnapManager,
) -> Result<()>
where
T: Transport + 'static,
C: PdClient + 'static,
{
let router_clone = router.clone();
// pd_client.handle_reconnect(move || {
// router_clone.broadcast_normal(|| PeerMsg::HeartbeatPd);
// });

let mut workers = Workers::default();
workers
.store_writers
.spawn(store_id, raft_engine.clone(), None, router, &trans, &cfg)?;

let mut read_runner = ReadRunner::new(router.clone(), raft_engine.clone());
read_runner.set_snap_mgr(snap_mgr.clone());
let read_scheduler = workers
.async_read_worker
.start("async-read-worker", read_runner);
let pd_scheduler = workers.pd_worker.start(
"pd-worker",
PdRunner::new(
pd_client,
raft_engine.clone(),
tablet_factory.clone(),
router.clone(),
workers.pd_worker.remote(),
self.logger.clone(),
),
);

let mut builder = StorePollerBuilder::new(
cfg.clone(),
Expand All @@ -395,6 +421,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
trans,
router.clone(),
read_scheduler,
pd_scheduler,
&mut workers.store_writers,
self.logger.clone(),
store_meta.clone(),
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
fn on_tick(&mut self, tick: PeerTick) {
match tick {
PeerTick::Raft => self.on_raft_tick(),
PeerTick::PdHeartbeat => self.on_pd_heartbeat(),
PeerTick::RaftLogGc => unimplemented!(),
PeerTick::SplitRegionCheck => unimplemented!(),
PeerTick::PdHeartbeat => unimplemented!(),
PeerTick::CheckMerge => unimplemented!(),
PeerTick::CheckPeerStaleState => unimplemented!(),
PeerTick::EntryCacheEvict => unimplemented!(),
Expand Down
40 changes: 33 additions & 7 deletions components/raftstore-v2/src/fsm/store.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::time::SystemTime;
use std::time::{Duration, SystemTime};

use batch_system::Fsm;
use collections::HashMap;
use engine_traits::{KvEngine, RaftEngine};
use futures::{compat::Future01CompatExt, FutureExt};
use raftstore::store::{Config, ReadDelegate};
use slog::{o, Logger};
use tikv_util::mpsc::{self, LooseBoundedSender, Receiver};
use slog::{info, o, Logger};
use tikv_util::{
future::poll_future_notify,
is_zero_duration,
mpsc::{self, LooseBoundedSender, Receiver},
};

use crate::{
batch::StoreContext,
Expand Down Expand Up @@ -74,7 +79,7 @@ impl Store {
}

pub struct StoreFsm {
store: Store,
pub store: Store,
receiver: Receiver<StoreMsg>,
}

Expand Down Expand Up @@ -118,8 +123,8 @@ impl Fsm for StoreFsm {
}

pub struct StoreFsmDelegate<'a, EK: KvEngine, ER: RaftEngine, T> {
fsm: &'a mut StoreFsm,
store_ctx: &'a mut StoreContext<EK, ER, T>,
pub fsm: &'a mut StoreFsm,
pub store_ctx: &'a mut StoreContext<EK, ER, T>,
}

impl<'a, EK: KvEngine, ER: RaftEngine, T> StoreFsmDelegate<'a, EK, ER, T> {
Expand All @@ -139,8 +144,29 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T> StoreFsmDelegate<'a, EK, ER, T> {
);
}

pub fn schedule_tick(&mut self, tick: StoreTick, timeout: Duration) {
if !is_zero_duration(&timeout) {
let mb = self.store_ctx.router.control_mailbox();
let logger = self.fsm.store.logger().clone();
let delay = self.store_ctx.timer.delay(timeout).compat().map(move |_| {
if let Err(e) = mb.force_send(StoreMsg::Tick(tick)) {
info!(
logger,
"failed to schedule store tick, are we shutting down?";
"tick" => ?tick,
"err" => ?e
);
}
});
poll_future_notify(delay);
}
}

fn on_tick(&mut self, tick: StoreTick) {
unimplemented!()
match tick {
StoreTick::PdStoreHeartbeat => self.on_pd_store_heartbeat(),
_ => unimplemented!(),
}
}

pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>) {
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#![allow(unused)]
#![feature(let_else)]
#![feature(array_windows)]
#![feature(div_duration)]

mod batch;
mod bootstrap;
Expand All @@ -32,6 +33,7 @@ mod operation;
mod raft;
pub mod router;
mod tablet;
mod worker;

pub(crate) use batch::StoreContext;
pub use batch::{create_store_batch_system, StoreRouter, StoreSystem};
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

mod command;
mod life;
mod pd;
mod query;
mod ready;

Expand Down
Loading

0 comments on commit 7516a0d

Please sign in to comment.