Skip to content

Commit

Permalink
Merge pull request #776 from drmingdrmer/51-eng-c
Browse files Browse the repository at this point in the history
Refactor: replace Engine and Command type parameter (NID, N, Entry) with C
  • Loading branch information
drmingdrmer authored Apr 18, 2023
2 parents 74cf0b3 + f01ff9a commit 8c61a24
Show file tree
Hide file tree
Showing 47 changed files with 264 additions and 356 deletions.
7 changes: 2 additions & 5 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ where
/// A controlling handle to the [`RaftStateMachine`] worker.
pub(crate) sm_handle: sm::Handle<C, SM>,

pub(crate) engine: Engine<C::NodeId, C::Node, C::Entry>,
pub(crate) engine: Engine<C>,

pub(crate) leader_data: Option<LeaderData<C, SM::SnapshotData>>,

Expand Down Expand Up @@ -1379,10 +1379,7 @@ where
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
async fn run_command<'e>(
&mut self,
cmd: Command<C::NodeId, C::Node, C::Entry>,
) -> Result<Option<Command<C::NodeId, C::Node, C::Entry>>, StorageError<C::NodeId>> {
async fn run_command<'e>(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C::NodeId>> {
if let Some(condition) = cmd.condition() {
match condition {
Condition::LogFlushed { .. } => {
Expand Down
73 changes: 38 additions & 35 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use tokio::sync::oneshot;

use crate::core::sm;
use crate::engine::CommandKind;
use crate::entry::RaftEntry;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
Expand All @@ -19,16 +18,14 @@ use crate::LogId;
use crate::MetricsChangeFlags;
use crate::Node;
use crate::NodeId;
use crate::RaftTypeConfig;
use crate::SnapshotMeta;
use crate::Vote;

/// Commands to send to `RaftRuntime` to execute, to update the application state.
#[derive(Debug)]
pub(crate) enum Command<NID, N, Ent>
where
NID: NodeId,
N: Node,
Ent: RaftEntry<NID, N>,
pub(crate) enum Command<C>
where C: RaftTypeConfig
{
/// Becomes a leader, i.e., its `vote` is granted by a quorum.
/// The runtime initializes leader data when receives this command.
Expand All @@ -38,37 +35,40 @@ where
QuitLeader,

/// Append one entry.
AppendEntry { entry: Ent },
AppendEntry { entry: C::Entry },

/// Append a `range` of entries.
AppendInputEntries { entries: Vec<Ent> },
AppendInputEntries { entries: Vec<C::Entry> },

/// Append a blank log.
///
/// One of the usage is when a leader is established, a blank log is written to commit the
/// state.
AppendBlankLog { log_id: LogId<NID> },
AppendBlankLog { log_id: LogId<C::NodeId> },

/// Replicate the committed log id to other nodes
ReplicateCommitted { committed: Option<LogId<NID>> },
ReplicateCommitted { committed: Option<LogId<C::NodeId>> },

/// Commit entries that are already in the store, upto `upto`, inclusive.
/// And send applied result to the client that proposed the entry.
LeaderCommit {
// TODO: pass the log id list?
// TODO: merge LeaderCommit and FollowerCommit
already_committed: Option<LogId<NID>>,
upto: LogId<NID>,
already_committed: Option<LogId<C::NodeId>>,
upto: LogId<C::NodeId>,
},

/// Commit entries that are already in the store, upto `upto`, inclusive.
FollowerCommit {
already_committed: Option<LogId<NID>>,
upto: LogId<NID>,
already_committed: Option<LogId<C::NodeId>>,
upto: LogId<C::NodeId>,
},

/// Replicate log entries or snapshot to a target.
Replicate { target: NID, req: Inflight<NID> },
Replicate {
target: C::NodeId,
req: Inflight<C::NodeId>,
},

/// Membership config changed, need to update replication streams.
/// The Runtime has to close all old replications and start new ones.
Expand All @@ -77,27 +77,30 @@ where
/// updated.
RebuildReplicationStreams {
/// Targets to replicate to.
targets: Vec<(NID, ProgressEntry<NID>)>,
targets: Vec<(C::NodeId, ProgressEntry<C::NodeId>)>,
},

// TODO(3): it also update the progress of a leader.
// Add doc:
// `target` can also be the leader id.
/// As the state of replication to `target` is updated, the metrics should be updated.
UpdateProgressMetrics { target: NID, matching: LogId<NID> },
UpdateProgressMetrics {
target: C::NodeId,
matching: LogId<C::NodeId>,
},

/// Save vote to storage
SaveVote { vote: Vote<NID> },
SaveVote { vote: Vote<C::NodeId> },

/// Send vote to all other members
SendVote { vote_req: VoteRequest<NID> },
SendVote { vote_req: VoteRequest<C::NodeId> },

/// Purge log from the beginning to `upto`, inclusive.
PurgeLog { upto: LogId<NID> },
PurgeLog { upto: LogId<C::NodeId> },

/// Delete logs that conflict with the leader from a follower/learner since log id `since`,
/// inclusive.
DeleteConflictLog { since: LogId<NID> },
DeleteConflictLog { since: LogId<C::NodeId> },

// TODO(3): put all state machine related commands in a separate enum.
/// Build a snapshot.
Expand All @@ -108,25 +111,28 @@ where

/// Install a snapshot data file: e.g., replace state machine with snapshot, save snapshot
/// data.
InstallSnapshot { snapshot_meta: SnapshotMeta<NID, N> },
InstallSnapshot {
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
},

// TODO: remove this, use InstallSnapshot instead.
/// A received snapshot does not need to be installed, just drop buffered snapshot data.
CancelSnapshot { snapshot_meta: SnapshotMeta<NID, N> },
CancelSnapshot {
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
},

/// Send result to caller
Respond {
when: Option<Condition<NID>>,
resp: Respond<NID, N>,
when: Option<Condition<C::NodeId>>,
resp: Respond<C::NodeId, C::Node>,
},
}

/// For unit testing
impl<NID, N, Ent> PartialEq for Command<NID, N, Ent>
impl<C> PartialEq for Command<C>
where
NID: NodeId,
N: Node,
Ent: RaftEntry<NID, N> + PartialEq,
C: RaftTypeConfig,
C::Entry: PartialEq,
{
#[rustfmt::skip]
fn eq(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -155,11 +161,8 @@ where
}
}

impl<NID, N, Ent> Command<NID, N, Ent>
where
NID: NodeId,
N: Node,
Ent: RaftEntry<NID, N>,
impl<C> Command<C>
where C: RaftTypeConfig
{
/// Update the flag of the metrics that needs to be updated when this command is executed.
pub(crate) fn update_metrics_flags(&self, flags: &mut MetricsChangeFlags) {
Expand Down Expand Up @@ -218,7 +221,7 @@ where
/// Return the condition the command waits for if any.
#[allow(dead_code)]
#[rustfmt::skip]
pub(crate) fn condition(&self) -> Option<&Condition<NID>> {
pub(crate) fn condition(&self) -> Option<&Condition<C::NodeId>> {
match self {
Command::BecomeLeader => None,
Command::QuitLeader => None,
Expand Down
Loading

0 comments on commit 8c61a24

Please sign in to comment.