Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: command after sync serialization #422

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/scripts/install_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ apt-get install -y cmake g++ expect
wget https://github.com/protocolbuffers/protobuf/releases/download/v21.10/protoc-21.10-linux-x86_64.zip
unzip protoc-21.10-linux-x86_64.zip -d .local
mv "$(pwd)/.local/bin/protoc" /bin/
mv "$(pwd)/.local/include/google" /usr/include/
12 changes: 6 additions & 6 deletions curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use crate::LogIndex;
/// Command to execute on the server side
#[async_trait]
pub trait Command:
Sync + Send + DeserializeOwned + Serialize + std::fmt::Debug + Clone + ConflictCheck + PbSerialize
Sync + Send + DeserializeOwned + Serialize + std::fmt::Debug + Clone + ConflictCheck + PbCodec
{
/// Error type
type Error: Send + Sync + Clone + std::error::Error + Serialize + DeserializeOwned + PbSerialize;
type Error: Send + Sync + Clone + std::error::Error + Serialize + DeserializeOwned + PbCodec;

/// K (key) is used to tell confliction
/// The key can be a single key or a key range
Expand All @@ -31,10 +31,10 @@ pub trait Command:
type PR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned;

/// Execution result
type ER: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbSerialize;
type ER: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbCodec;

/// After_sync result
type ASR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned;
type ASR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbCodec;

/// Get keys of the command
fn keys(&self) -> &[Self::K];
Expand Down Expand Up @@ -158,8 +158,8 @@ where
async fn reset(&self, snapshot: Option<(Snapshot, LogIndex)>) -> Result<(), C::Error>;
}

/// Serializaion for protobuf
pub trait PbSerialize: Sized {
/// Codec for encoding and decoding data into/from the Protobuf format
pub trait PbCodec: Sized {
/// Encode
fn encode(&self) -> Vec<u8>;
/// Decode
Expand Down
45 changes: 38 additions & 7 deletions curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use async_trait::async_trait;
use clippy_utilities::NumericCast;
use curp_external_api::{
cmd::{Command, CommandExecutor, ConflictCheck, PbSerialize, ProposeId},
cmd::{Command, CommandExecutor, ConflictCheck, PbCodec, ProposeId},
LogIndex,
};
use engine::{Engine, EngineType, Snapshot, SnapshotApi, StorageEngine, WriteOperation};
Expand All @@ -30,7 +30,7 @@ fn next_id() -> u64 {
}

#[derive(Error, Debug, Clone, Serialize, Deserialize)]
pub struct ExecuteError(String);
pub struct ExecuteError(pub String);

impl Display for ExecuteError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -39,7 +39,7 @@ impl Display for ExecuteError {
}

// The `ExecuteError` is only for internal use, so we do not have to serialize it to protobuf format
impl PbSerialize for ExecuteError {
impl PbCodec for ExecuteError {
fn encode(&self) -> Vec<u8> {
self.0.clone().into_bytes()
}
Expand Down Expand Up @@ -94,7 +94,7 @@ impl TestCommandResult {
}

// The `TestCommandResult` is only for internal use, so we do not have to serialize it to protobuf format
impl PbSerialize for TestCommandResult {
impl PbCodec for TestCommandResult {
fn encode(&self) -> Vec<u8> {
bincode::serialize(self).unwrap_or_else(|_| {
unreachable!("test cmd result should always be successfully serialized")
Expand Down Expand Up @@ -154,6 +154,37 @@ impl TestCommand {
}
}

/// LogIndex used in Command::ASR
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct LogIndexResult(LogIndex);
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved

impl From<LogIndex> for LogIndexResult {
fn from(value: LogIndex) -> Self {
Self(value)
}
}

impl From<LogIndexResult> for LogIndex {
fn from(value: LogIndexResult) -> Self {
value.0
}
}

// The `TestCommandResult` is only for internal use, so we donnot have to serialize it to protobuf format
impl PbCodec for LogIndexResult {
fn encode(&self) -> Vec<u8> {
bincode::serialize(self).unwrap_or_else(|_| {
unreachable!("test cmd result should always be successfully serialized")
})
}

fn decode(buf: &[u8]) -> Result<Self, curp_external_api::cmd::PbSerializeError> {
Ok(bincode::deserialize(buf).unwrap_or_else(|_| {
unreachable!("test cmd result should always be successfully serialized")
}))
}
}

impl Command for TestCommand {
type Error = ExecuteError;

Expand All @@ -163,7 +194,7 @@ impl Command for TestCommand {

type ER = TestCommandResult;

type ASR = LogIndex;
type ASR = LogIndexResult;

fn keys(&self) -> &[Self::K] {
&self.keys
Expand All @@ -188,7 +219,7 @@ impl ConflictCheck for TestCommand {
}

// The `TestCommand` is only for internal use, so we donnot have to serialize it to protobuf format
impl PbSerialize for TestCommand {
impl PbCodec for TestCommand {
fn encode(&self) -> Vec<u8> {
bincode::serialize(self)
.unwrap_or_else(|_| unreachable!("test cmd should always be successfully serialized"))
Expand Down Expand Up @@ -316,7 +347,7 @@ impl CommandExecutor<TestCommand> for TestCE {
cmd.cmd_type,
cmd.id()
);
Ok(index)
Ok(index.into())
}

fn last_applied(&self) -> Result<LogIndex, ExecuteError> {
Expand Down
6 changes: 5 additions & 1 deletion curp/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ fn main() {
tonic_build::configure()
.compile_with_config(
prost_config,
&["./proto/message.proto", "./proto/error.proto"],
&[
"./proto/message.proto",
"./proto/error.proto",
"./proto/command.proto",
],
&["./proto/"],
)
.unwrap_or_else(|e| panic!("Failed to compile proto, error is {:?}", e));
Expand Down
51 changes: 51 additions & 0 deletions curp/proto/command.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Message types for curp client
syntax = "proto3";

package commandpb;

import "error.proto";

message ProposeRequest {
// The serialized command
// The original type is `Command`
bytes command = 1;
}

message ProposeResponse {
optional uint64 leader_id = 1;
uint64 term = 2;
oneof exe_result {
CmdResult result = 3;
errorpb.ProposeError error = 4;
}
}

message WaitSyncedRequest {
string propose_id = 1;
}

message WaitSyncedResponse {
message Success {
// The serialized command after sync result
// The original type is Command::ASR
bytes after_sync_result = 1;
// The serialized command execute result
// The original type is Command::ER
bytes exe_result = 2;
}
oneof sync_result {
Success success = 1;
errorpb.CommandSyncError error = 2;
}
}

message CmdResult {
oneof result {
// The serialized command execute result
// The original type is Command::ER
bytes er = 1;
// The serialized command error
// The original type is Command::Error
bytes error = 2;
}
}
35 changes: 23 additions & 12 deletions curp/proto/error.proto
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
// Message error types for curp client
syntax = "proto3";

package errorpb;
import "google/protobuf/empty.proto";

// empty type
message Empty {
}
package errorpb;

message ProposeError {
oneof propose_error {
Empty key_conflict = 1;
Empty duplicated = 2;
SyncError sync_error = 3;
google.protobuf.Empty key_conflict = 1;
google.protobuf.Empty duplicated = 2;
WaitSyncError wait_sync_error = 3;
string encode_error = 4;
}
}

message RedirectData {
optional uint64 server_id = 1;
uint64 term = 2;
message CommandSyncError {
oneof command_sync_error {
WaitSyncError wait_sync = 1;
// The serialized command error
// The original type is Command::Error
bytes execute = 2;
// The serialized command error
// The original type is Command::Error
bytes after_sync = 3;
}
}

message SyncError {
oneof sync_error {
message WaitSyncError {
oneof wait_sync_error {
RedirectData redirect = 1;
string other = 2;
}
}

message RedirectData {
optional uint64 server_id = 1;
uint64 term = 2;
}
45 changes: 4 additions & 41 deletions curp/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,7 @@ syntax = "proto3";

package messagepb;

// Propose command from client to servers
message ProposeRequest {
// The serialized command
// Original type is Command trait
bytes command = 1;
}

// The original type is Result<Command::ER, Command::Error>
message CmdResult {
oneof result {
bytes er = 1;
bytes error = 2;
}
}

message ProposeResponse {
optional uint64 leader_id = 1;
uint64 term = 2;
oneof exe_result {
CmdResult result = 3;
// The original type is ProposeError
bytes error = 4;
}
}
import "command.proto";

message FetchLeaderRequest {
}
Expand All @@ -44,21 +21,6 @@ message FetchClusterResponse {
uint64 term = 3;
}

message WaitSyncedRequest {
bytes id = 1;
}

message WaitSyncedResponse {
message Success {
bytes after_sync_result = 1;
bytes exe_result = 2;
}
oneof sync_result {
Success success = 1;
bytes error = 2;
}
}

message AppendEntriesRequest {
uint64 term = 1;
uint64 leader_id = 2;
Expand Down Expand Up @@ -117,8 +79,9 @@ message FetchReadStateResponse {
}

service Protocol {
rpc Propose(ProposeRequest) returns (ProposeResponse);
rpc WaitSynced(WaitSyncedRequest) returns (WaitSyncedResponse);
rpc Propose(commandpb.ProposeRequest) returns (commandpb.ProposeResponse);
rpc WaitSynced(commandpb.WaitSyncedRequest)
returns (commandpb.WaitSyncedResponse);
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse);
rpc Vote(VoteRequest) returns (VoteResponse);
rpc InstallSnapshot(stream InstallSnapshotRequest)
Expand Down
24 changes: 17 additions & 7 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::Duration,
};

use curp_external_api::cmd::PbSerialize;
use curp_external_api::cmd::PbSerializeError;
use dashmap::DashMap;
use event_listener::Event;
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
Expand All @@ -16,7 +16,8 @@ use utils::{config::ClientTimeout, parking_lot_lock::RwLockMap};
use crate::{
cmd::{Command, ProposeId},
error::{
ClientBuildError, CommandProposeError, CommandSyncError, ProposeError, RpcError, SyncError,
ClientBuildError, CommandProposeError, CommandSyncError, ProposeError, RpcError,
WaitSyncError,
},
members::ServerId,
rpc::{
Expand Down Expand Up @@ -306,7 +307,7 @@ where
.get_connect(leader_id)
.unwrap_or_else(|| unreachable!("leader {leader_id} not found"))
.wait_synced(
WaitSyncedRequest::new(cmd.id()).map_err(Into::<ProposeError>::into)?,
WaitSyncedRequest::new(cmd.id()),
*self.timeout.wait_synced_timeout(),
)
.await
Expand All @@ -326,7 +327,10 @@ where
debug!("slow round for cmd({}) succeeded", cmd.id());
return Ok((asr, er));
}
SyncResult::Error(CommandSyncError::Sync(SyncError::Redirect(server_id, term))) => {
SyncResult::Error(CommandSyncError::WaitSync(WaitSyncError::Redirect(
server_id,
term,
))) => {
let new_leader = server_id.and_then(|id| {
self.state.map_write(|mut state| {
(state.term <= term).then(|| {
Expand All @@ -338,8 +342,10 @@ where
});
self.resend_propose(Arc::clone(&cmd), new_leader).await?; // resend the propose to the new leader
}
SyncResult::Error(CommandSyncError::Sync(e)) => {
return Err(ProposeError::SyncedError(SyncError::Other(e.to_string())).into());
SyncResult::Error(CommandSyncError::WaitSync(e)) => {
return Err(
ProposeError::SyncedError(WaitSyncError::Other(e.to_string())).into(),
);
}
SyncResult::Error(CommandSyncError::Execute(e)) => {
return Err(CommandProposeError::Execute(e));
Expand Down Expand Up @@ -380,7 +386,11 @@ where
Ok(resp) => {
let resp = resp.into_inner();
if let Some(rpc::ExeResult::Error(ref e)) = resp.exe_result {
let err = ProposeError::decode(e)?;
let err: ProposeError = e
.clone()
.propose_error
.ok_or(PbSerializeError::EmptyField)?
.try_into()?;
if matches!(err, ProposeError::Duplicated) {
return Ok(());
}
Expand Down
Loading