diff --git a/Cargo.lock b/Cargo.lock index 227ebce73d3cd..a29348204a23e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1709,8 +1709,10 @@ dependencies = [ "maplit", "num", "once_cell", + "openraft", "pretty_assertions", "serde", + "serde_json", "tempfile", "tracing", ] @@ -1781,7 +1783,6 @@ dependencies = [ "num-derive", "num-traits", "once_cell", - "opendal", "openraft", "prost", "prost-build", @@ -1891,12 +1892,10 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", - "common-datavalues", "common-expression", "common-meta-app", "common-meta-types", "common-protos", - "common-storage", "convert_case 0.6.0", "enumflags2", "maplit", @@ -2978,6 +2977,7 @@ dependencies = [ "common-meta-types", "common-metrics", "common-tracing", + "derive_more", "env_logger 0.9.3", "futures", "itertools", diff --git a/Cargo.toml b/Cargo.toml index 1b0b52e793078..428eaa8401d8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,9 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus opendal = { version = "0.26.2" } ordered-float = { version = "3.4.0", default-features = false } +# type helper +derive_more = "0.99.17" + # error anyhow = { version = "1.0.65" } anyerror = { version = "=0.1.7" } diff --git a/src/meta/client/Cargo.toml b/src/meta/client/Cargo.toml index d7bff1533e453..82d6270f246eb 100644 --- a/src/meta/client/Cargo.toml +++ b/src/meta/client/Cargo.toml @@ -23,7 +23,7 @@ common-meta-types = { path = "../types" } common-metrics = { path = "../../common/metrics" } common-tracing = { path = "../../common/tracing" } -derive_more = "0.99.17" +derive_more = { workspace = true } futures = "0.3.24" once_cell = "1.15.0" parking_lot = "0.12.1" diff --git a/src/meta/proto-conv/Cargo.toml b/src/meta/proto-conv/Cargo.toml index 9a6a293658581..7106e83d77c72 100644 --- a/src/meta/proto-conv/Cargo.toml +++ b/src/meta/proto-conv/Cargo.toml @@ -11,12 +11,10 @@ doctest = false test = false [dependencies] -common-datavalues = { path = "../../query/datavalues" } common-expression = { path = "../../query/expression" } common-meta-app = { path = "../app" } common-meta-types = { path = "../types" } common-protos = { path = "../protos" } -common-storage = { path = "../../common/storage" } chrono = { workspace = true } num = "0.4.0" diff --git a/src/meta/raft-store/Cargo.toml b/src/meta/raft-store/Cargo.toml index a1553905e1ae7..4f4833d6112ba 100644 --- a/src/meta/raft-store/Cargo.toml +++ b/src/meta/raft-store/Cargo.toml @@ -24,17 +24,20 @@ common-meta-stoerr = { path = "../stoerr" } common-meta-types = { path = "../types" } common-tracing = { path = "../../common/tracing" } +openraft = { workspace = true } + # crates.io deps anyhow = { workspace = true } async-trait = "0.1.57" byteorder = "1.4.3" chrono = { workspace = true } -derive_more = "0.99.17" +derive_more = { workspace = true } hostname = "0.3.1" maplit = "1.0.2" num = "0.4.0" once_cell = "1.15.0" serde = { workspace = true } +serde_json = { workspace = true } tracing = "0.1.36" [dev-dependencies] diff --git a/src/meta/types/src/applied_state.rs b/src/meta/raft-store/src/applied_state.rs similarity index 84% rename from src/meta/types/src/applied_state.rs rename to src/meta/raft-store/src/applied_state.rs index 5b4dc411adb14..069e9cfa376ab 100644 --- a/src/meta/types/src/applied_state.rs +++ b/src/meta/raft-store/src/applied_state.rs @@ -1,4 +1,4 @@ -// Copyright 2021 Datafuse Labs. +// Copyright 2023 Datafuse Labs. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,21 +13,25 @@ // limitations under the License. use std::fmt; -use std::fmt::Debug; use std::fmt::Formatter; +use common_meta_types::protobuf::RaftReply; +use common_meta_types::Change; +use common_meta_types::Node; +use common_meta_types::TxnReply; use openraft::AppDataResponse; -use serde::Deserialize; -use serde::Serialize; - -use crate::Change; -use crate::Node; -use crate::TxnReply; /// The state of an applied raft log. /// Normally it includes two fields: the state before applying and the state after applying the log. #[derive( - Serialize, Deserialize, Debug, Clone, PartialEq, Eq, derive_more::From, derive_more::TryInto, + serde::Serialize, + serde::Deserialize, + Debug, + Clone, + PartialEq, + Eq, + derive_more::From, + derive_more::TryInto, )] pub enum AppliedState { Seq { @@ -81,7 +85,7 @@ impl AppliedState { ref prev, ref result, } => prev != result, - AppliedState::KV(ref ch) => ch.changed(), + AppliedState::KV(ref ch) => ch.is_changed(), AppliedState::None => false, AppliedState::TxnReply(txn) => txn.success, } @@ -123,3 +127,13 @@ impl AppliedState { } } } + +impl From for RaftReply { + fn from(msg: AppliedState) -> Self { + let data = serde_json::to_string(&msg).expect("fail to serialize"); + RaftReply { + data, + error: "".to_string(), + } + } +} diff --git a/src/meta/raft-store/src/lib.rs b/src/meta/raft-store/src/lib.rs index 12301f35f1e49..125a6e4354f07 100644 --- a/src/meta/raft-store/src/lib.rs +++ b/src/meta/raft-store/src/lib.rs @@ -19,3 +19,5 @@ pub mod key_spaces; pub mod log; pub mod state; pub mod state_machine; + +pub mod applied_state; diff --git a/src/meta/raft-store/src/state_machine/client_last_resp.rs b/src/meta/raft-store/src/state_machine/client_last_resp.rs index 5a331e5e1dba4..f0e743a216c36 100644 --- a/src/meta/raft-store/src/state_machine/client_last_resp.rs +++ b/src/meta/raft-store/src/state_machine/client_last_resp.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta_types::AppliedState; use serde::Deserialize; use serde::Serialize; +use crate::applied_state::AppliedState; + /// Client last response that is stored in SledTree /// raft state: A mapping of client serial IDs to their state info: /// (serial, RaftResponse) diff --git a/src/meta/raft-store/src/state_machine/sm.rs b/src/meta/raft-store/src/state_machine/sm.rs index f0d8404888d83..b04892bfa0ef1 100644 --- a/src/meta/raft-store/src/state_machine/sm.rs +++ b/src/meta/raft-store/src/state_machine/sm.rs @@ -35,7 +35,6 @@ use common_meta_types::protobuf as pb; use common_meta_types::txn_condition; use common_meta_types::txn_op; use common_meta_types::txn_op_response; -use common_meta_types::AppliedState; use common_meta_types::Change; use common_meta_types::Cmd; use common_meta_types::ConditionResult; @@ -71,6 +70,7 @@ use tracing::debug; use tracing::error; use tracing::info; +use crate::applied_state::AppliedState; use crate::config::RaftConfig; use crate::key_spaces::ClientLastResps; use crate::key_spaces::Expire; diff --git a/src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs b/src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs index f9b7795493d7d..6d98dcef67e4b 100644 --- a/src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs +++ b/src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs @@ -13,7 +13,6 @@ // limitations under the License. use common_meta_kvapi::kvapi; -use common_meta_types::AppliedState; use common_meta_types::Cmd; use common_meta_types::GetKVReply; use common_meta_types::KVAppError; @@ -26,6 +25,7 @@ use common_meta_types::UpsertKVReply; use common_meta_types::UpsertKVReq; use tracing::debug; +use crate::applied_state::AppliedState; use crate::state_machine::StateMachine; #[async_trait::async_trait] diff --git a/src/meta/raft-store/tests/it/state_machine/mod.rs b/src/meta/raft-store/tests/it/state_machine/mod.rs index 49e179650e060..2cbddfd37ef3e 100644 --- a/src/meta/raft-store/tests/it/state_machine/mod.rs +++ b/src/meta/raft-store/tests/it/state_machine/mod.rs @@ -17,9 +17,9 @@ use std::time::UNIX_EPOCH; use common_base::base::tokio; use common_meta_kvapi::kvapi::KVApi; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_raft_store::state_machine::StateMachine; use common_meta_sled_store::openraft; -use common_meta_types::AppliedState; use common_meta_types::Change; use common_meta_types::Cmd; use common_meta_types::Endpoint; diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index a50b3561b0f3a..346c7719a8cdf 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -48,6 +48,7 @@ anyhow = { workspace = true } async-trait = "0.1.57" backon = "0.2.0" clap = { workspace = true } +derive_more = { workspace = true } futures = "0.3.24" itertools = "0.10.5" metrics = "0.20.1" diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 7d302e61fbb19..3943a5fb75a99 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -18,6 +18,7 @@ pub mod api; pub mod configs; pub mod export; pub mod logging; +pub mod message; pub mod meta_service; pub mod metrics; pub mod network; diff --git a/src/meta/service/src/message.rs b/src/meta/service/src/message.rs new file mode 100644 index 0000000000000..8ccb27a175880 --- /dev/null +++ b/src/meta/service/src/message.rs @@ -0,0 +1,131 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta_raft_store::applied_state::AppliedState; +use common_meta_sled_store::openraft::NodeId; +use common_meta_types::protobuf::RaftRequest; +use common_meta_types::Endpoint; +use common_meta_types::GetKVReply; +use common_meta_types::GetKVReq; +use common_meta_types::ListKVReply; +use common_meta_types::ListKVReq; +use common_meta_types::LogEntry; +use common_meta_types::MGetKVReply; +use common_meta_types::MGetKVReq; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Clone, PartialEq, Eq)] +pub struct JoinRequest { + pub node_id: NodeId, + pub endpoint: Endpoint, + + #[serde(skip)] + #[deprecated(note = "it is listening addr, not advertise addr")] + pub grpc_api_addr: String, + + pub grpc_api_advertise_address: Option, +} + +impl JoinRequest { + pub fn new( + node_id: NodeId, + endpoint: Endpoint, + grpc_api_advertise_address: Option, + ) -> Self { + Self { + node_id, + endpoint, + grpc_api_advertise_address: grpc_api_advertise_address.map(|x| x.to_string()), + ..Default::default() + } + } +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct LeaveRequest { + pub node_id: NodeId, +} + +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Clone, + PartialEq, + Eq, + derive_more::From, + derive_more::TryInto, +)] +pub enum ForwardRequestBody { + Ping, + + Join(JoinRequest), + Leave(LeaveRequest), + + Write(LogEntry), + + GetKV(GetKVReq), + MGetKV(MGetKVReq), + ListKV(ListKVReq), +} + +/// A request that is forwarded from one raft node to another +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct ForwardRequest { + /// Forward the request to leader if the node received this request is not leader. + pub forward_to_leader: u64, + + pub body: ForwardRequestBody, +} + +impl ForwardRequest { + pub fn decr_forward(&mut self) { + self.forward_to_leader -= 1; + } +} + +#[derive( + serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, derive_more::TryInto, +)] +#[allow(clippy::large_enum_variant)] +pub enum ForwardResponse { + #[try_into(ignore)] + Pong, + + Join(()), + Leave(()), + AppliedState(AppliedState), + + GetKV(GetKVReply), + MGetKV(MGetKVReply), + ListKV(ListKVReply), +} + +impl tonic::IntoRequest for ForwardRequest { + fn into_request(self) -> tonic::Request { + let mes = RaftRequest { + data: serde_json::to_string(&self).expect("fail to serialize"), + }; + tonic::Request::new(mes) + } +} + +impl TryFrom for ForwardRequest { + type Error = tonic::Status; + + fn try_from(mes: RaftRequest) -> Result { + let req = serde_json::from_str(&mes.data) + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + Ok(req) + } +} diff --git a/src/meta/service/src/meta_service/meta_leader.rs b/src/meta/service/src/meta_service/meta_leader.rs index fb5cb882b4814..60729709fcd24 100644 --- a/src/meta/service/src/meta_service/meta_leader.rs +++ b/src/meta/service/src/meta_service/meta_leader.rs @@ -16,13 +16,11 @@ use std::sync::Arc; use common_base::base::tokio::sync::RwLockReadGuard; use common_meta_kvapi::kvapi::KVApi; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_raft_store::state_machine::StateMachine; use common_meta_sled_store::openraft::error::RemoveLearnerError; use common_meta_stoerr::MetaStorageError; -use common_meta_types::AppliedState; use common_meta_types::Cmd; -use common_meta_types::ForwardRequest; -use common_meta_types::ForwardResponse; use common_meta_types::LogEntry; use common_meta_types::MetaDataError; use common_meta_types::MetaDataReadError; @@ -37,10 +35,12 @@ use tracing::debug; use tracing::error; use tracing::info; +use crate::message::ForwardRequest; +use crate::message::ForwardRequestBody; +use crate::message::ForwardResponse; +use crate::message::JoinRequest; +use crate::message::LeaveRequest; use crate::meta_service::raftmeta::MetaRaft; -use crate::meta_service::ForwardRequestBody; -use crate::meta_service::JoinRequest; -use crate::meta_service::LeaveRequest; use crate::meta_service::MetaNode; use crate::metrics::ProposalPending; use crate::store::RaftStore; diff --git a/src/meta/service/src/meta_service/meta_node_kv_api_impl.rs b/src/meta/service/src/meta_service/meta_node_kv_api_impl.rs index 5a7a916102c1f..6b0a70e75e8a9 100644 --- a/src/meta/service/src/meta_service/meta_node_kv_api_impl.rs +++ b/src/meta/service/src/meta_service/meta_node_kv_api_impl.rs @@ -14,7 +14,7 @@ use async_trait::async_trait; use common_meta_kvapi::kvapi; -use common_meta_types::AppliedState; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_types::Cmd; use common_meta_types::GetKVReply; use common_meta_types::GetKVReq; diff --git a/src/meta/service/src/meta_service/meta_service_impl.rs b/src/meta/service/src/meta_service/meta_service_impl.rs index 47efb6b27d7be..a725740afbd71 100644 --- a/src/meta/service/src/meta_service/meta_service_impl.rs +++ b/src/meta/service/src/meta_service/meta_service_impl.rs @@ -21,18 +21,18 @@ use std::sync::Arc; use std::time::Instant; use anyerror::AnyError; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_types::protobuf::raft_service_server::RaftService; use common_meta_types::protobuf::RaftReply; use common_meta_types::protobuf::RaftRequest; -use common_meta_types::AppliedState; -use common_meta_types::ForwardRequest; use common_meta_types::InvalidReply; use common_meta_types::LogEntry; use common_meta_types::MetaError; use common_meta_types::MetaNetworkError; use tonic::codegen::futures_core::Stream; -use crate::meta_service::ForwardRequestBody; +use crate::message::ForwardRequest; +use crate::message::ForwardRequestBody; use crate::meta_service::MetaNode; use crate::metrics::raft_metrics; use crate::metrics::server_metrics; diff --git a/src/meta/service/src/meta_service/mod.rs b/src/meta/service/src/meta_service/mod.rs index 090efc3b93805..fd689a346ddfb 100644 --- a/src/meta/service/src/meta_service/mod.rs +++ b/src/meta/service/src/meta_service/mod.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use common_meta_types::ForwardRequest; -pub use common_meta_types::ForwardRequestBody; -pub use common_meta_types::JoinRequest; -pub use common_meta_types::LeaveRequest; pub use meta_service_impl::RaftServiceImpl; pub use raftmeta::MetaNode; +pub use crate::message::ForwardRequest; +pub use crate::message::ForwardRequestBody; +pub use crate::message::JoinRequest; +pub use crate::message::LeaveRequest; + pub mod meta_leader; mod meta_node_kv_api_impl; pub mod meta_service_impl; diff --git a/src/meta/service/src/meta_service/raftmeta.rs b/src/meta/service/src/meta_service/raftmeta.rs index 49575e5e37220..c4dce7a065c58 100644 --- a/src/meta/service/src/meta_service/raftmeta.rs +++ b/src/meta/service/src/meta_service/raftmeta.rs @@ -28,6 +28,7 @@ use common_base::base::tokio::task::JoinHandle; use common_base::base::tokio::time::Instant; use common_grpc::ConnectionFactory; use common_grpc::DNSResolver; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_raft_store::config::RaftConfig; use common_meta_raft_store::key_spaces::GenericKV; use common_meta_sled_store::openraft; @@ -39,16 +40,12 @@ use common_meta_stoerr::MetaStorageError; use common_meta_types::protobuf::raft_service_client::RaftServiceClient; use common_meta_types::protobuf::raft_service_server::RaftServiceServer; use common_meta_types::protobuf::WatchRequest; -use common_meta_types::AppliedState; use common_meta_types::Cmd; use common_meta_types::ConnectionError; use common_meta_types::Endpoint; use common_meta_types::ForwardRPCError; -use common_meta_types::ForwardRequest; -use common_meta_types::ForwardResponse; use common_meta_types::ForwardToLeader; use common_meta_types::InvalidReply; -use common_meta_types::LeaveRequest; use common_meta_types::LogEntry; use common_meta_types::MetaAPIError; use common_meta_types::MetaDataError; @@ -74,9 +71,12 @@ use tracing::warn; use tracing::Instrument; use crate::configs::Config as MetaConfig; +use crate::message::ForwardRequest; +use crate::message::ForwardRequestBody; +use crate::message::ForwardResponse; +use crate::message::JoinRequest; +use crate::message::LeaveRequest; use crate::meta_service::meta_leader::MetaLeader; -use crate::meta_service::ForwardRequestBody; -use crate::meta_service::JoinRequest; use crate::meta_service::RaftServiceImpl; use crate::metrics::server_metrics; use crate::network::Network; diff --git a/src/meta/service/src/store/mod.rs b/src/meta/service/src/store/mod.rs index ce55c66d03bfe..62e9210b68afb 100644 --- a/src/meta/service/src/store/mod.rs +++ b/src/meta/service/src/store/mod.rs @@ -15,8 +15,8 @@ mod store_bare; mod to_storage_error; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_sled_store::openraft::StoreExt; -use common_meta_types::AppliedState; use common_meta_types::LogEntry; pub use store_bare::RaftStoreBare; pub use to_storage_error::ToStorageError; diff --git a/src/meta/service/src/store/store_bare.rs b/src/meta/service/src/store/store_bare.rs index e0d68ef3a4255..11272524f5772 100644 --- a/src/meta/service/src/store/store_bare.rs +++ b/src/meta/service/src/store/store_bare.rs @@ -21,6 +21,7 @@ use anyerror::AnyError; use common_base::base::tokio::sync::RwLock; use common_base::base::tokio::sync::RwLockWriteGuard; use common_exception::WithContext; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_raft_store::config::RaftConfig; use common_meta_raft_store::log::RaftLog; use common_meta_raft_store::state::RaftState; @@ -36,7 +37,6 @@ use common_meta_sled_store::openraft::ErrorVerb; use common_meta_sled_store::openraft::Membership; use common_meta_sled_store::openraft::StateMachineChanges; use common_meta_stoerr::MetaStorageError; -use common_meta_types::AppliedState; use common_meta_types::Endpoint; use common_meta_types::LogEntry; use common_meta_types::MetaError; diff --git a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs index 4bd0e7612386c..99abead1f6b3f 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs @@ -28,10 +28,10 @@ use common_meta_types::NodeId; use common_meta_types::UpsertKV; use databend_meta::configs; use databend_meta::init_meta_ut; -use databend_meta::meta_service::ForwardRequest; -use databend_meta::meta_service::ForwardRequestBody; -use databend_meta::meta_service::JoinRequest; -use databend_meta::meta_service::LeaveRequest; +use databend_meta::message::ForwardRequest; +use databend_meta::message::ForwardRequestBody; +use databend_meta::message::JoinRequest; +use databend_meta::message::LeaveRequest; use databend_meta::meta_service::MetaNode; use maplit::btreeset; use pretty_assertions::assert_eq; diff --git a/src/meta/service/tests/it/meta_node/meta_node_seq_api.rs b/src/meta/service/tests/it/meta_node/meta_node_seq_api.rs index 207c36e4a62c6..63fbb50fb6b53 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_seq_api.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_seq_api.rs @@ -16,8 +16,8 @@ //! It can also be used by apps to generate mono incremental seq numbers. use common_base::base::tokio; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_types::protobuf::raft_service_client::RaftServiceClient; -use common_meta_types::AppliedState; use common_meta_types::Cmd; use common_meta_types::LogEntry; use common_meta_types::MetaError; diff --git a/src/meta/service/tests/it/store.rs b/src/meta/service/tests/it/store.rs index 2512e21199dda..570e3149b54e1 100644 --- a/src/meta/service/tests/it/store.rs +++ b/src/meta/service/tests/it/store.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::sync::Mutex; use common_base::base::tokio; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_raft_store::state_machine::testing::pretty_snapshot; use common_meta_raft_store::state_machine::testing::snapshot_logs; use common_meta_raft_store::state_machine::SerializableSnapshot; @@ -29,7 +30,6 @@ use common_meta_sled_store::openraft::LogId; use common_meta_sled_store::openraft::Membership; use common_meta_sled_store::openraft::RaftStorage; use common_meta_sled_store::openraft::StorageHelper; -use common_meta_types::AppliedState; use common_meta_types::LogEntry; use databend_meta::init_meta_ut; use databend_meta::store::RaftStoreBare; diff --git a/src/meta/service/tests/it/tests/meta_node.rs b/src/meta/service/tests/it/tests/meta_node.rs index 268a300ef866e..b6b5730b5e202 100644 --- a/src/meta/service/tests/it/tests/meta_node.rs +++ b/src/meta/service/tests/it/tests/meta_node.rs @@ -18,9 +18,9 @@ use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; +use common_meta_raft_store::applied_state::AppliedState; use common_meta_sled_store::openraft::NodeId; use common_meta_sled_store::openraft::State; -use common_meta_types::AppliedState; use common_meta_types::Node; use databend_meta::meta_service::MetaNode; use databend_meta::Opened; diff --git a/src/meta/service/tests/it/tests/service.rs b/src/meta/service/tests/it/tests/service.rs index 989bdc85e1069..9643cbc2d7124 100644 --- a/src/meta/service/tests/it/tests/service.rs +++ b/src/meta/service/tests/it/tests/service.rs @@ -26,10 +26,10 @@ use common_meta_client::MetaGrpcClient; use common_meta_kvapi::kvapi; use common_meta_sled_store::openraft::NodeId; use common_meta_types::protobuf::raft_service_client::RaftServiceClient; -use common_meta_types::ForwardRequest; -use common_meta_types::ForwardRequestBody; use databend_meta::api::GrpcServer; use databend_meta::configs; +use databend_meta::message::ForwardRequest; +use databend_meta::message::ForwardRequestBody; use databend_meta::meta_service::MetaNode; use tracing::info; use tracing::warn; diff --git a/src/meta/sled-store/src/sled_tree.rs b/src/meta/sled-store/src/sled_tree.rs index 829e2fba6238c..950c461bd473a 100644 --- a/src/meta/sled-store/src/sled_tree.rs +++ b/src/meta/sled-store/src/sled_tree.rs @@ -417,7 +417,7 @@ impl<'a> TransactionSledTree<'a> { } self.changes - .push(Change::new_with_id(key.to_string(), prev, result)) + .push(Change::new(prev, result).with_id(key.to_string())) } } diff --git a/src/meta/types/Cargo.toml b/src/meta/types/Cargo.toml index 7fcccaaafe0cc..e6b4de9455169 100644 --- a/src/meta/types/Cargo.toml +++ b/src/meta/types/Cargo.toml @@ -11,19 +11,18 @@ doctest = false test = false [features] -storage-hdfs = ["opendal/services-hdfs"] +storage-hdfs = [] [dependencies] common-exception = { path = "../../common/exception" } common-io = { path = "../../common/io" } common-meta-stoerr = { path = "../stoerr" } -opendal = { workspace = true } openraft = { workspace = true } anyerror = { workspace = true } chrono = { workspace = true } -derive_more = "0.99.17" +derive_more = { workspace = true } enumflags2 = { version = "0.7.5", features = ["serde"] } hex = "0.4.3" num-derive = "0.3.3" diff --git a/src/meta/types/src/change.rs b/src/meta/types/src/change.rs index 32184def7e660..067048da7522d 100644 --- a/src/meta/types/src/change.rs +++ b/src/meta/types/src/change.rs @@ -36,8 +36,8 @@ where impl Change where - ID: Clone + PartialEq + std::fmt::Debug, - T: Clone + PartialEq + std::fmt::Debug, + ID: Clone + PartialEq + Debug, + T: Clone + PartialEq + Debug, { pub fn new(prev: Option>, result: Option>) -> Self { Change { @@ -47,20 +47,9 @@ where } } - pub fn new_with_id(id: ID, prev: Option>, result: Option>) -> Self { - Change { - ident: Some(id), - prev, - result, - } - } - - pub fn nochange_with_id(id: ID, prev: Option>) -> Self { - Change { - ident: Some(id), - prev: prev.clone(), - result: prev, - } + pub fn with_id(mut self, id: ID) -> Self { + self.ident = Some(id); + self } /// Maps `Option>` to `Option` for `prev` and `result`. @@ -88,12 +77,12 @@ where self.map(|x| x.data) } - pub fn changed(&self) -> bool { + pub fn is_changed(&self) -> bool { self.prev != self.result } - /// Assumes it is the state change of an add operation and return Ok if the add operation succeed. - /// Otherwise it returns the error the user provided function built with existing value. + /// Assumes it is a state change of an add operation and return Ok if the add operation succeed. + /// Otherwise it returns an error that is built by provided function. pub fn added_or_else(self, f: F) -> Result, E> where F: FnOnce(SeqV) -> E { let (prev, result) = self.unpack(); diff --git a/src/meta/types/src/errors/meta_raft_errors.rs b/src/meta/types/src/errors/meta_raft_errors.rs index 601506d5bb7a0..2ea339f47e436 100644 --- a/src/meta/types/src/errors/meta_raft_errors.rs +++ b/src/meta/types/src/errors/meta_raft_errors.rs @@ -19,20 +19,11 @@ pub use openraft::error::Fatal; pub use openraft::error::ForwardToLeader; pub use openraft::error::InProgress; pub use openraft::error::InitializeError; -use openraft::NodeId; use serde::Deserialize; use serde::Serialize; use crate::MetaOperationError; -#[derive(thiserror::Error, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub enum RetryableError { - /// Trying to write to a non-leader returns the latest leader the raft node knows, - /// to indicate the client to retry. - #[error("request must be forwarded to leader: {leader}")] - ForwardToLeader { leader: NodeId }, -} - /// Collection of errors that occur when writing a raft-log to local raft node. /// This does not include the errors raised when writing a membership log. #[derive(thiserror::Error, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] diff --git a/src/meta/types/src/lib.rs b/src/meta/types/src/lib.rs index 64eac46bd90df..a6a53fe8c9018 100644 --- a/src/meta/types/src/lib.rs +++ b/src/meta/types/src/lib.rs @@ -18,7 +18,6 @@ //! This crate defines data types used in meta data storage service. -mod applied_state; mod change; mod cluster; mod cmd; @@ -63,7 +62,6 @@ pub mod protobuf { pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("meta_descriptor"); } -pub use applied_state::AppliedState; pub use change::Change; pub use cluster::Node; pub use cluster::NodeInfo; @@ -112,7 +110,6 @@ pub use errors::meta_raft_errors::ForwardToLeader; pub use errors::meta_raft_errors::InitializeError; pub use errors::meta_raft_errors::RaftChangeMembershipError; pub use errors::meta_raft_errors::RaftWriteError; -pub use errors::meta_raft_errors::RetryableError; pub use errors::meta_startup_errors::MetaStartupError; pub use errors::rpc_errors::ForwardRPCError; pub use kv_message::GetKVReply; @@ -126,11 +123,6 @@ pub use kv_message::UpsertKVReq; pub use log_entry::LogEntry; pub use match_seq::MatchSeq; pub use match_seq::MatchSeqExt; -pub use message::ForwardRequest; -pub use message::ForwardRequestBody; -pub use message::ForwardResponse; -pub use message::JoinRequest; -pub use message::LeaveRequest; pub use operation::GCDroppedDataReply; pub use operation::GCDroppedDataReq; pub use operation::MetaId; diff --git a/src/meta/types/src/message.rs b/src/meta/types/src/message.rs index f92bdbe4e3b8a..73072a37869db 100644 --- a/src/meta/types/src/message.rs +++ b/src/meta/types/src/message.rs @@ -16,131 +16,15 @@ use openraft::raft::AppendEntriesRequest; use openraft::raft::InstallSnapshotRequest; use openraft::raft::VoteRequest; use serde::de::DeserializeOwned; -use serde::Deserialize; use serde::Serialize; -use thiserror::Error; use crate::protobuf::RaftReply; use crate::protobuf::RaftRequest; -use crate::AppliedState; -use crate::Endpoint; -use crate::GetKVReply; -use crate::GetKVReq; use crate::InvalidReply; -use crate::ListKVReply; -use crate::ListKVReq; use crate::LogEntry; -use crate::MGetKVReply; -use crate::MGetKVReq; -use crate::NodeId; use crate::TxnOpResponse; use crate::TxnReply; -#[derive(Error, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub enum RetryableError { - /// Trying to write to a non-leader returns the latest leader the raft node knows, - /// to indicate the client to retry. - #[error("request must be forwarded to leader: {leader}")] - ForwardToLeader { leader: NodeId }, -} - -#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq)] -pub struct JoinRequest { - pub node_id: NodeId, - pub endpoint: Endpoint, - - #[serde(skip)] - #[deprecated(note = "it is listening addr, not advertise addr")] - pub grpc_api_addr: String, - - pub grpc_api_advertise_address: Option, -} - -impl JoinRequest { - pub fn new( - node_id: NodeId, - endpoint: Endpoint, - grpc_api_advertise_address: Option, - ) -> Self { - Self { - node_id, - endpoint, - grpc_api_advertise_address: grpc_api_advertise_address.map(|x| x.to_string()), - ..Default::default() - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct LeaveRequest { - pub node_id: NodeId, -} - -#[derive( - Serialize, Deserialize, Debug, Clone, PartialEq, Eq, derive_more::From, derive_more::TryInto, -)] -pub enum ForwardRequestBody { - Ping, - - Join(JoinRequest), - Leave(LeaveRequest), - - Write(LogEntry), - - GetKV(GetKVReq), - MGetKV(MGetKVReq), - ListKV(ListKVReq), -} - -/// A request that is forwarded from one raft node to another -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct ForwardRequest { - /// Forward the request to leader if the node received this request is not leader. - pub forward_to_leader: u64, - - pub body: ForwardRequestBody, -} - -impl ForwardRequest { - pub fn decr_forward(&mut self) { - self.forward_to_leader -= 1; - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, derive_more::TryInto)] -#[allow(clippy::large_enum_variant)] -pub enum ForwardResponse { - #[try_into(ignore)] - Pong, - - Join(()), - Leave(()), - AppliedState(AppliedState), - - GetKV(GetKVReply), - MGetKV(MGetKVReply), - ListKV(ListKVReply), -} - -impl tonic::IntoRequest for ForwardRequest { - fn into_request(self) -> tonic::Request { - let mes = RaftRequest { - data: serde_json::to_string(&self).expect("fail to serialize"), - }; - tonic::Request::new(mes) - } -} - -impl TryFrom for ForwardRequest { - type Error = tonic::Status; - - fn try_from(mes: RaftRequest) -> Result { - let req = serde_json::from_str(&mes.data) - .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; - Ok(req) - } -} - impl tonic::IntoRequest for LogEntry { fn into_request(self) -> tonic::Request { let mes = RaftRequest { @@ -213,25 +97,6 @@ impl tonic::IntoRequest for &VoteRequest { tonic::Request::new(mes) } } -impl From for RaftReply { - fn from(err: RetryableError) -> Self { - let error = serde_json::to_string(&err).expect("fail to serialize"); - RaftReply { - data: "".to_string(), - error, - } - } -} - -impl From for RaftReply { - fn from(msg: AppliedState) -> Self { - let data = serde_json::to_string(&msg).expect("fail to serialize"); - RaftReply { - data, - error: "".to_string(), - } - } -} impl From for Result where diff --git a/src/meta/types/src/user_info.rs b/src/meta/types/src/user_info.rs index ae7b5013720e3..3080c31798fb7 100644 --- a/src/meta/types/src/user_info.rs +++ b/src/meta/types/src/user_info.rs @@ -74,6 +74,15 @@ impl UserInfo { pub fn has_option_flag(&self, flag: UserOptionFlag) -> bool { self.option.has_option_flag(flag) } + + pub fn update_auth_option(&mut self, auth: Option, option: Option) { + if let Some(auth_info) = auth { + self.auth_info = auth_info; + }; + if let Some(user_option) = option { + self.option = user_option; + }; + } } impl TryFrom> for UserInfo { @@ -176,3 +185,33 @@ impl std::fmt::Display for UserOptionFlag { } } } + +#[cfg(test)] +mod tests { + use enumflags2::BitFlags; + + use crate::AuthInfo; + use crate::UserInfo; + use crate::UserOption; + + #[test] + fn test_user_update_auth_option() -> anyhow::Result<()> { + let mut u = UserInfo::new("a", "b", AuthInfo::None); + + // None does not take effect + { + let mut u2 = u.clone(); + u2.update_auth_option(None, None); + assert_eq!(u2, u); + } + + // Some updates the corresponding fields + { + u.update_auth_option(Some(AuthInfo::JWT), Some(UserOption::new(BitFlags::all()))); + assert_eq!(AuthInfo::JWT, u.auth_info); + assert_eq!(BitFlags::all(), u.option.flags); + } + + Ok(()) + } +} diff --git a/src/query/management/src/user/user_api.rs b/src/query/management/src/user/user_api.rs index 589fa7d4aba61..4bdcba50fdf26 100644 --- a/src/query/management/src/user/user_api.rs +++ b/src/query/management/src/user/user_api.rs @@ -13,12 +13,10 @@ // limitations under the License. use common_exception::Result; -use common_meta_types::AuthInfo; use common_meta_types::MatchSeq; use common_meta_types::SeqV; use common_meta_types::UserIdentity; use common_meta_types::UserInfo; -use common_meta_types::UserOption; #[async_trait::async_trait] pub trait UserApi: Sync + Send { @@ -30,9 +28,13 @@ pub trait UserApi: Sync + Send { /// General user's grants update. /// - /// It fetches the role that matches the specified seq number, update it in place, then write it back with the seq it sees. + /// It fetches the user that matches the specified seq number, update it in place, then write it back with the seq it sees. /// /// Seq number ensures there is no other write happens between get and set. + /// Example: + /// ```ignore + /// self.update_user_with(user_ident, MatchSeq::GE(1), |ui: &mut UserInfo| ui.update_auth_option(foo())).await; + /// ``` async fn update_user_with( &self, user: UserIdentity, @@ -42,13 +44,5 @@ pub trait UserApi: Sync + Send { where F: FnOnce(&mut UserInfo) + Send; - async fn update_user( - &self, - user: UserIdentity, - auth_info: Option, - user_option: Option, - seq: MatchSeq, - ) -> Result>; - async fn drop_user(&self, user: UserIdentity, seq: MatchSeq) -> Result<()>; } diff --git a/src/query/management/src/user/user_mgr.rs b/src/query/management/src/user/user_mgr.rs index 494df8c6f46c5..7a274e2a5b258 100644 --- a/src/query/management/src/user/user_mgr.rs +++ b/src/query/management/src/user/user_mgr.rs @@ -18,7 +18,6 @@ use common_base::base::escape_for_key; use common_exception::ErrorCode; use common_exception::Result; use common_meta_kvapi::kvapi; -use common_meta_types::AuthInfo; use common_meta_types::KVAppError; use common_meta_types::MatchSeq; use common_meta_types::MatchSeqExt; @@ -27,7 +26,6 @@ use common_meta_types::SeqV; use common_meta_types::UpsertKVReq; use common_meta_types::UserIdentity; use common_meta_types::UserInfo; -use common_meta_types::UserOption; use crate::serde::deserialize_struct; use crate::serde::serialize_struct; @@ -140,11 +138,6 @@ impl UserApi for UserMgr { Ok(r) } - /// General user's grants update. - /// - /// It fetch the role that matches the specified seq number, update it in place, then write it back with the seq it sees. - /// - /// Seq number ensures there is no other write happens between get and set. async fn update_user_with( &self, user: UserIdentity, @@ -168,25 +161,6 @@ impl UserApi for UserMgr { Ok(Some(seq)) } - // TODO: it deserve a better name. - async fn update_user( - &self, - user: UserIdentity, - new_auth_info: Option, - new_user_option: Option, - seq: MatchSeq, - ) -> Result> { - self.update_user_with(user, seq, |ui: &mut UserInfo| { - if let Some(auth_info) = new_auth_info { - ui.auth_info = auth_info; - }; - if let Some(user_option) = new_user_option { - ui.option = user_option; - }; - }) - .await - } - async fn drop_user(&self, user: UserIdentity, seq: MatchSeq) -> Result<()> { let user_key = format_user_key(&user.username, &user.hostname); let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?); diff --git a/src/query/management/tests/it/user.rs b/src/query/management/tests/it/user.rs index 4778a68cfe270..488dc3511a83c 100644 --- a/src/query/management/tests/it/user.rs +++ b/src/query/management/tests/it/user.rs @@ -532,26 +532,22 @@ mod update { let kv = Arc::new(kv); let user_mgr = UserMgr::create(kv, "tenant1")?; - let res = user_mgr.update_user( - user_info.identity(), - Some(new_test_auth_info(full)), - None, - test_seq, - ); + let res = user_mgr.update_user_with(user_info.identity(), test_seq, |ui: &mut UserInfo| { + ui.update_auth_option(Some(new_test_auth_info(full)), None) + }); assert!(res.await.is_ok()); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn test_update_user_partial_unknown() -> common_exception::Result<()> { + async fn test_update_user_with_conflict_when_writing_back() -> common_exception::Result<()> { let test_user_name = "name"; let test_hostname = "localhost"; let test_key = format!( "__fd_users/tenant1/{}", escape_for_key(&format_user_key(test_user_name, test_hostname))? ); - let test_seq = MatchSeq::GE(0); // if partial update, and get_kv returns None // update_kv should NOT be called @@ -564,11 +560,10 @@ mod update { let kv = Arc::new(kv); let user_mgr = UserMgr::create(kv, "tenant1")?; - let res = user_mgr.update_user( + let res = user_mgr.update_user_with( UserIdentity::new(test_user_name, test_hostname), - Some(new_test_auth_info(false)), - None, - test_seq, + MatchSeq::GE(0), + |ui: &mut UserInfo| ui.update_auth_option(Some(new_test_auth_info(false)), None), ); assert_eq!( res.await.unwrap_err().code(), @@ -578,14 +573,13 @@ mod update { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn test_update_user_full_unknown() -> common_exception::Result<()> { + async fn test_update_user_with_complete() -> common_exception::Result<()> { let test_user_name = "name"; let test_hostname = "localhost"; let test_key = format!( "__fd_users/tenant1/{}", escape_for_key(&format_user_key(test_user_name, test_hostname))? ); - let test_seq = MatchSeq::GE(1); let user_info = UserInfo::new(test_user_name, test_hostname, default_test_auth_info()); let prev_value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?; @@ -611,16 +605,9 @@ mod update { let kv = Arc::new(kv); let user_mgr = UserMgr::create(kv, "tenant1")?; - let res = user_mgr.update_user( - user_info.identity(), - Some(new_test_auth_info(true)), - None, - test_seq, - ); - assert_eq!( - res.await.unwrap_err().code(), - ErrorCode::UnknownUser("").code() - ); + let _ = user_mgr + .update_user_with(user_info.identity(), MatchSeq::GE(1), |_x| {}) + .await; Ok(()) } } diff --git a/src/query/users/src/user_mgr.rs b/src/query/users/src/user_mgr.rs index 53902b08ce184..26f8a8f7fcf87 100644 --- a/src/query/users/src/user_mgr.rs +++ b/src/query/users/src/user_mgr.rs @@ -224,7 +224,9 @@ impl UserApiProvider { ) -> Result> { let client = self.get_user_api_client(tenant)?; let update_user = client - .update_user(user, auth_info, user_option, MatchSeq::GE(1)) + .update_user_with(user, MatchSeq::GE(1), |ui: &mut UserInfo| { + ui.update_auth_option(auth_info, user_option) + }) .await; match update_user {