From d8965cabeac103a5f73e5402dcd7bf6088f84e2e Mon Sep 17 00:00:00 2001 From: awygle Date: Tue, 3 Jan 2023 11:03:31 -0800 Subject: [PATCH] Key Image Router Service (#2898) * Key Image Router Service * Remove unneeded dead_code annotation * Update fog/ledger/server/src/error.rs Co-authored-by: Nick Santana * Update fog/ledger/server/src/key_image_router_service.rs Co-authored-by: Nick Santana * Clean up commented-out code Co-authored-by: Nick Santana * Fix misnamed type in a comment Co-authored-by: Nick Santana * Address PR feedback around logging and comments. * Address error in loop termination logic. * Parameterize allowed number of retries for query loop * Update based on changes from previous PRs * Don't create 'groups' in `mod` or `use` declarations. Co-authored-by: NotGyro Co-authored-by: Nick Santana --- fog/ledger/server/src/error.rs | 81 ++++ .../server/src/key_image_router_service.rs | 84 ++++ fog/ledger/server/src/lib.rs | 3 + fog/ledger/server/src/router_handlers.rs | 358 ++++++++++++++++++ 4 files changed, 526 insertions(+) create mode 100644 fog/ledger/server/src/error.rs create mode 100644 fog/ledger/server/src/key_image_router_service.rs create mode 100644 fog/ledger/server/src/router_handlers.rs diff --git a/fog/ledger/server/src/error.rs b/fog/ledger/server/src/error.rs new file mode 100644 index 0000000000..917a703869 --- /dev/null +++ b/fog/ledger/server/src/error.rs @@ -0,0 +1,81 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use displaydoc::Display; +use grpcio::RpcStatus; +use mc_common::logger::Logger; +use mc_fog_ledger_enclave_api::Error as LedgerEnclaveError; +use mc_sgx_report_cache_untrusted::Error as ReportCacheError; +use mc_util_grpc::{rpc_internal_error, rpc_permissions_error}; + +#[derive(Debug, Display)] +pub enum RouterServerError { + /// Error related to contacting Fog Ledger Store: {0} + LedgerStoreError(String), + /// Ledger Enclave error: {0} + Enclave(LedgerEnclaveError), +} + +impl From for RouterServerError { + fn from(src: grpcio::Error) -> Self { + RouterServerError::LedgerStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_common::ResponderIdParseError) -> Self { + RouterServerError::LedgerStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_util_uri::UriParseError) -> Self { + RouterServerError::LedgerStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_util_uri::UriConversionError) -> Self { + RouterServerError::LedgerStoreError(format!("{}", src)) + } +} + +pub fn router_server_err_to_rpc_status( + context: &str, + src: RouterServerError, + logger: Logger, +) -> RpcStatus { + match src { + RouterServerError::LedgerStoreError(_) => { + rpc_internal_error(context, format!("{}", src), &logger) + } + RouterServerError::Enclave(_) => { + rpc_permissions_error(context, format!("{}", src), &logger) + } + } +} + +impl From for RouterServerError { + fn from(src: LedgerEnclaveError) -> Self { + RouterServerError::Enclave(src) + } +} + +#[derive(Display)] +pub enum LedgerServerError { + /// Ledger Enclave error: {0} + Enclave(LedgerEnclaveError), + /// Report cache error: {0} + ReportCache(ReportCacheError), +} + +impl From for LedgerServerError { + fn from(src: LedgerEnclaveError) -> Self { + LedgerServerError::Enclave(src) + } +} + +impl From for LedgerServerError { + fn from(src: ReportCacheError) -> Self { + Self::ReportCache(src) + } +} diff --git a/fog/ledger/server/src/key_image_router_service.rs b/fog/ledger/server/src/key_image_router_service.rs new file mode 100644 index 0000000000..65f979701d --- /dev/null +++ b/fog/ledger/server/src/key_image_router_service.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use crate::router_handlers; +use futures::{FutureExt, TryFutureExt}; +use grpcio::{DuplexSink, RequestStream, RpcContext}; +use mc_common::logger::{log, Logger}; +use mc_fog_api::{ + ledger::{LedgerRequest, LedgerResponse}, + ledger_grpc::{self, LedgerApi}, +}; +use mc_fog_ledger_enclave::LedgerEnclaveProxy; +use mc_fog_uri::KeyImageStoreUri; +use mc_util_grpc::rpc_logger; +use mc_util_metrics::SVC_COUNTERS; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +#[derive(Clone)] +pub struct KeyImageRouterService +where + E: LedgerEnclaveProxy, +{ + enclave: E, + shards: Arc>>>, + query_retries: usize, + logger: Logger, +} + +impl KeyImageRouterService { + /// Creates a new LedgerRouterService that can be used by a gRPC server to + /// fulfill gRPC requests. + #[allow(dead_code)] // FIXME + pub fn new( + enclave: E, + shards: Arc>>>, + query_retries: usize, + logger: Logger, + ) -> Self { + Self { + enclave, + shards, + query_retries, + logger, + } + } +} + +impl LedgerApi for KeyImageRouterService +where + E: LedgerEnclaveProxy, +{ + fn request( + &mut self, + ctx: RpcContext, + requests: RequestStream, + responses: DuplexSink, + ) { + let _timer = SVC_COUNTERS.req(&ctx); + mc_common::logger::scoped_global_logger(&rpc_logger(&ctx, &self.logger), |logger| { + log::warn!( + self.logger, + "Streaming GRPC Ledger API only partially implemented." + ); + let logger = logger.clone(); + + let shards = self.shards.read().expect("RwLock poisoned"); + let future = router_handlers::handle_requests( + shards.values().cloned().collect(), + self.enclave.clone(), + requests, + responses, + self.query_retries, + logger.clone(), + ) + .map_err(move |err| log::error!(&logger, "failed to reply: {}", err)) + // TODO: Do more with the error than just push it to the log. + .map(|_| ()); + + ctx.spawn(future) + }); + } +} diff --git a/fog/ledger/server/src/lib.rs b/fog/ledger/server/src/lib.rs index 0eb4c73b8e..02dc89a338 100644 --- a/fog/ledger/server/src/lib.rs +++ b/fog/ledger/server/src/lib.rs @@ -5,8 +5,11 @@ mod block_service; mod config; mod counters; mod db_fetcher; +mod error; +mod key_image_router_service; mod key_image_service; mod merkle_proof_service; +mod router_handlers; mod server; mod untrusted_tx_out_service; diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs new file mode 100644 index 0000000000..2bf217de08 --- /dev/null +++ b/fog/ledger/server/src/router_handlers.rs @@ -0,0 +1,358 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use crate::error::{router_server_err_to_rpc_status, RouterServerError}; +use futures::{future::try_join_all, SinkExt, TryStreamExt}; +use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcStatus, WriteFlags}; +use mc_attest_api::attest; +use mc_attest_enclave_api::{EnclaveMessage, NonceSession}; +use mc_common::{ + logger::{log, Logger}, + ResponderId, +}; +use mc_fog_api::{ + ledger::{ + LedgerRequest, LedgerResponse, MultiKeyImageStoreRequest, MultiKeyImageStoreResponse, + MultiKeyImageStoreResponseStatus, + }, + ledger_grpc::KeyImageStoreApiClient, +}; +use mc_fog_ledger_enclave::LedgerEnclaveProxy; +use mc_fog_uri::{ConnectionUri, KeyImageStoreUri}; +use mc_util_grpc::{rpc_invalid_arg_error, ConnectionUriGrpcioChannel}; +use std::{collections::BTreeMap, str::FromStr, sync::Arc}; + +/// Handles a series of requests sent by the Fog Ledger Router client, +/// routing them out to shards. +pub async fn handle_requests( + shard_clients: Vec>, + enclave: E, + mut requests: RequestStream, + mut responses: DuplexSink, + query_retries: usize, + logger: Logger, +) -> Result<(), grpcio::Error> +where + E: LedgerEnclaveProxy, +{ + while let Some(request) = requests.try_next().await? { + let result = handle_request( + request, + shard_clients.clone(), + enclave.clone(), + query_retries, + logger.clone(), + ) + .await; + match result { + Ok(response) => responses.send((response, WriteFlags::default())).await?, + Err(rpc_status) => return responses.fail(rpc_status).await, + } + } + responses.close().await?; + Ok(()) +} + +/// Handles a client's request by performing either an authentication or a +/// query. +pub async fn handle_request( + mut request: LedgerRequest, + shard_clients: Vec>, + enclave: E, + query_retries: usize, + logger: Logger, +) -> Result +where + E: LedgerEnclaveProxy, +{ + if request.has_auth() { + handle_auth_request(enclave, request.take_auth(), logger) + } else if request.has_check_key_images() { + handle_query_request( + request.take_check_key_images(), + enclave, + shard_clients, + query_retries, + logger, + ) + .await + // TODO: Handle other cases here as they are added, such as the merkele + // proof service. + } else { + let rpc_status = rpc_invalid_arg_error( + "Inavlid LedgerRequest request", + "Neither the check_key_images nor auth fields were set".to_string(), + &logger, + ); + Err(rpc_status) + } +} + +/// The result of processing the MultiLedgerStoreQueryResponse from each Fog +/// Ledger Shard. +pub struct ProcessedShardResponseData { + /// gRPC clients for Shards that need to be retried for a successful + /// response. + pub shard_clients_for_retry: Vec>, + + /// Uris for individual Fog Ledger Stores that need to be authenticated with + /// by the Fog Router. It should only have entries if + /// `shard_clients_for_retry` has entries. + pub store_uris_for_authentication: Vec, + + /// New, successfully processed query responses. + pub new_query_responses: Vec<(ResponderId, attest::NonceMessage)>, +} + +impl ProcessedShardResponseData { + pub fn new( + shard_clients_for_retry: Vec>, + store_uris_for_authentication: Vec, + new_query_responses: Vec<(ResponderId, attest::NonceMessage)>, + ) -> Self { + ProcessedShardResponseData { + shard_clients_for_retry, + store_uris_for_authentication, + new_query_responses, + } + } +} + +/// Processes the MultiKeyImageStoreResponses returned by each Ledger Shard. +pub fn process_shard_responses( + clients_and_responses: Vec<(Arc, MultiKeyImageStoreResponse)>, + logger: Logger, +) -> Result { + let mut shard_clients_for_retry = Vec::new(); + let mut store_uris_for_authentication = Vec::new(); + let mut new_query_responses = Vec::new(); + + for (shard_client, mut response) in clients_and_responses { + let store_uri = KeyImageStoreUri::from_str(response.get_store_uri())?; + match response.get_status() { + MultiKeyImageStoreResponseStatus::SUCCESS => { + let store_responder_id = store_uri.responder_id()?; + new_query_responses.push((store_responder_id, response.take_query_response())); + } + MultiKeyImageStoreResponseStatus::AUTHENTICATION_ERROR => { + // We did not receive a query response for this shard.Therefore, we need to: + // (a) retry the query + // (b) authenticate with the Ledger Store that returned the decryption_error + shard_clients_for_retry.push(shard_client); + store_uris_for_authentication.push(store_uri); + } + // This call will be retried as part of the larger retry logic + MultiKeyImageStoreResponseStatus::NOT_READY => (), + // This is an unexpected error - we should never see this + MultiKeyImageStoreResponseStatus::UNKNOWN => { + log::error!( + logger, + "Received a response with status 'UNKNOWN' from store {}", + KeyImageStoreUri::from_str(&response.store_uri)? + ); + } + } + } + + Ok(ProcessedShardResponseData::new( + shard_clients_for_retry, + store_uris_for_authentication, + new_query_responses, + )) +} + +/// Handles a client's authentication request. +fn handle_auth_request( + enclave: E, + auth_message: attest::AuthMessage, + logger: Logger, +) -> Result +where + E: LedgerEnclaveProxy, +{ + let (client_auth_response, _) = enclave.client_accept(auth_message.into()).map_err(|err| { + router_server_err_to_rpc_status("Auth: e client accept", err.into(), logger) + })?; + + let mut response = LedgerResponse::new(); + response.mut_auth().set_data(client_auth_response.into()); + Ok(response) +} + +/// Handles a client's query request. +async fn handle_query_request( + query: attest::Message, + enclave: E, + shard_clients: Vec>, + query_retries: usize, + logger: Logger, +) -> Result +where + E: LedgerEnclaveProxy, +{ + let mut query_responses: BTreeMap> = BTreeMap::new(); + let mut shards_to_query = shard_clients.clone(); + let sealed_query = enclave + .decrypt_and_seal_query(query.into()) + .map_err(|err| { + router_server_err_to_rpc_status( + "Key Images Query: internal encryption error", + err.into(), + logger.clone(), + ) + })?; + + // The retry logic here is: + // Set retries remaining to query_retries + // Send query and process responses + // If there's a response from every shard, we're done + // If there's a new store, repeat + // If there's no new store and we don't have enough responses, decrement + // remaining_retries and loop + let mut remaining_retries = query_retries; + while remaining_retries > 0 { + let multi_ledger_store_query_request = enclave + .create_multi_key_image_store_query_data(sealed_query.clone()) + .map_err(|err| { + router_server_err_to_rpc_status( + "Key Images Query: internal encryption error", + err.into(), + logger.clone(), + ) + })? + .into(); + let clients_and_responses = + route_query(&multi_ledger_store_query_request, shards_to_query.clone()) + .await + .map_err(|err| { + router_server_err_to_rpc_status( + "Key Images Query: internal query routing error", + err, + logger.clone(), + ) + })?; + + let processed_shard_response_data = + process_shard_responses(clients_and_responses, logger.clone()).map_err(|err| { + router_server_err_to_rpc_status( + "Key Images Query: internal query response processing", + err, + logger.clone(), + ) + })?; + + for (store_responder_id, new_query_response) in processed_shard_response_data + .new_query_responses + .into_iter() + { + query_responses.insert(store_responder_id, new_query_response.into()); + } + + if query_responses.len() >= shard_clients.len() { + break; + } + + shards_to_query = processed_shard_response_data.shard_clients_for_retry; + if !shards_to_query.is_empty() { + authenticate_ledger_stores( + enclave.clone(), + processed_shard_response_data.store_uris_for_authentication, + logger.clone(), + ) + .await?; + } else { + remaining_retries -= 1; + } + } + + if remaining_retries == 0 { + return Err(router_server_err_to_rpc_status( + "Key Images Query: timed out connecting to key image stores", + RouterServerError::LedgerStoreError(format!( + "Received {} responses which failed to advance the MultiKeyImageStoreRequest", + query_retries + )), + logger.clone(), + )); + } + + let query_response = enclave + .collate_shard_query_responses(sealed_query, query_responses) + .map_err(|err| { + router_server_err_to_rpc_status( + "Key Images Query: shard response collation error", + RouterServerError::Enclave(err), + logger.clone(), + ) + })?; + + let mut response = LedgerResponse::new(); + response.set_check_key_image_response(query_response.into()); + Ok(response) +} + +/// Sends a client's query request to all of the Fog Ledger shards. +async fn route_query( + request: &MultiKeyImageStoreRequest, + shard_clients: Vec>, +) -> Result, MultiKeyImageStoreResponse)>, RouterServerError> { + let responses = shard_clients + .into_iter() + .map(|shard_client| query_shard(request, shard_client)); + try_join_all(responses).await +} + +/// Sends a client's query request to one of the Fog Ledger shards. +async fn query_shard( + request: &MultiKeyImageStoreRequest, + shard_client: Arc, +) -> Result<(Arc, MultiKeyImageStoreResponse), RouterServerError> { + let client_unary_receiver = shard_client.multi_key_image_store_query_async(request)?; + let response = client_unary_receiver.await?; + + Ok((shard_client, response)) +} + +// Authenticates Fog Ledger Stores that have previously not been authenticated. +async fn authenticate_ledger_stores( + enclave: E, + ledger_store_uris: Vec, + logger: Logger, +) -> Result, RpcStatus> { + let pending_auth_requests = ledger_store_uris + .into_iter() + .map(|store_uri| authenticate_ledger_store(enclave.clone(), store_uri, logger.clone())); + + try_join_all(pending_auth_requests).await.map_err(|err| { + router_server_err_to_rpc_status( + "Key Images Query: cannot authenticate with each Fog Ledger Store:", + err, + logger.clone(), + ) + }) +} + +// Authenticates a Fog Ledger Store that has previously not been authenticated. +async fn authenticate_ledger_store( + enclave: E, + ledger_store_url: KeyImageStoreUri, + logger: Logger, +) -> Result<(), RouterServerError> { + let ledger_store_id = ResponderId::from_str(&ledger_store_url.to_string())?; + let client_auth_request = enclave.ledger_store_init(ledger_store_id.clone())?; + let grpc_env = Arc::new( + grpcio::EnvBuilder::new() + .name_prefix("authenticate-ledger-store".to_string()) + .build(), + ); + let ledger_store_client = KeyImageStoreApiClient::new( + ChannelBuilder::default_channel_builder(grpc_env) + .connect_to_uri(&ledger_store_url, &logger), + ); + + let auth_unary_receiver = ledger_store_client.auth_async(&client_auth_request.into())?; + let auth_response = auth_unary_receiver.await?; + + let result = enclave.ledger_store_connect(ledger_store_id, auth_response.into())?; + + Ok(result) +}