From 0b33f46b9360e290c426ded165643b45a3e15539 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 22 Jun 2022 16:46:09 -0400 Subject: [PATCH] First commit so as to save work --- fog/view/server/src/error.rs | 52 +++- fog/view/server/src/fog_view_router_server.rs | 95 ------- .../server/src/fog_view_router_service.rs | 267 ++++++++++++------ 3 files changed, 236 insertions(+), 178 deletions(-) diff --git a/fog/view/server/src/error.rs b/fog/view/server/src/error.rs index 473dcfaec2..c695d5a536 100644 --- a/fog/view/server/src/error.rs +++ b/fog/view/server/src/error.rs @@ -1,8 +1,58 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation use displaydoc::Display; +use grpcio::RpcStatus; +use mc_common::logger::Logger; use mc_fog_view_enclave::Error as ViewEnclaveError; use mc_sgx_report_cache_untrusted::Error as ReportCacheError; +use mc_util_grpc::{rpc_internal_error, rpc_permissions_error}; + +#[derive(Debug, Display)] +pub enum RouterServerError { + /// Error related to contacting Fog View Store: {0} + ViewStoreError(String), + /// View Enclave error: {0} + Enclave(ViewEnclaveError), +} + +impl From for RouterServerError { + fn from(src: grpcio::Error) -> Self { + RouterServerError::ViewStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_common::ResponderIdParseError) -> Self { + RouterServerError::ViewStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_util_uri::UriParseError) -> Self { + RouterServerError::ViewStoreError(format!("{}", src)) + } +} + +pub fn router_server_err_to_rpc_status( + context: &str, + src: RouterServerError, + logger: Logger, +) -> RpcStatus { + match src { + RouterServerError::ViewStoreError(_) => { + rpc_internal_error(context, format!("{}", src), &logger) + } + RouterServerError::Enclave(_) => { + rpc_permissions_error(context, format!("{}", src), &logger) + } + } +} + +impl From for RouterServerError { + fn from(src: ViewEnclaveError) -> Self { + RouterServerError::Enclave(src) + } +} #[derive(Debug, Display)] pub enum ViewServerError { @@ -26,4 +76,4 @@ impl From for ViewServerError { fn from(src: ReportCacheError) -> Self { Self::ReportCache(src) } -} +} \ No newline at end of file diff --git a/fog/view/server/src/fog_view_router_server.rs b/fog/view/server/src/fog_view_router_server.rs index 1c58cbbd19..e69de29bb2 100644 --- a/fog/view/server/src/fog_view_router_server.rs +++ b/fog/view/server/src/fog_view_router_server.rs @@ -1,95 +0,0 @@ -// Copyright (c) 2018-2022 The MobileCoin Foundation - -//! Server object containing a view node -//! Constructible from config (for testability) and with a mechanism for -//! stopping it - -use crate::{config::FogViewRouterConfig, fog_view_router_service::FogViewRouterService}; -use futures::executor::block_on; -use mc_common::logger::{log, Logger}; -use mc_fog_api::view_grpc; -use mc_fog_uri::{ConnectionUri, FogViewStoreUri}; -use mc_fog_view_enclave::ViewEnclaveProxy; -use mc_util_grpc::{ConnectionUriGrpcioServer, ReadinessIndicator}; -use std::sync::Arc; - -pub struct FogViewRouterServer -where - E: ViewEnclaveProxy, -{ - server: grpcio::Server, - _enclave: E, - logger: Logger, -} - -impl FogViewRouterServer -where - E: ViewEnclaveProxy, -{ - /// Creates a new view router server instance - pub fn new( - config: FogViewRouterConfig, - enclave: E, - shards: Vec, - logger: Logger, - ) -> FogViewRouterServer { - let readiness_indicator = ReadinessIndicator::default(); - - let env = Arc::new( - grpcio::EnvBuilder::new() - .name_prefix("Main-RPC".to_string()) - .build(), - ); - - let fog_view_router_service = view_grpc::create_fog_view_router_api( - FogViewRouterService::new(enclave.clone(), shards, logger.clone()), - ); - log::debug!(logger, "Constructed Fog View Router GRPC Service"); - - // Health check service - let health_service = - mc_util_grpc::HealthService::new(Some(readiness_indicator.into()), logger.clone()) - .into_service(); - - // Package service into grpc server - log::info!( - logger, - "Starting Fog View Router server on {}", - config.client_listen_uri.addr(), - ); - let server_builder = grpcio::ServerBuilder::new(env) - .register_service(fog_view_router_service) - .register_service(health_service) - .bind_using_uri(&config.client_listen_uri, logger.clone()); - - let server = server_builder.build().unwrap(); - - Self { - server, - _enclave: enclave, - logger, - } - } - - /// Starts the server - pub fn start(&mut self) { - self.server.start(); - for (host, port) in self.server.bind_addrs() { - log::info!(self.logger, "API listening on {}:{}", host, port); - } - } - - /// Stops the server - pub fn stop(&mut self) { - block_on(self.server.shutdown()).expect("Could not stop grpc server"); - } -} - -impl Drop for FogViewRouterServer -where - E: ViewEnclaveProxy, -{ - fn drop(&mut self) { - self.stop(); - } -} diff --git a/fog/view/server/src/fog_view_router_service.rs b/fog/view/server/src/fog_view_router_service.rs index 25090ca49f..e5b569e8fa 100644 --- a/fog/view/server/src/fog_view_router_service.rs +++ b/fog/view/server/src/fog_view_router_service.rs @@ -1,34 +1,43 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation +use crate::error::{router_server_err_to_rpc_status, RouterServerError}; use futures::{future::try_join_all, FutureExt, SinkExt, TryFutureExt, TryStreamExt}; -use grpcio::{DuplexSink, RequestStream, RpcContext, WriteFlags}; +use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcContext, RpcStatus, WriteFlags}; use mc_attest_api::attest; -use mc_common::logger::{log, Logger}; +use mc_common::{ + logger::{log, Logger}, + ResponderId, +}; use mc_fog_api::{ - view::{FogViewRouterRequest, FogViewRouterResponse}, - view_grpc::FogViewRouterApi, + view::{ + FogViewRouterRequest, FogViewRouterResponse, MultiViewStoreQueryRequest, + MultiViewStoreQueryResponse, + }, + view_grpc::{FogViewApiClient, FogViewRouterApi}, }; -use mc_fog_uri::FogViewStoreUri; +use mc_fog_uri::FogViewUri; use mc_fog_view_enclave_api::ViewEnclaveProxy; -use mc_util_grpc::{rpc_logger, rpc_permissions_error}; +use mc_util_grpc::{rpc_invalid_arg_error, rpc_logger, ConnectionUriGrpcioChannel}; use mc_util_metrics::SVC_COUNTERS; -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; + +const RETRY_COUNT: usize = 3; #[derive(Clone)] pub struct FogViewRouterService { enclave: E, - shards: Vec>, + shard_clients: Vec>, logger: Logger, } impl FogViewRouterService { /// Creates a new FogViewRouterService that can be used by a gRPC server to /// fulfill gRPC requests. - pub fn new(enclave: E, shards: Vec, logger: Logger) -> Self { - let shards = shards.into_iter().map(Arc::new).collect(); + pub fn new(enclave: E, shard_clients: Vec, logger: Logger) -> Self { + let shard_clients = shard_clients.into_iter().map(Arc::new).collect(); Self { enclave, - shards, + shard_clients, logger, } } @@ -47,103 +56,197 @@ impl FogViewRouterApi for FogViewRouterService { let logger = logger.clone(); // TODO: Confirm that we don't need to perform the authenticator logic. I think // we don't because of streaming... - let future = handle_request( - self.shards.clone(), + let future = handle_requests( + self.shard_clients.clone(), self.enclave.clone(), requests, responses, logger.clone(), ) - .map_err(move |err: grpcio::Error| log::error!(&logger, "failed to reply: {}", err)) - // TODO: Do stuff with the error - .map(|_| ()); + .map_err(move |err: grpcio::Error| log::error!(&logger, "failed to reply: {}", err)) + // TODO: Do stuff with the error + .map(|_| ()); ctx.spawn(future) }); } } - -/// Receives a client's request and performs either authentication or a query. -async fn handle_request( - shards: Vec>, +async fn handle_requests( + shard_clients: Vec>, enclave: E, mut requests: RequestStream, mut responses: DuplexSink, logger: Logger, ) -> Result<(), grpcio::Error> { - while let Some(mut request) = requests.try_next().await? { - if request.has_auth() { - match enclave.client_accept(request.take_auth().into()) { - Ok((enclave_response, _)) => { - let mut response = FogViewRouterResponse::new(); - response.mut_auth().set_data(enclave_response.into()); - responses - .send((response.clone(), WriteFlags::default())) - .await?; - } - Err(client_error) => { - log::debug!( - &logger, - "ViewEnclaveApi::client_accept failed: {}", - client_error - ); - let rpc_permissions_error = rpc_permissions_error( - "client_auth", - format!("Permission denied: {:?}", client_error), - &logger, - ); - return responses.fail(rpc_permissions_error).await; - } - } - } else if request.has_query() { - let query: attest::Message = request.take_query(); - // TODO: In the next PR, use this _shard_query_data to construct a - // MultiViewStoreQuery and send it off to the Fog View Load - // Balancers. - let _multi_view_store_query_data = - enclave.create_multi_view_store_query_data(query.into()); - let _result = route_query(shards.clone(), logger.clone()).await; - - let response = FogViewRouterResponse::new(); - responses - .send((response.clone(), WriteFlags::default())) - .await?; - } else { - // TODO: Throw some sort of error though not sure - // that's necessary. + while let Some(request) = requests.try_next().await? { + let result = handle_request( + request, + shard_clients.clone(), + enclave.clone(), + logger.clone(), + ) + .await; + match result { + Ok(response) => responses.send((response, WriteFlags::default())).await?, + Err(rpc_status) => return responses.fail(rpc_status).await, } } - responses.close().await?; Ok(()) } -// TODO: This method will be responsible for contacting each shard, passing -// along a MultiViewStoreQuery message. It will eventually return a Vec of -// encrypted QueryResponses that the caller of this method will transform into -// one FogViewRouterResponse to return to the client. -async fn route_query( - shards: Vec>, +/// Receives a client's request and performs either authentication or a query. +async fn handle_request( + mut request: FogViewRouterRequest, + shard_clients: Vec>, + enclave: E, logger: Logger, -) -> Result, String> { - let mut futures = Vec::new(); - for (i, shard) in shards.iter().enumerate() { - let future = contact_shard(i, shard.clone(), logger.clone()); - futures.push(future); +) -> Result { + if request.has_auth() { + return handle_auth_request(enclave, request.take_auth(), logger).await; + } else if request.has_query() { + return handle_query_request(request.take_query(), enclave, shard_clients, logger).await; + } else { + let rpc_status = rpc_invalid_arg_error( + "Inavlid FogViewRouterRequest request", + "Neither the query nor auth fields were set".to_string(), + &logger, + ); + Err(rpc_status) } +} + +async fn handle_auth_request( + enclave: E, + auth_message: attest::AuthMessage, + logger: Logger, +) -> Result { + let (client_auth_response, _) = enclave.client_accept(auth_message.into()).map_err(|err| { + router_server_err_to_rpc_status("Auth: e client accept", err.into(), logger) + })?; - try_join_all(futures).await + let mut response = FogViewRouterResponse::new(); + response.mut_auth().set_data(client_auth_response.into()); + Ok(response) } -// TODO: Pass along the MultiViewStoreQuery to the individual shard. -// This method will eventually return an encrypted QueryResponse that the -// router will decrypt and collate with all of the other shards' responses. -async fn contact_shard( - index: usize, - shard: Arc, +async fn handle_query_request( + query: attest::Message, + enclave: E, + shard_clients: Vec>, logger: Logger, -) -> Result { - log::info!(logger, "Contacting shard {} at index {}", shard, index); +) -> Result { + let mut query_responses: Vec = Vec::with_capacity(shard_clients.len()); + let mut shard_clients = shard_clients.clone(); + // TODO: use retry crate? + for _ in 0..RETRY_COUNT { + let multi_view_store_query_request: MultiViewStoreQueryRequest = enclave + .create_multi_view_store_query_data(query.clone().into()) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal encryption error", + err.into(), + logger.clone(), + ) + })? + .into(); + let clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)> = + route_query(&multi_view_store_query_request, shard_clients.clone()) + .await + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query routing error", + err, + logger.clone(), + ) + })?; + + shard_clients = Vec::new(); + let mut pending_auth_requests = Vec::new(); + for (shard_client, mut response) in clients_and_responses { + // We did not receive a query_response for this shard.Therefore, we need to: + // (a) retry the query + // (b) authenticate with the Fog View Store that returned the decryption_error + if response.has_decryption_error() { + shard_clients.push(shard_client); + let store_uri = + FogViewUri::from_str(&response.get_decryption_error().fog_view_store_uri) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: invalid uri returned from Fog View Store", + err.into(), + logger.clone(), + ) + })?; + let auth_future = auth_store(enclave.clone(), store_uri, logger.clone()); + pending_auth_requests.push(auth_future); + } else { + query_responses.push(response.take_query_response()); + } + } - Ok(0) + try_join_all(pending_auth_requests).await.map_err(|err| { + router_server_err_to_rpc_status( + "Query: cannot authenticate with each Fog View Store:", + err.into(), + logger.clone(), + ) + })?; + + // We've successfully retrieved responses from each shard so we can break. + if shard_clients.is_empty() { + break; + } + } + + // TODO: Collate the query_responses into one response for the client. Make an + // enclave method for this. + let response = FogViewRouterResponse::new(); + Ok(response) +} + +async fn route_query( + request: &MultiViewStoreQueryRequest, + shard_clients: Vec>, +) -> Result, MultiViewStoreQueryResponse)>, RouterServerError> { + let mut responses = Vec::with_capacity(shard_clients.len()); + for shard_client in shard_clients { + let response = query_shard(request, shard_client.clone()); + responses.push(response); + } + try_join_all(responses).await } + +async fn query_shard( + request: &MultiViewStoreQueryRequest, + shard_client: Arc, +) -> Result<(Arc, MultiViewStoreQueryResponse), RouterServerError> { + let client_unary_receiver = shard_client.multi_view_store_query_async(request)?; + let response = client_unary_receiver.await?; + + Ok((shard_client, response)) +} + +async fn auth_store( + enclave: E, + store_uri: FogViewUri, + logger: Logger, +) -> Result<(), RouterServerError> { + let store_responder_id = ResponderId::from_str(&store_uri.to_string())?; + let client_auth_request = enclave.client_init(store_responder_id.clone())?; + let grpc_env = Arc::new( + grpcio::EnvBuilder::new() + .name_prefix("Main-RPC".to_string()) + .build(), + ); + let store_client = FogViewApiClient::new( + ChannelBuilder::default_channel_builder(grpc_env).connect_to_uri(&store_uri, &logger), + ); + + let auth_unary_receiver = store_client.auth_async(&client_auth_request.into())?; + let auth_response = auth_unary_receiver.await?; + + let result = enclave.client_connect(store_responder_id, auth_response.into())?; + + Ok(result) +} \ No newline at end of file