Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/auto compactor #401

Merged
merged 9 commits into from
Aug 4, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion curp-external-api/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::error::Error;

use async_trait::async_trait;
use engine::Snapshot as EngineSnapshot;
use std::error::Error;

/// The snapshot allocation is handled by the upper-level application
#[allow(clippy::module_name_repetitions)] // it's re-exported in lib
Expand Down
3 changes: 1 addition & 2 deletions curp/src/log_entry.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::sync::Arc;

use serde::{Deserialize, Serialize};

use curp_external_api::LogIndex;
use serde::{Deserialize, Serialize};

/// Log entry
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
5 changes: 2 additions & 3 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use serde::{de::DeserializeOwned, Serialize};

pub use self::proto::protocol_server::ProtocolServer;
pub(crate) use self::proto::{
fetch_read_state_response::ReadState,
propose_response::ExeResult,
Expand All @@ -13,8 +12,8 @@ pub(crate) use self::proto::{
WaitSyncedRequest, WaitSyncedResponse,
};
pub use self::proto::{
propose_response, protocol_client, FetchLeaderRequest, FetchLeaderResponse, ProposeRequest,
ProposeResponse,
propose_response, protocol_client, protocol_server::ProtocolServer, FetchLeaderRequest,
FetchLeaderResponse, ProposeRequest, ProposeResponse,
};
use crate::{
cmd::{Command, ProposeId},
Expand Down
10 changes: 5 additions & 5 deletions curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,16 @@ pub(super) fn start_cmd_workers<
mod tests {
use std::time::Duration;

use tokio::{sync::mpsc, time::Instant};
use tracing_test::traced_test;

use super::*;
use crate::log_entry::LogEntry;
use curp_test_utils::{
mock_role_change, sleep_millis, sleep_secs,
test_cmd::{TestCE, TestCommand},
};
use test_macros::abort_on_panic;
use tokio::{sync::mpsc, time::Instant};
use tracing_test::traced_test;

use super::*;
use crate::log_entry::LogEntry;

// This should happen in fast path in most cases
#[traced_test]
Expand Down
9 changes: 1 addition & 8 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::{
time::MissedTickBehavior,
};
use tracing::{debug, error, info, warn};
use utils::config::{ClientTimeout, CurpConfig};
use utils::config::CurpConfig;

use super::{
cmd_board::{CmdBoardRef, CommandBoard},
Expand All @@ -24,7 +24,6 @@ use super::{
storage::{StorageApi, StorageError},
};
use crate::{
client::Client,
cmd::{Command, CommandExecutor},
error::RpcError,
log_entry::LogEntry,
Expand Down Expand Up @@ -274,12 +273,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {

/// Spawned tasks
impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
/// get curp inner client from `CurpNode`
#[inline]
pub(crate) async fn inner_client(&self, client_timeout: ClientTimeout) -> Client<C> {
self.curp.inner_client(client_timeout).await
}

/// Tick periodically
async fn election_task(
curp: Arc<RawCurp<C, RC>>,
Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async fn gc_cmd_board<C: Command + 'static>(cmd_board: CmdBoardRef<C>, interval:
mod tests {
use std::{sync::Arc, time::Duration};

use curp_test_utils::{sleep_secs, test_cmd::TestCommand};
use parking_lot::{Mutex, RwLock};
use test_macros::abort_on_panic;

Expand All @@ -84,7 +85,6 @@ mod tests {
spec_pool::{SpecPoolRef, SpeculativePool},
},
};
use curp_test_utils::{sleep_secs, test_cmd::TestCommand};

#[tokio::test]
#[abort_on_panic]
Expand Down
12 changes: 1 addition & 11 deletions curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@ use tokio_stream::wrappers::TcpListenerStream;
#[cfg(not(madsim))]
use tracing::info;
use tracing::instrument;
use utils::{
config::{ClientTimeout, CurpConfig},
tracing::Extract,
};
use utils::{config::CurpConfig, tracing::Extract};

use self::curp_node::{CurpError, CurpNode};
use crate::{
client::Client,
cmd::{Command, CommandExecutor},
error::ServerError,
members::ClusterMember,
Expand Down Expand Up @@ -303,12 +299,6 @@ impl<C: Command + 'static, RC: RoleChange + 'static> Rpc<C, RC> {
pub fn leader_rx(&self) -> broadcast::Receiver<Option<ServerId>> {
self.inner.leader_rx()
}

/// Get an inner client
#[inline]
pub async fn inner_client(&self, client_timeout: ClientTimeout) -> Client<C> {
self.inner.inner_client(client_timeout).await
}
}

impl From<CurpError> for tonic::Status {
Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/raw_curp/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,10 @@ impl<C: 'static + Command> Log<C> {
mod tests {
use std::{iter::repeat, ops::Index, sync::Arc};

use curp_test_utils::test_cmd::TestCommand;
use utils::config::{default_batch_max_size, default_log_entries_cap};

use super::*;
use curp_test_utils::test_cmd::TestCommand;

// impl index for test is handy
impl<C: 'static + Command> Index<usize> for Log<C> {
Expand Down
13 changes: 1 addition & 12 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::{
log::{log_enabled, Level},
};
use utils::{
config::{ClientTimeout, CurpConfig},
config::CurpConfig,
parking_lot_lock::{MutexMap, RwLockMap},
};

Expand All @@ -39,7 +39,6 @@ use self::{
};
use super::cmd_worker::CEEventTxApi;
use crate::{
client::Client,
cmd::{Command, ProposeId},
error::ProposeError,
log_entry::LogEntry,
Expand Down Expand Up @@ -627,16 +626,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
raw_curp
}

/// get curp inner client
pub(crate) async fn inner_client(&self, timeout: ClientTimeout) -> Client<C> {
Client::<C>::new(
Some(self.id().clone()),
self.ctx.cluster_info.all_members(),
timeout,
)
.await
}

/// Create a new `RawCurp`
/// `is_leader` will only take effect when all servers start from a fresh state
#[allow(clippy::too_many_arguments)] // only called once
Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Instant;

use curp_test_utils::{mock_role_change, test_cmd::TestCommand};
use test_macros::abort_on_panic;
use tokio::{sync::oneshot, time::sleep};
use tracing_test::traced_test;
Expand All @@ -18,7 +19,6 @@ use crate::{
},
LogIndex,
};
use curp_test_utils::{mock_role_change, test_cmd::TestCommand};

// Hooks for tests
impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
Expand Down
5 changes: 2 additions & 3 deletions curp/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#[allow(clippy::module_name_repetitions)] // it's re-exported in lib
pub use curp_external_api::snapshot::SnapshotAllocator;

use std::fmt::Debug;

#[allow(clippy::module_name_repetitions)] // it's re-exported in lib
pub use curp_external_api::snapshot::SnapshotAllocator;
use engine::Snapshot as EngineSnapshot;

/// Snapshot
Expand Down
34 changes: 34 additions & 0 deletions scripts/validation_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@ run_with_expect() {
fi
}

# run a command with expect output, based on key word match
# args:
# $1: command to run
# $2: expect output
run_with_match() {
cmd="${1}"
res=$(eval ${cmd} 2>&1)
expect=$(echo -e ${2})
if echo "${res}" | grep -q "${expect}"; then
echo "command run success"
else
echo "command run failed"
echo "command: ${cmd}"
echo "expect: ${expect}"
echo "result: ${res}"
exit 1
fi
}

# validate kv requests
kv_validation() {
echo "kv validation test running..."
Expand Down Expand Up @@ -189,6 +208,21 @@ maintenance_validation() {
echo "maintenance validation test passed"
}

# validate compact requests
compact_validation() {
echo "compact validation test running..."
for value in "value1" "value2" "value3" "value4" "value5" "value6";
do
run_with_expect "${ETCDCTL} put key ${value}" "OK"
done
run_with_expect "${ETCDCTL} get --rev=4 key" "key\nvalue3"
run_with_expect "${ETCDCTL} compact 5" "compacted revision 5"
run_with_match "${ETCDCTL} get --rev=4 key" "required revision 4 has been compacted, compacted revision is 5"
run_with_match "${ETCDCTL} watch --rev=4 key " "watch was canceled (etcdserver: mvcc: required revision has been compacted)"
echo "compact validation test pass..."
}

compact_validation
kv_validation
watch_validation
lease_validation
Expand Down
3 changes: 1 addition & 2 deletions simulation/tests/it/curp/server_election.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use curp_test_utils::{init_logger, sleep_secs, test_cmd::TestCommand};
use utils::config::ClientTimeout;

use simulation::curp_group::CurpGroup;
use utils::config::ClientTimeout;

/// Wait some time for the election to finish, and get the leader to ensure that the election is
/// completed.
Expand Down
3 changes: 1 addition & 2 deletions simulation/tests/it/curp/server_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use std::sync::Arc;
use curp_test_utils::{init_logger, sleep_secs, test_cmd::TestCommand, TEST_TABLE};
use engine::StorageEngine;
use itertools::Itertools;
use simulation::curp_group::{CurpGroup, ProposeRequest};
use tracing::debug;
use utils::config::ClientTimeout;

use simulation::curp_group::{CurpGroup, ProposeRequest};

#[madsim::test]
async fn leader_crash_and_recovery() {
init_logger();
Expand Down
1 change: 1 addition & 0 deletions utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ toml = "0.5"
thiserror = "1.0.31"
tracing-appender = "0.2"
derive_builder = "0.12.0"
clippy-utilities = "0.2.0"

[dev-dependencies]
opentelemetry-jaeger = "0.17.0"
Expand Down
Loading