diff --git a/Cargo.toml b/Cargo.toml index 64fcaf3..7804d6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "etcd-rs" -version = "0.5.0" +version = "0.6.0" authors = ["ccc "] edition = "2018" keywords = ["etcd", "future", "async"] @@ -11,9 +11,9 @@ documentation = "https://docs.rs/etcd-rs" license = "MIT" [dependencies] -tonic = { version = "0.4", features = ["tls"] } +tonic = { version = "0.5", features = ["tls"] } bytes = "1.0" -prost = "0.7" +prost = "0.8" tokio = "1.0" tokio-stream = "^0.1" async-stream = "0.3" @@ -26,4 +26,4 @@ http = "0.2" tokio = { version = "1.0", features = ["full"] } [build-dependencies] -tonic-build = "0.4" +tonic-build = "0.5" diff --git a/src/auth/mod.rs b/src/auth/mod.rs index f119a9c..d19e197 100644 --- a/src/auth/mod.rs +++ b/src/auth/mod.rs @@ -4,6 +4,7 @@ pub use authenticate::{AuthenticateRequest, AuthenticateResponse}; use tonic::transport::Channel; +use crate::client::Interceptor; use crate::proto::etcdserverpb::auth_client::AuthClient; use crate::Result; @@ -11,11 +12,15 @@ use crate::Result; #[derive(Clone)] pub struct Auth { client: AuthClient, + interceptor: Interceptor, } impl Auth { - pub(crate) fn new(client: AuthClient) -> Self { - Self { client } + pub(crate) fn new(client: AuthClient, interceptor: Interceptor) -> Self { + Self { + client, + interceptor, + } } /// Performs an authenticating operation. @@ -25,7 +30,7 @@ impl Auth { pub async fn authenticate(&mut self, req: AuthenticateRequest) -> Result { let resp = self .client - .authenticate(tonic::Request::new(req.into())) + .authenticate(self.interceptor.intercept(tonic::Request::new(req.into()))) .await?; Ok(resp.into_inner().into()) diff --git a/src/client.rs b/src/client.rs index 9a04527..ad691ec 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use futures::Stream; +use tonic::metadata::Ascii; use tonic::transport::ClientTlsConfig; -use tonic::{metadata::MetadataValue, transport::Channel, Interceptor, Request}; +use tonic::{metadata::MetadataValue, transport::Channel, Request}; use crate::proto::etcdserverpb::{ auth_client::AuthClient, kv_client::KvClient, lease_client::LeaseClient, @@ -25,6 +26,20 @@ pub struct Client { inner: Arc, } +#[derive(Clone)] +pub(crate) struct Interceptor { + token: Option>, +} + +impl Interceptor { + pub(crate) fn intercept(&self, mut req: Request) -> Request { + if let Some(token) = self.token.as_ref() { + req.metadata_mut().insert("authorization", token.clone()); + } + req + } +} + #[allow(dead_code)] pub(crate) struct Inner { channel: Channel, @@ -54,7 +69,7 @@ impl Client { let channel = Self::get_channel(&cfg)?; - let mut auth_client = Auth::new(AuthClient::new(channel)); + let mut auth_client = Auth::new(AuthClient::new(channel), Interceptor { token: None }); let token = match cfg.auth.as_ref() { Some((name, password)) => auth_client @@ -74,41 +89,18 @@ impl Client { pub async fn connect(cfg: ClientConfig) -> Result { // If authentication provided, generates token before connecting. let token = Self::generate_auth_token(&cfg).await?; + let token = token.map(|token| MetadataValue::from_str(&token).unwrap()); - let auth_interceptor = token.map(|token| { - let token = MetadataValue::from_str(&token).unwrap(); - Interceptor::new(move |mut req: Request<()>| { - req.metadata_mut().insert("authorization", token.clone()); - Ok(req) - }) - }); + let interceptor = Interceptor { token }; let channel = Self::get_channel(&cfg)?; - let inner = { - let (auth_client, kv_client, watch_client, lease_client) = - if let Some(auth_interceptor) = auth_interceptor { - ( - AuthClient::with_interceptor(channel.clone(), auth_interceptor.clone()), - KvClient::with_interceptor(channel.clone(), auth_interceptor.clone()), - WatchClient::with_interceptor(channel.clone(), auth_interceptor.clone()), - LeaseClient::with_interceptor(channel.clone(), auth_interceptor), - ) - } else { - ( - AuthClient::new(channel.clone()), - KvClient::new(channel.clone()), - WatchClient::new(channel.clone()), - LeaseClient::new(channel.clone()), - ) - }; - Inner { - channel, - auth_client: Auth::new(auth_client), - kv_client: Kv::new(kv_client), - watch_client: Watch::new(watch_client), - lease_client: Lease::new(lease_client), - } + let inner = Inner { + auth_client: Auth::new(AuthClient::new(channel.clone()), interceptor.clone()), + kv_client: Kv::new(KvClient::new(channel.clone()), interceptor.clone()), + watch_client: Watch::new(WatchClient::new(channel.clone()), interceptor.clone()), + lease_client: Lease::new(LeaseClient::new(channel.clone()), interceptor.clone()), + channel, }; Ok(Self { diff --git a/src/kv/mod.rs b/src/kv/mod.rs index f31b547..786d2fe 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -10,6 +10,7 @@ pub use txn::{TxnCmp, TxnOp, TxnOpResponse, TxnRequest, TxnResponse}; use tonic::transport::Channel; +use crate::client::Interceptor; use crate::proto::etcdserverpb::kv_client::KvClient; use crate::proto::mvccpb; use crate::Result as Res; @@ -18,23 +19,33 @@ use crate::Result as Res; #[derive(Clone)] pub struct Kv { client: KvClient, + interceptor: Interceptor, } impl Kv { - pub(crate) fn new(client: KvClient) -> Self { - Self { client } + pub(crate) fn new(client: KvClient, interceptor: Interceptor) -> Self { + Self { + client, + interceptor, + } } /// Performs a key-value saving operation. pub async fn put(&mut self, req: PutRequest) -> Res { - let resp = self.client.put(tonic::Request::new(req.into())).await?; + let resp = self + .client + .put(self.interceptor.intercept(tonic::Request::new(req.into()))) + .await?; Ok(resp.into_inner().into()) } /// Performs a key-value fetching operation. pub async fn range(&mut self, req: RangeRequest) -> Res { - let resp = self.client.range(tonic::Request::new(req.into())).await?; + let resp = self + .client + .range(self.interceptor.intercept(tonic::Request::new(req.into()))) + .await?; Ok(resp.into_inner().into()) } @@ -43,7 +54,7 @@ impl Kv { pub async fn delete(&mut self, req: DeleteRequest) -> Res { let resp = self .client - .delete_range(tonic::Request::new(req.into())) + .delete_range(self.interceptor.intercept(tonic::Request::new(req.into()))) .await?; Ok(resp.into_inner().into()) @@ -51,7 +62,10 @@ impl Kv { /// Performs a transaction operation. pub async fn txn(&mut self, req: TxnRequest) -> Res { - let resp = self.client.txn(tonic::Request::new(req.into())).await?; + let resp = self + .client + .txn(self.interceptor.intercept(tonic::Request::new(req.into()))) + .await?; Ok(resp.into_inner().into()) } diff --git a/src/lease/mod.rs b/src/lease/mod.rs index 117bfa2..054a5c6 100644 --- a/src/lease/mod.rs +++ b/src/lease/mod.rs @@ -77,9 +77,12 @@ pub use grant::{LeaseGrantRequest, LeaseGrantResponse}; pub use keep_alive::{LeaseKeepAliveRequest, LeaseKeepAliveResponse}; pub use revoke::{LeaseRevokeRequest, LeaseRevokeResponse}; -use crate::lazy::{Lazy, Shutdown}; use crate::proto::etcdserverpb; use crate::proto::etcdserverpb::lease_client::LeaseClient; +use crate::{ + client::Interceptor, + lazy::{Lazy, Shutdown}, +}; use crate::{Error, Result}; mod grant; @@ -95,12 +98,14 @@ struct LeaseKeepAliveTunnel { } impl LeaseKeepAliveTunnel { - fn new(mut client: LeaseClient) -> Self { + fn new(mut client: LeaseClient, interceptor: Interceptor) -> Self { let (req_sender, req_receiver) = unbounded_channel::(); let (resp_sender, resp_receiver) = unbounded_channel::>(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let request = tonic::Request::new(UnboundedReceiverStream::new(req_receiver)); + let request = interceptor.intercept(tonic::Request::new(UnboundedReceiverStream::new( + req_receiver, + ))); // monitor inbound watch response and transfer to the receiver tokio::spawn(async move { @@ -151,17 +156,22 @@ impl Shutdown for LeaseKeepAliveTunnel { pub struct Lease { client: LeaseClient, keep_alive_tunnel: Arc>, + interceptor: Interceptor, } impl Lease { - pub(crate) fn new(client: LeaseClient) -> Self { + pub(crate) fn new(client: LeaseClient, interceptor: Interceptor) -> Self { let keep_alive_tunnel = { let client = client.clone(); - Arc::new(Lazy::new(move || LeaseKeepAliveTunnel::new(client.clone()))) + let interceptor = interceptor.clone(); + Arc::new(Lazy::new(move || { + LeaseKeepAliveTunnel::new(client.clone(), interceptor.clone()) + })) }; Self { client, keep_alive_tunnel, + interceptor, } } @@ -169,7 +179,7 @@ impl Lease { pub async fn grant(&mut self, req: LeaseGrantRequest) -> Result { let resp = self .client - .lease_grant(tonic::Request::new(req.into())) + .lease_grant(self.interceptor.intercept(tonic::Request::new(req.into()))) .await?; Ok(resp.into_inner().into()) @@ -179,7 +189,7 @@ impl Lease { pub async fn revoke(&mut self, req: LeaseRevokeRequest) -> Result { let resp = self .client - .lease_revoke(tonic::Request::new(req.into())) + .lease_revoke(self.interceptor.intercept(tonic::Request::new(req.into()))) .await?; Ok(resp.into_inner().into()) diff --git a/src/watch/mod.rs b/src/watch/mod.rs index 80e70ea..0e3d76f 100644 --- a/src/watch/mod.rs +++ b/src/watch/mod.rs @@ -56,13 +56,16 @@ use tonic::transport::Channel; pub use watch::{WatchCancelRequest, WatchCreateRequest, WatchResponse}; -use crate::lazy::{Lazy, Shutdown}; use crate::proto::etcdserverpb; use crate::proto::etcdserverpb::watch_client::WatchClient; use crate::proto::mvccpb; use crate::Error; use crate::KeyValue; use crate::Result; +use crate::{ + client::Interceptor, + lazy::{Lazy, Shutdown}, +}; mod watch; @@ -75,7 +78,7 @@ struct WatchTunnel { } impl WatchTunnel { - fn new(mut client: WatchClient) -> Self { + fn new(mut client: WatchClient, interceptor: Interceptor) -> Self { let (req_sender, req_receiver) = unbounded_channel::(); let (resp_sender, resp_receiver) = unbounded_channel::>>(); @@ -86,7 +89,7 @@ impl WatchTunnel { tokio::spawn(async move { let mut shutdown_rx = shutdown_rx.fuse(); let mut inbound = futures::select! { - res = client.watch(request).fuse() => { + res = client.watch(interceptor.intercept(request)).fuse() => { match res { Err(e) => { resp_sender.send(Err(From::from(e))).unwrap(); @@ -142,18 +145,19 @@ impl Shutdown for WatchTunnel { /// Watch client. #[derive(Clone)] pub struct Watch { - client: WatchClient, tunnel: Arc>, } impl Watch { - pub(crate) fn new(client: WatchClient) -> Self { + pub(crate) fn new(client: WatchClient, interceptor: Interceptor) -> Self { let tunnel = { let client = client.clone(); - Arc::new(Lazy::new(move || WatchTunnel::new(client.clone()))) + Arc::new(Lazy::new(move || { + WatchTunnel::new(client.clone(), interceptor.clone()) + })) }; - Self { client, tunnel } + Self { tunnel } } /// Performs a watch operation.