From 257a25ce5606155bf9d53a995434310cec89c046 Mon Sep 17 00:00:00 2001 From: fys <1113014250@qq.com> Date: Tue, 7 Feb 2023 23:16:48 +0800 Subject: [PATCH] add DistLock trait and a implement based etcd wip impl lock grpc service for meta-srv reuse the etcd client instead of repeatedly creating etcd client add some docs and comments add some comment meta client support distribute lock fix: dead lock self-cr --- Cargo.lock | 2 +- src/api/Cargo.toml | 2 +- src/meta-client/examples/lock.rs | 125 +++++++++++++++++ src/meta-client/src/client.rs | 49 ++++++- src/meta-client/src/client/lock.rs | 184 +++++++++++++++++++++++++ src/meta-client/src/rpc.rs | 1 + src/meta-client/src/rpc/lock.rs | 115 ++++++++++++++++ src/meta-srv/src/bootstrap.rs | 21 ++- src/meta-srv/src/election/etcd.rs | 10 +- src/meta-srv/src/error.rs | 25 ++++ src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/lock.rs | 42 ++++++ src/meta-srv/src/lock/etcd.rs | 76 ++++++++++ src/meta-srv/src/metasrv.rs | 7 + src/meta-srv/src/metasrv/builder.rs | 10 ++ src/meta-srv/src/service.rs | 1 + src/meta-srv/src/service/lock.rs | 53 +++++++ src/meta-srv/src/service/store/etcd.rs | 4 + 18 files changed, 716 insertions(+), 12 deletions(-) create mode 100644 src/meta-client/examples/lock.rs create mode 100644 src/meta-client/src/client/lock.rs create mode 100644 src/meta-client/src/rpc/lock.rs create mode 100644 src/meta-srv/src/lock.rs create mode 100644 src/meta-srv/src/lock/etcd.rs create mode 100644 src/meta-srv/src/service/lock.rs diff --git a/Cargo.lock b/Cargo.lock index a98a7b047a02..81e86efd41a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2972,7 +2972,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=966161508646f575801bcf05f47ed283ec231d68#966161508646f575801bcf05f47ed283ec231d68" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b846743da398407097d4549617485f59067281c2#b846743da398407097d4549617485f59067281c2" dependencies = [ "prost 0.11.6", "tonic", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 7892210163bb..9ec6266ef7e7 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "966161508646f575801bcf05f47ed283ec231d68" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b846743da398407097d4549617485f59067281c2" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/meta-client/examples/lock.rs b/src/meta-client/examples/lock.rs new file mode 100644 index 000000000000..a0aa59cae746 --- /dev/null +++ b/src/meta-client/examples/lock.rs @@ -0,0 +1,125 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use meta_client::client::{MetaClient, MetaClientBuilder}; +use meta_client::rpc::lock::{LockRequest, UnlockRequest}; +use tracing::{info, subscriber}; +use tracing_subscriber::FmtSubscriber; + +fn main() { + subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap(); + run(); +} + +#[tokio::main] +async fn run() { + let id = (1000u64, 2000u64); + let config = ChannelConfig::new() + .timeout(Duration::from_secs(30)) + .connect_timeout(Duration::from_secs(5)) + .tcp_nodelay(true); + let channel_manager = ChannelManager::with_config(config); + let mut meta_client = MetaClientBuilder::new(id.0, id.1) + .enable_lock() + .channel_manager(channel_manager) + .build(); + meta_client.start(&["127.0.0.1:3002"]).await.unwrap(); + + run_normal(meta_client.clone()).await; + + run_multi_thread(meta_client.clone()).await; + + run_multi_thread_with_one_timeout(meta_client).await; +} + +async fn run_normal(meta_client: MetaClient) { + let name = "lock_name".as_bytes().to_vec(); + let expire = 60; + + let lock_req = LockRequest { name, expire }; + + let lock_result = meta_client.lock(lock_req).await.unwrap(); + let key = lock_result.key; + info!( + "lock success! Returned key: {}", + String::from_utf8(key.clone()).unwrap() + ); + + // It is recommended that time of holding lock is less than the timeout of the grpc channel + info!("do some work, take 3 seconds"); + tokio::time::sleep(Duration::from_secs(3)).await; + + let unlock_req = UnlockRequest { key }; + + meta_client.unlock(unlock_req).await.unwrap(); + info!("unlock success!"); +} + +async fn run_multi_thread(meta_client: MetaClient) { + let meta_client_clone = meta_client.clone(); + let join1 = tokio::spawn(async move { + run_normal(meta_client_clone.clone()).await; + }); + + tokio::time::sleep(Duration::from_secs(1)).await; + + let join2 = tokio::spawn(async move { + run_normal(meta_client).await; + }); + + join1.await.unwrap(); + join2.await.unwrap(); +} + +async fn run_multi_thread_with_one_timeout(meta_client: MetaClient) { + let meta_client_clone = meta_client.clone(); + let join1 = tokio::spawn(async move { + run_with_timeout(meta_client_clone.clone()).await; + }); + + tokio::time::sleep(Duration::from_secs(1)).await; + + let join2 = tokio::spawn(async move { + run_normal(meta_client).await; + }); + + join1.await.unwrap(); + join2.await.unwrap(); +} + +async fn run_with_timeout(meta_client: MetaClient) { + let name = "lock_name".as_bytes().to_vec(); + let expire = 5; + + let lock_req = LockRequest { name, expire }; + + let lock_result = meta_client.lock(lock_req).await.unwrap(); + let key = lock_result.key; + info!( + "lock success! Returned key: {}", + String::from_utf8(key.clone()).unwrap() + ); + + // It is recommended that time of holding lock is less than the timeout of the grpc channel + info!("do some work, take 20 seconds"); + tokio::time::sleep(Duration::from_secs(20)).await; + + let unlock_req = UnlockRequest { key }; + + meta_client.unlock(unlock_req).await.unwrap(); + info!("unlock success!"); +} diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 8ae5682e9e2f..1444e2beffb0 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -14,12 +14,14 @@ mod heartbeat; mod load_balance; +mod lock; mod router; mod store; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_telemetry::info; use heartbeat::Client as HeartbeatClient; +use lock::Client as LockClient; use router::Client as RouterClient; use snafu::OptionExt; use store::Client as StoreClient; @@ -27,6 +29,7 @@ use store::Client as StoreClient; pub use self::heartbeat::{HeartbeatSender, HeartbeatStream}; use crate::error; use crate::error::Result; +use crate::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; use crate::rpc::router::DeleteRequest; use crate::rpc::{ BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, CreateRequest, @@ -42,6 +45,7 @@ pub struct MetaClientBuilder { enable_heartbeat: bool, enable_router: bool, enable_store: bool, + enable_lock: bool, channel_manager: Option, } @@ -74,6 +78,13 @@ impl MetaClientBuilder { } } + pub fn enable_lock(self) -> Self { + Self { + enable_lock: true, + ..self + } + } + pub fn channel_manager(self, channel_manager: ChannelManager) -> Self { Self { channel_manager: Some(channel_manager), @@ -88,9 +99,12 @@ impl MetaClientBuilder { MetaClient::new(self.id) }; - if let (false, false, false) = - (self.enable_heartbeat, self.enable_router, self.enable_store) - { + if let (false, false, false, false) = ( + self.enable_heartbeat, + self.enable_router, + self.enable_store, + self.enable_lock, + ) { panic!("At least one client needs to be enabled.") } @@ -103,7 +117,10 @@ impl MetaClientBuilder { client.router = Some(RouterClient::new(self.id, mgr.clone())); } if self.enable_store { - client.store = Some(StoreClient::new(self.id, mgr)); + client.store = Some(StoreClient::new(self.id, mgr.clone())); + } + if self.enable_lock { + client.lock = Some(LockClient::new(self.id, mgr)); } client @@ -117,6 +134,7 @@ pub struct MetaClient { heartbeat: Option, router: Option, store: Option, + lock: Option, } impl MetaClient { @@ -151,10 +169,15 @@ impl MetaClient { info!("Router client started"); } if let Some(client) = &mut self.store { - client.start(urls).await?; + client.start(urls.clone()).await?; info!("Store client started"); } + if let Some(client) = &mut self.lock { + client.start(urls).await?; + info!("Lock client started"); + } + Ok(()) } @@ -260,6 +283,15 @@ impl MetaClient { .try_into() } + pub async fn lock(&self, req: LockRequest) -> Result { + self.lock_client()?.lock(req.into()).await.map(Into::into) + } + + pub async fn unlock(&self, req: UnlockRequest) -> Result<()> { + self.lock_client()?.unlock(req.into()).await?; + Ok(()) + } + #[inline] pub fn heartbeat_client(&self) -> Result { self.heartbeat.clone().context(error::NotStartedSnafu { @@ -281,6 +313,13 @@ impl MetaClient { }) } + #[inline] + pub fn lock_client(&self) -> Result { + self.lock.clone().context(error::NotStartedSnafu { + name: "lock_client", + }) + } + #[inline] pub fn channel_config(&self) -> &ChannelConfig { self.channel_manager.config() diff --git a/src/meta-client/src/client/lock.rs b/src/meta-client/src/client/lock.rs new file mode 100644 index 000000000000..eddef25f3e46 --- /dev/null +++ b/src/meta-client/src/client/lock.rs @@ -0,0 +1,184 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use api::v1::meta::lock_client::LockClient; +use api::v1::meta::{LockRequest, LockResponse, UnlockRequest, UnlockResponse}; +use common_grpc::channel_manager::ChannelManager; +use snafu::{ensure, OptionExt, ResultExt}; +use tokio::sync::RwLock; +use tonic::transport::Channel; + +use crate::client::{load_balance, Id}; +use crate::error; +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct Client { + inner: Arc>, +} + +impl Client { + pub fn new(id: Id, channel_manager: ChannelManager) -> Self { + let inner = Arc::new(RwLock::new(Inner { + id, + channel_manager, + peers: vec![], + })); + + Self { inner } + } + + pub async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + let mut inner = self.inner.write().await; + inner.start(urls).await + } + + pub async fn is_started(&self) -> bool { + let inner = self.inner.read().await; + inner.is_started() + } + + pub async fn lock(&self, req: LockRequest) -> Result { + let inner = self.inner.read().await; + inner.lock(req).await + } + + pub async fn unlock(&self, req: UnlockRequest) -> Result { + let inner = self.inner.read().await; + inner.unlock(req).await + } +} + +#[derive(Debug)] +struct Inner { + id: Id, + channel_manager: ChannelManager, + peers: Vec, +} + +impl Inner { + async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + ensure!( + !self.is_started(), + error::IllegalGrpcClientStateSnafu { + err_msg: "Lock client already started", + } + ); + + self.peers = urls + .as_ref() + .iter() + .map(|url| url.as_ref().to_string()) + .collect::>() + .drain() + .collect::>(); + + Ok(()) + } + + fn random_client(&self) -> Result> { + let len = self.peers.len(); + let peer = load_balance::random_get(len, |i| Some(&self.peers[i])).context( + error::IllegalGrpcClientStateSnafu { + err_msg: "Empty peers, lock client may not start yet", + }, + )?; + + self.make_client(peer) + } + + fn make_client(&self, addr: impl AsRef) -> Result> { + let channel = self + .channel_manager + .get(addr) + .context(error::CreateChannelSnafu)?; + + Ok(LockClient::new(channel)) + } + + #[inline] + fn is_started(&self) -> bool { + !self.peers.is_empty() + } + + async fn lock(&self, mut req: LockRequest) -> Result { + let mut client = self.random_client()?; + req.set_header(self.id); + let res = client.lock(req).await.context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + + async fn unlock(&self, mut req: UnlockRequest) -> Result { + let mut client = self.random_client()?; + req.set_header(self.id); + let res = client.unlock(req).await.context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_start_client() { + let mut client = Client::new((0, 0), ChannelManager::default()); + assert!(!client.is_started().await); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) + .await + .unwrap(); + assert!(client.is_started().await); + } + + #[tokio::test] + async fn test_already_start() { + let mut client = Client::new((0, 0), ChannelManager::default()); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) + .await + .unwrap(); + assert!(client.is_started().await); + let res = client.start(&["127.0.0.1:1002"]).await; + assert!(res.is_err()); + assert!(matches!( + res.err(), + Some(error::Error::IllegalGrpcClientState { .. }) + )); + } + + #[tokio::test] + async fn test_start_with_duplicate_peers() { + let mut client = Client::new((0, 0), ChannelManager::default()); + client + .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) + .await + .unwrap(); + + assert_eq!(1, client.inner.write().await.peers.len()); + } +} diff --git a/src/meta-client/src/rpc.rs b/src/meta-client/src/rpc.rs index 66844a00bbdb..2264f0033b65 100644 --- a/src/meta-client/src/rpc.rs +++ b/src/meta-client/src/rpc.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod lock; pub mod router; mod store; pub mod util; diff --git a/src/meta-client/src/rpc/lock.rs b/src/meta-client/src/rpc/lock.rs new file mode 100644 index 000000000000..76738bca675e --- /dev/null +++ b/src/meta-client/src/rpc/lock.rs @@ -0,0 +1,115 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{ + LockRequest as PbLockRequest, LockResponse as PbLockResponse, UnlockRequest as PbUnlockRequest, +}; + +#[derive(Debug)] +pub struct LockRequest { + pub name: Vec, + pub expire: i64, +} + +impl From for PbLockRequest { + fn from(req: LockRequest) -> Self { + Self { + header: None, + name: req.name, + expire: req.expire, + } + } +} + +#[derive(Debug)] +pub struct LockResponse { + pub key: Vec, +} + +impl From for LockResponse { + fn from(resp: PbLockResponse) -> Self { + Self { key: resp.key } + } +} + +#[derive(Debug)] +pub struct UnlockRequest { + pub key: Vec, +} + +impl From for PbUnlockRequest { + fn from(req: UnlockRequest) -> Self { + Self { + header: None, + key: req.key.to_vec(), + } + } +} + +#[cfg(test)] +mod tests { + use api::v1::meta::{ + LockRequest as PbLockRequest, LockResponse as PbLockResponse, + UnlockRequest as PbUnlockRequest, + }; + + use super::LockRequest; + use crate::rpc::lock::{LockResponse, UnlockRequest}; + + #[test] + fn test_convert_lock_req() { + let lock_req = LockRequest { + name: "lock_1".as_bytes().to_vec(), + expire: 1, + }; + let pb_lock_req: PbLockRequest = lock_req.into(); + + let expected = PbLockRequest { + header: None, + name: "lock_1".as_bytes().to_vec(), + expire: 1, + }; + + assert_eq!(expected, pb_lock_req); + } + + #[test] + fn test_convert_unlock_req() { + let unlock_req = UnlockRequest { + key: "lock_1_12378123".as_bytes().to_vec(), + }; + let pb_unlock_req: PbUnlockRequest = unlock_req.into(); + + let expected = PbUnlockRequest { + header: None, + key: "lock_1_12378123".as_bytes().to_vec(), + }; + + assert_eq!(expected, pb_unlock_req); + } + + #[test] + fn test_convert_lock_response() { + let pb_lock_resp = PbLockResponse { + header: None, + key: "lock_1_12378123".as_bytes().to_vec(), + }; + + let lock_resp: LockResponse = pb_lock_resp.into(); + + let expected_key = "lock_1_12378123".as_bytes().to_vec(); + + assert_eq!(expected_key, lock_resp.key); + } +} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 5ca0ac3809fb..243512e16874 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -16,8 +16,10 @@ use std::sync::Arc; use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::heartbeat_server::HeartbeatServer; +use api::v1::meta::lock_server::LockServer; use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; +use etcd_client::Client; use snafu::ResultExt; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; @@ -25,6 +27,7 @@ use tonic::transport::server::Router; use crate::cluster::MetaPeerClient; use crate::election::etcd::EtcdElection; +use crate::lock::etcd::EtcdLock; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; @@ -65,16 +68,25 @@ pub fn router(meta_srv: MetaSrv) -> Router { .add_service(RouterServer::new(meta_srv.clone())) .add_service(StoreServer::new(meta_srv.clone())) .add_service(ClusterServer::new(meta_srv.clone())) + .add_service(LockServer::new(meta_srv.clone())) .add_service(admin::make_admin_service(meta_srv)) } pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { - let (kv_store, election) = if opts.use_memory_store { - (Arc::new(MemStore::new()) as _, None) + let (kv_store, election, lock) = if opts.use_memory_store { + (Arc::new(MemStore::new()) as _, None, None) } else { + let etcd_endpoints = [&opts.store_addr]; + let etcd_client = Client::connect(etcd_endpoints, None) + .await + .context(error::ConnectEtcdSnafu)?; ( - EtcdStore::with_endpoints([&opts.store_addr]).await?, - Some(EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?), + EtcdStore::with_etcd_client(etcd_client.clone())?, + Some(EtcdElection::with_etcd_client( + &opts.server_addr, + etcd_client.clone(), + )?), + Some(EtcdLock::with_etcd_client(etcd_client)?), ) }; @@ -95,6 +107,7 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { .selector(selector) .election(election) .meta_peer_client(meta_peer_client) + .lock(lock) .build() .await; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index fc4ceef57857..ee19eede976b 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -38,11 +38,19 @@ impl EtcdElection { E: AsRef, S: AsRef<[E]>, { - let leader_value = leader_value.as_ref().into(); let client = Client::connect(endpoints, None) .await .context(error::ConnectEtcdSnafu)?; + Self::with_etcd_client(leader_value, client) + } + + pub fn with_etcd_client(leader_value: E, client: Client) -> Result + where + E: AsRef, + { + let leader_value = leader_value.as_ref().into(); + Ok(Arc::new(Self { leader_value, client, diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 2e6b49e6ffdd..b08fd2f2dcdb 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -218,6 +218,27 @@ pub enum Error { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Failed to lock based on etcd, source: {}", source))] + Lock { + source: etcd_client::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to unlock based on etcd, source: {}", source))] + Unlock { + source: etcd_client::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to grant lease, source: {}", source))] + LeaseGrant { + source: etcd_client::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Distributed lock is not configured"))] + LockNotConfig { backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -254,6 +275,10 @@ impl ErrorExt for Error { | Error::IsNotLeader { .. } | Error::NoMetaPeerClient { .. } | Error::InvalidHttpBody { .. } + | Error::Lock { .. } + | Error::Unlock { .. } + | Error::LeaseGrant { .. } + | Error::LockNotConfig { .. } | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::EmptyTableName { .. } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index c0b00e446b98..8e95448ff50e 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -20,6 +20,7 @@ pub mod error; pub mod handler; pub mod keys; pub mod lease; +pub mod lock; pub mod metasrv; #[cfg(feature = "mock")] pub mod mocks; diff --git a/src/meta-srv/src/lock.rs b/src/meta-srv/src/lock.rs new file mode 100644 index 000000000000..fe9cfbab859e --- /dev/null +++ b/src/meta-srv/src/lock.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod etcd; + +use std::sync::Arc; + +use crate::error::Result; + +pub type Key = Vec; + +// default expire time: 10 seconds +pub const DEFAULT_EXPIRE_TIME: u64 = 10; + +pub struct Opts { + // If the expiration time is exceeded and currently holds the lock, the lock is + // aytomatically released. The unit is second. + pub expire: Option, +} + +#[async_trait::async_trait] +pub trait DistLock: Send + Sync { + // Lock acquires a distributed shared lock on a given named lock. On success, it + // will return a unique key that exists so long as the lock is held by the caller. + async fn lock(&self, name: Vec, opts: Opts) -> Result; + + // Unlock takes a key returned by Lock and releases the hold on lock. + async fn unlock(&self, key: Vec) -> Result<()>; +} + +pub type DistLockRef = Arc; diff --git a/src/meta-srv/src/lock/etcd.rs b/src/meta-srv/src/lock/etcd.rs new file mode 100644 index 000000000000..39992162ed34 --- /dev/null +++ b/src/meta-srv/src/lock/etcd.rs @@ -0,0 +1,76 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use etcd_client::{Client, LockOptions}; +use snafu::ResultExt; + +use super::{DistLock, DistLockRef, Opts, DEFAULT_EXPIRE_TIME}; +use crate::error; +use crate::error::Result; + +/// A implementation of distributed lock based on etcd. The Clone of EtcdLock is cheap. +#[derive(Clone)] +pub struct EtcdLock { + client: Client, +} + +impl EtcdLock { + pub async fn with_endpoints(endpoints: S) -> Result + where + E: AsRef, + S: AsRef<[E]>, + { + let client = Client::connect(endpoints, None) + .await + .context(error::ConnectEtcdSnafu)?; + + Self::with_etcd_client(client) + } + + pub fn with_etcd_client(client: Client) -> Result { + Ok(Arc::new(EtcdLock { client })) + } +} + +#[async_trait::async_trait] +impl DistLock for EtcdLock { + async fn lock(&self, name: Vec, opts: Opts) -> Result> { + let expire = opts.expire.unwrap_or(DEFAULT_EXPIRE_TIME) as i64; + + let mut client = self.client.clone(); + + let resp = client + .lease_grant(expire, None) + .await + .context(error::LeaseGrantSnafu)?; + + let lease_id = resp.id(); + let lock_opts = LockOptions::new().with_lease(lease_id); + + let resp = client + .lock(name, Some(lock_opts)) + .await + .context(error::LockSnafu)?; + + Ok(resp.key().to_vec()) + } + + async fn unlock(&self, key: Vec) -> Result<()> { + let mut client = self.client.clone(); + let _ = client.unlock(key).await.context(error::UnlockSnafu)?; + Ok(()) + } +} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 62633cc950b8..985c2db6a2fa 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize}; use crate::cluster::MetaPeerClient; use crate::election::Election; use crate::handler::HeartbeatHandlerGroup; +use crate::lock::DistLockRef; use crate::selector::{Selector, SelectorType}; use crate::sequence::SequenceRef; use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef}; @@ -99,6 +100,7 @@ pub struct MetaSrv { handler_group: HeartbeatHandlerGroup, election: Option, meta_peer_client: Option, + lock: Option, } impl MetaSrv { @@ -174,6 +176,11 @@ impl MetaSrv { self.meta_peer_client.clone() } + #[inline] + pub fn lock(&self) -> Option { + self.lock.clone() + } + #[inline] pub fn new_ctx(&self) -> Context { let datanode_lease_secs = self.options().datanode_lease_secs; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 730dc3d54a76..aec93289a8cc 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -20,6 +20,7 @@ use crate::handler::{ CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler, }; +use crate::lock::DistLockRef; use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ}; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; @@ -35,6 +36,7 @@ pub struct MetaSrvBuilder { handler_group: Option, election: Option, meta_peer_client: Option, + lock: Option, } impl MetaSrvBuilder { @@ -47,6 +49,7 @@ impl MetaSrvBuilder { meta_peer_client: None, election: None, options: None, + lock: None, } } @@ -85,6 +88,11 @@ impl MetaSrvBuilder { self } + pub fn lock(mut self, lock: Option) -> Self { + self.lock = lock; + self + } + pub async fn build(self) -> MetaSrv { let started = Arc::new(AtomicBool::new(false)); @@ -96,6 +104,7 @@ impl MetaSrvBuilder { in_memory, selector, handler_group, + lock, } = self; let options = options.unwrap_or_default(); @@ -136,6 +145,7 @@ impl MetaSrvBuilder { handler_group, election, meta_peer_client, + lock, } } } diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index d8a5eaf4c98e..cb386f82ff8c 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -20,6 +20,7 @@ use tonic::{Response, Status}; pub mod admin; pub mod cluster; mod heartbeat; +pub mod lock; pub mod router; pub mod store; diff --git a/src/meta-srv/src/service/lock.rs b/src/meta-srv/src/service/lock.rs new file mode 100644 index 000000000000..56509dec87b6 --- /dev/null +++ b/src/meta-srv/src/service/lock.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{lock_server, LockRequest, LockResponse, UnlockRequest, UnlockResponse}; +use snafu::OptionExt; +use tonic::{Request, Response}; + +use super::GrpcResult; +use crate::error; +use crate::lock::Opts; +use crate::metasrv::MetaSrv; + +#[async_trait::async_trait] +impl lock_server::Lock for MetaSrv { + async fn lock(&self, request: Request) -> GrpcResult { + let LockRequest { name, expire, .. } = request.into_inner(); + let expire = Some(expire as u64); + + let lock = self.lock().context(error::LockNotConfigSnafu)?; + let key = lock.lock(name, Opts { expire }).await?; + + let resp = LockResponse { + key, + ..Default::default() + }; + + Ok(Response::new(resp)) + } + + async fn unlock(&self, request: Request) -> GrpcResult { + let UnlockRequest { key, .. } = request.into_inner(); + + let lock = self.lock().context(error::LockNotConfigSnafu)?; + let _ = lock.unlock(key).await?; + + let resp = UnlockResponse { + ..Default::default() + }; + + Ok(Response::new(resp)) + } +} diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 38dd9220f4f3..91cd1f7c16ca 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -43,6 +43,10 @@ impl EtcdStore { .await .context(error::ConnectEtcdSnafu)?; + Self::with_etcd_client(client) + } + + pub fn with_etcd_client(client: Client) -> Result { Ok(Arc::new(Self { client })) } }