From 277b5ec1021c04fded75df5b3d740e3f21e2d787 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 21 Aug 2023 19:28:45 +0800 Subject: [PATCH 1/3] refactor: move after sync result to protobuf Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- .github/scripts/install_deps.sh | 1 + curp-external-api/src/cmd.rs | 2 +- curp-test-utils/src/test_cmd.rs | 35 +++++- curp/build.rs | 6 +- curp/proto/command.proto | 51 ++++++++ curp/proto/error.proto | 29 +++-- curp/proto/message.proto | 45 +------ curp/src/client.rs | 10 +- curp/src/error.rs | 65 ++++++++-- curp/src/lib.rs | 3 +- curp/src/rpc/connect.rs | 8 +- curp/src/rpc/mod.rs | 133 ++++++++++++-------- curp/src/server/curp_node.rs | 6 +- curp/src/server/gc.rs | 6 +- curp/tests/common/curp_group.rs | 9 +- curp/tests/server.rs | 4 +- simulation/tests/it/curp/server_recovery.rs | 2 +- xline/src/server/command.rs | 48 +++++-- xline/src/storage/execute_error.rs | 26 ++-- xlineapi/proto/command.proto | 79 ++++++------ xlineapi/proto/error.proto | 48 +++---- xlineapi/src/lib.rs | 10 +- 22 files changed, 405 insertions(+), 221 deletions(-) create mode 100644 curp/proto/command.proto diff --git a/.github/scripts/install_deps.sh b/.github/scripts/install_deps.sh index d4aee8da5..3932261ff 100644 --- a/.github/scripts/install_deps.sh +++ b/.github/scripts/install_deps.sh @@ -5,3 +5,4 @@ apt-get install -y cmake g++ expect wget https://github.com/protocolbuffers/protobuf/releases/download/v21.10/protoc-21.10-linux-x86_64.zip unzip protoc-21.10-linux-x86_64.zip -d .local mv "$(pwd)/.local/bin/protoc" /bin/ +mv "$(pwd)/.local/include/google" /usr/include/ diff --git a/curp-external-api/src/cmd.rs b/curp-external-api/src/cmd.rs index eb30b10a5..55469b45d 100644 --- a/curp-external-api/src/cmd.rs +++ b/curp-external-api/src/cmd.rs @@ -34,7 +34,7 @@ pub trait Command: type ER: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbSerialize; /// After_sync result - type ASR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned; + type ASR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbSerialize; /// Get keys of the command fn keys(&self) -> &[Self::K]; diff --git a/curp-test-utils/src/test_cmd.rs b/curp-test-utils/src/test_cmd.rs index c70353bd8..8e239f341 100644 --- a/curp-test-utils/src/test_cmd.rs +++ b/curp-test-utils/src/test_cmd.rs @@ -154,6 +154,37 @@ impl TestCommand { } } +/// LogIndex used in Command::ASR +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub struct LogIndexResult(LogIndex); + +impl From for LogIndexResult { + fn from(value: LogIndex) -> Self { + Self(value) + } +} + +impl From for LogIndex { + fn from(value: LogIndexResult) -> Self { + value.0 + } +} + +// The `TestCommandResult` is only for internal use, so we donnot have to serialize it to protobuf format +impl PbSerialize for LogIndexResult { + fn encode(&self) -> Vec { + bincode::serialize(self).unwrap_or_else(|_| { + unreachable!("test cmd result should always be successfully serialized") + }) + } + + fn decode(buf: &[u8]) -> Result { + Ok(bincode::deserialize(buf).unwrap_or_else(|_| { + unreachable!("test cmd result should always be successfully serialized") + })) + } +} + impl Command for TestCommand { type Error = ExecuteError; @@ -163,7 +194,7 @@ impl Command for TestCommand { type ER = TestCommandResult; - type ASR = LogIndex; + type ASR = LogIndexResult; fn keys(&self) -> &[Self::K] { &self.keys @@ -316,7 +347,7 @@ impl CommandExecutor for TestCE { cmd.cmd_type, cmd.id() ); - Ok(index) + Ok(index.into()) } fn last_applied(&self) -> Result { diff --git a/curp/build.rs b/curp/build.rs index 4c595e8ab..1b3db3350 100644 --- a/curp/build.rs +++ b/curp/build.rs @@ -4,7 +4,11 @@ fn main() { tonic_build::configure() .compile_with_config( prost_config, - &["./proto/message.proto", "./proto/error.proto"], + &[ + "./proto/message.proto", + "./proto/error.proto", + "./proto/command.proto", + ], &["./proto/"], ) .unwrap_or_else(|e| panic!("Failed to compile proto, error is {:?}", e)); diff --git a/curp/proto/command.proto b/curp/proto/command.proto new file mode 100644 index 000000000..26c422f90 --- /dev/null +++ b/curp/proto/command.proto @@ -0,0 +1,51 @@ +// Message types for curp client +syntax = "proto3"; + +package commandpb; + +import "error.proto"; + +message ProposeRequest { + // The serialized command + // The original type is `Command` + bytes command = 1; +} + +message ProposeResponse { + optional uint64 leader_id = 1; + uint64 term = 2; + oneof exe_result { + CmdResult result = 3; + errorpb.ProposeError error = 4; + } +} + +message WaitSyncedRequest { + string propose_id = 1; +} + +message WaitSyncedResponse { + message Success { + // The serialized command after sync result + // The original type is Command::ASR + bytes after_sync_result = 1; + // The serialized command execute result + // The original type is Command::ER + bytes exe_result = 2; + } + oneof sync_result { + Success success = 1; + errorpb.CommandSyncError error = 2; + } +} + +message CmdResult { + oneof result { + // The serialized command execute result + // The original type is Command::ER + bytes er = 1; + // The serialized command error + // The original type is Command::Error + bytes error = 2; + } +} diff --git a/curp/proto/error.proto b/curp/proto/error.proto index 01f7be89e..e3c45663f 100644 --- a/curp/proto/error.proto +++ b/curp/proto/error.proto @@ -1,23 +1,29 @@ +// Message error types for curp client syntax = "proto3"; -package errorpb; +import "google/protobuf/empty.proto"; -// empty type -message Empty { -} +package errorpb; message ProposeError { oneof propose_error { - Empty key_conflict = 1; - Empty duplicated = 2; + google.protobuf.Empty key_conflict = 1; + google.protobuf.Empty duplicated = 2; SyncError sync_error = 3; string encode_error = 4; } } -message RedirectData { - optional uint64 server_id = 1; - uint64 term = 2; +message CommandSyncError { + oneof command_sync_error { + SyncError sync = 1; + // The serialized command error + // The original type is Command::Error + bytes execute = 2; + // The serialized command error + // The original type is Command::Error + bytes after_sync = 3; + } } message SyncError { @@ -26,3 +32,8 @@ message SyncError { string other = 2; } } + +message RedirectData { + optional uint64 server_id = 1; + uint64 term = 2; +} diff --git a/curp/proto/message.proto b/curp/proto/message.proto index 4f3533aa3..a907cee0e 100644 --- a/curp/proto/message.proto +++ b/curp/proto/message.proto @@ -2,30 +2,7 @@ syntax = "proto3"; package messagepb; -// Propose command from client to servers -message ProposeRequest { - // The serialized command - // Original type is Command trait - bytes command = 1; -} - -// The original type is Result -message CmdResult { - oneof result { - bytes er = 1; - bytes error = 2; - } -} - -message ProposeResponse { - optional uint64 leader_id = 1; - uint64 term = 2; - oneof exe_result { - CmdResult result = 3; - // The original type is ProposeError - bytes error = 4; - } -} +import "command.proto"; message FetchLeaderRequest { } @@ -44,21 +21,6 @@ message FetchClusterResponse { uint64 term = 3; } -message WaitSyncedRequest { - bytes id = 1; -} - -message WaitSyncedResponse { - message Success { - bytes after_sync_result = 1; - bytes exe_result = 2; - } - oneof sync_result { - Success success = 1; - bytes error = 2; - } -} - message AppendEntriesRequest { uint64 term = 1; uint64 leader_id = 2; @@ -117,8 +79,9 @@ message FetchReadStateResponse { } service Protocol { - rpc Propose(ProposeRequest) returns (ProposeResponse); - rpc WaitSynced(WaitSyncedRequest) returns (WaitSyncedResponse); + rpc Propose(commandpb.ProposeRequest) returns (commandpb.ProposeResponse); + rpc WaitSynced(commandpb.WaitSyncedRequest) + returns (commandpb.WaitSyncedResponse); rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse); rpc Vote(VoteRequest) returns (VoteResponse); rpc InstallSnapshot(stream InstallSnapshotRequest) diff --git a/curp/src/client.rs b/curp/src/client.rs index cde73cfdd..8002e211d 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -3,7 +3,7 @@ use std::{ time::Duration, }; -use curp_external_api::cmd::PbSerialize; +use curp_external_api::cmd::PbSerializeError; use dashmap::DashMap; use event_listener::Event; use futures::{pin_mut, stream::FuturesUnordered, StreamExt}; @@ -306,7 +306,7 @@ where .get_connect(leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) .wait_synced( - WaitSyncedRequest::new(cmd.id()).map_err(Into::::into)?, + WaitSyncedRequest::new(cmd.id()), *self.timeout.wait_synced_timeout(), ) .await @@ -380,7 +380,11 @@ where Ok(resp) => { let resp = resp.into_inner(); if let Some(rpc::ExeResult::Error(ref e)) = resp.exe_result { - let err = ProposeError::decode(e)?; + let err: ProposeError = e + .clone() + .propose_error + .ok_or(PbSerializeError::EmptyField)? + .try_into()?; if matches!(err, ProposeError::Duplicated) { return Ok(()); } diff --git a/curp/src/error.rs b/curp/src/error.rs index 4eea3e4eb..005c5e5ae 100644 --- a/curp/src/error.rs +++ b/curp/src/error.rs @@ -7,7 +7,10 @@ use thiserror::Error; use crate::{ members::ServerId, - rpc::{Empty, PbProposeError, PbProposeErrorOuter, PbSyncError, RedirectData}, + rpc::{ + PbCommandSyncError, PbCommandSyncErrorOuter, PbProposeError, PbProposeErrorOuter, + PbSyncError, PbSyncErrorOuter, RedirectData, + }, }; /// Error type of client builder @@ -103,14 +106,12 @@ impl From for PbProposeError { #[inline] fn from(err: ProposeError) -> Self { match err { - ProposeError::KeyConflict => PbProposeError::KeyConflict(Empty {}), - ProposeError::Duplicated => PbProposeError::Duplicated(Empty {}), - ProposeError::SyncedError(e) => { - PbProposeError::SyncError(crate::rpc::PbSyncErrorOuter { - sync_error: Some(e.into()), - }) - } - ProposeError::EncodeError(e) => PbProposeError::EncodeError(e), + ProposeError::KeyConflict => PbProposeError::KeyConflict(()), + ProposeError::Duplicated => PbProposeError::Duplicated(()), + ProposeError::SyncedError(e) => PbProposeError::SyncError(PbSyncErrorOuter { + sync_error: Some(e.into()), + }), + ProposeError::EncodeError(s) => PbProposeError::EncodeError(s), } } } @@ -228,6 +229,52 @@ pub(crate) enum CommandSyncError { AfterSync(C::Error), } +impl From> for PbCommandSyncError { + fn from(err: CommandSyncError) -> Self { + match err { + CommandSyncError::Sync(e) => PbCommandSyncError::Sync(PbSyncErrorOuter { + sync_error: Some(e.into()), + }), + CommandSyncError::Execute(e) => PbCommandSyncError::Execute(e.encode()), + CommandSyncError::AfterSync(e) => PbCommandSyncError::AfterSync(e.encode()), + } + } +} + +impl TryFrom for CommandSyncError { + type Error = PbSerializeError; + + fn try_from(err: PbCommandSyncError) -> Result { + Ok(match err { + PbCommandSyncError::Sync(e) => { + CommandSyncError::Sync(e.sync_error.ok_or(PbSerializeError::EmptyField)?.into()) + } + PbCommandSyncError::Execute(e) => { + CommandSyncError::Execute(::Error::decode(&e)?) + } + PbCommandSyncError::AfterSync(e) => { + CommandSyncError::AfterSync(::Error::decode(&e)?) + } + }) + } +} + +impl PbSerialize for CommandSyncError { + fn encode(&self) -> Vec { + PbCommandSyncErrorOuter { + command_sync_error: Some(self.clone().into()), + } + .encode_to_vec() + } + + fn decode(buf: &[u8]) -> Result { + PbCommandSyncErrorOuter::decode(buf)? + .command_sync_error + .ok_or(PbSerializeError::EmptyField)? + .try_into() + } +} + impl From for CommandSyncError { fn from(err: SyncError) -> Self { Self::Sync(err) diff --git a/curp/src/lib.rs b/curp/src/lib.rs index ba9f465fe..5d45f3169 100644 --- a/curp/src/lib.rs +++ b/curp/src/lib.rs @@ -160,8 +160,7 @@ pub use curp_external_api::LogIndex; pub use rpc::ProtocolServer; /// Expose for madsim simulation tests. pub use rpc::{ - propose_response, protocol_client, FetchLeaderRequest, FetchLeaderResponse, ProposeRequest, - ProposeResponse, + protocol_client, FetchLeaderRequest, FetchLeaderResponse, ProposeRequest, ProposeResponse, }; pub use snapshot::SnapshotAllocator; diff --git a/curp/src/rpc/connect.rs b/curp/src/rpc/connect.rs index f13e9f71b..5a087ed50 100644 --- a/curp/src/rpc/connect.rs +++ b/curp/src/rpc/connect.rs @@ -16,10 +16,10 @@ use crate::{ error::RpcError, members::ServerId, rpc::{ - proto::protocol_client::ProtocolClient, AppendEntriesRequest, AppendEntriesResponse, - FetchLeaderRequest, FetchLeaderResponse, FetchReadStateRequest, FetchReadStateResponse, - InstallSnapshotRequest, InstallSnapshotResponse, ProposeRequest, ProposeResponse, - VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, + proto::messagepb::protocol_client::ProtocolClient, AppendEntriesRequest, + AppendEntriesResponse, FetchLeaderRequest, FetchLeaderResponse, FetchReadStateRequest, + FetchReadStateResponse, InstallSnapshotRequest, InstallSnapshotResponse, ProposeRequest, + ProposeResponse, VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, }, snapshot::Snapshot, }; diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index 0d42cd38b..cf9a4c375 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -3,25 +3,6 @@ use std::{collections::HashMap, sync::Arc}; use curp_external_api::cmd::{PbSerialize, PbSerializeError}; use serde::{de::DeserializeOwned, Serialize}; -use self::proto::{cmd_result::Result as CmdResultInner, CmdResult}; -pub(crate) use self::proto::{ - fetch_read_state_response::ReadState, - propose_error::ProposeError as PbProposeError, - propose_response::ExeResult, - protocol_server::Protocol, - sync_error::SyncError as PbSyncError, - wait_synced_response::{Success, SyncResult as SyncResultRaw}, - AppendEntriesRequest, AppendEntriesResponse, Empty, FetchClusterRequest, FetchClusterResponse, - FetchReadStateRequest, FetchReadStateResponse, IdSet, InstallSnapshotRequest, - InstallSnapshotResponse, ProposeError as PbProposeErrorOuter, RedirectData, - SyncError as PbSyncErrorOuter, VoteRequest, VoteResponse, WaitSyncedRequest, - WaitSyncedResponse, -}; -pub use self::proto::{ - propose_response, protocol_client, protocol_server::ProtocolServer, FetchLeaderRequest, - FetchLeaderResponse, ProposeRequest, ProposeResponse, -}; - use crate::{ cmd::{Command, ProposeId}, error::{CommandSyncError, ProposeError, SyncError}, @@ -30,6 +11,33 @@ use crate::{ LogIndex, }; +use self::proto::commandpb::{cmd_result::Result as CmdResultInner, CmdResult}; +pub(crate) use self::proto::{ + commandpb::{ + propose_response::ExeResult, + wait_synced_response::{Success, SyncResult as SyncResultRaw}, + WaitSyncedRequest, WaitSyncedResponse, + }, + errorpb::{ + command_sync_error::CommandSyncError as PbCommandSyncError, + propose_error::ProposeError as PbProposeError, sync_error::SyncError as PbSyncError, + CommandSyncError as PbCommandSyncErrorOuter, ProposeError as PbProposeErrorOuter, + RedirectData, SyncError as PbSyncErrorOuter, + }, + messagepb::{ + fetch_read_state_response::ReadState, protocol_server::Protocol, AppendEntriesRequest, + AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest, + FetchReadStateResponse, IdSet, InstallSnapshotRequest, InstallSnapshotResponse, + VoteRequest, VoteResponse, + }, +}; +pub use self::proto::{ + commandpb::{ProposeRequest, ProposeResponse}, + messagepb::{ + protocol_client, protocol_server::ProtocolServer, FetchLeaderRequest, FetchLeaderResponse, + }, +}; + /// Rpc connect pub(crate) mod connect; pub(crate) use connect::connect; @@ -50,8 +58,15 @@ pub(crate) use connect::connect; unused_results )] mod proto { - tonic::include_proto!("messagepb"); - tonic::include_proto!("errorpb"); + pub(crate) mod messagepb { + tonic::include_proto!("messagepb"); + } + pub(crate) mod commandpb { + tonic::include_proto!("commandpb"); + } + pub(crate) mod errorpb { + tonic::include_proto!("errorpb"); + } } impl FetchLeaderRequest { @@ -137,11 +152,13 @@ impl ProposeResponse { } /// Create an error propose response - pub(crate) fn new_error(leader_id: Option, term: u64, error: &ProposeError) -> Self { + pub(crate) fn new_error(leader_id: Option, term: u64, error: ProposeError) -> Self { Self { leader_id, term, - exe_result: Some(ExeResult::Error(error.encode())), + exe_result: Some(ExeResult::Error(PbProposeErrorOuter { + propose_error: Some(error.into()), + })), } } @@ -169,7 +186,14 @@ impl ProposeResponse { }; Ok(success(Some(cmd_result))) } - Some(ExeResult::Error(ref buf)) => Ok(failure(ProposeError::decode(buf)?)), + Some(ExeResult::Error(ref err)) => { + let propose_error = err + .clone() + .propose_error + .ok_or(PbSerializeError::EmptyField)? + .try_into()?; + Ok(failure(propose_error)) + } None => Ok(success(None)), } } @@ -177,77 +201,82 @@ impl ProposeResponse { impl WaitSyncedRequest { /// Create a `WaitSynced` request - pub(crate) fn new(id: &ProposeId) -> bincode::Result { - Ok(Self { - id: bincode::serialize(id)?, - }) + pub(crate) fn new(id: &ProposeId) -> Self { + Self { + propose_id: id.clone().into_inner(), + } } /// Get the propose id - pub(crate) fn id(&self) -> bincode::Result { - bincode::deserialize(&self.id) + pub(crate) fn propose_id(&self) -> ProposeId { + ProposeId::new(self.propose_id.clone()) } } impl WaitSyncedResponse { /// Create a success response - pub(crate) fn new_success(asr: &C::ASR, er: &C::ER) -> bincode::Result { - Ok(Self { + pub(crate) fn new_success(asr: &C::ASR, er: &C::ER) -> Self { + Self { sync_result: Some(SyncResultRaw::Success(Success { - after_sync_result: bincode::serialize(&asr)?, - exe_result: bincode::serialize(&er)?, + after_sync_result: asr.encode(), + exe_result: er.encode(), })), - }) + } } /// Create an error response - pub(crate) fn new_error(err: &CommandSyncError) -> bincode::Result { - Ok(Self { - sync_result: Some(SyncResultRaw::Error(bincode::serialize(&err)?)), - }) + pub(crate) fn new_error(err: CommandSyncError) -> Self { + Self { + sync_result: Some(SyncResultRaw::Error(PbCommandSyncErrorOuter { + command_sync_error: Some(err.into()), + })), + } } /// Create a new response from execution result and `after_sync` result pub(crate) fn new_from_result( er: Option>, asr: Option>, - ) -> bincode::Result { + ) -> Self { match (er, asr) { - (None, Some(_)) => { + (None | Some(Err(_)), Some(_)) => { unreachable!("should not call after sync if execution fails") } (None, None) => WaitSyncedResponse::new_error::( - &SyncError::Other("can't get er result".to_owned()).into(), + SyncError::Other("can't get er result".to_owned()).into(), ), // this is highly unlikely to happen, - (Some(Err(_)), Some(_)) => { - unreachable!("should not call after_sync when exe failed") - } (Some(Err(err)), None) => { - WaitSyncedResponse::new_error(&CommandSyncError::::Execute(err)) + WaitSyncedResponse::new_error(CommandSyncError::::Execute(err)) } // The er is ignored as the propose has failed (Some(Ok(_er)), Some(Err(err))) => { - WaitSyncedResponse::new_error(&CommandSyncError::::AfterSync(err)) + WaitSyncedResponse::new_error(CommandSyncError::::AfterSync(err)) } (Some(Ok(er)), Some(Ok(asr))) => WaitSyncedResponse::new_success::(&asr, &er), // The er is ignored as the propose has failed (Some(Ok(_er)), None) => { WaitSyncedResponse::new_error::( - &SyncError::Other("can't get after sync result".to_owned()).into(), + SyncError::Other("can't get after sync result".to_owned()).into(), ) // this is highly unlikely to happen, } } } /// Into deserialized result - pub(crate) fn into(self) -> bincode::Result> { + pub(crate) fn into(self) -> Result, PbSerializeError> { let res = match self.sync_result { None => unreachable!("WaitSyncedResponse should contain valid sync_result"), Some(SyncResultRaw::Success(success)) => SyncResult::Success { - asr: bincode::deserialize(&success.after_sync_result)?, - er: bincode::deserialize(&success.exe_result)?, + asr: ::ASR::decode(&success.after_sync_result)?, + er: ::ER::decode(&success.exe_result)?, }, - Some(SyncResultRaw::Error(err)) => SyncResult::Error(bincode::deserialize(&err)?), + Some(SyncResultRaw::Error(err)) => { + let cmd_sync_err = err + .command_sync_error + .ok_or(PbSerializeError::EmptyField)? + .try_into()?; + SyncResult::Error(cmd_sync_err) + } }; Ok(res) } diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index 05b2623a0..d00a7083e 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -133,7 +133,7 @@ impl CurpNode { ProposeResponse::new_result::(leader_id, term, &er_res) } Ok(false) => ProposeResponse::new_empty(leader_id, term), - Err(err) => ProposeResponse::new_error(leader_id, term, &err), + Err(err) => ProposeResponse::new_error(leader_id, term, err), }; Ok(resp) @@ -186,11 +186,11 @@ impl CurpNode { &self, req: WaitSyncedRequest, ) -> Result { - let id = req.id()?; + let id = req.propose_id(); debug!("{} get wait synced request for cmd({id})", self.curp.id()); let (er, asr) = CommandBoard::wait_for_er_asr(&self.cmd_board, &id).await; - let resp = WaitSyncedResponse::new_from_result::(Some(er), asr)?; + let resp = WaitSyncedResponse::new_from_result::(Some(er), asr); debug!("{} wait synced for cmd({id}) finishes", self.curp.id()); Ok(resp) diff --git a/curp/src/server/gc.rs b/curp/src/server/gc.rs index b1abcf1d4..0183cc5e9 100644 --- a/curp/src/server/gc.rs +++ b/curp/src/server/gc.rs @@ -108,12 +108,12 @@ mod tests { board .write() .asr_buffer - .insert(ProposeId::new("1".to_owned()), Ok(0)); + .insert(ProposeId::new("1".to_owned()), Ok(0.into())); tokio::time::sleep(Duration::from_millis(100)).await; board .write() .asr_buffer - .insert(ProposeId::new("2".to_owned()), Ok(0)); + .insert(ProposeId::new("2".to_owned()), Ok(0.into())); // at 600ms tokio::time::sleep(Duration::from_millis(400)).await; @@ -124,7 +124,7 @@ mod tests { board .write() .asr_buffer - .insert(ProposeId::new("3".to_owned()), Ok(0)); + .insert(ProposeId::new("3".to_owned()), Ok(0.into())); // at 1100ms, the first two kv should be removed tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/curp/tests/common/curp_group.rs b/curp/tests/common/curp_group.rs index f33aaf9f9..5c872bd28 100644 --- a/curp/tests/common/curp_group.rs +++ b/curp/tests/common/curp_group.rs @@ -25,7 +25,14 @@ use utils::config::{ClientTimeout, CurpConfigBuilder, StorageConfig}; pub mod proto { tonic::include_proto!("messagepb"); } -pub use proto::{protocol_client::ProtocolClient, ProposeRequest, ProposeResponse}; +pub mod commandpb { + tonic::include_proto!("commandpb"); +} +pub mod errorpb { + tonic::include_proto!("errorpb"); +} +pub use commandpb::{ProposeRequest, ProposeResponse}; +pub use proto::protocol_client::ProtocolClient; use self::proto::{FetchLeaderRequest, FetchLeaderResponse}; diff --git a/curp/tests/server.rs b/curp/tests/server.rs index 52c0dea9e..ffbec5cd3 100644 --- a/curp/tests/server.rs +++ b/curp/tests/server.rs @@ -13,7 +13,7 @@ use test_macros::abort_on_panic; use utils::config::ClientTimeout; use crate::common::curp_group::{ - proto::propose_response::ExeResult, CurpGroup, ProposeRequest, ProposeResponse, + commandpb::propose_response::ExeResult, CurpGroup, ProposeRequest, ProposeResponse, }; mod common; @@ -73,7 +73,7 @@ async fn synced_propose() { let (er, index) = client.propose(cmd.clone(), false).await.unwrap(); assert_eq!(er, TestCommandResult::new(vec![], vec![])); - assert_eq!(index.unwrap(), 1); // log[0] is a fake one + assert_eq!(index.unwrap(), 1.into()); // log[0] is a fake one for exe_rx in group.exe_rxs() { let (cmd1, er) = exe_rx.recv().await.unwrap(); diff --git a/simulation/tests/it/curp/server_recovery.rs b/simulation/tests/it/curp/server_recovery.rs index 8b09c2b12..548d03932 100644 --- a/simulation/tests/it/curp/server_recovery.rs +++ b/simulation/tests/it/curp/server_recovery.rs @@ -281,7 +281,7 @@ async fn old_leader_will_keep_original_states() { let cmd0 = TestCommand::new_put(vec![0], 0); let (er, index) = client.propose(cmd0, false).await.unwrap(); assert_eq!(er.values, vec![]); - assert_eq!(index.unwrap(), 1); + assert_eq!(index.unwrap(), 1.into()); // 1: disable all others to prevent the cmd1 to be synced let leader1 = group.get_leader().await.0; diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index f7f648d4b..cf4807475 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -16,15 +16,12 @@ use engine::Snapshot; use itertools::Itertools; use prost::Message; use serde::{Deserialize, Serialize}; -use xlineapi::{PbCommand, PbKeyRange}; +use xlineapi::{PbCommand, PbCommandResponse, PbKeyRange, PbSyncResponse}; use super::barriers::{IdBarrier, IndexBarrier}; use crate::{ revision_number::RevisionNumberGenerator, - rpc::{ - Request, RequestBackend, RequestWithToken, RequestWrapper, ResponseWrapper, - ResponseWrapperOuter, - }, + rpc::{Request, RequestBackend, RequestWithToken, RequestWrapper, ResponseWrapper}, storage::{db::WriteOp, storage_api::StorageApi, AuthStore, ExecuteError, KvStore, LeaseStore}, }; @@ -580,7 +577,7 @@ pub struct CommandResponse { impl PbSerialize for CommandResponse { #[inline] fn encode(&self) -> Vec { - ResponseWrapperOuter { + PbCommandResponse { response_wrapper: Some(self.response.clone()), } .encode_to_vec() @@ -588,8 +585,10 @@ impl PbSerialize for CommandResponse { #[inline] fn decode(buf: &[u8]) -> Result { + let pb_cmd_resp = PbCommandResponse::decode(buf)?; + Ok(CommandResponse { - response: ResponseWrapperOuter::decode(buf)? + response: pb_cmd_resp .response_wrapper .ok_or(PbSerializeError::EmptyField)?, }) @@ -634,6 +633,36 @@ impl SyncResponse { } } +impl From for SyncResponse { + #[inline] + fn from(resp: PbSyncResponse) -> Self { + Self { + revision: resp.revision, + } + } +} + +impl From for PbSyncResponse { + #[inline] + fn from(resp: SyncResponse) -> Self { + Self { + revision: resp.revision, + } + } +} + +impl PbSerialize for SyncResponse { + #[inline] + fn encode(&self) -> Vec { + PbSyncResponse::from(*self).encode_to_vec() + } + + #[inline] + fn decode(buf: &[u8]) -> Result { + Ok(PbSyncResponse::decode(buf)?.into()) + } +} + #[async_trait::async_trait] impl CurpCommand for Command { type Error = ExecuteError; @@ -660,7 +689,7 @@ impl PbSerialize for Command { let rpc_cmd = PbCommand { keys: cmd.keys.into_iter().map(Into::into).collect(), request: Some(cmd.request.into()), - id: cmd.id.into_inner(), + propose_id: cmd.id.into_inner(), }; rpc_cmd.encode_to_vec() } @@ -674,7 +703,7 @@ impl PbSerialize for Command { .request .ok_or(PbSerializeError::EmptyField)? .try_into()?, - id: ProposeId::new(rpc_cmd.id), + id: ProposeId::new(rpc_cmd.propose_id), }) } } @@ -732,6 +761,7 @@ pub(super) fn propose_err_to_status(err: CommandProposeError) -> tonic: CommandProposeError::Propose(ProposeError::SyncedError(e)) => { tonic::Status::unknown(e.to_string()) } + CommandProposeError::Propose(ProposeError::EncodeError(e)) => tonic::Status::internal(e), _ => unreachable!("propose err {err:?}"), } } diff --git a/xline/src/storage/execute_error.rs b/xline/src/storage/execute_error.rs index 261b65dd5..adf2ced74 100644 --- a/xline/src/storage/execute_error.rs +++ b/xline/src/storage/execute_error.rs @@ -4,7 +4,7 @@ use curp::cmd::{PbSerialize, PbSerializeError}; use prost::Message; use serde::{Deserialize, Serialize}; use thiserror::Error; -use xlineapi::{Empty, PbExecuteError, PbExecuteErrorOuter, PbRevisions, PbValidationError}; +use xlineapi::{PbExecuteError, PbExecuteErrorOuter, PbRevisions, PbValidationError}; use crate::request_validation::ValidationError; @@ -154,7 +154,7 @@ impl From for PbExecuteError { ExecuteError::InvalidRequest(s) => { PbExecuteError::InvalidRequest(PbValidationError::from(s).into()) } - ExecuteError::KeyNotFound => PbExecuteError::KeyNotFound(Empty {}), + ExecuteError::KeyNotFound => PbExecuteError::KeyNotFound(()), ExecuteError::RevisionTooLarge(required_revision, current_revision) => { PbExecuteError::RevisionTooLarge(PbRevisions { required_revision, @@ -171,24 +171,24 @@ impl From for PbExecuteError { ExecuteError::LeaseExpired(l) => PbExecuteError::LeaseExpired(l), ExecuteError::LeaseTtlTooLarge(l) => PbExecuteError::LeaseTtlTooLarge(l), ExecuteError::LeaseAlreadyExists(l) => PbExecuteError::LeaseAlreadyExists(l), - ExecuteError::AuthNotEnabled => PbExecuteError::AuthNotEnabled(Empty {}), - ExecuteError::AuthFailed => PbExecuteError::AuthFailed(Empty {}), + ExecuteError::AuthNotEnabled => PbExecuteError::AuthNotEnabled(()), + ExecuteError::AuthFailed => PbExecuteError::AuthFailed(()), ExecuteError::UserNotFound(u) => PbExecuteError::UserNotFound(u), ExecuteError::UserAlreadyExists(u) => PbExecuteError::UserAlreadyExists(u), ExecuteError::UserAlreadyHasRole(user, role) => { PbExecuteError::UserAlreadyHasRole(xlineapi::PbUserRole { user, role }) } - ExecuteError::NoPasswordUser => PbExecuteError::NoPasswordUser(Empty {}), + ExecuteError::NoPasswordUser => PbExecuteError::NoPasswordUser(()), ExecuteError::RoleNotFound(r) => PbExecuteError::RoleNotFound(r), ExecuteError::RoleAlreadyExists(r) => PbExecuteError::RoleAlreadyExists(r), ExecuteError::RoleNotGranted(r) => PbExecuteError::RoleNotGranted(r), - ExecuteError::RootRoleNotExist => PbExecuteError::RootRoleNotExist(Empty {}), - ExecuteError::PermissionNotGranted => PbExecuteError::PermissionNotGranted(Empty {}), - ExecuteError::PermissionNotGiven => PbExecuteError::PermissionNotGiven(Empty {}), - ExecuteError::InvalidAuthManagement => PbExecuteError::InvalidAuthManagement(Empty {}), - ExecuteError::InvalidAuthToken => PbExecuteError::InvalidAuthToken(Empty {}), - ExecuteError::TokenManagerNotInit => PbExecuteError::TokenManagerNotInit(Empty {}), - ExecuteError::TokenNotProvided => PbExecuteError::TokenNotProvided(Empty {}), + ExecuteError::RootRoleNotExist => PbExecuteError::RootRoleNotExist(()), + ExecuteError::PermissionNotGranted => PbExecuteError::PermissionNotGranted(()), + ExecuteError::PermissionNotGiven => PbExecuteError::PermissionNotGiven(()), + ExecuteError::InvalidAuthManagement => PbExecuteError::InvalidAuthManagement(()), + ExecuteError::InvalidAuthToken => PbExecuteError::InvalidAuthToken(()), + ExecuteError::TokenManagerNotInit => PbExecuteError::TokenManagerNotInit(()), + ExecuteError::TokenNotProvided => PbExecuteError::TokenNotProvided(()), ExecuteError::TokenOldRevision(required_revision, current_revision) => { PbExecuteError::TokenOldRevision(PbRevisions { required_revision, @@ -196,7 +196,7 @@ impl From for PbExecuteError { }) } ExecuteError::DbError(e) => PbExecuteError::DbError(e), - ExecuteError::PermissionDenied => PbExecuteError::PermissionDenied(Empty {}), + ExecuteError::PermissionDenied => PbExecuteError::PermissionDenied(()), } } } diff --git a/xlineapi/proto/command.proto b/xlineapi/proto/command.proto index 7c35e5f3d..4f1f3f557 100644 --- a/xlineapi/proto/command.proto +++ b/xlineapi/proto/command.proto @@ -3,10 +3,52 @@ package commandpb; import "rpc.proto"; +// Command message Command { repeated KeyRange keys = 1; RequestWithToken request = 2; - string id = 3; + string propose_id = 3; +} + +// Command::ER +message CommandResponse { + oneof response_wrapper { + etcdserverpb.RangeResponse range_response = 1; + etcdserverpb.PutResponse put_response = 2; + etcdserverpb.DeleteRangeResponse delete_range_response = 3; + etcdserverpb.TxnResponse txn_response = 4; + etcdserverpb.CompactionResponse compaction_response = 5; + etcdserverpb.AuthEnableResponse auth_enable_response = 6; + etcdserverpb.AuthDisableResponse auth_disable_response = 7; + etcdserverpb.AuthStatusResponse auth_status_response = 8; + etcdserverpb.AuthRoleAddResponse auth_role_add_response = 9; + etcdserverpb.AuthRoleDeleteResponse auth_role_delete_response = 10; + etcdserverpb.AuthRoleGetResponse auth_role_get_response = 11; + etcdserverpb.AuthRoleGrantPermissionResponse + auth_role_grant_permission_response = 12; + etcdserverpb.AuthRoleListResponse auth_role_list_response = 13; + etcdserverpb.AuthRoleRevokePermissionResponse + auth_role_revoke_permission_response = 14; + etcdserverpb.AuthUserAddResponse auth_user_add_response = 15; + etcdserverpb.AuthUserChangePasswordResponse + auth_user_change_password_response = 16; + etcdserverpb.AuthUserDeleteResponse auth_user_delete_response = 17; + etcdserverpb.AuthUserGetResponse auth_user_get_response = 18; + etcdserverpb.AuthUserGrantRoleResponse auth_user_grant_role_response = + 19; + etcdserverpb.AuthUserListResponse auth_user_list_response = 20; + etcdserverpb.AuthUserRevokeRoleResponse auth_user_revoke_role_response = + 21; + etcdserverpb.AuthenticateResponse authenticate_response = 22; + etcdserverpb.LeaseGrantResponse lease_grant_response = 23; + etcdserverpb.LeaseRevokeResponse lease_revoke_response = 24; + etcdserverpb.LeaseLeasesResponse lease_leases_response = 25; + } +} + +// Command::ASR +message SyncResponse { + int64 revision = 1; } message KeyRange { @@ -48,38 +90,3 @@ message RequestWithToken { etcdserverpb.LeaseLeasesRequest lease_leases_request = 26; } } - -message ResponseWrapper { - oneof response_wrapper { - etcdserverpb.RangeResponse range_response = 1; - etcdserverpb.PutResponse put_response = 2; - etcdserverpb.DeleteRangeResponse delete_range_response = 3; - etcdserverpb.TxnResponse txn_response = 4; - etcdserverpb.CompactionResponse compaction_response = 5; - etcdserverpb.AuthEnableResponse auth_enable_response = 6; - etcdserverpb.AuthDisableResponse auth_disable_response = 7; - etcdserverpb.AuthStatusResponse auth_status_response = 8; - etcdserverpb.AuthRoleAddResponse auth_role_add_response = 9; - etcdserverpb.AuthRoleDeleteResponse auth_role_delete_response = 10; - etcdserverpb.AuthRoleGetResponse auth_role_get_response = 11; - etcdserverpb.AuthRoleGrantPermissionResponse - auth_role_grant_permission_response = 12; - etcdserverpb.AuthRoleListResponse auth_role_list_response = 13; - etcdserverpb.AuthRoleRevokePermissionResponse - auth_role_revoke_permission_response = 14; - etcdserverpb.AuthUserAddResponse auth_user_add_response = 15; - etcdserverpb.AuthUserChangePasswordResponse - auth_user_change_password_response = 16; - etcdserverpb.AuthUserDeleteResponse auth_user_delete_response = 17; - etcdserverpb.AuthUserGetResponse auth_user_get_response = 18; - etcdserverpb.AuthUserGrantRoleResponse auth_user_grant_role_response = - 19; - etcdserverpb.AuthUserListResponse auth_user_list_response = 20; - etcdserverpb.AuthUserRevokeRoleResponse auth_user_revoke_role_response = - 21; - etcdserverpb.AuthenticateResponse authenticate_response = 22; - etcdserverpb.LeaseGrantResponse lease_grant_response = 23; - etcdserverpb.LeaseRevokeResponse lease_revoke_response = 24; - etcdserverpb.LeaseLeasesResponse lease_leases_response = 25; - } -} diff --git a/xlineapi/proto/error.proto b/xlineapi/proto/error.proto index 9bad13661..f32d0fd68 100644 --- a/xlineapi/proto/error.proto +++ b/xlineapi/proto/error.proto @@ -2,48 +2,38 @@ syntax = "proto3"; package errorpb; -message Empty { -} - -message Revisions { - int64 required_revision = 1; - int64 current_revision = 2; -} - -message UserRole { - string user = 1; - string role = 2; -} +import "google/protobuf/empty.proto"; +// Command::Error message ExecuteError { oneof error { VALIDATION_ERROR invalid_request = 1; - Empty key_not_found = 2; + google.protobuf.Empty key_not_found = 2; Revisions revision_too_large = 3; Revisions revision_compacted = 4; int64 lease_not_found = 5; int64 lease_expired = 6; int64 lease_ttl_too_large = 7; int64 lease_already_exists = 8; - Empty auth_not_enabled = 9; - Empty auth_failed = 10; + google.protobuf.Empty auth_not_enabled = 9; + google.protobuf.Empty auth_failed = 10; string user_not_found = 11; string user_already_exists = 12; UserRole user_already_has_role = 13; - Empty no_password_user = 14; + google.protobuf.Empty no_password_user = 14; string role_not_found = 15; string role_already_exists = 16; string role_not_granted = 17; - Empty root_role_not_exist = 18; - Empty permission_not_granted = 19; - Empty permission_not_given = 20; - Empty invalid_auth_management = 21; - Empty invalid_auth_token = 22; - Empty token_manager_not_init = 23; - Empty token_not_provided = 24; + google.protobuf.Empty root_role_not_exist = 18; + google.protobuf.Empty permission_not_granted = 19; + google.protobuf.Empty permission_not_given = 20; + google.protobuf.Empty invalid_auth_management = 21; + google.protobuf.Empty invalid_auth_token = 22; + google.protobuf.Empty token_manager_not_init = 23; + google.protobuf.Empty token_not_provided = 24; Revisions token_old_revision = 25; string db_error = 26; - Empty permission_denied = 27; + google.protobuf.Empty permission_denied = 27; } enum VALIDATION_ERROR { @@ -60,3 +50,13 @@ message ExecuteError { PERMISSION_NOT_GIVEN = 10; } } + +message Revisions { + int64 required_revision = 1; + int64 current_revision = 2; +} + +message UserRole { + string user = 1; + string role = 2; +} diff --git a/xlineapi/src/lib.rs b/xlineapi/src/lib.rs index 29b5ed4b2..b2ba6baf4 100644 --- a/xlineapi/src/lib.rs +++ b/xlineapi/src/lib.rs @@ -193,14 +193,14 @@ use serde::{Deserialize, Serialize}; pub use self::{ authpb::{permission::Type, Permission, Role, User, UserAddOptions}, commandpb::{ - request_with_token::RequestWrapper, response_wrapper::ResponseWrapper, - Command as PbCommand, KeyRange as PbKeyRange, RequestWithToken as PbRequestWithToken, - ResponseWrapper as ResponseWrapperOuter, + command_response::ResponseWrapper, request_with_token::RequestWrapper, + Command as PbCommand, CommandResponse as PbCommandResponse, KeyRange as PbKeyRange, + RequestWithToken as PbRequestWithToken, SyncResponse as PbSyncResponse, }, errorpb::{ execute_error::Error as PbExecuteError, - execute_error::ValidationError as PbValidationError, Empty, - ExecuteError as PbExecuteErrorOuter, Revisions as PbRevisions, UserRole as PbUserRole, + execute_error::ValidationError as PbValidationError, ExecuteError as PbExecuteErrorOuter, + Revisions as PbRevisions, UserRole as PbUserRole, }, etcdserverpb::{ auth_client::AuthClient, From afd14cebd7268fa3a47156cb0e1e2ce7b5274a6a Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 23 Aug 2023 12:57:20 +0800 Subject: [PATCH 2/3] test: add tests for command sync serialization Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- curp-test-utils/src/test_cmd.rs | 2 +- curp/src/error.rs | 17 +++++++++++++++++ xline/src/server/command.rs | 9 +++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/curp-test-utils/src/test_cmd.rs b/curp-test-utils/src/test_cmd.rs index 8e239f341..e8d55967e 100644 --- a/curp-test-utils/src/test_cmd.rs +++ b/curp-test-utils/src/test_cmd.rs @@ -30,7 +30,7 @@ fn next_id() -> u64 { } #[derive(Error, Debug, Clone, Serialize, Deserialize)] -pub struct ExecuteError(String); +pub struct ExecuteError(pub String); impl Display for ExecuteError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/curp/src/error.rs b/curp/src/error.rs index 005c5e5ae..273ee16a7 100644 --- a/curp/src/error.rs +++ b/curp/src/error.rs @@ -283,6 +283,8 @@ impl From for CommandSyncError { #[cfg(test)] mod test { + use curp_test_utils::test_cmd::{ExecuteError, TestCommand}; + use super::*; #[test] @@ -292,4 +294,19 @@ mod test { ::decode(&err.encode()).expect("decode should success"); assert!(matches!(err, _decoded_err)); } + + #[test] + fn cmd_sync_error_serialization_is_ok() { + let err: CommandSyncError = + CommandSyncError::Sync(SyncError::Other("msg".to_owned())); + let _decoded_err = as PbSerialize>::decode(&err.encode()) + .expect("decode should success"); + assert!(matches!(err, _decoded_err)); + + let err1: CommandSyncError = + CommandSyncError::Execute(ExecuteError("msg".to_owned())); + let _decoded_err1 = as PbSerialize>::decode(&err1.encode()) + .expect("decode should success"); + assert!(matches!(err1, _decoded_err1)); + } } diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index cf4807475..b8dcf194c 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -612,6 +612,7 @@ impl CommandResponse { } /// Sync Response +#[cfg_attr(test, derive(PartialEq, Eq))] #[derive(Debug, Copy, Clone, Serialize, Deserialize)] pub struct SyncResponse { /// Revision of this request @@ -1025,4 +1026,12 @@ mod test { .expect("decode should success"); assert_eq!(cmd_resp, decoded_cmd_resp); } + + #[test] + fn sync_resp_serialization_is_ok() { + let sync_resp = SyncResponse::new(1); + let decoded_sync_resp = ::decode(&sync_resp.encode()) + .expect("decode should success"); + assert_eq!(sync_resp, decoded_sync_resp); + } } From b30fa3390b59c61eeee0bbf8cf550df7aa836bbf Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 6 Sep 2023 10:27:51 +0800 Subject: [PATCH 3/3] chore: rename SyncError to WaitSyncError Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> chore: rename `PbSerialize` to `PbCodec` Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- curp-external-api/src/cmd.rs | 12 ++--- curp-test-utils/src/test_cmd.rs | 10 ++--- curp/proto/error.proto | 8 ++-- curp/src/client.rs | 14 ++++-- curp/src/error.rs | 70 ++++++++++++++++-------------- curp/src/rpc/mod.rs | 13 +++--- xline/src/server/command.rs | 16 +++---- xline/src/storage/execute_error.rs | 8 ++-- 8 files changed, 81 insertions(+), 70 deletions(-) diff --git a/curp-external-api/src/cmd.rs b/curp-external-api/src/cmd.rs index 55469b45d..e365672ec 100644 --- a/curp-external-api/src/cmd.rs +++ b/curp-external-api/src/cmd.rs @@ -10,10 +10,10 @@ use crate::LogIndex; /// Command to execute on the server side #[async_trait] pub trait Command: - Sync + Send + DeserializeOwned + Serialize + std::fmt::Debug + Clone + ConflictCheck + PbSerialize + Sync + Send + DeserializeOwned + Serialize + std::fmt::Debug + Clone + ConflictCheck + PbCodec { /// Error type - type Error: Send + Sync + Clone + std::error::Error + Serialize + DeserializeOwned + PbSerialize; + type Error: Send + Sync + Clone + std::error::Error + Serialize + DeserializeOwned + PbCodec; /// K (key) is used to tell confliction /// The key can be a single key or a key range @@ -31,10 +31,10 @@ pub trait Command: type PR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned; /// Execution result - type ER: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbSerialize; + type ER: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbCodec; /// After_sync result - type ASR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbSerialize; + type ASR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbCodec; /// Get keys of the command fn keys(&self) -> &[Self::K]; @@ -158,8 +158,8 @@ where async fn reset(&self, snapshot: Option<(Snapshot, LogIndex)>) -> Result<(), C::Error>; } -/// Serializaion for protobuf -pub trait PbSerialize: Sized { +/// Codec for encoding and decoding data into/from the Protobuf format +pub trait PbCodec: Sized { /// Encode fn encode(&self) -> Vec; /// Decode diff --git a/curp-test-utils/src/test_cmd.rs b/curp-test-utils/src/test_cmd.rs index e8d55967e..07d53f98a 100644 --- a/curp-test-utils/src/test_cmd.rs +++ b/curp-test-utils/src/test_cmd.rs @@ -10,7 +10,7 @@ use std::{ use async_trait::async_trait; use clippy_utilities::NumericCast; use curp_external_api::{ - cmd::{Command, CommandExecutor, ConflictCheck, PbSerialize, ProposeId}, + cmd::{Command, CommandExecutor, ConflictCheck, PbCodec, ProposeId}, LogIndex, }; use engine::{Engine, EngineType, Snapshot, SnapshotApi, StorageEngine, WriteOperation}; @@ -39,7 +39,7 @@ impl Display for ExecuteError { } // The `ExecuteError` is only for internal use, so we do not have to serialize it to protobuf format -impl PbSerialize for ExecuteError { +impl PbCodec for ExecuteError { fn encode(&self) -> Vec { self.0.clone().into_bytes() } @@ -94,7 +94,7 @@ impl TestCommandResult { } // The `TestCommandResult` is only for internal use, so we do not have to serialize it to protobuf format -impl PbSerialize for TestCommandResult { +impl PbCodec for TestCommandResult { fn encode(&self) -> Vec { bincode::serialize(self).unwrap_or_else(|_| { unreachable!("test cmd result should always be successfully serialized") @@ -171,7 +171,7 @@ impl From for LogIndex { } // The `TestCommandResult` is only for internal use, so we donnot have to serialize it to protobuf format -impl PbSerialize for LogIndexResult { +impl PbCodec for LogIndexResult { fn encode(&self) -> Vec { bincode::serialize(self).unwrap_or_else(|_| { unreachable!("test cmd result should always be successfully serialized") @@ -219,7 +219,7 @@ impl ConflictCheck for TestCommand { } // The `TestCommand` is only for internal use, so we donnot have to serialize it to protobuf format -impl PbSerialize for TestCommand { +impl PbCodec for TestCommand { fn encode(&self) -> Vec { bincode::serialize(self) .unwrap_or_else(|_| unreachable!("test cmd should always be successfully serialized")) diff --git a/curp/proto/error.proto b/curp/proto/error.proto index e3c45663f..b4021dcaa 100644 --- a/curp/proto/error.proto +++ b/curp/proto/error.proto @@ -9,14 +9,14 @@ message ProposeError { oneof propose_error { google.protobuf.Empty key_conflict = 1; google.protobuf.Empty duplicated = 2; - SyncError sync_error = 3; + WaitSyncError wait_sync_error = 3; string encode_error = 4; } } message CommandSyncError { oneof command_sync_error { - SyncError sync = 1; + WaitSyncError wait_sync = 1; // The serialized command error // The original type is Command::Error bytes execute = 2; @@ -26,8 +26,8 @@ message CommandSyncError { } } -message SyncError { - oneof sync_error { +message WaitSyncError { + oneof wait_sync_error { RedirectData redirect = 1; string other = 2; } diff --git a/curp/src/client.rs b/curp/src/client.rs index 8002e211d..770560225 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -16,7 +16,8 @@ use utils::{config::ClientTimeout, parking_lot_lock::RwLockMap}; use crate::{ cmd::{Command, ProposeId}, error::{ - ClientBuildError, CommandProposeError, CommandSyncError, ProposeError, RpcError, SyncError, + ClientBuildError, CommandProposeError, CommandSyncError, ProposeError, RpcError, + WaitSyncError, }, members::ServerId, rpc::{ @@ -326,7 +327,10 @@ where debug!("slow round for cmd({}) succeeded", cmd.id()); return Ok((asr, er)); } - SyncResult::Error(CommandSyncError::Sync(SyncError::Redirect(server_id, term))) => { + SyncResult::Error(CommandSyncError::WaitSync(WaitSyncError::Redirect( + server_id, + term, + ))) => { let new_leader = server_id.and_then(|id| { self.state.map_write(|mut state| { (state.term <= term).then(|| { @@ -338,8 +342,10 @@ where }); self.resend_propose(Arc::clone(&cmd), new_leader).await?; // resend the propose to the new leader } - SyncResult::Error(CommandSyncError::Sync(e)) => { - return Err(ProposeError::SyncedError(SyncError::Other(e.to_string())).into()); + SyncResult::Error(CommandSyncError::WaitSync(e)) => { + return Err( + ProposeError::SyncedError(WaitSyncError::Other(e.to_string())).into(), + ); } SyncResult::Error(CommandSyncError::Execute(e)) => { return Err(CommandProposeError::Execute(e)); diff --git a/curp/src/error.rs b/curp/src/error.rs index 273ee16a7..ba44bcb0e 100644 --- a/curp/src/error.rs +++ b/curp/src/error.rs @@ -1,6 +1,6 @@ use std::io; -use curp_external_api::cmd::{Command, PbSerialize, PbSerializeError}; +use curp_external_api::cmd::{Command, PbCodec, PbSerializeError}; use prost::Message; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -9,7 +9,7 @@ use crate::{ members::ServerId, rpc::{ PbCommandSyncError, PbCommandSyncErrorOuter, PbProposeError, PbProposeErrorOuter, - PbSyncError, PbSyncErrorOuter, RedirectData, + PbWaitSyncError, PbWaitSyncErrorOuter, RedirectData, }, }; @@ -80,7 +80,7 @@ pub enum ProposeError { Duplicated, /// Command syncing error #[error("syncing error {0}")] - SyncedError(SyncError), + SyncedError(WaitSyncError), /// Encode error #[error("encode error: {0}")] EncodeError(String), @@ -94,9 +94,11 @@ impl TryFrom for ProposeError { Ok(match err { PbProposeError::KeyConflict(_) => ProposeError::KeyConflict, PbProposeError::Duplicated(_) => ProposeError::Duplicated, - PbProposeError::SyncError(e) => { - ProposeError::SyncedError(e.sync_error.ok_or(PbSerializeError::EmptyField)?.into()) - } + PbProposeError::WaitSyncError(e) => ProposeError::SyncedError( + e.wait_sync_error + .ok_or(PbSerializeError::EmptyField)? + .into(), + ), PbProposeError::EncodeError(s) => ProposeError::EncodeError(s), }) } @@ -108,8 +110,8 @@ impl From for PbProposeError { match err { ProposeError::KeyConflict => PbProposeError::KeyConflict(()), ProposeError::Duplicated => PbProposeError::Duplicated(()), - ProposeError::SyncedError(e) => PbProposeError::SyncError(PbSyncErrorOuter { - sync_error: Some(e.into()), + ProposeError::SyncedError(e) => PbProposeError::WaitSyncError(PbWaitSyncErrorOuter { + wait_sync_error: Some(e.into()), }), ProposeError::EncodeError(s) => PbProposeError::EncodeError(s), } @@ -123,7 +125,7 @@ impl From for ProposeError { } } -impl PbSerialize for ProposeError { +impl PbCodec for ProposeError { #[inline] fn encode(&self) -> Vec { PbProposeErrorOuter { @@ -188,7 +190,7 @@ pub enum CommandProposeError { #[derive(Clone, Error, Serialize, Deserialize, Debug)] #[allow(clippy::module_name_repetitions)] // this-error generate code false-positive #[non_exhaustive] -pub enum SyncError { +pub enum WaitSyncError { /// If client sent a wait synced request to a non-leader #[error("redirect to {0:?}, term {1}")] Redirect(Option, u64), @@ -197,23 +199,23 @@ pub enum SyncError { Other(String), } -impl From for SyncError { +impl From for WaitSyncError { #[inline] - fn from(err: PbSyncError) -> Self { + fn from(err: PbWaitSyncError) -> Self { match err { - PbSyncError::Redirect(data) => SyncError::Redirect(data.server_id, data.term), - PbSyncError::Other(s) => SyncError::Other(s), + PbWaitSyncError::Redirect(data) => WaitSyncError::Redirect(data.server_id, data.term), + PbWaitSyncError::Other(s) => WaitSyncError::Other(s), } } } -impl From for PbSyncError { - fn from(err: SyncError) -> Self { +impl From for PbWaitSyncError { + fn from(err: WaitSyncError) -> Self { match err { - SyncError::Redirect(server_id, term) => { - PbSyncError::Redirect(RedirectData { server_id, term }) + WaitSyncError::Redirect(server_id, term) => { + PbWaitSyncError::Redirect(RedirectData { server_id, term }) } - SyncError::Other(s) => PbSyncError::Other(s), + WaitSyncError::Other(s) => PbWaitSyncError::Other(s), } } } @@ -222,7 +224,7 @@ impl From for PbSyncError { #[derive(Clone, Debug, Serialize, Deserialize)] pub(crate) enum CommandSyncError { /// If wait sync went wrong - Sync(SyncError), + WaitSync(WaitSyncError), /// If the execution of the cmd went wrong Execute(C::Error), /// If after sync of the cmd went wrong @@ -232,8 +234,8 @@ pub(crate) enum CommandSyncError { impl From> for PbCommandSyncError { fn from(err: CommandSyncError) -> Self { match err { - CommandSyncError::Sync(e) => PbCommandSyncError::Sync(PbSyncErrorOuter { - sync_error: Some(e.into()), + CommandSyncError::WaitSync(e) => PbCommandSyncError::WaitSync(PbWaitSyncErrorOuter { + wait_sync_error: Some(e.into()), }), CommandSyncError::Execute(e) => PbCommandSyncError::Execute(e.encode()), CommandSyncError::AfterSync(e) => PbCommandSyncError::AfterSync(e.encode()), @@ -246,9 +248,11 @@ impl TryFrom for CommandSyncError { fn try_from(err: PbCommandSyncError) -> Result { Ok(match err { - PbCommandSyncError::Sync(e) => { - CommandSyncError::Sync(e.sync_error.ok_or(PbSerializeError::EmptyField)?.into()) - } + PbCommandSyncError::WaitSync(e) => CommandSyncError::WaitSync( + e.wait_sync_error + .ok_or(PbSerializeError::EmptyField)? + .into(), + ), PbCommandSyncError::Execute(e) => { CommandSyncError::Execute(::Error::decode(&e)?) } @@ -259,7 +263,7 @@ impl TryFrom for CommandSyncError { } } -impl PbSerialize for CommandSyncError { +impl PbCodec for CommandSyncError { fn encode(&self) -> Vec { PbCommandSyncErrorOuter { command_sync_error: Some(self.clone().into()), @@ -275,9 +279,9 @@ impl PbSerialize for CommandSyncError { } } -impl From for CommandSyncError { - fn from(err: SyncError) -> Self { - Self::Sync(err) +impl From for CommandSyncError { + fn from(err: WaitSyncError) -> Self { + Self::WaitSync(err) } } @@ -291,21 +295,21 @@ mod test { fn propose_error_serialization_is_ok() { let err = ProposeError::Duplicated; let _decoded_err = - ::decode(&err.encode()).expect("decode should success"); + ::decode(&err.encode()).expect("decode should success"); assert!(matches!(err, _decoded_err)); } #[test] fn cmd_sync_error_serialization_is_ok() { let err: CommandSyncError = - CommandSyncError::Sync(SyncError::Other("msg".to_owned())); - let _decoded_err = as PbSerialize>::decode(&err.encode()) + CommandSyncError::WaitSync(WaitSyncError::Other("msg".to_owned())); + let _decoded_err = as PbCodec>::decode(&err.encode()) .expect("decode should success"); assert!(matches!(err, _decoded_err)); let err1: CommandSyncError = CommandSyncError::Execute(ExecuteError("msg".to_owned())); - let _decoded_err1 = as PbSerialize>::decode(&err1.encode()) + let _decoded_err1 = as PbCodec>::decode(&err1.encode()) .expect("decode should success"); assert!(matches!(err1, _decoded_err1)); } diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index cf9a4c375..45c7e74b4 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -1,11 +1,11 @@ use std::{collections::HashMap, sync::Arc}; -use curp_external_api::cmd::{PbSerialize, PbSerializeError}; +use curp_external_api::cmd::{PbCodec, PbSerializeError}; use serde::{de::DeserializeOwned, Serialize}; use crate::{ cmd::{Command, ProposeId}, - error::{CommandSyncError, ProposeError, SyncError}, + error::{CommandSyncError, ProposeError, WaitSyncError}, log_entry::LogEntry, members::ServerId, LogIndex, @@ -20,9 +20,10 @@ pub(crate) use self::proto::{ }, errorpb::{ command_sync_error::CommandSyncError as PbCommandSyncError, - propose_error::ProposeError as PbProposeError, sync_error::SyncError as PbSyncError, + propose_error::ProposeError as PbProposeError, + wait_sync_error::WaitSyncError as PbWaitSyncError, CommandSyncError as PbCommandSyncErrorOuter, ProposeError as PbProposeErrorOuter, - RedirectData, SyncError as PbSyncErrorOuter, + RedirectData, WaitSyncError as PbWaitSyncErrorOuter, }, messagepb::{ fetch_read_state_response::ReadState, protocol_server::Protocol, AppendEntriesRequest, @@ -243,7 +244,7 @@ impl WaitSyncedResponse { unreachable!("should not call after sync if execution fails") } (None, None) => WaitSyncedResponse::new_error::( - SyncError::Other("can't get er result".to_owned()).into(), + WaitSyncError::Other("can't get er result".to_owned()).into(), ), // this is highly unlikely to happen, (Some(Err(err)), None) => { WaitSyncedResponse::new_error(CommandSyncError::::Execute(err)) @@ -256,7 +257,7 @@ impl WaitSyncedResponse { // The er is ignored as the propose has failed (Some(Ok(_er)), None) => { WaitSyncedResponse::new_error::( - SyncError::Other("can't get after sync result".to_owned()).into(), + WaitSyncError::Other("can't get after sync result".to_owned()).into(), ) // this is highly unlikely to happen, } } diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index b8dcf194c..bff8f4e54 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -6,7 +6,7 @@ use std::{ use curp::{ cmd::{ - Command as CurpCommand, CommandExecutor as CurpCommandExecutor, ConflictCheck, PbSerialize, + Command as CurpCommand, CommandExecutor as CurpCommandExecutor, ConflictCheck, PbCodec, PbSerializeError, ProposeId, }, error::{CommandProposeError, ProposeError}, @@ -574,7 +574,7 @@ pub struct CommandResponse { response: ResponseWrapper, } -impl PbSerialize for CommandResponse { +impl PbCodec for CommandResponse { #[inline] fn encode(&self) -> Vec { PbCommandResponse { @@ -652,7 +652,7 @@ impl From for PbSyncResponse { } } -impl PbSerialize for SyncResponse { +impl PbCodec for SyncResponse { #[inline] fn encode(&self) -> Vec { PbSyncResponse::from(*self).encode_to_vec() @@ -683,7 +683,7 @@ impl CurpCommand for Command { } } -impl PbSerialize for Command { +impl PbCodec for Command { #[inline] fn encode(&self) -> Vec { let cmd = self.clone(); @@ -1015,14 +1015,14 @@ mod test { ProposeId::new("id".to_owned()), ); let decoded_cmd = - ::decode(&cmd.encode()).expect("decode should success"); + ::decode(&cmd.encode()).expect("decode should success"); assert_eq!(cmd, decoded_cmd); } #[test] fn command_resp_serialization_is_ok() { let cmd_resp = CommandResponse::new(ResponseWrapper::PutResponse(PutResponse::default())); - let decoded_cmd_resp = ::decode(&cmd_resp.encode()) + let decoded_cmd_resp = ::decode(&cmd_resp.encode()) .expect("decode should success"); assert_eq!(cmd_resp, decoded_cmd_resp); } @@ -1030,8 +1030,8 @@ mod test { #[test] fn sync_resp_serialization_is_ok() { let sync_resp = SyncResponse::new(1); - let decoded_sync_resp = ::decode(&sync_resp.encode()) - .expect("decode should success"); + let decoded_sync_resp = + ::decode(&sync_resp.encode()).expect("decode should success"); assert_eq!(sync_resp, decoded_sync_resp); } } diff --git a/xline/src/storage/execute_error.rs b/xline/src/storage/execute_error.rs index adf2ced74..ec1cc96c6 100644 --- a/xline/src/storage/execute_error.rs +++ b/xline/src/storage/execute_error.rs @@ -1,6 +1,6 @@ #![allow(clippy::integer_arithmetic)] // introduced by `strum_macros::EnumIter` -use curp::cmd::{PbSerialize, PbSerializeError}; +use curp::cmd::{PbCodec, PbSerializeError}; use prost::Message; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -201,7 +201,7 @@ impl From for PbExecuteError { } } -impl PbSerialize for ExecuteError { +impl PbCodec for ExecuteError { #[inline] fn encode(&self) -> Vec { PbExecuteErrorOuter { @@ -326,8 +326,8 @@ mod test { #[test] fn serialization_is_ok() { for err in ExecuteError::iter() { - let _decoded_err = ::decode(&err.encode()) - .expect("decode should success"); + let _decoded_err = + ::decode(&err.encode()).expect("decode should success"); assert!(matches!(err, _decoded_err)); } }