Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 267: Remove runtime drop error message #311

Merged
merged 11 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ base64 = "0.12"
tokio = "1.1"
async-trait = "0.1"
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-subscriber = "0.2"

[dev-dependencies]
serial_test = "*"
13 changes: 9 additions & 4 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,11 @@ impl ClientConfigBuilder {
mod tests {
use super::*;
use base64::encode;
use serial_test::serial;
use std::net::Ipv4Addr;

#[test]
#[serial]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why these tests can't be run in parallel. This is testing config values.
If it's modifying environment variables, can we split that out into its own test so it don't affect others too much?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's because the env var is a shared resource and running unit tests in parallel will have issues.
I have been googling around and cannot find a better way to deal with shared resource like env var in the unit test. Either we run it consecutively or run it using only 1 thread.

fn test_get_set() {
let config = ClientConfigBuilder::default()
.max_connections_in_pool(15 as u32)
Expand All @@ -246,6 +248,7 @@ mod tests {
}

#[test]
#[serial]
fn test_get_default() {
let config = ClientConfigBuilder::default()
.controller_uri(MOCK_CONTROLLER_URI)
Expand All @@ -259,6 +262,7 @@ mod tests {
}

#[test]
#[serial]
fn test_extract_credentials() {
let rt = tokio::runtime::Runtime::new().expect("create runtime");
// test empty env
Expand Down Expand Up @@ -307,12 +311,13 @@ mod tests {
}

#[test]
#[serial]
fn test_extract_tls_cert_path() {
// test default
fs::create_dir_all(DEFAULT_TLS_CERT_PATH).expect("create default cert path");
fs::File::create(format!("{}/foo.crt", DEFAULT_TLS_CERT_PATH)).expect("create crt");
let config = ClientConfigBuilder::default()
.controller_uri("127.0.0.2:9091".to_string())
.controller_uri("tls://127.0.0.2:9091".to_string())
.is_tls_enabled(true)
.build()
.unwrap();
Expand Down Expand Up @@ -353,15 +358,15 @@ mod tests {
fs::File::create(format!("./bar/foo.crt")).expect("create crt");
env::set_var("pravega_client_tls_cert_path", "./bar");
let config = ClientConfigBuilder::default()
.controller_uri("127.0.0.2:9091".to_string())
.controller_uri("tls://127.0.0.2:9091".to_string())
.is_tls_enabled(true)
.build()
.unwrap();
assert_eq!(config.trustcerts.len(), 1);
// test with file path
env::set_var("pravega_client_tls_cert_path", "./bar/foo.crt");
let config = ClientConfigBuilder::default()
.controller_uri("127.0.0.2:9091".to_string())
.controller_uri("tls://127.0.0.2:9091".to_string())
.is_tls_enabled(true)
.build()
.unwrap();
Expand All @@ -370,7 +375,7 @@ mod tests {
// test with invalid path
let result = std::panic::catch_unwind(|| {
ClientConfigBuilder::default()
.controller_uri("127.0.0.2:9091".to_string())
.controller_uri("tls://127.0.0.2:9091".to_string())
.is_tls_enabled(true)
.build()
.unwrap()
Expand Down
6 changes: 3 additions & 3 deletions controller-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ pravega-client-retry = {path = "../retry", version = "0.2"}
pravega-connection-pool = {path = "../connection_pool", version = "0.2" }
pravega-client-config = {path = "../config", version = "0.2"}
async-trait = "0.1"
prost = "0.7"
prost = "0.8"
snafu = "0.6"
tokio = { version = "1.1", features = ["full"] }
tonic = { version = "0.4", features = ["tls"] }
tonic = { version = "0.5", features = ["tls"] }
derive_more = "0.99.9"
ordered-float = "2.7"
uuid = {version = "0.8", features = ["v4"]}
Expand All @@ -39,7 +39,7 @@ num-derive = "0.3"
num-traits = "0.2"

[build-dependencies]
tonic-build = "0.4"
tonic-build = "0.5"

[[bin]]
name = "controller-cli"
Expand Down
2 changes: 1 addition & 1 deletion controller-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn main() {
.build()
.expect("creating config");
// create a controller client.
let controller_client = ControllerClientImpl::new(config, &rt);
let controller_client = ControllerClientImpl::new(config, rt.handle());
match opt.cmd {
Command::CreateScope { scope_name } => {
let scope_result = rt.block_on(controller_client.create_scope(&Scope::from(scope_name)));
Expand Down
56 changes: 36 additions & 20 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ use std::convert::{From, Into};
use std::io::BufReader;
use std::str::FromStr;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::runtime::Handle;
use tokio::sync::RwLock;
use tokio_rustls::rustls::ClientConfig as RustlsClientConfig;
use tonic::codegen::http::uri::InvalidUri;
use tonic::codegen::InterceptedService;
use tonic::service::Interceptor;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
use tonic::{metadata::MetadataValue, Code, Request, Status};
use tonic::{metadata::MetadataValue, Code, Status};
use tracing::{debug, info, warn};

#[allow(non_camel_case_types)]
Expand Down Expand Up @@ -325,9 +327,24 @@ pub trait ControllerClient: Send + Sync {
async fn check_scale(&self, stream: &ScopedStream, scale_epoch: i32) -> ResultRetry<bool>;
}

#[derive(Clone)]
struct AuthInterceptor {
token: Option<String>,
}

impl Interceptor for AuthInterceptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate more on this change? I think it is unrelated to the PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the necessary change for upgrading Tonic version. I am following the example here

fn call(&mut self, mut request: tonic::Request<()>) -> std::result::Result<tonic::Request<()>, Status> {
if let Some(ref token_string) = self.token {
let meta_token = MetadataValue::from_str(token_string).expect("convert to metadata value");
request.metadata_mut().insert(AUTHORIZATION, meta_token);
}
Ok(request)
}
}

pub struct ControllerClientImpl {
config: ClientConfig,
channel: RwLock<ControllerServiceClient<Channel>>,
channel: RwLock<ControllerServiceClient<InterceptedService<Channel, AuthInterceptor>>>,
}

async fn get_channel(config: &ClientConfig) -> Channel {
Expand Down Expand Up @@ -631,18 +648,17 @@ impl ControllerClientImpl {
/// The requests will be load balanced across multiple connections and every connection supports
/// multiplexing of requests.
///
pub fn new(config: ClientConfig, rt: &Runtime) -> Self {
pub fn new(config: ClientConfig, handle: &Handle) -> Self {
// actual connection is established lazily.
let ch = rt.block_on(get_channel(&config));
let ch = handle.block_on(get_channel(&config));
let client = if config.is_auth_enabled {
let token = rt.block_on(config.credentials.get_request_metadata());
let token = MetadataValue::from_str(&token).expect("convert to metadata value");
ControllerServiceClient::with_interceptor(ch, move |mut req: Request<()>| {
req.metadata_mut().insert(AUTHORIZATION, token.clone());
Ok(req)
})
let token = handle.block_on(config.credentials.get_request_metadata());
let auth_interceptor = AuthInterceptor { token: Some(token) };

ControllerServiceClient::with_interceptor(ch, auth_interceptor)
} else {
ControllerServiceClient::new(ch)
let auth_interceptor = AuthInterceptor { token: None };
ControllerServiceClient::with_interceptor(ch, auth_interceptor)
};

ControllerClientImpl {
Expand All @@ -661,7 +677,8 @@ impl ControllerClientImpl {
} else {
let ch = get_channel(&self.config).await;
let mut x = self.channel.write().await;
*x = ControllerServiceClient::new(ch);
let auth_interceptor = AuthInterceptor { token: None };
*x = ControllerServiceClient::with_interceptor(ch, auth_interceptor);
}
}

Expand All @@ -671,7 +688,9 @@ impl ControllerClientImpl {
/// which runs the connection in a background task and provides a `mpsc` channel interface.
/// Due to this cloning the `Channel` type is cheap and encouraged.
///
async fn get_controller_client(&self) -> ControllerServiceClient<Channel> {
async fn get_controller_client(
&self,
) -> ControllerServiceClient<InterceptedService<Channel, AuthInterceptor>> {
if self.config.is_auth_enabled && self.config.credentials.is_expired() {
// get_request_metadata internally checks if token expired before sending request to the server,
// race condition might happen here but eventually only one request will be sent.
Expand All @@ -684,11 +703,8 @@ impl ControllerClientImpl {
let ch = get_channel(&self.config).await;
let mut x = self.channel.write().await;
let token = self.config.credentials.get_request_metadata().await;
let token = MetadataValue::from_str(&token).expect("convert to metadata value");
*x = ControllerServiceClient::with_interceptor(ch, move |mut req: Request<()>| {
req.metadata_mut().insert(AUTHORIZATION, token.clone());
Ok(req)
});
let auth_interceptor = AuthInterceptor { token: Some(token) };
*x = ControllerServiceClient::with_interceptor(ch, auth_interceptor);
}

// Method used to translate grpc errors to ControllerError.
Expand Down Expand Up @@ -1609,7 +1625,7 @@ mod test {
.controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
.build()
.unwrap();
let controller = ControllerClientImpl::new(config.clone(), &rt);
let controller = ControllerClientImpl::new(config.clone(), &rt.handle());
let scope = Scope {
name: "scope".to_string(),
};
Expand Down
Loading