diff --git a/Cargo.lock b/Cargo.lock index 1df0e5677f..b1b9ccea90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4494,6 +4494,7 @@ dependencies = [ "futures", "grpcio", "hex", + "itertools", "lazy_static", "mc-attest-api", "mc-attest-core", diff --git a/fog/view/server/Cargo.toml b/fog/view/server/Cargo.toml index 860130097b..ff6cfe0708 100644 --- a/fog/view/server/Cargo.toml +++ b/fog/view/server/Cargo.toml @@ -20,6 +20,7 @@ displaydoc = { version = "0.2", default-features = false } futures = "0.3" grpcio = "0.11.0" hex = "0.4" +itertools = "0.10" lazy_static = "1.4" # mobilecoin diff --git a/fog/view/server/src/bin/router.rs b/fog/view/server/src/bin/router.rs index 60b337c40f..63d1f1e2d8 100644 --- a/fog/view/server/src/bin/router.rs +++ b/fog/view/server/src/bin/router.rs @@ -13,7 +13,12 @@ use mc_fog_view_server::{ }; use mc_util_cli::ParserWithBuildInfo; use mc_util_grpc::ConnectionUriGrpcioChannel; -use std::{env, str::FromStr, sync::Arc}; +use std::{ + collections::HashMap, + env, + str::FromStr, + sync::{Arc, RwLock}, +}; fn main() { mc_common::setup_panic_handler(); @@ -38,7 +43,7 @@ fn main() { ); // TODO: Remove and get from a config. - let mut fog_view_store_grpc_clients = Vec::new(); + let mut fog_view_store_grpc_clients = HashMap::new(); let grpc_env = Arc::new( grpcio::EnvBuilder::new() .name_prefix("Main-RPC".to_string()) @@ -54,8 +59,9 @@ fn main() { ChannelBuilder::default_channel_builder(grpc_env.clone()) .connect_to_uri(&shard_uri, &logger), ); - fog_view_store_grpc_clients.push(fog_view_store_grpc_client); + fog_view_store_grpc_clients.insert(shard_uri, Arc::new(fog_view_store_grpc_client)); } + let fog_view_store_grpc_clients = Arc::new(RwLock::new(fog_view_store_grpc_clients)); let ias_client = Client::new(&config.ias_api_key).expect("Could not create IAS client"); let mut router_server = FogViewRouterServer::new( diff --git a/fog/view/server/src/config.rs b/fog/view/server/src/config.rs index 5c99a3c4af..a5682aaaa9 100644 --- a/fog/view/server/src/config.rs +++ b/fog/view/server/src/config.rs @@ -7,7 +7,7 @@ use clap::Parser; use mc_attest_core::ProviderId; use mc_common::ResponderId; use mc_fog_sql_recovery_db::SqlRecoveryDbConnectionConfig; -use mc_fog_uri::{FogViewRouterUri, FogViewStoreUri, FogViewUri}; +use mc_fog_uri::{FogViewRouterAdminUri, FogViewRouterUri, FogViewStoreUri, FogViewUri}; use mc_util_parse::parse_duration_in_seconds; use mc_util_uri::AdminUri; use serde::Serialize; @@ -160,4 +160,8 @@ pub struct FogViewRouterConfig { /// to disk by linux kernel. #[clap(long, default_value = "1048576", env = "MC_OMAP_CAPACITY")] pub omap_capacity: u64, + + /// Router admin listening URI. + #[clap(long)] + pub admin_listen_uri: FogViewRouterAdminUri, } diff --git a/fog/view/server/src/fog_view_router_server.rs b/fog/view/server/src/fog_view_router_server.rs index 9597f623ac..32e0b5dc37 100644 --- a/fog/view/server/src/fog_view_router_server.rs +++ b/fog/view/server/src/fog_view_router_server.rs @@ -4,23 +4,30 @@ //! Constructible from config (for testability) and with a mechanism for //! stopping it -use crate::{config::FogViewRouterConfig, counters, fog_view_router_service::FogViewRouterService}; +use crate::{ + config::FogViewRouterConfig, counters, fog_view_router_service::FogViewRouterService, + router_admin_service::FogViewRouterAdminService, +}; use futures::executor::block_on; use mc_attest_net::RaClient; use mc_common::logger::{log, Logger}; use mc_fog_api::view_grpc; -use mc_fog_uri::ConnectionUri; +use mc_fog_uri::{ConnectionUri, FogViewStoreUri}; use mc_fog_view_enclave::ViewEnclaveProxy; use mc_sgx_report_cache_untrusted::ReportCacheThread; use mc_util_grpc::{ConnectionUriGrpcioServer, ReadinessIndicator}; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; pub struct FogViewRouterServer where E: ViewEnclaveProxy, RC: RaClient + Send + Sync + 'static, { - server: grpcio::Server, + router_server: grpcio::Server, + admin_server: grpcio::Server, enclave: E, config: FogViewRouterConfig, logger: Logger, @@ -38,7 +45,7 @@ where config: FogViewRouterConfig, enclave: E, ra_client: RC, - shards: Vec, + shards: Arc>>>, logger: Logger, ) -> FogViewRouterServer where @@ -53,10 +60,15 @@ where ); let fog_view_router_service = view_grpc::create_fog_view_router_api( - FogViewRouterService::new(enclave.clone(), shards, logger.clone()), + FogViewRouterService::new(enclave.clone(), shards.clone(), logger.clone()), ); log::debug!(logger, "Constructed Fog View Router GRPC Service"); + let fog_view_router_admin_service = view_grpc::create_fog_view_router_admin_api( + FogViewRouterAdminService::new(shards, logger.clone()), + ); + log::debug!(logger, "Constructed Fog View Router Admin GRPC Service"); + // Health check service let health_service = mc_util_grpc::HealthService::new(Some(readiness_indicator.into()), logger.clone()) @@ -68,15 +80,21 @@ where "Starting Fog View Router server on {}", config.client_listen_uri.addr(), ); - let server_builder = grpcio::ServerBuilder::new(env) + let router_server_builder = grpcio::ServerBuilder::new(env.clone()) .register_service(fog_view_router_service) .register_service(health_service) .bind_using_uri(&config.client_listen_uri, logger.clone()); - let server = server_builder.build().unwrap(); + let admin_server_builder = grpcio::ServerBuilder::new(env) + .register_service(fog_view_router_admin_service) + .bind_using_uri(&config.admin_listen_uri, logger.clone()); + + let router_server = router_server_builder.build().unwrap(); + let admin_server = admin_server_builder.build().unwrap(); Self { - server, + router_server, + admin_server, enclave, config, logger, @@ -97,9 +115,18 @@ where ) .expect("failed starting report cache thread"), ); - self.server.start(); - for (host, port) in self.server.bind_addrs() { - log::info!(self.logger, "API listening on {}:{}", host, port); + self.router_server.start(); + for (host, port) in self.router_server.bind_addrs() { + log::info!(self.logger, "Router API listening on {}:{}", host, port); + } + self.admin_server.start(); + for (host, port) in self.admin_server.bind_addrs() { + log::info!( + self.logger, + "Router Admin API listening on {}:{}", + host, + port + ); } } @@ -108,7 +135,8 @@ where if let Some(ref mut thread) = self.report_cache_thread.take() { thread.stop().expect("Could not stop report cache thread"); } - block_on(self.server.shutdown()).expect("Could not stop grpc server"); + block_on(self.router_server.shutdown()).expect("Could not stop router grpc server"); + block_on(self.admin_server.shutdown()).expect("Could not stop admin router server"); } } diff --git a/fog/view/server/src/fog_view_router_service.rs b/fog/view/server/src/fog_view_router_service.rs index 5f6e8ecc5d..9f94ca582b 100644 --- a/fog/view/server/src/fog_view_router_service.rs +++ b/fog/view/server/src/fog_view_router_service.rs @@ -8,10 +8,14 @@ use mc_fog_api::{ view::{FogViewRouterRequest, FogViewRouterResponse}, view_grpc::{FogViewRouterApi, FogViewStoreApiClient}, }; +use mc_fog_uri::FogViewStoreUri; use mc_fog_view_enclave_api::ViewEnclaveProxy; use mc_util_grpc::rpc_logger; use mc_util_metrics::SVC_COUNTERS; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; #[derive(Clone)] pub struct FogViewRouterService @@ -19,7 +23,7 @@ where E: ViewEnclaveProxy, { enclave: E, - shard_clients: Vec>, + shard_clients: Arc>>>, logger: Logger, } @@ -29,8 +33,11 @@ impl FogViewRouterService { /// /// TODO: Add a `view_store_clients` parameter of type FogApiClient, and /// perform view store authentication on each one. - pub fn new(enclave: E, shard_clients: Vec, logger: Logger) -> Self { - let shard_clients = shard_clients.into_iter().map(Arc::new).collect(); + pub fn new( + enclave: E, + shard_clients: Arc>>>, + logger: Logger, + ) -> Self { Self { enclave, shard_clients, @@ -55,8 +62,9 @@ where let logger = logger.clone(); // TODO: Confirm that we don't need to perform the authenticator logic. I think // we don't because of streaming... + let shard_clients = self.shard_clients.read().expect("RwLock poisoned"); let future = router_request_handler::handle_requests( - self.shard_clients.clone(), + shard_clients.values().cloned().collect(), self.enclave.clone(), requests, responses, diff --git a/fog/view/server/src/lib.rs b/fog/view/server/src/lib.rs index c202dce361..bddf7490d0 100644 --- a/fog/view/server/src/lib.rs +++ b/fog/view/server/src/lib.rs @@ -13,5 +13,6 @@ pub mod sharding_strategy; mod block_tracker; mod counters; mod db_fetcher; +mod router_admin_service; mod router_request_handler; mod shard_responses_processor; diff --git a/fog/view/server/src/router_admin_service.rs b/fog/view/server/src/router_admin_service.rs new file mode 100644 index 0000000000..f1bf738091 --- /dev/null +++ b/fog/view/server/src/router_admin_service.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use grpcio::{ChannelBuilder, RpcContext, RpcStatus, UnarySink}; +use itertools::Itertools; +use mc_common::logger::{log, Logger}; +use mc_fog_api::{ + view::AddShardRequest, + view_grpc::{FogViewRouterAdminApi, FogViewStoreApiClient}, +}; +use mc_fog_uri::FogViewStoreUri; +use mc_util_grpc::{ + rpc_invalid_arg_error, rpc_logger, rpc_precondition_error, send_result, + ConnectionUriGrpcioChannel, Empty, +}; +use mc_util_metrics::SVC_COUNTERS; +use std::{ + collections::HashMap, + str::FromStr, + sync::{Arc, RwLock}, +}; + +#[derive(Clone)] +pub struct FogViewRouterAdminService { + shard_clients: Arc>>>, + logger: Logger, +} + +impl FogViewRouterAdminService { + pub fn new( + shard_clients: Arc>>>, + logger: Logger, + ) -> Self { + Self { + shard_clients, + logger, + } + } + + fn add_shard_impl(&mut self, shard_uri: &str, logger: &Logger) -> Result { + let view_store_uri = FogViewStoreUri::from_str(shard_uri).map_err(|_| { + rpc_invalid_arg_error( + "add_shard", + format!("Shard uri string {} is invalid", shard_uri), + logger, + ) + })?; + let mut shard_clients = self.shard_clients.write().expect("RwLock Poisoned"); + if shard_clients.keys().contains(&view_store_uri) { + let error = rpc_precondition_error( + "add_shard", + format!("Shard uri {} already exists in the shard list", shard_uri), + logger, + ); + return Err(error); + } + let grpc_env = Arc::new( + grpcio::EnvBuilder::new() + .name_prefix("add-shard".to_string()) + .build(), + ); + let view_store_client = FogViewStoreApiClient::new( + ChannelBuilder::default_channel_builder(grpc_env) + .connect_to_uri(&view_store_uri, logger), + ); + shard_clients.insert(view_store_uri, Arc::new(view_store_client)); + + Ok(Empty::new()) + } +} + +impl FogViewRouterAdminApi for FogViewRouterAdminService { + fn add_shard(&mut self, ctx: RpcContext, request: AddShardRequest, sink: UnarySink) { + log::info!(self.logger, "Request received in add_shard fn"); + let _timer = SVC_COUNTERS.req(&ctx); + mc_common::logger::scoped_global_logger(&rpc_logger(&ctx, &self.logger), |logger| { + send_result( + ctx, + sink, + self.add_shard_impl(request.get_shard_uri(), logger), + logger, + ); + }); + } +}