Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move types, remove obsolete codes #9922

Merged
merged 5 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus
opendal = { version = "0.26.2" }
ordered-float = { version = "3.4.0", default-features = false }

# type helper
derive_more = "0.99.17"

# error
anyhow = { version = "1.0.65" }
anyerror = { version = "=0.1.7" }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ common-meta-types = { path = "../types" }
common-metrics = { path = "../../common/metrics" }
common-tracing = { path = "../../common/tracing" }

derive_more = "0.99.17"
derive_more = { workspace = true }
futures = "0.3.24"
once_cell = "1.15.0"
parking_lot = "0.12.1"
Expand Down
2 changes: 0 additions & 2 deletions src/meta/proto-conv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ doctest = false
test = false

[dependencies]
common-datavalues = { path = "../../query/datavalues" }
common-expression = { path = "../../query/expression" }
common-meta-app = { path = "../app" }
common-meta-types = { path = "../types" }
common-protos = { path = "../protos" }
common-storage = { path = "../../common/storage" }

chrono = { workspace = true }
num = "0.4.0"
Expand Down
5 changes: 4 additions & 1 deletion src/meta/raft-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ common-meta-stoerr = { path = "../stoerr" }
common-meta-types = { path = "../types" }
common-tracing = { path = "../../common/tracing" }

openraft = { workspace = true }

# crates.io deps
anyhow = { workspace = true }
async-trait = "0.1.57"
byteorder = "1.4.3"
chrono = { workspace = true }
derive_more = "0.99.17"
derive_more = { workspace = true }
hostname = "0.3.1"
maplit = "1.0.2"
num = "0.4.0"
once_cell = "1.15.0"
serde = { workspace = true }
serde_json = { workspace = true }
tracing = "0.1.36"

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Datafuse Labs.
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,21 +13,25 @@
// limitations under the License.

use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;

use common_meta_types::protobuf::RaftReply;
use common_meta_types::Change;
use common_meta_types::Node;
use common_meta_types::TxnReply;
use openraft::AppDataResponse;
use serde::Deserialize;
use serde::Serialize;

use crate::Change;
use crate::Node;
use crate::TxnReply;

/// The state of an applied raft log.
/// Normally it includes two fields: the state before applying and the state after applying the log.
#[derive(
Serialize, Deserialize, Debug, Clone, PartialEq, Eq, derive_more::From, derive_more::TryInto,
serde::Serialize,
serde::Deserialize,
Debug,
Clone,
PartialEq,
Eq,
derive_more::From,
derive_more::TryInto,
)]
pub enum AppliedState {
Seq {
Expand Down Expand Up @@ -81,7 +85,7 @@ impl AppliedState {
ref prev,
ref result,
} => prev != result,
AppliedState::KV(ref ch) => ch.changed(),
AppliedState::KV(ref ch) => ch.is_changed(),
AppliedState::None => false,
AppliedState::TxnReply(txn) => txn.success,
}
Expand Down Expand Up @@ -123,3 +127,13 @@ impl AppliedState {
}
}
}

impl From<AppliedState> for RaftReply {
fn from(msg: AppliedState) -> Self {
let data = serde_json::to_string(&msg).expect("fail to serialize");
RaftReply {
data,
error: "".to_string(),
}
}
}
2 changes: 2 additions & 0 deletions src/meta/raft-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ pub mod key_spaces;
pub mod log;
pub mod state;
pub mod state_machine;

pub mod applied_state;
3 changes: 2 additions & 1 deletion src/meta/raft-store/src/state_machine/client_last_resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_meta_types::AppliedState;
use serde::Deserialize;
use serde::Serialize;

use crate::applied_state::AppliedState;

/// Client last response that is stored in SledTree
/// raft state: A mapping of client serial IDs to their state info:
/// (serial, RaftResponse)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use common_meta_types::protobuf as pb;
use common_meta_types::txn_condition;
use common_meta_types::txn_op;
use common_meta_types::txn_op_response;
use common_meta_types::AppliedState;
use common_meta_types::Change;
use common_meta_types::Cmd;
use common_meta_types::ConditionResult;
Expand Down Expand Up @@ -71,6 +70,7 @@ use tracing::debug;
use tracing::error;
use tracing::info;

use crate::applied_state::AppliedState;
use crate::config::RaftConfig;
use crate::key_spaces::ClientLastResps;
use crate::key_spaces::Expire;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use common_meta_kvapi::kvapi;
use common_meta_types::AppliedState;
use common_meta_types::Cmd;
use common_meta_types::GetKVReply;
use common_meta_types::KVAppError;
Expand All @@ -26,6 +25,7 @@ use common_meta_types::UpsertKVReply;
use common_meta_types::UpsertKVReq;
use tracing::debug;

use crate::applied_state::AppliedState;
use crate::state_machine::StateMachine;

#[async_trait::async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/meta/raft-store/tests/it/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::time::UNIX_EPOCH;

use common_base::base::tokio;
use common_meta_kvapi::kvapi::KVApi;
use common_meta_raft_store::applied_state::AppliedState;
use common_meta_raft_store::state_machine::StateMachine;
use common_meta_sled_store::openraft;
use common_meta_types::AppliedState;
use common_meta_types::Change;
use common_meta_types::Cmd;
use common_meta_types::Endpoint;
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ anyhow = { workspace = true }
async-trait = "0.1.57"
backon = "0.2.0"
clap = { workspace = true }
derive_more = { workspace = true }
futures = "0.3.24"
itertools = "0.10.5"
metrics = "0.20.1"
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod api;
pub mod configs;
pub mod export;
pub mod logging;
pub mod message;
pub mod meta_service;
pub mod metrics;
pub mod network;
Expand Down
131 changes: 131 additions & 0 deletions src/meta/service/src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_meta_raft_store::applied_state::AppliedState;
use common_meta_sled_store::openraft::NodeId;
use common_meta_types::protobuf::RaftRequest;
use common_meta_types::Endpoint;
use common_meta_types::GetKVReply;
use common_meta_types::GetKVReq;
use common_meta_types::ListKVReply;
use common_meta_types::ListKVReq;
use common_meta_types::LogEntry;
use common_meta_types::MGetKVReply;
use common_meta_types::MGetKVReq;

#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Clone, PartialEq, Eq)]
pub struct JoinRequest {
pub node_id: NodeId,
pub endpoint: Endpoint,

#[serde(skip)]
#[deprecated(note = "it is listening addr, not advertise addr")]
pub grpc_api_addr: String,

pub grpc_api_advertise_address: Option<String>,
}

impl JoinRequest {
pub fn new(
node_id: NodeId,
endpoint: Endpoint,
grpc_api_advertise_address: Option<impl ToString>,
) -> Self {
Self {
node_id,
endpoint,
grpc_api_advertise_address: grpc_api_advertise_address.map(|x| x.to_string()),
..Default::default()
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct LeaveRequest {
pub node_id: NodeId,
}

#[derive(
serde::Serialize,
serde::Deserialize,
Debug,
Clone,
PartialEq,
Eq,
derive_more::From,
derive_more::TryInto,
)]
pub enum ForwardRequestBody {
Ping,

Join(JoinRequest),
Leave(LeaveRequest),

Write(LogEntry),

GetKV(GetKVReq),
MGetKV(MGetKVReq),
ListKV(ListKVReq),
}

/// A request that is forwarded from one raft node to another
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct ForwardRequest {
/// Forward the request to leader if the node received this request is not leader.
pub forward_to_leader: u64,

pub body: ForwardRequestBody,
}

impl ForwardRequest {
pub fn decr_forward(&mut self) {
self.forward_to_leader -= 1;
}
}

#[derive(
serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, derive_more::TryInto,
)]
#[allow(clippy::large_enum_variant)]
pub enum ForwardResponse {
#[try_into(ignore)]
Pong,

Join(()),
Leave(()),
AppliedState(AppliedState),

GetKV(GetKVReply),
MGetKV(MGetKVReply),
ListKV(ListKVReply),
}

impl tonic::IntoRequest<RaftRequest> for ForwardRequest {
fn into_request(self) -> tonic::Request<RaftRequest> {
let mes = RaftRequest {
data: serde_json::to_string(&self).expect("fail to serialize"),
};
tonic::Request::new(mes)
}
}

impl TryFrom<RaftRequest> for ForwardRequest {
type Error = tonic::Status;

fn try_from(mes: RaftRequest) -> Result<Self, Self::Error> {
let req = serde_json::from_str(&mes.data)
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
Ok(req)
}
}
12 changes: 6 additions & 6 deletions src/meta/service/src/meta_service/meta_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ use std::sync::Arc;

use common_base::base::tokio::sync::RwLockReadGuard;
use common_meta_kvapi::kvapi::KVApi;
use common_meta_raft_store::applied_state::AppliedState;
use common_meta_raft_store::state_machine::StateMachine;
use common_meta_sled_store::openraft::error::RemoveLearnerError;
use common_meta_stoerr::MetaStorageError;
use common_meta_types::AppliedState;
use common_meta_types::Cmd;
use common_meta_types::ForwardRequest;
use common_meta_types::ForwardResponse;
use common_meta_types::LogEntry;
use common_meta_types::MetaDataError;
use common_meta_types::MetaDataReadError;
Expand All @@ -37,10 +35,12 @@ use tracing::debug;
use tracing::error;
use tracing::info;

use crate::message::ForwardRequest;
use crate::message::ForwardRequestBody;
use crate::message::ForwardResponse;
use crate::message::JoinRequest;
use crate::message::LeaveRequest;
use crate::meta_service::raftmeta::MetaRaft;
use crate::meta_service::ForwardRequestBody;
use crate::meta_service::JoinRequest;
use crate::meta_service::LeaveRequest;
use crate::meta_service::MetaNode;
use crate::metrics::ProposalPending;
use crate::store::RaftStore;
Expand Down
Loading