Skip to content

Commit

Permalink
feat: meta provides the ability to distribute lock (#961)
Browse files Browse the repository at this point in the history
* add DistLock trait and a implement based etcd

wip

impl lock grpc service for meta-srv

reuse the etcd client instead of repeatedly creating etcd client

add some docs and comments

add some comment

meta client support distribute lock

fix: dead lock

self-cr

* cr

* rename "expire" -> "expire_secs"
  • Loading branch information
fengys1996 committed Feb 13, 2023
1 parent be897ef commit c1a9f84
Show file tree
Hide file tree
Showing 18 changed files with 712 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "966161508646f575801bcf05f47ed283ec231d68" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3e6349be127b65a8b42a38cda9d527ec423ca77d" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true
Expand Down
125 changes: 125 additions & 0 deletions src/meta-client/examples/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::rpc::lock::{LockRequest, UnlockRequest};
use tracing::{info, subscriber};
use tracing_subscriber::FmtSubscriber;

fn main() {
subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap();
run();
}

#[tokio::main]
async fn run() {
let id = (1000u64, 2000u64);
let config = ChannelConfig::new()
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
.enable_lock()
.channel_manager(channel_manager)
.build();
meta_client.start(&["127.0.0.1:3002"]).await.unwrap();

run_normal(meta_client.clone()).await;

run_multi_thread(meta_client.clone()).await;

run_multi_thread_with_one_timeout(meta_client).await;
}

async fn run_normal(meta_client: MetaClient) {
let name = "lock_name".as_bytes().to_vec();
let expire_secs = 60;

let lock_req = LockRequest { name, expire_secs };

let lock_result = meta_client.lock(lock_req).await.unwrap();
let key = lock_result.key;
info!(
"lock success! Returned key: {}",
String::from_utf8(key.clone()).unwrap()
);

// It is recommended that time of holding lock is less than the timeout of the grpc channel
info!("do some work, take 3 seconds");
tokio::time::sleep(Duration::from_secs(3)).await;

let unlock_req = UnlockRequest { key };

meta_client.unlock(unlock_req).await.unwrap();
info!("unlock success!");
}

async fn run_multi_thread(meta_client: MetaClient) {
let meta_client_clone = meta_client.clone();
let join1 = tokio::spawn(async move {
run_normal(meta_client_clone.clone()).await;
});

tokio::time::sleep(Duration::from_secs(1)).await;

let join2 = tokio::spawn(async move {
run_normal(meta_client).await;
});

join1.await.unwrap();
join2.await.unwrap();
}

async fn run_multi_thread_with_one_timeout(meta_client: MetaClient) {
let meta_client_clone = meta_client.clone();
let join1 = tokio::spawn(async move {
run_with_timeout(meta_client_clone.clone()).await;
});

tokio::time::sleep(Duration::from_secs(1)).await;

let join2 = tokio::spawn(async move {
run_normal(meta_client).await;
});

join1.await.unwrap();
join2.await.unwrap();
}

async fn run_with_timeout(meta_client: MetaClient) {
let name = "lock_name".as_bytes().to_vec();
let expire_secs = 5;

let lock_req = LockRequest { name, expire_secs };

let lock_result = meta_client.lock(lock_req).await.unwrap();
let key = lock_result.key;
info!(
"lock success! Returned key: {}",
String::from_utf8(key.clone()).unwrap()
);

// It is recommended that time of holding lock is less than the timeout of the grpc channel
info!("do some work, take 20 seconds");
tokio::time::sleep(Duration::from_secs(20)).await;

let unlock_req = UnlockRequest { key };

meta_client.unlock(unlock_req).await.unwrap();
info!("unlock success!");
}
44 changes: 39 additions & 5 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@

mod heartbeat;
mod load_balance;
mod lock;
mod router;
mod store;

use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::info;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
use router::Client as RouterClient;
use snafu::OptionExt;
use store::Client as StoreClient;

pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::error;
use crate::error::Result;
use crate::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use crate::rpc::router::DeleteRequest;
use crate::rpc::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, CreateRequest,
Expand All @@ -42,6 +45,7 @@ pub struct MetaClientBuilder {
enable_heartbeat: bool,
enable_router: bool,
enable_store: bool,
enable_lock: bool,
channel_manager: Option<ChannelManager>,
}

Expand Down Expand Up @@ -74,6 +78,13 @@ impl MetaClientBuilder {
}
}

pub fn enable_lock(self) -> Self {
Self {
enable_lock: true,
..self
}
}

pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
channel_manager: Some(channel_manager),
Expand All @@ -88,9 +99,7 @@ impl MetaClientBuilder {
MetaClient::new(self.id)
};

if let (false, false, false) =
(self.enable_heartbeat, self.enable_router, self.enable_store)
{
if !(self.enable_heartbeat || self.enable_router || self.enable_store || self.enable_lock) {
panic!("At least one client needs to be enabled.")
}

Expand All @@ -103,7 +112,10 @@ impl MetaClientBuilder {
client.router = Some(RouterClient::new(self.id, mgr.clone()));
}
if self.enable_store {
client.store = Some(StoreClient::new(self.id, mgr));
client.store = Some(StoreClient::new(self.id, mgr.clone()));
}
if self.enable_lock {
client.lock = Some(LockClient::new(self.id, mgr));
}

client
Expand All @@ -117,6 +129,7 @@ pub struct MetaClient {
heartbeat: Option<HeartbeatClient>,
router: Option<RouterClient>,
store: Option<StoreClient>,
lock: Option<LockClient>,
}

impl MetaClient {
Expand Down Expand Up @@ -151,10 +164,15 @@ impl MetaClient {
info!("Router client started");
}
if let Some(client) = &mut self.store {
client.start(urls).await?;
client.start(urls.clone()).await?;
info!("Store client started");
}

if let Some(client) = &mut self.lock {
client.start(urls).await?;
info!("Lock client started");
}

Ok(())
}

Expand Down Expand Up @@ -260,6 +278,15 @@ impl MetaClient {
.try_into()
}

pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
self.lock_client()?.lock(req.into()).await.map(Into::into)
}

pub async fn unlock(&self, req: UnlockRequest) -> Result<()> {
self.lock_client()?.unlock(req.into()).await?;
Ok(())
}

#[inline]
pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
self.heartbeat.clone().context(error::NotStartedSnafu {
Expand All @@ -281,6 +308,13 @@ impl MetaClient {
})
}

#[inline]
pub fn lock_client(&self) -> Result<LockClient> {
self.lock.clone().context(error::NotStartedSnafu {
name: "lock_client",
})
}

#[inline]
pub fn channel_config(&self) -> &ChannelConfig {
self.channel_manager.config()
Expand Down
Loading

0 comments on commit c1a9f84

Please sign in to comment.