From c9ae671c040515103d34cc92f8ed2c8c9c20ebd1 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Thu, 23 Jun 2022 11:47:33 -0400 Subject: [PATCH 01/10] Remove FogViewStoreUri --- fog/uri/src/lib.rs | 26 -------------------------- fog/view/server/src/bin/router.rs | 2 +- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/fog/uri/src/lib.rs b/fog/uri/src/lib.rs index a363cde82b..b8cbd33e06 100644 --- a/fog/uri/src/lib.rs +++ b/fog/uri/src/lib.rs @@ -18,20 +18,6 @@ impl UriScheme for FogViewRouterScheme { const DEFAULT_INSECURE_PORT: u16 = 3225; } -/// Fog View Store Scheme -#[derive(Debug, Hash, Ord, PartialOrd, Eq, PartialEq, Clone)] -pub struct FogViewStoreScheme {} - -impl UriScheme for FogViewStoreScheme { - /// The part before the '://' of a URL. - const SCHEME_SECURE: &'static str = "fog-view-store"; - const SCHEME_INSECURE: &'static str = "insecure-fog-view-store"; - - /// Default port numbers - const DEFAULT_SECURE_PORT: u16 = 443; - const DEFAULT_INSECURE_PORT: u16 = 3225; -} - /// Fog View Uri Scheme #[derive(Debug, Hash, Ord, PartialOrd, Eq, PartialEq, Clone)] pub struct FogViewScheme {} @@ -95,8 +81,6 @@ pub type FogIngestUri = Uri; pub type FogLedgerUri = Uri; /// Uri used when talking to fog view router service. pub type FogViewRouterUri = Uri; -/// Uri used when talking to fog view store service. -pub type FogViewStoreUri = Uri; /// Uri used when talking to fog-view service, with the right default ports and /// scheme. pub type FogViewUri = Uri; @@ -171,16 +155,6 @@ mod tests { ResponderId::from_str("node1.test.mobilecoin.com:3225").unwrap() ); assert!(!uri.use_tls()); - - let uri = - FogViewStoreUri::from_str("insecure-fog-view-store://node1.test.mobilecoin.com:3225/") - .unwrap(); - assert_eq!(uri.addr(), "node1.test.mobilecoin.com:3225"); - assert_eq!( - uri.responder_id().unwrap(), - ResponderId::from_str("node1.test.mobilecoin.com:3225").unwrap() - ); - assert!(!uri.use_tls()); } #[test] diff --git a/fog/view/server/src/bin/router.rs b/fog/view/server/src/bin/router.rs index b091fc7cca..1808548508 100644 --- a/fog/view/server/src/bin/router.rs +++ b/fog/view/server/src/bin/router.rs @@ -37,7 +37,7 @@ fn main() { let mut shard_uris: Vec = Vec::new(); for i in 0..50 { let shard_uri_string = format!( - "insecure-fog-view-store://node{}.test.mobilecoin.com:3225", + "insecure-fog-view://node{}.test.mobilecoin.com:3225", i ); let shard_uri = FogViewStoreUri::from_str(&shard_uri_string).unwrap(); From 0f7e4e7dfb29a4c4fb7b2213998c2044c5a787c5 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Thu, 23 Jun 2022 11:46:08 -0400 Subject: [PATCH 02/10] Implement core Fog Router Service logic --- Cargo.lock | 1 + fog/api/Cargo.toml | 1 + fog/api/src/conversions.rs | 28 +- fog/view/server/src/bin/router.rs | 25 +- fog/view/server/src/error.rs | 50 +++ fog/view/server/src/fog_view_router_server.rs | 35 +-- .../server/src/fog_view_router_service.rs | 294 +++++++++++++----- 7 files changed, 326 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 814f7ccaa1..f79703bab8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3293,6 +3293,7 @@ dependencies = [ "mc-api", "mc-attest-api", "mc-attest-core", + "mc-attest-enclave-api", "mc-consensus-api", "mc-crypto-keys", "mc-fog-enclave-connection", diff --git a/fog/api/Cargo.toml b/fog/api/Cargo.toml index 9f9825f198..602c4581b3 100644 --- a/fog/api/Cargo.toml +++ b/fog/api/Cargo.toml @@ -15,6 +15,7 @@ protobuf = "2.27.1" mc-api = { path = "../../api" } mc-attest-api = { path = "../../attest/api" } mc-attest-core = { path = "../../attest/core" } +mc-attest-enclave-api = { path = "../../attest/enclave-api" } mc-consensus-api = { path = "../../consensus/api" } mc-crypto-keys = { path = "../../crypto/keys" } mc-fog-enclave-connection = { path = "../enclave_connection" } diff --git a/fog/api/src/conversions.rs b/fog/api/src/conversions.rs index 95f9a124da..6577d6d9fa 100644 --- a/fog/api/src/conversions.rs +++ b/fog/api/src/conversions.rs @@ -2,11 +2,37 @@ // // Contains helper methods that enable conversions for Fog Api types. -use crate::{fog_common, ingest_common}; +use crate::{fog_common, ingest_common, view::MultiViewStoreQueryRequest}; use mc_api::ConversionError; +use mc_attest_api::attest; +use mc_attest_enclave_api::{ClientSession, EnclaveMessage}; use mc_crypto_keys::CompressedRistrettoPublic; use mc_fog_types::common; +impl From>> + for MultiViewStoreQueryRequest +{ + fn from(enclave_messages: Vec>) -> MultiViewStoreQueryRequest { + let attested_query_messages: Vec = enclave_messages + .into_iter() + .map(|enclave_message| enclave_message.into()) + .collect(); + let mut multi_view_store_query_request = MultiViewStoreQueryRequest::new(); + multi_view_store_query_request.set_queries(attested_query_messages.into()); + + multi_view_store_query_request + } +} + +impl From> for MultiViewStoreQueryRequest { + fn from(attested_query_messages: Vec) -> MultiViewStoreQueryRequest { + let mut multi_view_store_query_request = MultiViewStoreQueryRequest::new(); + multi_view_store_query_request.set_queries(attested_query_messages.into()); + + multi_view_store_query_request + } +} + impl From<&common::BlockRange> for fog_common::BlockRange { fn from(common_block_range: &common::BlockRange) -> fog_common::BlockRange { let mut proto_block_range = fog_common::BlockRange::new(); diff --git a/fog/view/server/src/bin/router.rs b/fog/view/server/src/bin/router.rs index 1808548508..48727e7fef 100644 --- a/fog/view/server/src/bin/router.rs +++ b/fog/view/server/src/bin/router.rs @@ -2,14 +2,17 @@ #![deny(missing_docs)] //! MobileCoin Fog View Router target +use grpcio::ChannelBuilder; use mc_common::logger::log; -use mc_fog_uri::FogViewStoreUri; +use mc_fog_api::view_grpc::FogViewApiClient; +use mc_fog_uri::FogViewUri; use mc_fog_view_enclave::{SgxViewEnclave, ENCLAVE_FILE}; use mc_fog_view_server::{ config::FogViewRouterConfig, fog_view_router_server::FogViewRouterServer, }; use mc_util_cli::ParserWithBuildInfo; -use std::{env, str::FromStr}; +use mc_util_grpc::ConnectionUriGrpcioChannel; +use std::{env, str::FromStr, sync::Arc}; fn main() { mc_common::setup_panic_handler(); @@ -34,17 +37,27 @@ fn main() { ); // TODO: Remove and get from a config. - let mut shard_uris: Vec = Vec::new(); + let mut fog_view_grpc_clients: Vec = Vec::new(); + let grpc_env = Arc::new( + grpcio::EnvBuilder::new() + .name_prefix("Main-RPC".to_string()) + .build(), + ); for i in 0..50 { let shard_uri_string = format!( "insecure-fog-view://node{}.test.mobilecoin.com:3225", i ); - let shard_uri = FogViewStoreUri::from_str(&shard_uri_string).unwrap(); - shard_uris.push(shard_uri); + let shard_uri = FogViewUri::from_str(&shard_uri_string).unwrap(); + let fog_view_grpc_client = FogViewApiClient::new( + ChannelBuilder::default_channel_builder(grpc_env.clone()) + .connect_to_uri(&shard_uri, &logger), + ); + fog_view_grpc_clients.push(fog_view_grpc_client); } - let mut router_server = FogViewRouterServer::new(config, sgx_enclave, shard_uris, logger); + let mut router_server = + FogViewRouterServer::new(config, sgx_enclave, fog_view_grpc_clients, logger); router_server.start(); loop { diff --git a/fog/view/server/src/error.rs b/fog/view/server/src/error.rs index 473dcfaec2..ee6786701a 100644 --- a/fog/view/server/src/error.rs +++ b/fog/view/server/src/error.rs @@ -1,8 +1,58 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation use displaydoc::Display; +use grpcio::RpcStatus; +use mc_common::logger::Logger; use mc_fog_view_enclave::Error as ViewEnclaveError; use mc_sgx_report_cache_untrusted::Error as ReportCacheError; +use mc_util_grpc::{rpc_internal_error, rpc_permissions_error}; + +#[derive(Debug, Display)] +pub enum RouterServerError { + /// Error related to contacting Fog View Store: {0} + ViewStoreError(String), + /// View Enclave error: {0} + Enclave(ViewEnclaveError), +} + +impl From for RouterServerError { + fn from(src: grpcio::Error) -> Self { + RouterServerError::ViewStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_common::ResponderIdParseError) -> Self { + RouterServerError::ViewStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_util_uri::UriParseError) -> Self { + RouterServerError::ViewStoreError(format!("{}", src)) + } +} + +pub fn router_server_err_to_rpc_status( + context: &str, + src: RouterServerError, + logger: Logger, +) -> RpcStatus { + match src { + RouterServerError::ViewStoreError(_) => { + rpc_internal_error(context, format!("{}", src), &logger) + } + RouterServerError::Enclave(_) => { + rpc_permissions_error(context, format!("{}", src), &logger) + } + } +} + +impl From for RouterServerError { + fn from(src: ViewEnclaveError) -> Self { + RouterServerError::Enclave(src) + } +} #[derive(Debug, Display)] pub enum ViewServerError { diff --git a/fog/view/server/src/fog_view_router_server.rs b/fog/view/server/src/fog_view_router_server.rs index 1c58cbbd19..da887e8851 100644 --- a/fog/view/server/src/fog_view_router_server.rs +++ b/fog/view/server/src/fog_view_router_server.rs @@ -8,31 +8,27 @@ use crate::{config::FogViewRouterConfig, fog_view_router_service::FogViewRouterS use futures::executor::block_on; use mc_common::logger::{log, Logger}; use mc_fog_api::view_grpc; -use mc_fog_uri::{ConnectionUri, FogViewStoreUri}; +use mc_fog_uri::ConnectionUri; use mc_fog_view_enclave::ViewEnclaveProxy; use mc_util_grpc::{ConnectionUriGrpcioServer, ReadinessIndicator}; use std::sync::Arc; -pub struct FogViewRouterServer -where - E: ViewEnclaveProxy, -{ +pub struct FogViewRouterServer { server: grpcio::Server, - _enclave: E, logger: Logger, } -impl FogViewRouterServer -where - E: ViewEnclaveProxy, -{ +impl FogViewRouterServer { /// Creates a new view router server instance - pub fn new( + pub fn new( config: FogViewRouterConfig, enclave: E, - shards: Vec, + shards: Vec, logger: Logger, - ) -> FogViewRouterServer { + ) -> FogViewRouterServer + where + E: ViewEnclaveProxy, + { let readiness_indicator = ReadinessIndicator::default(); let env = Arc::new( @@ -42,7 +38,7 @@ where ); let fog_view_router_service = view_grpc::create_fog_view_router_api( - FogViewRouterService::new(enclave.clone(), shards, logger.clone()), + FogViewRouterService::new(enclave, shards, logger.clone()), ); log::debug!(logger, "Constructed Fog View Router GRPC Service"); @@ -64,11 +60,7 @@ where let server = server_builder.build().unwrap(); - Self { - server, - _enclave: enclave, - logger, - } + Self { server, logger } } /// Starts the server @@ -85,10 +77,7 @@ where } } -impl Drop for FogViewRouterServer -where - E: ViewEnclaveProxy, -{ +impl Drop for FogViewRouterServer { fn drop(&mut self) { self.stop(); } diff --git a/fog/view/server/src/fog_view_router_service.rs b/fog/view/server/src/fog_view_router_service.rs index 25090ca49f..fae4be0450 100644 --- a/fog/view/server/src/fog_view_router_service.rs +++ b/fog/view/server/src/fog_view_router_service.rs @@ -1,34 +1,46 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation +use crate::error::{router_server_err_to_rpc_status, RouterServerError}; use futures::{future::try_join_all, FutureExt, SinkExt, TryFutureExt, TryStreamExt}; -use grpcio::{DuplexSink, RequestStream, RpcContext, WriteFlags}; +use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcContext, RpcStatus, WriteFlags}; use mc_attest_api::attest; -use mc_common::logger::{log, Logger}; +use mc_common::{ + logger::{log, Logger}, + ResponderId, +}; use mc_fog_api::{ - view::{FogViewRouterRequest, FogViewRouterResponse}, - view_grpc::FogViewRouterApi, + view::{ + FogViewRouterRequest, FogViewRouterResponse, MultiViewStoreQueryRequest, + MultiViewStoreQueryResponse, + }, + view_grpc::{FogViewApiClient, FogViewRouterApi}, }; -use mc_fog_uri::FogViewStoreUri; +use mc_fog_uri::FogViewUri; use mc_fog_view_enclave_api::ViewEnclaveProxy; -use mc_util_grpc::{rpc_logger, rpc_permissions_error}; +use mc_util_grpc::{rpc_invalid_arg_error, rpc_logger, ConnectionUriGrpcioChannel}; use mc_util_metrics::SVC_COUNTERS; -use std::sync::Arc; +use std::{future::Future, str::FromStr, sync::Arc}; + +const RETRY_COUNT: usize = 3; #[derive(Clone)] pub struct FogViewRouterService { enclave: E, - shards: Vec>, + shard_clients: Vec>, logger: Logger, } impl FogViewRouterService { /// Creates a new FogViewRouterService that can be used by a gRPC server to /// fulfill gRPC requests. - pub fn new(enclave: E, shards: Vec, logger: Logger) -> Self { - let shards = shards.into_iter().map(Arc::new).collect(); + /// + /// TODO: Add a `view_store_clients` parameter of type FogApiClient, and + /// perform view store authentication on each one. + pub fn new(enclave: E, shard_clients: Vec, logger: Logger) -> Self { + let shard_clients = shard_clients.into_iter().map(Arc::new).collect(); Self { enclave, - shards, + shard_clients, logger, } } @@ -47,8 +59,8 @@ impl FogViewRouterApi for FogViewRouterService { let logger = logger.clone(); // TODO: Confirm that we don't need to perform the authenticator logic. I think // we don't because of streaming... - let future = handle_request( - self.shards.clone(), + let future = handle_requests( + self.shard_clients.clone(), self.enclave.clone(), requests, responses, @@ -63,87 +75,213 @@ impl FogViewRouterApi for FogViewRouterService { } } -/// Receives a client's request and performs either authentication or a query. -async fn handle_request( - shards: Vec>, +/// Handles a series of requests sent by the Fog Router client. +async fn handle_requests( + shard_clients: Vec>, enclave: E, mut requests: RequestStream, mut responses: DuplexSink, logger: Logger, ) -> Result<(), grpcio::Error> { - while let Some(mut request) = requests.try_next().await? { - if request.has_auth() { - match enclave.client_accept(request.take_auth().into()) { - Ok((enclave_response, _)) => { - let mut response = FogViewRouterResponse::new(); - response.mut_auth().set_data(enclave_response.into()); - responses - .send((response.clone(), WriteFlags::default())) - .await?; - } - Err(client_error) => { - log::debug!( - &logger, - "ViewEnclaveApi::client_accept failed: {}", - client_error - ); - let rpc_permissions_error = rpc_permissions_error( - "client_auth", - format!("Permission denied: {:?}", client_error), - &logger, - ); - return responses.fail(rpc_permissions_error).await; - } - } - } else if request.has_query() { - let query: attest::Message = request.take_query(); - // TODO: In the next PR, use this _shard_query_data to construct a - // MultiViewStoreQuery and send it off to the Fog View Load - // Balancers. - let _multi_view_store_query_data = - enclave.create_multi_view_store_query_data(query.into()); - let _result = route_query(shards.clone(), logger.clone()).await; - - let response = FogViewRouterResponse::new(); - responses - .send((response.clone(), WriteFlags::default())) - .await?; - } else { - // TODO: Throw some sort of error though not sure - // that's necessary. + while let Some(request) = requests.try_next().await? { + let result = handle_request( + request, + shard_clients.clone(), + enclave.clone(), + logger.clone(), + ) + .await; + match result { + Ok(response) => responses.send((response, WriteFlags::default())).await?, + Err(rpc_status) => return responses.fail(rpc_status).await, } } - responses.close().await?; Ok(()) } -// TODO: This method will be responsible for contacting each shard, passing -// along a MultiViewStoreQuery message. It will eventually return a Vec of -// encrypted QueryResponses that the caller of this method will transform into -// one FogViewRouterResponse to return to the client. -async fn route_query( - shards: Vec>, +/// Handles a client's request by performing either an authentication or a +/// query. +async fn handle_request( + mut request: FogViewRouterRequest, + shard_clients: Vec>, + enclave: E, + logger: Logger, +) -> Result { + if request.has_auth() { + return handle_auth_request(enclave, request.take_auth(), logger).await; + } else if request.has_query() { + return handle_query_request(request.take_query(), enclave, shard_clients, logger).await; + } else { + let rpc_status = rpc_invalid_arg_error( + "Inavlid FogViewRouterRequest request", + "Neither the query nor auth fields were set".to_string(), + &logger, + ); + Err(rpc_status) + } +} + +/// Handles a client's authentication request. +async fn handle_auth_request( + enclave: E, + auth_message: attest::AuthMessage, + logger: Logger, +) -> Result { + let (client_auth_response, _) = enclave.client_accept(auth_message.into()).map_err(|err| { + router_server_err_to_rpc_status("Auth: e client accept", err.into(), logger) + })?; + + let mut response = FogViewRouterResponse::new(); + response.mut_auth().set_data(client_auth_response.into()); + Ok(response) +} + +/// Handles a client's query request. +async fn handle_query_request( + query: attest::Message, + enclave: E, + shard_clients: Vec>, + logger: Logger, +) -> Result { + let mut query_responses: Vec = Vec::with_capacity(shard_clients.len()); + let shard_clients = shard_clients.clone(); + // TODO: use retry crate? + for _ in 0..RETRY_COUNT { + let multi_view_store_query_request: MultiViewStoreQueryRequest = enclave + .create_multi_view_store_query_data(query.clone().into()) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal encryption error", + err.into(), + logger.clone(), + ) + })? + .into(); + let clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)> = + route_query(&multi_view_store_query_request, shard_clients.clone()) + .await + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query routing error", + err, + logger.clone(), + ) + })?; + + let (shard_clients, pending_auth_requests, mut new_query_responses) = + process_shard_responses(clients_and_responses, enclave.clone(), logger.clone()) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query response processing", + err, + logger.clone(), + ) + })?; + query_responses.append(&mut new_query_responses); + + try_join_all(pending_auth_requests).await.map_err(|err| { + router_server_err_to_rpc_status( + "Query: cannot authenticate with each Fog View Store:", + err, + logger.clone(), + ) + })?; + + // We've successfully retrieved responses from each shard so we can break. + if shard_clients.is_empty() { + break; + } + } + + // TODO: Collate the query_responses into one response for the client. Make an + // enclave method for this. + let response = FogViewRouterResponse::new(); + Ok(response) +} + +fn process_shard_responses( + clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)>, + enclave: E, logger: Logger, -) -> Result, String> { - let mut futures = Vec::new(); - for (i, shard) in shards.iter().enumerate() { - let future = contact_shard(i, shard.clone(), logger.clone()); - futures.push(future); +) -> Result< + ( + Vec>, + Vec>>, + Vec, + ), + RouterServerError, +> { + let mut shard_clients_for_retry = Vec::new(); + let mut pending_auth_requests = Vec::new(); + let mut new_query_responses = Vec::new(); + for (shard_client, mut response) in clients_and_responses { + // We did not receive a query_response for this shard.Therefore, we need to: + // (a) retry the query + // (b) authenticate with the Fog View Store that returned the decryption_error + if response.has_decryption_error() { + shard_clients_for_retry.push(shard_client); + let store_uri = + FogViewUri::from_str(&response.get_decryption_error().fog_view_store_uri)?; + let auth_future = authenticate_view_store(enclave.clone(), store_uri, logger.clone()); + pending_auth_requests.push(auth_future); + } else { + new_query_responses.push(response.take_query_response()); + } } - try_join_all(futures).await + Ok(( + shard_clients_for_retry, + pending_auth_requests, + new_query_responses, + )) } -// TODO: Pass along the MultiViewStoreQuery to the individual shard. -// This method will eventually return an encrypted QueryResponse that the -// router will decrypt and collate with all of the other shards' responses. -async fn contact_shard( - index: usize, - shard: Arc, +/// Sends a client's query request to all of the Fog View shards. +async fn route_query( + request: &MultiViewStoreQueryRequest, + shard_clients: Vec>, +) -> Result, MultiViewStoreQueryResponse)>, RouterServerError> { + let mut responses = Vec::with_capacity(shard_clients.len()); + for shard_client in shard_clients { + let response = query_shard(request, shard_client.clone()); + responses.push(response); + } + try_join_all(responses).await +} + +/// Sends a client's query request to one of the Fog View shards. +async fn query_shard( + request: &MultiViewStoreQueryRequest, + shard_client: Arc, +) -> Result<(Arc, MultiViewStoreQueryResponse), RouterServerError> { + let client_unary_receiver = shard_client.multi_view_store_query_async(request)?; + let response = client_unary_receiver.await?; + + Ok((shard_client, response)) +} + +/// Authenticates a Fog View Store that has previously not been authenticated. +async fn authenticate_view_store( + enclave: E, + view_store_url: FogViewUri, logger: Logger, -) -> Result { - log::info!(logger, "Contacting shard {} at index {}", shard, index); +) -> Result<(), RouterServerError> { + let view_store_id = ResponderId::from_str(&view_store_url.to_string())?; + let client_auth_request = enclave.view_store_init(view_store_id.clone())?; + let grpc_env = Arc::new( + grpcio::EnvBuilder::new() + .name_prefix("Main-RPC".to_string()) + .build(), + ); + let view_store_client = FogViewApiClient::new( + ChannelBuilder::default_channel_builder(grpc_env).connect_to_uri(&view_store_url, &logger), + ); + + let auth_unary_receiver = view_store_client.auth_async(&client_auth_request.into())?; + let auth_response = auth_unary_receiver.await?; + + let result = enclave.view_store_connect(view_store_id, auth_response.into())?; - Ok(0) + Ok(result) } From b11b09b7475cebd078a1f57fab21e6fcb3423aad Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Fri, 24 Jun 2022 12:26:02 -0400 Subject: [PATCH 03/10] Refactor module structure and add tests --- Cargo.lock | 1 - fog/view/server/Cargo.toml | 1 - fog/view/server/src/bin/router.rs | 5 +- .../src/fog_view_router_request_handler.rs | 214 ++++++++++++++++ .../server/src/fog_view_router_service.rs | 238 +----------------- .../src/fog_view_shard_responses_processor.rs | 236 +++++++++++++++++ fog/view/server/src/lib.rs | 2 + 7 files changed, 462 insertions(+), 235 deletions(-) create mode 100644 fog/view/server/src/fog_view_router_request_handler.rs create mode 100644 fog/view/server/src/fog_view_shard_responses_processor.rs diff --git a/Cargo.lock b/Cargo.lock index f79703bab8..dbc194a089 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4476,7 +4476,6 @@ dependencies = [ "mc-fog-kex-rng", "mc-fog-recovery-db-iface", "mc-fog-sql-recovery-db", - "mc-fog-test-infra", "mc-fog-types", "mc-fog-uri", "mc-fog-view-connection", diff --git a/fog/view/server/Cargo.toml b/fog/view/server/Cargo.toml index a0d573af0f..941c968302 100644 --- a/fog/view/server/Cargo.toml +++ b/fog/view/server/Cargo.toml @@ -69,7 +69,6 @@ mc-util-serial = { path = "../../../util/serial" } mc-util-test-helper = { path = "../../../util/test-helper" } mc-util-uri = { path = "../../../util/uri" } -mc-fog-test-infra = { path = "../../test_infra" } mc-fog-types = { path = "../../types" } mc-fog-view-connection = { path = "../connection" } mc-fog-view-enclave-measurement = { path = "../enclave/measurement" } diff --git a/fog/view/server/src/bin/router.rs b/fog/view/server/src/bin/router.rs index 48727e7fef..6f14fddd72 100644 --- a/fog/view/server/src/bin/router.rs +++ b/fog/view/server/src/bin/router.rs @@ -44,10 +44,7 @@ fn main() { .build(), ); for i in 0..50 { - let shard_uri_string = format!( - "insecure-fog-view://node{}.test.mobilecoin.com:3225", - i - ); + let shard_uri_string = format!("insecure-fog-view://node{}.test.mobilecoin.com:3225", i); let shard_uri = FogViewUri::from_str(&shard_uri_string).unwrap(); let fog_view_grpc_client = FogViewApiClient::new( ChannelBuilder::default_channel_builder(grpc_env.clone()) diff --git a/fog/view/server/src/fog_view_router_request_handler.rs b/fog/view/server/src/fog_view_router_request_handler.rs new file mode 100644 index 0000000000..4768afcf48 --- /dev/null +++ b/fog/view/server/src/fog_view_router_request_handler.rs @@ -0,0 +1,214 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use crate::{ + error::{router_server_err_to_rpc_status, RouterServerError}, + fog_view_shard_responses_processor, +}; + +use futures::{future::try_join_all, SinkExt, TryStreamExt}; +use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcStatus, WriteFlags}; +use mc_attest_api::attest; +use mc_common::{logger::Logger, ResponderId}; +use mc_fog_api::{ + view::{ + FogViewRouterRequest, FogViewRouterResponse, MultiViewStoreQueryRequest, + MultiViewStoreQueryResponse, + }, + view_grpc::FogViewApiClient, +}; +use mc_fog_uri::FogViewUri; +use mc_fog_view_enclave_api::ViewEnclaveProxy; +use mc_util_grpc::{rpc_invalid_arg_error, ConnectionUriGrpcioChannel}; +use std::{str::FromStr, sync::Arc}; + +const RETRY_COUNT: usize = 3; + +/// Handles a series of requests sent by the Fog Router client. +pub async fn handle_requests( + shard_clients: Vec>, + enclave: E, + mut requests: RequestStream, + mut responses: DuplexSink, + logger: Logger, +) -> Result<(), grpcio::Error> { + while let Some(request) = requests.try_next().await? { + let result = handle_request( + request, + shard_clients.clone(), + enclave.clone(), + logger.clone(), + ) + .await; + match result { + Ok(response) => responses.send((response, WriteFlags::default())).await?, + Err(rpc_status) => return responses.fail(rpc_status).await, + } + } + responses.close().await?; + Ok(()) +} + +/// Handles a client's request by performing either an authentication or a +/// query. +pub async fn handle_request( + mut request: FogViewRouterRequest, + shard_clients: Vec>, + enclave: E, + logger: Logger, +) -> Result { + if request.has_auth() { + return handle_auth_request(enclave, request.take_auth(), logger).await; + } else if request.has_query() { + return handle_query_request(request.take_query(), enclave, shard_clients, logger).await; + } else { + let rpc_status = rpc_invalid_arg_error( + "Inavlid FogViewRouterRequest request", + "Neither the query nor auth fields were set".to_string(), + &logger, + ); + Err(rpc_status) + } +} + +/// Handles a client's authentication request. +async fn handle_auth_request( + enclave: E, + auth_message: attest::AuthMessage, + logger: Logger, +) -> Result { + let (client_auth_response, _) = enclave.client_accept(auth_message.into()).map_err(|err| { + router_server_err_to_rpc_status("Auth: e client accept", err.into(), logger) + })?; + + let mut response = FogViewRouterResponse::new(); + response.mut_auth().set_data(client_auth_response.into()); + Ok(response) +} + +/// Handles a client's query request. +async fn handle_query_request( + query: attest::Message, + enclave: E, + shard_clients: Vec>, + logger: Logger, +) -> Result { + let mut query_responses: Vec = Vec::with_capacity(shard_clients.len()); + let mut shard_clients = shard_clients.clone(); + // TODO: use retry crate? + for _ in 0..RETRY_COUNT { + let multi_view_store_query_request: MultiViewStoreQueryRequest = enclave + .create_multi_view_store_query_data(query.clone().into()) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal encryption error", + err.into(), + logger.clone(), + ) + })? + .into(); + let clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)> = + route_query(&multi_view_store_query_request, shard_clients.clone()) + .await + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query routing error", + err, + logger.clone(), + ) + })?; + + let mut processed_shard_response_data = + fog_view_shard_responses_processor::process_shard_responses(clients_and_responses) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query response processing", + err, + logger.clone(), + ) + })?; + + query_responses.append(&mut processed_shard_response_data.new_query_responses); + shard_clients = processed_shard_response_data.shard_clients_for_retry; + if shard_clients.is_empty() { + break; + } + + authenticate_view_stores( + enclave.clone(), + processed_shard_response_data.view_store_uris_for_authentication, + logger.clone(), + ) + .await?; + } + + // TODO: Collate the query_responses into one response for the client. Make an + // enclave method for this. + let response = FogViewRouterResponse::new(); + Ok(response) +} + +/// Sends a client's query request to all of the Fog View shards. +async fn route_query( + request: &MultiViewStoreQueryRequest, + shard_clients: Vec>, +) -> Result, MultiViewStoreQueryResponse)>, RouterServerError> { + let responses = shard_clients + .into_iter() + .map(|shard_client| query_shard(request, shard_client.clone())); + try_join_all(responses).await +} + +/// Sends a client's query request to one of the Fog View shards. +async fn query_shard( + request: &MultiViewStoreQueryRequest, + shard_client: Arc, +) -> Result<(Arc, MultiViewStoreQueryResponse), RouterServerError> { + let client_unary_receiver = shard_client.multi_view_store_query_async(request)?; + let response = client_unary_receiver.await?; + + Ok((shard_client, response)) +} + +/// Authenticates Fog View Stores that have previously not been authenticated. +pub async fn authenticate_view_stores( + enclave: E, + view_store_uris: Vec, + logger: Logger, +) -> Result, RpcStatus> { + let pending_auth_requests = view_store_uris + .into_iter() + .map(|store_uri| authenticate_view_store(enclave.clone(), store_uri, logger.clone())); + + try_join_all(pending_auth_requests).await.map_err(|err| { + router_server_err_to_rpc_status( + "Query: cannot authenticate with each Fog View Store:", + err, + logger.clone(), + ) + }) +} + +/// Authenticates a Fog View Store that has previously not been authenticated. +pub async fn authenticate_view_store( + enclave: E, + view_store_url: FogViewUri, + logger: Logger, +) -> Result<(), RouterServerError> { + let view_store_id = ResponderId::from_str(&view_store_url.to_string())?; + let client_auth_request = enclave.view_store_init(view_store_id.clone())?; + let grpc_env = Arc::new( + grpcio::EnvBuilder::new() + .name_prefix("Main-RPC".to_string()) + .build(), + ); + let view_store_client = FogViewApiClient::new( + ChannelBuilder::default_channel_builder(grpc_env).connect_to_uri(&view_store_url, &logger), + ); + + let auth_unary_receiver = view_store_client.auth_async(&client_auth_request.into())?; + let auth_response = auth_unary_receiver.await?; + + let result = enclave.view_store_connect(view_store_id, auth_response.into())?; + + Ok(result) +} diff --git a/fog/view/server/src/fog_view_router_service.rs b/fog/view/server/src/fog_view_router_service.rs index fae4be0450..8dccfd665c 100644 --- a/fog/view/server/src/fog_view_router_service.rs +++ b/fog/view/server/src/fog_view_router_service.rs @@ -1,27 +1,18 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation -use crate::error::{router_server_err_to_rpc_status, RouterServerError}; -use futures::{future::try_join_all, FutureExt, SinkExt, TryFutureExt, TryStreamExt}; -use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcContext, RpcStatus, WriteFlags}; -use mc_attest_api::attest; -use mc_common::{ - logger::{log, Logger}, - ResponderId, -}; +use crate::fog_view_router_request_handler; + +use futures::{FutureExt, TryFutureExt}; +use grpcio::{DuplexSink, RequestStream, RpcContext}; +use mc_common::logger::{log, Logger}; use mc_fog_api::{ - view::{ - FogViewRouterRequest, FogViewRouterResponse, MultiViewStoreQueryRequest, - MultiViewStoreQueryResponse, - }, + view::{FogViewRouterRequest, FogViewRouterResponse}, view_grpc::{FogViewApiClient, FogViewRouterApi}, }; -use mc_fog_uri::FogViewUri; use mc_fog_view_enclave_api::ViewEnclaveProxy; -use mc_util_grpc::{rpc_invalid_arg_error, rpc_logger, ConnectionUriGrpcioChannel}; +use mc_util_grpc::rpc_logger; use mc_util_metrics::SVC_COUNTERS; -use std::{future::Future, str::FromStr, sync::Arc}; - -const RETRY_COUNT: usize = 3; +use std::sync::Arc; #[derive(Clone)] pub struct FogViewRouterService { @@ -59,7 +50,7 @@ impl FogViewRouterApi for FogViewRouterService { let logger = logger.clone(); // TODO: Confirm that we don't need to perform the authenticator logic. I think // we don't because of streaming... - let future = handle_requests( + let future = fog_view_router_request_handler::handle_requests( self.shard_clients.clone(), self.enclave.clone(), requests, @@ -74,214 +65,3 @@ impl FogViewRouterApi for FogViewRouterService { }); } } - -/// Handles a series of requests sent by the Fog Router client. -async fn handle_requests( - shard_clients: Vec>, - enclave: E, - mut requests: RequestStream, - mut responses: DuplexSink, - logger: Logger, -) -> Result<(), grpcio::Error> { - while let Some(request) = requests.try_next().await? { - let result = handle_request( - request, - shard_clients.clone(), - enclave.clone(), - logger.clone(), - ) - .await; - match result { - Ok(response) => responses.send((response, WriteFlags::default())).await?, - Err(rpc_status) => return responses.fail(rpc_status).await, - } - } - responses.close().await?; - Ok(()) -} - -/// Handles a client's request by performing either an authentication or a -/// query. -async fn handle_request( - mut request: FogViewRouterRequest, - shard_clients: Vec>, - enclave: E, - logger: Logger, -) -> Result { - if request.has_auth() { - return handle_auth_request(enclave, request.take_auth(), logger).await; - } else if request.has_query() { - return handle_query_request(request.take_query(), enclave, shard_clients, logger).await; - } else { - let rpc_status = rpc_invalid_arg_error( - "Inavlid FogViewRouterRequest request", - "Neither the query nor auth fields were set".to_string(), - &logger, - ); - Err(rpc_status) - } -} - -/// Handles a client's authentication request. -async fn handle_auth_request( - enclave: E, - auth_message: attest::AuthMessage, - logger: Logger, -) -> Result { - let (client_auth_response, _) = enclave.client_accept(auth_message.into()).map_err(|err| { - router_server_err_to_rpc_status("Auth: e client accept", err.into(), logger) - })?; - - let mut response = FogViewRouterResponse::new(); - response.mut_auth().set_data(client_auth_response.into()); - Ok(response) -} - -/// Handles a client's query request. -async fn handle_query_request( - query: attest::Message, - enclave: E, - shard_clients: Vec>, - logger: Logger, -) -> Result { - let mut query_responses: Vec = Vec::with_capacity(shard_clients.len()); - let shard_clients = shard_clients.clone(); - // TODO: use retry crate? - for _ in 0..RETRY_COUNT { - let multi_view_store_query_request: MultiViewStoreQueryRequest = enclave - .create_multi_view_store_query_data(query.clone().into()) - .map_err(|err| { - router_server_err_to_rpc_status( - "Query: internal encryption error", - err.into(), - logger.clone(), - ) - })? - .into(); - let clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)> = - route_query(&multi_view_store_query_request, shard_clients.clone()) - .await - .map_err(|err| { - router_server_err_to_rpc_status( - "Query: internal query routing error", - err, - logger.clone(), - ) - })?; - - let (shard_clients, pending_auth_requests, mut new_query_responses) = - process_shard_responses(clients_and_responses, enclave.clone(), logger.clone()) - .map_err(|err| { - router_server_err_to_rpc_status( - "Query: internal query response processing", - err, - logger.clone(), - ) - })?; - query_responses.append(&mut new_query_responses); - - try_join_all(pending_auth_requests).await.map_err(|err| { - router_server_err_to_rpc_status( - "Query: cannot authenticate with each Fog View Store:", - err, - logger.clone(), - ) - })?; - - // We've successfully retrieved responses from each shard so we can break. - if shard_clients.is_empty() { - break; - } - } - - // TODO: Collate the query_responses into one response for the client. Make an - // enclave method for this. - let response = FogViewRouterResponse::new(); - Ok(response) -} - -fn process_shard_responses( - clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)>, - enclave: E, - logger: Logger, -) -> Result< - ( - Vec>, - Vec>>, - Vec, - ), - RouterServerError, -> { - let mut shard_clients_for_retry = Vec::new(); - let mut pending_auth_requests = Vec::new(); - let mut new_query_responses = Vec::new(); - for (shard_client, mut response) in clients_and_responses { - // We did not receive a query_response for this shard.Therefore, we need to: - // (a) retry the query - // (b) authenticate with the Fog View Store that returned the decryption_error - if response.has_decryption_error() { - shard_clients_for_retry.push(shard_client); - let store_uri = - FogViewUri::from_str(&response.get_decryption_error().fog_view_store_uri)?; - let auth_future = authenticate_view_store(enclave.clone(), store_uri, logger.clone()); - pending_auth_requests.push(auth_future); - } else { - new_query_responses.push(response.take_query_response()); - } - } - - Ok(( - shard_clients_for_retry, - pending_auth_requests, - new_query_responses, - )) -} - -/// Sends a client's query request to all of the Fog View shards. -async fn route_query( - request: &MultiViewStoreQueryRequest, - shard_clients: Vec>, -) -> Result, MultiViewStoreQueryResponse)>, RouterServerError> { - let mut responses = Vec::with_capacity(shard_clients.len()); - for shard_client in shard_clients { - let response = query_shard(request, shard_client.clone()); - responses.push(response); - } - try_join_all(responses).await -} - -/// Sends a client's query request to one of the Fog View shards. -async fn query_shard( - request: &MultiViewStoreQueryRequest, - shard_client: Arc, -) -> Result<(Arc, MultiViewStoreQueryResponse), RouterServerError> { - let client_unary_receiver = shard_client.multi_view_store_query_async(request)?; - let response = client_unary_receiver.await?; - - Ok((shard_client, response)) -} - -/// Authenticates a Fog View Store that has previously not been authenticated. -async fn authenticate_view_store( - enclave: E, - view_store_url: FogViewUri, - logger: Logger, -) -> Result<(), RouterServerError> { - let view_store_id = ResponderId::from_str(&view_store_url.to_string())?; - let client_auth_request = enclave.view_store_init(view_store_id.clone())?; - let grpc_env = Arc::new( - grpcio::EnvBuilder::new() - .name_prefix("Main-RPC".to_string()) - .build(), - ); - let view_store_client = FogViewApiClient::new( - ChannelBuilder::default_channel_builder(grpc_env).connect_to_uri(&view_store_url, &logger), - ); - - let auth_unary_receiver = view_store_client.auth_async(&client_auth_request.into())?; - let auth_response = auth_unary_receiver.await?; - - let result = enclave.view_store_connect(view_store_id, auth_response.into())?; - - Ok(result) -} diff --git a/fog/view/server/src/fog_view_shard_responses_processor.rs b/fog/view/server/src/fog_view_shard_responses_processor.rs new file mode 100644 index 0000000000..2751cbc97d --- /dev/null +++ b/fog/view/server/src/fog_view_shard_responses_processor.rs @@ -0,0 +1,236 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use crate::error::RouterServerError; + +use mc_attest_api::attest; +use mc_fog_api::{view::MultiViewStoreQueryResponse, view_grpc::FogViewApiClient}; +use mc_fog_uri::FogViewUri; +use std::{str::FromStr, sync::Arc}; + +/// The result of processing the MultiViewStoreQueryResponse from each Fog View +/// Shard. +pub struct ProcessedShardResponseData { + /// gRPC clients for Shards that need to be retried for a successful + /// response. + pub shard_clients_for_retry: Vec>, + + /// Uris for *individual* Fog View Stores that need to be authenticated with + /// by the Fog Router. + pub view_store_uris_for_authentication: Vec, + + /// New, successfully processed query responses. + pub new_query_responses: Vec, +} + +impl ProcessedShardResponseData { + pub fn new( + shard_clients_for_retry: Vec>, + view_store_uris_for_authentication: Vec, + new_query_responses: Vec, + ) -> Self { + ProcessedShardResponseData { + shard_clients_for_retry, + view_store_uris_for_authentication, + new_query_responses, + } + } +} + +/// Processes the MultiViewStoreQueryResponses returned by each Fog View Shard. +pub fn process_shard_responses( + clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)>, +) -> Result { + let mut shard_clients_for_retry = Vec::new(); + let mut view_store_uris_for_authentication = Vec::new(); + let mut new_query_responses = Vec::new(); + + for (shard_client, mut response) in clients_and_responses { + // We did not receive a query_response for this shard.Therefore, we need to: + // (a) retry the query + // (b) authenticate with the Fog View Store that returned the decryption_error + if response.has_decryption_error() { + shard_clients_for_retry.push(shard_client); + let store_uri = + FogViewUri::from_str(&response.get_decryption_error().fog_view_store_uri)?; + view_store_uris_for_authentication.push(store_uri); + } else { + new_query_responses.push(response.take_query_response()); + } + } + + Ok(ProcessedShardResponseData::new( + shard_clients_for_retry, + view_store_uris_for_authentication, + new_query_responses, + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use grpcio::ChannelBuilder; + use mc_common::logger::{test_with_logger, Logger}; + use mc_util_grpc::ConnectionUriGrpcioChannel; + + fn create_successful_mvq_response() -> MultiViewStoreQueryResponse { + let mut successful_response = MultiViewStoreQueryResponse::new(); + let client_auth_request = vec![].into(); + successful_response + .mut_query_response() + .set_data(client_auth_request); + + successful_response + } + + fn create_failed_mvq_response(i: usize) -> MultiViewStoreQueryResponse { + let mut failed_response = MultiViewStoreQueryResponse::new(); + let view_uri_string = format!("insecure-fog-view://node{}.test.mobilecoin.com:3225", i); + failed_response + .mut_decryption_error() + .set_fog_view_store_uri(view_uri_string); + + failed_response + } + + fn create_grpc_client(i: usize, logger: Logger) -> Arc { + let view_uri_string = format!("insecure-fog-view://node{}.test.mobilecoin.com:3225", i); + let view_uri = FogViewUri::from_str(&view_uri_string).unwrap(); + let grpc_env = Arc::new( + grpcio::EnvBuilder::new() + .name_prefix("Main-RPC".to_string()) + .build(), + ); + + let grpc_client = FogViewApiClient::new( + ChannelBuilder::default_channel_builder(grpc_env.clone()) + .connect_to_uri(&view_uri, &logger), + ); + + Arc::new(grpc_client) + } + + #[test_with_logger] + fn one_successful_response_no_shard_clients(logger: Logger) { + let grpc_client = create_grpc_client(0, logger.clone()); + let successful_mvq_response = create_successful_mvq_response(); + let clients_and_responses = vec![(grpc_client, successful_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let shard_clients_for_retry = result.unwrap().shard_clients_for_retry; + assert!(shard_clients_for_retry.is_empty()); + } + + #[test_with_logger] + fn one_successful_response_no_pending_authentications(logger: Logger) { + let grpc_client = create_grpc_client(0, logger.clone()); + let successful_mvq_response = create_successful_mvq_response(); + let clients_and_responses = vec![(grpc_client, successful_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let view_store_uris_for_authentication = result.unwrap().view_store_uris_for_authentication; + assert!(view_store_uris_for_authentication.is_empty()); + } + + #[test_with_logger] + fn one_successful_response_one_new_query_response(logger: Logger) { + let grpc_client = create_grpc_client(0, logger.clone()); + let successful_mvq_response = create_successful_mvq_response(); + let clients_and_responses = vec![(grpc_client, successful_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let new_query_response = result.unwrap().new_query_responses; + assert_eq!(new_query_response.len(), 1); + } + + #[test_with_logger] + fn one_failed_response_one_pending_shard_client(logger: Logger) { + let client_index: usize = 0; + let grpc_client = create_grpc_client(client_index, logger.clone()); + let failed_mvq_response = create_failed_mvq_response(client_index); + let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let shard_clients_for_retry = result.unwrap().shard_clients_for_retry; + assert_eq!(shard_clients_for_retry.len(), 1); + } + + #[test_with_logger] + fn one_failed_response_one_pending_authentications(logger: Logger) { + let client_index: usize = 0; + let grpc_client = create_grpc_client(client_index, logger.clone()); + let failed_mvq_response = create_failed_mvq_response(client_index); + let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let view_store_uris_for_authentication = result.unwrap().view_store_uris_for_authentication; + assert_eq!(view_store_uris_for_authentication.len(), 1); + } + + #[test_with_logger] + fn one_failed_response_zero_new_query_responses(logger: Logger) { + let client_index: usize = 0; + let grpc_client = create_grpc_client(client_index, logger.clone()); + let failed_mvq_response = create_failed_mvq_response(client_index); + let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let new_query_responses = result.unwrap().new_query_responses; + assert!(new_query_responses.is_empty()); + } + + #[test_with_logger] + fn mixed_failed_and_successful_responses_processes_correctly(logger: Logger) { + let number_of_failures: usize = 11; + let number_of_successes: usize = 8; + + let mut clients_and_responses = Vec::new(); + for i in 0..number_of_failures { + let grpc_client = create_grpc_client(i, logger.clone()); + let failed_mvq_response = create_failed_mvq_response(i); + clients_and_responses.push((grpc_client, failed_mvq_response)); + } + for i in 0..number_of_successes { + let client_index = i + number_of_failures; + let grpc_client = create_grpc_client(client_index, logger.clone()); + let successful_mvq_response = create_successful_mvq_response(); + clients_and_responses.push((grpc_client, successful_mvq_response)); + } + + let result = process_shard_responses(clients_and_responses); + assert!(result.is_ok()); + let processed_shard_response_data = result.unwrap(); + + assert_eq!( + processed_shard_response_data.shard_clients_for_retry.len(), + number_of_failures + ); + assert_eq!( + processed_shard_response_data + .view_store_uris_for_authentication + .len(), + number_of_failures + ); + assert_eq!( + processed_shard_response_data.new_query_responses.len(), + number_of_successes + ); + } +} diff --git a/fog/view/server/src/lib.rs b/fog/view/server/src/lib.rs index 8e391e5819..2b6664b154 100644 --- a/fog/view/server/src/lib.rs +++ b/fog/view/server/src/lib.rs @@ -12,3 +12,5 @@ pub mod server; mod block_tracker; mod counters; mod db_fetcher; +mod fog_view_router_request_handler; +mod fog_view_shard_responses_processor; From 5e4a78f40345c8fd80fa821ea90bcd7b6ad7bcae Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Fri, 24 Jun 2022 13:47:32 -0400 Subject: [PATCH 04/10] Use where clauses --- .../src/fog_view_router_request_handler.rs | 28 +++++++++++++------ .../server/src/fog_view_router_service.rs | 10 +++++-- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/fog/view/server/src/fog_view_router_request_handler.rs b/fog/view/server/src/fog_view_router_request_handler.rs index 4768afcf48..c7a3bf4dc8 100644 --- a/fog/view/server/src/fog_view_router_request_handler.rs +++ b/fog/view/server/src/fog_view_router_request_handler.rs @@ -24,13 +24,16 @@ use std::{str::FromStr, sync::Arc}; const RETRY_COUNT: usize = 3; /// Handles a series of requests sent by the Fog Router client. -pub async fn handle_requests( +pub async fn handle_requests( shard_clients: Vec>, enclave: E, mut requests: RequestStream, mut responses: DuplexSink, logger: Logger, -) -> Result<(), grpcio::Error> { +) -> Result<(), grpcio::Error> +where + E: ViewEnclaveProxy, +{ while let Some(request) = requests.try_next().await? { let result = handle_request( request, @@ -50,12 +53,15 @@ pub async fn handle_requests( /// Handles a client's request by performing either an authentication or a /// query. -pub async fn handle_request( +pub async fn handle_request( mut request: FogViewRouterRequest, shard_clients: Vec>, enclave: E, logger: Logger, -) -> Result { +) -> Result +where + E: ViewEnclaveProxy, +{ if request.has_auth() { return handle_auth_request(enclave, request.take_auth(), logger).await; } else if request.has_query() { @@ -71,11 +77,14 @@ pub async fn handle_request( } /// Handles a client's authentication request. -async fn handle_auth_request( +async fn handle_auth_request( enclave: E, auth_message: attest::AuthMessage, logger: Logger, -) -> Result { +) -> Result +where + E: ViewEnclaveProxy, +{ let (client_auth_response, _) = enclave.client_accept(auth_message.into()).map_err(|err| { router_server_err_to_rpc_status("Auth: e client accept", err.into(), logger) })?; @@ -86,12 +95,15 @@ async fn handle_auth_request( } /// Handles a client's query request. -async fn handle_query_request( +async fn handle_query_request( query: attest::Message, enclave: E, shard_clients: Vec>, logger: Logger, -) -> Result { +) -> Result +where + E: ViewEnclaveProxy, +{ let mut query_responses: Vec = Vec::with_capacity(shard_clients.len()); let mut shard_clients = shard_clients.clone(); // TODO: use retry crate? diff --git a/fog/view/server/src/fog_view_router_service.rs b/fog/view/server/src/fog_view_router_service.rs index 8dccfd665c..a4d9e6306e 100644 --- a/fog/view/server/src/fog_view_router_service.rs +++ b/fog/view/server/src/fog_view_router_service.rs @@ -15,7 +15,10 @@ use mc_util_metrics::SVC_COUNTERS; use std::sync::Arc; #[derive(Clone)] -pub struct FogViewRouterService { +pub struct FogViewRouterService +where + E: ViewEnclaveProxy, +{ enclave: E, shard_clients: Vec>, logger: Logger, @@ -37,7 +40,10 @@ impl FogViewRouterService { } } -impl FogViewRouterApi for FogViewRouterService { +impl FogViewRouterApi for FogViewRouterService +where + E: ViewEnclaveProxy, +{ fn request( &mut self, ctx: RpcContext, From 674b7c6251b020fa3127db81f19d58ed43a1d958 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 13 Jul 2022 11:24:53 -0700 Subject: [PATCH 05/10] Implement nick's suggestions --- fog/api/src/conversions.rs | 8 ++------ fog/view/server/src/bin/router.rs | 2 +- fog/view/server/src/fog_view_router_request_handler.rs | 4 ++-- fog/view/server/src/fog_view_shard_responses_processor.rs | 6 +++--- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/fog/api/src/conversions.rs b/fog/api/src/conversions.rs index 6577d6d9fa..78843e94ec 100644 --- a/fog/api/src/conversions.rs +++ b/fog/api/src/conversions.rs @@ -13,14 +13,10 @@ impl From>) -> MultiViewStoreQueryRequest { - let attested_query_messages: Vec = enclave_messages + enclave_messages .into_iter() .map(|enclave_message| enclave_message.into()) - .collect(); - let mut multi_view_store_query_request = MultiViewStoreQueryRequest::new(); - multi_view_store_query_request.set_queries(attested_query_messages.into()); - - multi_view_store_query_request + .collect::>().into() } } diff --git a/fog/view/server/src/bin/router.rs b/fog/view/server/src/bin/router.rs index 6f14fddd72..a8ebf01a18 100644 --- a/fog/view/server/src/bin/router.rs +++ b/fog/view/server/src/bin/router.rs @@ -37,7 +37,7 @@ fn main() { ); // TODO: Remove and get from a config. - let mut fog_view_grpc_clients: Vec = Vec::new(); + let mut fog_view_grpc_clients= Vec::new(); let grpc_env = Arc::new( grpcio::EnvBuilder::new() .name_prefix("Main-RPC".to_string()) diff --git a/fog/view/server/src/fog_view_router_request_handler.rs b/fog/view/server/src/fog_view_router_request_handler.rs index c7a3bf4dc8..ba1eb88dbd 100644 --- a/fog/view/server/src/fog_view_router_request_handler.rs +++ b/fog/view/server/src/fog_view_router_request_handler.rs @@ -63,7 +63,7 @@ where E: ViewEnclaveProxy, { if request.has_auth() { - return handle_auth_request(enclave, request.take_auth(), logger).await; + return handle_auth_request(enclave, request.take_auth(), logger); } else if request.has_query() { return handle_query_request(request.take_query(), enclave, shard_clients, logger).await; } else { @@ -77,7 +77,7 @@ where } /// Handles a client's authentication request. -async fn handle_auth_request( +fn handle_auth_request( enclave: E, auth_message: attest::AuthMessage, logger: Logger, diff --git a/fog/view/server/src/fog_view_shard_responses_processor.rs b/fog/view/server/src/fog_view_shard_responses_processor.rs index 2751cbc97d..508ddb67ee 100644 --- a/fog/view/server/src/fog_view_shard_responses_processor.rs +++ b/fog/view/server/src/fog_view_shard_responses_processor.rs @@ -15,7 +15,7 @@ pub struct ProcessedShardResponseData { pub shard_clients_for_retry: Vec>, /// Uris for *individual* Fog View Stores that need to be authenticated with - /// by the Fog Router. + /// by the Fog Router. It should only have entries if `shard_clients_for_retry` has entries. pub view_store_uris_for_authentication: Vec, /// New, successfully processed query responses. @@ -74,7 +74,7 @@ mod tests { fn create_successful_mvq_response() -> MultiViewStoreQueryResponse { let mut successful_response = MultiViewStoreQueryResponse::new(); - let client_auth_request = vec![].into(); + let client_auth_request = Vec::new(); successful_response .mut_query_response() .set_data(client_auth_request); @@ -153,7 +153,7 @@ mod tests { #[test_with_logger] fn one_failed_response_one_pending_shard_client(logger: Logger) { - let client_index: usize = 0; + let client_index = 0; let grpc_client = create_grpc_client(client_index, logger.clone()); let failed_mvq_response = create_failed_mvq_response(client_index); let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; From 3345abb42f7c82a481b6c0dbd7595adbeb126bb1 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 13 Jul 2022 11:54:46 -0700 Subject: [PATCH 06/10] Implement second round of nick's suggestions --- fog/view/server/src/fog_view_router_request_handler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fog/view/server/src/fog_view_router_request_handler.rs b/fog/view/server/src/fog_view_router_request_handler.rs index ba1eb88dbd..4d8a14d87b 100644 --- a/fog/view/server/src/fog_view_router_request_handler.rs +++ b/fog/view/server/src/fog_view_router_request_handler.rs @@ -108,7 +108,7 @@ where let mut shard_clients = shard_clients.clone(); // TODO: use retry crate? for _ in 0..RETRY_COUNT { - let multi_view_store_query_request: MultiViewStoreQueryRequest = enclave + let multi_view_store_query_request = enclave .create_multi_view_store_query_data(query.clone().into()) .map_err(|err| { router_server_err_to_rpc_status( @@ -118,7 +118,7 @@ where ) })? .into(); - let clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)> = + let clients_and_responses = route_query(&multi_view_store_query_request, shard_clients.clone()) .await .map_err(|err| { @@ -182,7 +182,7 @@ async fn query_shard( } /// Authenticates Fog View Stores that have previously not been authenticated. -pub async fn authenticate_view_stores( +async fn authenticate_view_stores( enclave: E, view_store_uris: Vec, logger: Logger, @@ -201,7 +201,7 @@ pub async fn authenticate_view_stores( } /// Authenticates a Fog View Store that has previously not been authenticated. -pub async fn authenticate_view_store( +async fn authenticate_view_store( enclave: E, view_store_url: FogViewUri, logger: Logger, From 886321a4dea88ae0ac30cf0c1ddda11e599a931c Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 13 Jul 2022 13:54:06 -0700 Subject: [PATCH 07/10] Implement lint suggestions --- Cargo.lock | 1 + fog/api/src/conversions.rs | 3 ++- fog/view/enclave/trusted/src/lib.rs | 8 ++++++-- fog/view/server/Cargo.toml | 1 + fog/view/server/src/bin/router.rs | 2 +- fog/view/server/src/fog_view_router_request_handler.rs | 6 +++--- fog/view/server/src/fog_view_shard_responses_processor.rs | 6 +++--- 7 files changed, 17 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbc194a089..f79703bab8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4476,6 +4476,7 @@ dependencies = [ "mc-fog-kex-rng", "mc-fog-recovery-db-iface", "mc-fog-sql-recovery-db", + "mc-fog-test-infra", "mc-fog-types", "mc-fog-uri", "mc-fog-view-connection", diff --git a/fog/api/src/conversions.rs b/fog/api/src/conversions.rs index 78843e94ec..965ccb88c3 100644 --- a/fog/api/src/conversions.rs +++ b/fog/api/src/conversions.rs @@ -16,7 +16,8 @@ impl From>().into() + .collect::>() + .into() } } diff --git a/fog/view/enclave/trusted/src/lib.rs b/fog/view/enclave/trusted/src/lib.rs index dd8f16397c..9c66580fc2 100644 --- a/fog/view/enclave/trusted/src/lib.rs +++ b/fog/view/enclave/trusted/src/lib.rs @@ -115,8 +115,12 @@ pub fn ecall_dispatcher(inbuf: &[u8]) -> Result, sgx_status_t> { } ViewEnclaveRequest::GetIasReport => serialize(&ENCLAVE.get_ias_report()), ViewEnclaveRequest::ClientAccept(msg) => serialize(&ENCLAVE.client_accept(msg)), - ViewEnclaveRequest::ViewStoreInit(view_store_id) => serialize(&ENCLAVE.view_store_init(view_store_id)), - ViewEnclaveRequest::ViewStoreConnect(view_store_id, msg) => serialize(&ENCLAVE.view_store_connect(view_store_id, msg)), + ViewEnclaveRequest::ViewStoreInit(view_store_id) => { + serialize(&ENCLAVE.view_store_init(view_store_id)) + } + ViewEnclaveRequest::ViewStoreConnect(view_store_id, msg) => { + serialize(&ENCLAVE.view_store_connect(view_store_id, msg)) + } ViewEnclaveRequest::ClientClose(session) => serialize(&ENCLAVE.client_close(session)), ViewEnclaveRequest::Query(req, untrusted_query_response) => { serialize(&ENCLAVE.query(req, untrusted_query_response)) diff --git a/fog/view/server/Cargo.toml b/fog/view/server/Cargo.toml index 941c968302..a0d573af0f 100644 --- a/fog/view/server/Cargo.toml +++ b/fog/view/server/Cargo.toml @@ -69,6 +69,7 @@ mc-util-serial = { path = "../../../util/serial" } mc-util-test-helper = { path = "../../../util/test-helper" } mc-util-uri = { path = "../../../util/uri" } +mc-fog-test-infra = { path = "../../test_infra" } mc-fog-types = { path = "../../types" } mc-fog-view-connection = { path = "../connection" } mc-fog-view-enclave-measurement = { path = "../enclave/measurement" } diff --git a/fog/view/server/src/bin/router.rs b/fog/view/server/src/bin/router.rs index a8ebf01a18..eebe7dce96 100644 --- a/fog/view/server/src/bin/router.rs +++ b/fog/view/server/src/bin/router.rs @@ -37,7 +37,7 @@ fn main() { ); // TODO: Remove and get from a config. - let mut fog_view_grpc_clients= Vec::new(); + let mut fog_view_grpc_clients = Vec::new(); let grpc_env = Arc::new( grpcio::EnvBuilder::new() .name_prefix("Main-RPC".to_string()) diff --git a/fog/view/server/src/fog_view_router_request_handler.rs b/fog/view/server/src/fog_view_router_request_handler.rs index 4d8a14d87b..cc39750fe4 100644 --- a/fog/view/server/src/fog_view_router_request_handler.rs +++ b/fog/view/server/src/fog_view_router_request_handler.rs @@ -63,9 +63,9 @@ where E: ViewEnclaveProxy, { if request.has_auth() { - return handle_auth_request(enclave, request.take_auth(), logger); + handle_auth_request(enclave, request.take_auth(), logger) } else if request.has_query() { - return handle_query_request(request.take_query(), enclave, shard_clients, logger).await; + handle_query_request(request.take_query(), enclave, shard_clients, logger).await } else { let rpc_status = rpc_invalid_arg_error( "Inavlid FogViewRouterRequest request", @@ -166,7 +166,7 @@ async fn route_query( ) -> Result, MultiViewStoreQueryResponse)>, RouterServerError> { let responses = shard_clients .into_iter() - .map(|shard_client| query_shard(request, shard_client.clone())); + .map(|shard_client| query_shard(request, shard_client)); try_join_all(responses).await } diff --git a/fog/view/server/src/fog_view_shard_responses_processor.rs b/fog/view/server/src/fog_view_shard_responses_processor.rs index 508ddb67ee..12d3ffbc87 100644 --- a/fog/view/server/src/fog_view_shard_responses_processor.rs +++ b/fog/view/server/src/fog_view_shard_responses_processor.rs @@ -15,7 +15,8 @@ pub struct ProcessedShardResponseData { pub shard_clients_for_retry: Vec>, /// Uris for *individual* Fog View Stores that need to be authenticated with - /// by the Fog Router. It should only have entries if `shard_clients_for_retry` has entries. + /// by the Fog Router. It should only have entries if + /// `shard_clients_for_retry` has entries. pub view_store_uris_for_authentication: Vec, /// New, successfully processed query responses. @@ -102,8 +103,7 @@ mod tests { ); let grpc_client = FogViewApiClient::new( - ChannelBuilder::default_channel_builder(grpc_env.clone()) - .connect_to_uri(&view_uri, &logger), + ChannelBuilder::default_channel_builder(grpc_env).connect_to_uri(&view_uri, &logger), ); Arc::new(grpc_client) From 44f335d041e687399d275313187cc28b2fbd2011 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 13 Jul 2022 17:38:30 -0700 Subject: [PATCH 08/10] Implement james's suggestions --- fog/view/server/src/fog_view_router_server.rs | 2 +- .../server/src/fog_view_router_service.rs | 4 ++-- fog/view/server/src/lib.rs | 4 ++-- ...t_handler.rs => router_request_handler.rs} | 24 +++++++++---------- fog/view/server/src/server.rs | 2 +- ...cessor.rs => shard_responses_processor.rs} | 7 +++--- 6 files changed, 21 insertions(+), 22 deletions(-) rename fog/view/server/src/{fog_view_router_request_handler.rs => router_request_handler.rs} (93%) rename fog/view/server/src/{fog_view_shard_responses_processor.rs => shard_responses_processor.rs} (98%) diff --git a/fog/view/server/src/fog_view_router_server.rs b/fog/view/server/src/fog_view_router_server.rs index da887e8851..8564353bb3 100644 --- a/fog/view/server/src/fog_view_router_server.rs +++ b/fog/view/server/src/fog_view_router_server.rs @@ -33,7 +33,7 @@ impl FogViewRouterServer { let env = Arc::new( grpcio::EnvBuilder::new() - .name_prefix("Main-RPC".to_string()) + .name_prefix("Fog-view-router-server".to_string()) .build(), ); diff --git a/fog/view/server/src/fog_view_router_service.rs b/fog/view/server/src/fog_view_router_service.rs index a4d9e6306e..4e4e9f7fba 100644 --- a/fog/view/server/src/fog_view_router_service.rs +++ b/fog/view/server/src/fog_view_router_service.rs @@ -1,6 +1,6 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation -use crate::fog_view_router_request_handler; +use crate::router_request_handler; use futures::{FutureExt, TryFutureExt}; use grpcio::{DuplexSink, RequestStream, RpcContext}; @@ -56,7 +56,7 @@ where let logger = logger.clone(); // TODO: Confirm that we don't need to perform the authenticator logic. I think // we don't because of streaming... - let future = fog_view_router_request_handler::handle_requests( + let future = router_request_handler::handle_requests( self.shard_clients.clone(), self.enclave.clone(), requests, diff --git a/fog/view/server/src/lib.rs b/fog/view/server/src/lib.rs index 2b6664b154..42a029f277 100644 --- a/fog/view/server/src/lib.rs +++ b/fog/view/server/src/lib.rs @@ -12,5 +12,5 @@ pub mod server; mod block_tracker; mod counters; mod db_fetcher; -mod fog_view_router_request_handler; -mod fog_view_shard_responses_processor; +mod router_request_handler; +mod shard_responses_processor; diff --git a/fog/view/server/src/fog_view_router_request_handler.rs b/fog/view/server/src/router_request_handler.rs similarity index 93% rename from fog/view/server/src/fog_view_router_request_handler.rs rename to fog/view/server/src/router_request_handler.rs index cc39750fe4..e88714fec2 100644 --- a/fog/view/server/src/fog_view_router_request_handler.rs +++ b/fog/view/server/src/router_request_handler.rs @@ -2,9 +2,8 @@ use crate::{ error::{router_server_err_to_rpc_status, RouterServerError}, - fog_view_shard_responses_processor, + shard_responses_processor, }; - use futures::{future::try_join_all, SinkExt, TryStreamExt}; use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcStatus, WriteFlags}; use mc_attest_api::attest; @@ -129,15 +128,16 @@ where ) })?; - let mut processed_shard_response_data = - fog_view_shard_responses_processor::process_shard_responses(clients_and_responses) - .map_err(|err| { - router_server_err_to_rpc_status( - "Query: internal query response processing", - err, - logger.clone(), - ) - })?; + let mut processed_shard_response_data = shard_responses_processor::process_shard_responses( + clients_and_responses, + ) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query response processing", + err, + logger.clone(), + ) + })?; query_responses.append(&mut processed_shard_response_data.new_query_responses); shard_clients = processed_shard_response_data.shard_clients_for_retry; @@ -210,7 +210,7 @@ async fn authenticate_view_store( let client_auth_request = enclave.view_store_init(view_store_id.clone())?; let grpc_env = Arc::new( grpcio::EnvBuilder::new() - .name_prefix("Main-RPC".to_string()) + .name_prefix("authenticate-view-store".to_string()) .build(), ); let view_store_client = FogViewApiClient::new( diff --git a/fog/view/server/src/server.rs b/fog/view/server/src/server.rs index cf94c4e7a0..2d238a15ae 100644 --- a/fog/view/server/src/server.rs +++ b/fog/view/server/src/server.rs @@ -79,7 +79,7 @@ where let env = Arc::new( grpcio::EnvBuilder::new() - .name_prefix("Main-RPC".to_string()) + .name_prefix("Fog-view-server".to_string()) .build(), ); diff --git a/fog/view/server/src/fog_view_shard_responses_processor.rs b/fog/view/server/src/shard_responses_processor.rs similarity index 98% rename from fog/view/server/src/fog_view_shard_responses_processor.rs rename to fog/view/server/src/shard_responses_processor.rs index 12d3ffbc87..e7b86bac35 100644 --- a/fog/view/server/src/fog_view_shard_responses_processor.rs +++ b/fog/view/server/src/shard_responses_processor.rs @@ -1,7 +1,6 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation use crate::error::RouterServerError; - use mc_attest_api::attest; use mc_fog_api::{view::MultiViewStoreQueryResponse, view_grpc::FogViewApiClient}; use mc_fog_uri::FogViewUri; @@ -98,7 +97,7 @@ mod tests { let view_uri = FogViewUri::from_str(&view_uri_string).unwrap(); let grpc_env = Arc::new( grpcio::EnvBuilder::new() - .name_prefix("Main-RPC".to_string()) + .name_prefix("processor-test".to_string()) .build(), ); @@ -198,8 +197,8 @@ mod tests { #[test_with_logger] fn mixed_failed_and_successful_responses_processes_correctly(logger: Logger) { - let number_of_failures: usize = 11; - let number_of_successes: usize = 8; + const number_of_failures: usize = 11; + const number_of_successes: usize = 8; let mut clients_and_responses = Vec::new(); for i in 0..number_of_failures { From 7484dde30adb24cf27e9d0abc11d3894e4e8cede Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 13 Jul 2022 18:55:34 -0700 Subject: [PATCH 09/10] Fix naming error --- fog/view/server/src/shard_responses_processor.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/fog/view/server/src/shard_responses_processor.rs b/fog/view/server/src/shard_responses_processor.rs index e7b86bac35..6b0fbd45a3 100644 --- a/fog/view/server/src/shard_responses_processor.rs +++ b/fog/view/server/src/shard_responses_processor.rs @@ -197,17 +197,17 @@ mod tests { #[test_with_logger] fn mixed_failed_and_successful_responses_processes_correctly(logger: Logger) { - const number_of_failures: usize = 11; - const number_of_successes: usize = 8; + const NUMBER_OF_FAILURES: usize = 11; + const NUMBER_OF_SUCCESSES: usize = 8; let mut clients_and_responses = Vec::new(); - for i in 0..number_of_failures { + for i in 0..NUMBER_OF_FAILURES { let grpc_client = create_grpc_client(i, logger.clone()); let failed_mvq_response = create_failed_mvq_response(i); clients_and_responses.push((grpc_client, failed_mvq_response)); } - for i in 0..number_of_successes { - let client_index = i + number_of_failures; + for i in 0..NUMBER_OF_SUCCESSES { + let client_index = i + NUMBER_OF_FAILURES; let grpc_client = create_grpc_client(client_index, logger.clone()); let successful_mvq_response = create_successful_mvq_response(); clients_and_responses.push((grpc_client, successful_mvq_response)); @@ -219,17 +219,17 @@ mod tests { assert_eq!( processed_shard_response_data.shard_clients_for_retry.len(), - number_of_failures + NUMBER_OF_FAILURES ); assert_eq!( processed_shard_response_data .view_store_uris_for_authentication .len(), - number_of_failures + NUMBER_OF_FAILURES ); assert_eq!( processed_shard_response_data.new_query_responses.len(), - number_of_successes + NUMBER_OF_SUCCESSES ); } } From a44dbe0542079243154097ac11264f4987408df9 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Thu, 14 Jul 2022 12:40:26 -0700 Subject: [PATCH 10/10] Fix whitespace --- fog/view/server/src/fog_view_router_service.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/fog/view/server/src/fog_view_router_service.rs b/fog/view/server/src/fog_view_router_service.rs index 4e4e9f7fba..e7a602d3c4 100644 --- a/fog/view/server/src/fog_view_router_service.rs +++ b/fog/view/server/src/fog_view_router_service.rs @@ -1,7 +1,6 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation use crate::router_request_handler; - use futures::{FutureExt, TryFutureExt}; use grpcio::{DuplexSink, RequestStream, RpcContext}; use mc_common::logger::{log, Logger};