Skip to content

Commit

Permalink
support lock service
Browse files Browse the repository at this point in the history
  • Loading branch information
themanforfree committed Jan 31, 2023
1 parent 1a15496 commit e97e892
Show file tree
Hide file tree
Showing 11 changed files with 443 additions and 183 deletions.
39 changes: 34 additions & 5 deletions xline/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt::Debug};

use curp::{client::Client as CurpClient, cmd::ProposeId};
use etcd_client::{
AuthClient, Client as EtcdClient, LeaseKeepAliveStream, LeaseKeeper, WatchClient,
AuthClient, Client as EtcdClient, KvClient, LeaseClient, LeaseKeepAliveStream, LeaseKeeper,
LockClient, WatchClient,
};
use utils::config::ClientTimeout;
use uuid::Uuid;
Expand Down Expand Up @@ -30,7 +31,6 @@ pub mod errors;
pub mod kv_types;

/// Xline client
#[allow(missing_debug_implementations)] // EtcdClient doesn't implement Debug
pub struct Client {
/// Name of the client
name: String,
Expand All @@ -42,6 +42,17 @@ pub struct Client {
use_curp_client: bool,
}

impl Debug for Client {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("name", &self.name)
.field("use_curp_client", &self.use_curp_client)
.field("curp_client", &self.curp_client)
.finish()
}
}

impl Client {
/// New `Client`
///
Expand Down Expand Up @@ -238,15 +249,33 @@ impl Client {
Ok(response.into())
}

/// Gets an kv client.
#[inline]
pub fn kv_client(&self) -> KvClient {
self.etcd_client.kv_client()
}

/// Gets an auth client.
#[inline]
pub fn auth_client(&mut self) -> AuthClient {
pub fn auth_client(&self) -> AuthClient {
self.etcd_client.auth_client()
}

/// Gets an watch client.
#[inline]
pub fn watch_client(&mut self) -> WatchClient {
pub fn watch_client(&self) -> WatchClient {
self.etcd_client.watch_client()
}

/// Gets an lock client.
#[inline]
pub fn lock_client(&self) -> LockClient {
self.etcd_client.lock_client()
}

/// Gets an lease client.
#[inline]
pub fn lease_client(&self) -> LeaseClient {
self.etcd_client.lease_client()
}
}
2 changes: 1 addition & 1 deletion xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl Kv for KvServer {
) -> Result<tonic::Response<TxnResponse>, tonic::Status> {
debug!("Receive TxnRequest {:?}", request);
Self::check_txn_request(request.get_ref())?;
let is_fast_path = true;
let is_fast_path = false; // lock need revision of txn
let (cmd_res, sync_res) = self.propose(request, is_fast_path).await?;

let mut res = Self::parse_response_op(cmd_res.decode().into());
Expand Down
6 changes: 3 additions & 3 deletions xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const DEFAULT_LEASE_REQUEST_TIME: Duration = Duration::from_millis(500);

/// Lease Server
#[derive(Debug)]
#[allow(dead_code)] // Remove this after feature is completed
pub(crate) struct LeaseServer {
/// Lease storage
storage: Arc<LeaseStore>,
Expand Down Expand Up @@ -72,13 +71,14 @@ impl LeaseServer {
async fn revoke_expired_leases_task(lease_server: Arc<LeaseServer>) {
loop {
// only leader will check expired lease
if lease_server.state.is_leader() {
if lease_server.is_leader() {
for id in lease_server.storage.find_expired_leases() {
let _handle = tokio::spawn({
let s = Arc::clone(&lease_server);
let token_option = lease_server.auth_storage.root_token();
async move {
let mut request = tonic::Request::new(LeaseRevokeRequest { id });
if let Ok(token) = s.auth_storage.root_token() {
if let Ok(token) = token_option {
let _ignore = request.metadata_mut().insert(
"token",
token.parse().unwrap_or_else(|e| {
Expand Down
Loading

0 comments on commit e97e892

Please sign in to comment.