Skip to content

Commit

Permalink
Feature: add trait RaftLogStorageExt to provide additional raft-log…
Browse files Browse the repository at this point in the history
… methods

The `RaftLogReaderExt::blocking_append()` method enables the caller to
append logs to storage in a blocking manner, eliminating the need to
create and await a callback. This method simplifies the process of
writing tests.
  • Loading branch information
drmingdrmer committed Mar 4, 2024
1 parent 273232c commit 884f0da
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 39 deletions.
1 change: 1 addition & 0 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use log_store_ext::RaftLogReaderExt;
use macros::add_async_trait;
pub use snapshot_signature::SnapshotSignature;
pub use v2::RaftLogStorage;
pub use v2::RaftLogStorageExt;
pub use v2::RaftStateMachine;

use crate::display_ext::DisplayOption;
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
//! [`RaftStorage`](`crate::storage::RaftStorage`). [`RaftLogStorage`] is responsible for storing
//! logs, and [`RaftStateMachine`] is responsible for storing state machine and snapshot.

mod raft_log_storage_ext;

use macros::add_async_trait;
pub use raft_log_storage_ext::RaftLogStorageExt;

use crate::storage::callback::LogFlushed;
use crate::storage::v2::sealed::Sealed;
Expand Down
45 changes: 45 additions & 0 deletions openraft/src/storage/v2/raft_log_storage_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use anyerror::AnyError;
use macros::add_async_trait;

use crate::storage::LogFlushed;
use crate::storage::RaftLogStorage;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StorageIOError;

/// Extension trait for RaftLogStorage to provide utility methods.
///
/// All methods in this trait are provided with default implementation.
#[add_async_trait]
pub trait RaftLogStorageExt<C>: RaftLogStorage<C>
where C: RaftTypeConfig
{
/// Blocking mode append log entries to the storage.
///
/// It blocks until the callback is called by the underlying storage implementation.
async fn blocking_append<I>(&mut self, entries: I) -> Result<(), StorageError<C::NodeId>>
where
I: IntoIterator<Item = C::Entry> + OptionalSend,
I::IntoIter: OptionalSend,
{
let (tx, rx) = AsyncRuntimeOf::<C>::oneshot();

let callback = LogFlushed::new(None, tx);
self.append(entries, callback).await?;
rx.await
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;

Ok(())
}
}

impl<C, T> RaftLogStorageExt<C> for T
where
T: RaftLogStorage<C>,
C: RaftTypeConfig,
{
}
26 changes: 0 additions & 26 deletions openraft/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,13 @@ mod suite;

use std::collections::BTreeSet;

use anyerror::AnyError;
pub use store_builder::StoreBuilder;
pub use suite::Suite;

use crate::entry::RaftEntry;
use crate::log_id::RaftLogId;
use crate::storage::LogFlushed;
use crate::storage::RaftLogStorage;
use crate::AsyncRuntime;
use crate::CommittedLeaderId;
use crate::LogId;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StorageIOError;

/// Builds a log id, for testing purposes.
pub fn log_id<NID: crate::NodeId>(term: u64, node_id: NID, index: u64) -> LogId<NID> {
Expand All @@ -43,22 +36,3 @@ pub fn membership_ent<C: RaftTypeConfig>(
crate::Membership::new(config, None),
)
}

/// Append to log and wait for the log to be flushed.
pub async fn blocking_append<C: RaftTypeConfig, LS: RaftLogStorage<C>, I>(
log_store: &mut LS,
entries: I,
) -> Result<(), StorageError<C::NodeId>>
where
I: IntoIterator<Item = C::Entry>,
{
let entries = entries.into_iter().collect::<Vec<_>>();
let last_log_id = entries.last().map(|e| *e.get_log_id()).unwrap();

let (tx, rx) = <C::AsyncRuntime as AsyncRuntime>::oneshot();
let cb = LogFlushed::new(Some(last_log_id), tx);
log_store.append(entries, cb).await?;
rx.await.unwrap().map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;

Ok(())
}
7 changes: 3 additions & 4 deletions tests/tests/append_entries/t11_append_inconsistent_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Result;
use maplit::btreeset;
use openraft::storage::RaftLogReaderExt;
use openraft::storage::RaftLogStorage;
use openraft::testing;
use openraft::storage::RaftLogStorageExt;
use openraft::testing::blank_ent;
use openraft::Config;
use openraft::ServerState;
Expand Down Expand Up @@ -57,9 +57,8 @@ async fn append_inconsistent_log() -> Result<()> {
r2.shutdown().await?;

for i in log_index + 1..=100 {
testing::blocking_append(&mut sto0, [blank_ent(2, 0, i)]).await?;

testing::blocking_append(&mut sto2, [blank_ent(3, 0, i)]).await?;
sto0.blocking_append([blank_ent(2, 0, i)]).await?;
sto2.blocking_append([blank_ent(3, 0, i)]).await?;
}

sto0.save_vote(&Vote::new(2, 0)).await?;
Expand Down
6 changes: 3 additions & 3 deletions tests/tests/elect/t10_elect_compare_last_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use anyhow::Result;
use maplit::btreeset;
use openraft::storage::RaftLogStorage;
use openraft::testing;
use openraft::storage::RaftLogStorageExt;
use openraft::testing::blank_ent;
use openraft::testing::membership_ent;
use openraft::Config;
Expand Down Expand Up @@ -37,7 +37,7 @@ async fn elect_compare_last_log() -> Result<()> {
{
sto0.save_vote(&Vote::new(10, 0)).await?;

testing::blocking_append(&mut sto0, [
sto0.blocking_append([
//
blank_ent(0, 0, 0),
membership_ent(2, 0, 1, vec![btreeset! {0,1}]),
Expand All @@ -49,7 +49,7 @@ async fn elect_compare_last_log() -> Result<()> {
{
sto1.save_vote(&Vote::new(10, 0)).await?;

testing::blocking_append(&mut sto1, [
sto1.blocking_append([
blank_ent(0, 0, 0),
membership_ent(1, 0, 1, vec![btreeset! {0,1}]),
blank_ent(1, 0, 2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use anyhow::Result;
use maplit::btreeset;
use openraft::testing;
use openraft::storage::RaftLogStorageExt;
use openraft::testing::log_id;
use openraft::Config;
use openraft::Entry;
Expand Down Expand Up @@ -39,7 +39,7 @@ async fn new_leader_auto_commit_uniform_config() -> Result<()> {
router.remove_node(0);

{
testing::blocking_append(&mut sto, [Entry {
sto.blocking_append([Entry {
log_id: log_id(1, 0, log_index + 1),
payload: EntryPayload::Membership(Membership::new(
vec![btreeset! {0}, btreeset! {0,1,2}],
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/snapshot_building/t10_build_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
use openraft::storage::RaftLogReaderExt;
use openraft::testing;
use openraft::storage::RaftLogStorageExt;
use openraft::testing::blank_ent;
use openraft::CommittedLeaderId;
use openraft::Config;
Expand Down Expand Up @@ -80,7 +80,7 @@ async fn build_snapshot() -> Result<()> {

// Add a new node and assert that it received the same snapshot.
let (mut sto1, sm1) = router.new_store();
testing::blocking_append(&mut sto1, [blank_ent(0, 0, 0), Entry {
sto1.blocking_append([blank_ent(0, 0, 0), Entry {
log_id: LogId::new(CommittedLeaderId::new(1, 0), 1),
payload: EntryPayload::Membership(Membership::new(vec![btreeset! {0}], None)),
}])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::InstallSnapshotRequest;
use openraft::storage::RaftLogStorage;
use openraft::storage::RaftLogStorageExt;
use openraft::storage::RaftStateMachine;
use openraft::testing;
use openraft::testing::blank_ent;
use openraft::testing::log_id;
use openraft::testing::membership_ent;
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> {

// When the node starts, it will become candidate and increment its vote to (5,0)
sto0.save_vote(&Vote::new(4, 0)).await?;
testing::blocking_append(&mut sto0, [
sto0.blocking_append([
// manually insert the initializing log
membership_ent(0, 0, 0, vec![btreeset! {0}]),
])
Expand Down

0 comments on commit 884f0da

Please sign in to comment.