Skip to content

Commit

Permalink
Change: move snapshot type definition from storage traits to `RaftTyp…
Browse files Browse the repository at this point in the history
…eConfig`

Similar to `NodeId` or `Entry`, `SnapshotData` is also a data type that
is specified by the application and needs to be defined in
`RaftTypeConfig`, which is a collection of all application types.

Public types changes:

- Add `SnapshotData` to `RaftTypeConfig`:
  ```rust
  pub trait RaftTypeConfig {
      /// ...
      type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static;
  }
  ```
- Remove associated type `SnapshotData` from `storage::RaftStorage`.
- Remove associated type `SnapshotData` from `storage::v2::RaftStateMachine`.

Corresponding API changes:

- Change `storage::RaftSnapshotBuilder<C: RaftTypeConfig, SNAPSHOT_DATA>` to `RaftSnapshotBuilder<C>`
- Change `storage::Snapshot<NID: NodeId, N: Node, SNAPSHOT_DATA>` to `storage::Snapshot<C>`

Upgrade tip:

Update generic type parameter in application types to pass compilation.
  • Loading branch information
drmingdrmer committed Apr 26, 2023
1 parent a28d552 commit 84539cb
Show file tree
Hide file tree
Showing 34 changed files with 286 additions and 355 deletions.
2 changes: 1 addition & 1 deletion cluster_benchmark/tests/benchmark/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use openraft::Raft;
use openraft::RaftNetwork;
use openraft::RaftNetworkFactory;

use crate::store::Config as MemConfig;
use crate::store::LogStore;
use crate::store::NodeId;
use crate::store::StateMachineStore;
use crate::store::TypeConfig as MemConfig;

pub type BenchRaft = Raft<MemConfig, Router, Arc<LogStore>, Arc<StateMachineStore>>;

Expand Down
36 changes: 18 additions & 18 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::RaftLogId;
use openraft::RaftTypeConfig;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
Expand All @@ -38,7 +39,8 @@ pub struct ClientResponse {}
pub type NodeId = u64;

openraft::declare_raft_types!(
pub Config: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (), Entry = Entry<Config>
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (),
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
);

#[derive(Debug)]
Expand All @@ -55,7 +57,7 @@ pub struct StateMachine {

pub struct LogStore {
vote: RwLock<Option<Vote<NodeId>>>,
log: RwLock<BTreeMap<u64, Entry<Config>>>,
log: RwLock<BTreeMap<u64, Entry<TypeConfig>>>,
last_purged_log_id: RwLock<Option<LogId<NodeId>>>,
}

Expand Down Expand Up @@ -98,11 +100,11 @@ impl StateMachineStore {
}

#[async_trait]
impl RaftLogReader<Config> for Arc<LogStore> {
impl RaftLogReader<TypeConfig> for Arc<LogStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
range: RB,
) -> Result<Vec<Entry<Config>>, StorageError<NodeId>> {
) -> Result<Vec<Entry<TypeConfig>>, StorageError<NodeId>> {
let mut entries = vec![];
{
let log = self.log.read().await;
Expand All @@ -114,7 +116,7 @@ impl RaftLogReader<Config> for Arc<LogStore> {
Ok(entries)
}

async fn get_log_state(&mut self) -> Result<LogState<Config>, StorageError<NodeId>> {
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
let log = self.log.read().await;
let last_serialized = log.iter().rev().next().map(|(_, ent)| ent);

Expand All @@ -138,9 +140,9 @@ impl RaftLogReader<Config> for Arc<LogStore> {
}

#[async_trait]
impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<StateMachineStore> {
impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<NodeId, (), Cursor<Vec<u8>>>, StorageError<NodeId>> {
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
let data;
let last_applied_log;
let last_membership;
Expand Down Expand Up @@ -190,7 +192,7 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<StateMachineStore> {
}

#[async_trait]
impl RaftLogStorage<Config> for Arc<LogStore> {
impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
let mut v = self.vote.write().await;
Expand Down Expand Up @@ -225,7 +227,7 @@ impl RaftLogStorage<Config> for Arc<LogStore> {

#[tracing::instrument(level = "trace", skip_all)]
async fn append<I>(&mut self, entries: I, callback: LogFlushed<NodeId>) -> Result<(), StorageError<NodeId>>
where I: IntoIterator<Item = Entry<Config>> + Send {
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
{
let mut log = self.log.write().await;
log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index, entry)));
Expand All @@ -242,9 +244,7 @@ impl RaftLogStorage<Config> for Arc<LogStore> {
}

#[async_trait]
impl RaftStateMachine<Config> for Arc<StateMachineStore> {
type SnapshotData = Cursor<Vec<u8>>;

impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<NodeId, ()>), StorageError<NodeId>> {
Expand All @@ -253,7 +253,7 @@ impl RaftStateMachine<Config> for Arc<StateMachineStore> {
}

async fn apply<I>(&mut self, entries: I) -> Result<Vec<ClientResponse>, StorageError<NodeId>>
where I: IntoIterator<Item = Entry<Config>> + Send {
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
let mut sm = self.sm.write().await;

let it = entries.into_iter();
Expand All @@ -275,15 +275,17 @@ impl RaftStateMachine<Config> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, ()>,
snapshot: Box<Self::SnapshotData>,
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
) -> Result<(), StorageError<NodeId>> {
let new_snapshot = StoredSnapshot {
meta: meta.clone(),
Expand All @@ -305,9 +307,7 @@ impl RaftStateMachine<Config> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(
&mut self,
) -> Result<Option<Snapshot<NodeId, (), Self::SnapshotData>>, StorageError<NodeId>> {
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
match &*self.current_snapshot.read().await {
Some(snapshot) => {
let data = snapshot.data.clone();
Expand Down
4 changes: 2 additions & 2 deletions cluster_benchmark/tests/benchmark/store/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use openraft::testing::StoreBuilder;
use openraft::testing::Suite;
use openraft::StorageError;

use crate::store::Config;
use crate::store::LogStore;
use crate::store::NodeId;
use crate::store::StateMachineStore;
use crate::store::TypeConfig;

struct Builder {}
#[async_trait]
impl StoreBuilder<Config, Arc<LogStore>, Arc<StateMachineStore>> for Builder {
impl StoreBuilder<TypeConfig, Arc<LogStore>, Arc<StateMachineStore>> for Builder {
async fn build(&self) -> Result<((), Arc<LogStore>, Arc<StateMachineStore>), StorageError<NodeId>> {
let log_store = LogStore::new_async().await;
let sm = Arc::new(StateMachineStore::new());
Expand Down
4 changes: 3 additions & 1 deletion examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use std::io::Cursor;
use std::sync::Arc;

use actix_web::middleware;
Expand Down Expand Up @@ -29,7 +30,8 @@ pub type NodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode, Entry = openraft::Entry<TypeConfig>
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
);

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
Expand Down
16 changes: 8 additions & 8 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use openraft::LogId;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage;
use openraft::RaftTypeConfig;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
Expand Down Expand Up @@ -123,9 +124,9 @@ impl RaftLogReader<TypeConfig> for Arc<Store> {
}

#[async_trait]
impl RaftSnapshotBuilder<TypeConfig, Cursor<Vec<u8>>> for Arc<Store> {
impl RaftSnapshotBuilder<TypeConfig> for Arc<Store> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<NodeId, BasicNode, Cursor<Vec<u8>>>, StorageError<NodeId>> {
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
let data;
let last_applied_log;
let last_membership;
Expand Down Expand Up @@ -176,7 +177,6 @@ impl RaftSnapshotBuilder<TypeConfig, Cursor<Vec<u8>>> for Arc<Store> {

#[async_trait]
impl RaftStorage<TypeConfig> for Arc<Store> {
type SnapshotData = Cursor<Vec<u8>>;
type LogReader = Self;
type SnapshotBuilder = Self;

Expand Down Expand Up @@ -277,15 +277,17 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
snapshot: Box<Self::SnapshotData>,
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
Expand All @@ -312,9 +314,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(
&mut self,
) -> Result<Option<Snapshot<NodeId, BasicNode, Self::SnapshotData>>, StorageError<NodeId>> {
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
match &*self.current_snapshot.read().await {
Some(snapshot) => {
let data = snapshot.data.clone();
Expand Down
10 changes: 4 additions & 6 deletions examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![deny(unused_qualifications)]

use std::fmt::Display;
use std::io::Cursor;
use std::path::Path;
use std::sync::Arc;

Expand Down Expand Up @@ -33,17 +34,14 @@ pub struct Node {

impl Display for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ExampleNode {{ rpc_addr: {}, api_addr: {} }}",
self.rpc_addr, self.api_addr
)
write!(f, "Node {{ rpc_addr: {}, api_addr: {} }}", self.rpc_addr, self.api_addr)
}
}

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node, Entry = openraft::Entry<TypeConfig>
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
);

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
Expand Down
16 changes: 8 additions & 8 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use openraft::LogId;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage;
use openraft::RaftTypeConfig;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
Expand Down Expand Up @@ -363,9 +364,9 @@ impl RaftLogReader<TypeConfig> for Arc<Store> {
}

#[async_trait]
impl RaftSnapshotBuilder<TypeConfig, Cursor<Vec<u8>>> for Arc<Store> {
impl RaftSnapshotBuilder<TypeConfig> for Arc<Store> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<NodeId, Node, Cursor<Vec<u8>>>, StorageError<NodeId>> {
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
let data;
let last_applied_log;
let last_membership;
Expand Down Expand Up @@ -411,7 +412,6 @@ impl RaftSnapshotBuilder<TypeConfig, Cursor<Vec<u8>>> for Arc<Store> {

#[async_trait]
impl RaftStorage<TypeConfig> for Arc<Store> {
type SnapshotData = Cursor<Vec<u8>>;
type LogReader = Self;
type SnapshotBuilder = Self;

Expand Down Expand Up @@ -508,15 +508,17 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, Node>,
snapshot: Box<Self::SnapshotData>,
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
Expand All @@ -541,9 +543,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(
&mut self,
) -> Result<Option<Snapshot<NodeId, Node, Self::SnapshotData>>, StorageError<NodeId>> {
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
match Store::get_current_snapshot_(self)? {
Some(snapshot) => {
let data = snapshot.data.clone();
Expand Down
Loading

0 comments on commit 84539cb

Please sign in to comment.