Skip to content

Commit

Permalink
Merge pull request #6278 from ariesdevil/refactor-grpc-client
Browse files Browse the repository at this point in the history
[metasrv] refactor grpc client
  • Loading branch information
BohuTANG authored Jun 28, 2022
2 parents a591c6c + 2982c9d commit 06d8886
Showing 1 changed file with 54 additions and 80 deletions.
134 changes: 54 additions & 80 deletions common/meta/grpc/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,27 +140,6 @@ impl ItemManager for MetaChannelManager {
}
}

pub struct MetaGrpcClient {
conn_pool: Pool<MetaChannelManager>,
endpoints: RwLock<Vec<String>>,
username: String,
password: String,
token: RwLock<Option<Vec<u8>>>,
current_endpoint: Arc<Mutex<Option<String>>>,
unhealthy_endpoints: Mutex<TtlHashMap<String, ()>>,
auto_sync_interval: Option<Duration>,

/// Dedicated runtime to support meta client background tasks.
///
/// In order not to let a blocking operation(such as calling the new PipelinePullingExecutor) in a tokio runtime block meta-client background tasks.
/// If a background task is blocked, no meta-client will be able to proceed if meta-client is reused.
///
/// Note that a thread_pool tokio runtime does not help: a scheduled tokio-task resides in `filo_slot` won't be stolen by other tokio-workers.
/// TODO: dead code
#[allow(dead_code)]
rt: Arc<Runtime>,
}

/// A handle to access meta-client worker.
/// The worker will be actually running in a dedicated runtime: `MetaGrpcClient.rt`.
pub struct ClientHandle {
Expand Down Expand Up @@ -244,6 +223,26 @@ impl ClientHandle {
}
}

pub struct MetaGrpcClient {
conn_pool: Pool<MetaChannelManager>,
endpoints: RwLock<Vec<String>>,
username: String,
password: String,
current_endpoint: Arc<Mutex<Option<String>>>,
unhealthy_endpoints: Mutex<TtlHashMap<String, ()>>,
auto_sync_interval: Option<Duration>,

/// Dedicated runtime to support meta client background tasks.
///
/// In order not to let a blocking operation(such as calling the new PipelinePullingExecutor) in a tokio runtime block meta-client background tasks.
/// If a background task is blocked, no meta-client will be able to proceed if meta-client is reused.
///
/// Note that a thread_pool tokio runtime does not help: a scheduled tokio-task resides in `filo_slot` won't be stolen by other tokio-workers.
/// TODO: dead code
#[allow(dead_code)]
rt: Arc<Runtime>,
}

impl MetaGrpcClient {
/// Create a new client of metasrv.
///
Expand Down Expand Up @@ -301,7 +300,6 @@ impl MetaGrpcClient {
auto_sync_interval,
username: username.to_string(),
password: password.to_string(),
token: RwLock::new(None),
rt: rt.clone(),
});

Expand Down Expand Up @@ -393,8 +391,8 @@ impl MetaGrpcClient {
start.elapsed().as_millis() as f64,
);

if let Err(err) = res {
match err {
if let Err(result) = res {
match result {
Err(err) => {
label_counter_with_val_and_labels(
META_GRPC_CLIENT_REQUEST_FAILED,
Expand Down Expand Up @@ -446,11 +444,6 @@ impl MetaGrpcClient {
MetaServiceClient<InterceptedService<Channel, AuthInterceptor>>,
MetaError,
> {
{
let mut current_endpoint = self.current_endpoint.lock();
*current_endpoint = None;
}

let mut eps = self.get_endpoints().await;
debug_assert!(!eps.is_empty());

Expand All @@ -466,46 +459,35 @@ impl MetaGrpcClient {
Ok(c) => {
let mut client = MetaServiceClient::new(c.clone());

let mut t = self.token.write().await;
match t.clone() {
Some(t) => {
let new_token = Self::handshake(
&mut client,
&METACLI_COMMIT_SEMVER,
&MIN_METASRV_SEMVER,
&self.username,
&self.password,
)
.await;
match new_token {
Ok(token) => {
return Ok(MetaServiceClient::with_interceptor(c, AuthInterceptor {
token: t,
}))
token,
}));
}
None => {
let new_token = Self::handshake(
&mut client,
&METACLI_COMMIT_SEMVER,
&MIN_METASRV_SEMVER,
&self.username,
&self.password,
)
.await;
match new_token {
Ok(token) => {
*t = Some(token.clone());
return Ok(MetaServiceClient::with_interceptor(
c,
AuthInterceptor { token },
));
}
Err(e) => {
tracing::warn!("handshake error when make client: {:?}", e);
{
let mut ue = self.unhealthy_endpoints.lock();
ue.insert(addr.to_string(), ());
}
if is_last {
// reach to last addr
return Err(e);
}
continue;
}
Err(e) => {
tracing::warn!("handshake error when make client: {:?}", e);
{
let mut ue = self.unhealthy_endpoints.lock();
ue.insert(addr.to_string(), ());
}
if is_last {
// reach to last addr
return Err(e);
}
continue;
}
};
}

Err(e) => {
{
let mut ue = self.unhealthy_endpoints.lock();
Expand All @@ -529,14 +511,12 @@ impl MetaGrpcClient {
eps.first().unwrap().clone()
};
let ch = self.conn_pool.get(&addr).await;
{
let mut current_endpoint = self.current_endpoint.lock();
*current_endpoint = Some(addr.clone());
}
match ch {
Ok(c) => {
{
let mut current_endpoint = self.current_endpoint.lock();
*current_endpoint = Some(addr.clone());
}
Ok(c)
}
Ok(c) => Ok(c),
Err(e) => {
tracing::warn!(
"grpc_client create channel with {} faild, err: {:?}",
Expand Down Expand Up @@ -894,22 +874,16 @@ impl MetaGrpcClient {
Ok(reply)
}
async fn mark_as_unhealthy(&self) {
{
let ca = self.current_endpoint.lock();
let mut ue = self.unhealthy_endpoints.lock();
ue.insert((*ca).as_ref().unwrap().clone(), ());
}
{
let mut token = self.token.write().await;
*token = None;
}
let ca = self.current_endpoint.lock();
let mut ue = self.unhealthy_endpoints.lock();
ue.insert((*ca).as_ref().unwrap().clone(), ());
}
}

fn status_is_retryable(status: &Status) -> bool {
matches!(
status.code(),
Code::Unauthenticated | Code::Internal | Code::Unavailable
Code::Unauthenticated | Code::Unavailable | Code::Internal
)
}

Expand Down

0 comments on commit 06d8886

Please sign in to comment.