diff --git a/fog/api/proto/view.proto b/fog/api/proto/view.proto index f34b0c533c..1b3d3f7bed 100644 --- a/fog/api/proto/view.proto +++ b/fog/api/proto/view.proto @@ -36,16 +36,23 @@ message FogViewRouterResponse { } } -message FogViewStoreDecryptionError { - /// An error message that describes the decryption error. - string error_message = 1; -} - message MultiViewStoreQueryRequest { /// A list of queries encrypted for Fog View Stores. repeated attest.Message queries = 1; } +/// The status associated with a MultiViewStoreQueryResponse +enum MultiViewStoreQueryResponseStatus { + /// The Fog View Store successfully fulfilled the request. + SUCCESS = 0; + /// The Fog View Store is unable to decrypt a query within the MultiViewStoreQuery. It needs to be authenticated + /// by the router. + AUTHENTICATION_ERROR = 1; + /// The Fog View Store is not ready to service a MultiViewStoreQueryRequest. This might be because the store has + /// not loaded enough blocks yet. + NOT_READY = 2; +} + message MultiViewStoreQueryResponse { /// Optional field that gets set when the Fog View Store is able to decrypt a query /// included in the MultiViewStoreQueryRequest and create a query response for that @@ -58,9 +65,8 @@ message MultiViewStoreQueryResponse { /// described by this URI. string fog_view_store_uri = 2; - /// Optional error that gets returned when the Fog View Store - /// cannot decrypt the MultiViewStoreQuery. - FogViewStoreDecryptionError decryption_error = 3; + /// Status that gets returned when the Fog View Store services a MultiViewStoreQueryRequest. + MultiViewStoreQueryResponseStatus status = 3; } /// Fulfills requests sent directly by a Fog client, e.g. a mobile phone using the SDK. diff --git a/fog/view/server/src/fog_view_service.rs b/fog/view/server/src/fog_view_service.rs index 5aed893ed7..45369f83f3 100644 --- a/fog/view/server/src/fog_view_service.rs +++ b/fog/view/server/src/fog_view_service.rs @@ -1,16 +1,20 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation -use crate::{config::ClientListenUri, server::DbPollSharedState}; +use crate::{ + config::ClientListenUri, server::DbPollSharedState, sharding_strategy::ShardingStrategy, +}; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use mc_attest_api::attest; use mc_common::logger::{log, Logger}; use mc_fog_api::{ - view::{MultiViewStoreQueryRequest, MultiViewStoreQueryResponse}, + view::{ + MultiViewStoreQueryRequest, MultiViewStoreQueryResponse, MultiViewStoreQueryResponseStatus, + }, view_grpc::{FogViewApi, FogViewStoreApi}, }; use mc_fog_recovery_db_iface::RecoveryDb; use mc_fog_types::view::QueryRequestAAD; -use mc_fog_uri::ConnectionUri; +use mc_fog_uri::{ConnectionUri, FogViewStoreUri}; use mc_fog_view_enclave::{Error as ViewEnclaveError, ViewEnclaveProxy}; use mc_fog_view_enclave_api::UntrustedQueryResponse; use mc_util_grpc::{ @@ -22,7 +26,12 @@ use mc_util_telemetry::{tracer, Tracer}; use std::sync::{Arc, Mutex}; #[derive(Clone)] -pub struct FogViewService { +pub struct FogViewService +where + E: ViewEnclaveProxy, + DB: RecoveryDb + Send + Sync, + SS: ShardingStrategy, +{ /// Enclave providing access to the Recovery DB enclave: E, @@ -40,9 +49,17 @@ pub struct FogViewService { /// Slog logger object logger: Logger, + + /// Dictates what blocks to process. + sharding_strategy: SS, } -impl FogViewService { +impl FogViewService +where + E: ViewEnclaveProxy, + DB: RecoveryDb + Send + Sync, + SS: ShardingStrategy, +{ /// Creates a new fog-view-service node (but does not create sockets and /// start it etc.) pub fn new( @@ -51,6 +68,7 @@ impl FogViewService { db_poll_shared_state: Arc>, authenticator: Arc, client_listen_uri: ClientListenUri, + sharding_strategy: SS, logger: Logger, ) -> Self { Self { @@ -59,6 +77,7 @@ impl FogViewService { db_poll_shared_state, authenticator, client_listen_uri, + sharding_strategy, logger, } } @@ -151,6 +170,37 @@ impl FogViewService { }) } + fn process_queries( + &mut self, + fog_view_store_uri: FogViewStoreUri, + queries: Vec, + ) -> MultiViewStoreQueryResponse { + let mut response = MultiViewStoreQueryResponse::new(); + response.set_fog_view_store_uri(fog_view_store_uri.url().to_string()); + for query in queries.into_iter() { + let result = self.query_impl(query); + // Only one of the query messages in an MVSQR is intended for this store + if let Ok(attested_message) = result { + { + let shared_state = self.db_poll_shared_state.lock().expect("mutex poisoned"); + if !self + .sharding_strategy + .is_ready_to_serve_tx_outs(shared_state.processed_block_count.into()) + { + response.set_status(MultiViewStoreQueryResponseStatus::NOT_READY); + } else { + response.set_query_response(attested_message); + response.set_status(MultiViewStoreQueryResponseStatus::SUCCESS); + } + } + return response; + } + } + + response.set_status(MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR); + response + } + // Helper function that is common fn enclave_err_to_rpc_status(&self, context: &str, src: ViewEnclaveError) -> RpcStatus { // Treat prost-decode error as an invalid arg, @@ -169,7 +219,12 @@ impl FogViewService { } // Implement grpc trait -impl FogViewApi for FogViewService { +impl FogViewApi for FogViewService +where + E: ViewEnclaveProxy, + DB: RecoveryDb + Send + Sync, + SS: ShardingStrategy, +{ fn auth( &mut self, ctx: RpcContext, @@ -204,7 +259,12 @@ impl FogViewApi for FogViewSe } /// Implement the FogViewStoreService gRPC trait. -impl FogViewStoreApi for FogViewService { +impl FogViewStoreApi for FogViewService +where + E: ViewEnclaveProxy, + DB: RecoveryDb + Send + Sync, + SS: ShardingStrategy, +{ fn auth( &mut self, ctx: RpcContext, @@ -235,20 +295,7 @@ impl FogViewStoreApi for FogV return send_result(ctx, sink, err.into(), logger); } if let ClientListenUri::Store(fog_view_store_uri) = self.client_listen_uri.clone() { - let mut response = MultiViewStoreQueryResponse::new(); - response.set_fog_view_store_uri(fog_view_store_uri.url().to_string()); - for query in request.queries { - let result = self.query_impl(query); - if let Ok(attested_message) = result { - response.set_query_response(attested_message); - return send_result(ctx, sink, Ok(response), logger); - } - } - - let decryption_error = response.mut_decryption_error(); - decryption_error.set_error_message( - "Could not decrypt a query embedded in the MultiViewStoreQuery".to_string(), - ); + let response = self.process_queries(fog_view_store_uri, request.queries.into_vec()); send_result(ctx, sink, Ok(response), logger) } else { let rpc_permissions_error = rpc_permissions_error( diff --git a/fog/view/server/src/server.rs b/fog/view/server/src/server.rs index 368f2f90af..d7595b9017 100644 --- a/fog/view/server/src/server.rs +++ b/fog/view/server/src/server.rs @@ -81,7 +81,7 @@ where enclave.clone(), recovery_db.clone(), readiness_indicator.clone(), - sharding_strategy, + sharding_strategy.clone(), logger.clone(), ); @@ -108,6 +108,7 @@ where db_poll_thread.get_shared_state(), client_authenticator, config.client_listen_uri.clone(), + sharding_strategy, logger.clone(), )); log::debug!(logger, "Constructed View GRPC Service"); @@ -223,6 +224,9 @@ pub struct DbPollSharedState { /// The cumulative txo count of the last known block. pub last_known_block_cumulative_txo_count: u64, + + /// The number of blocks that have been processed. + pub processed_block_count: u64, } /// A thread that periodically pushes new tx data from db to enclave @@ -628,6 +632,8 @@ where // Track that this block was processed. self.enclave_block_tracker .block_processed(ingress_key, block_index); + let mut shared_state = self.shared_state.lock().expect("mutex poisoned"); + shared_state.processed_block_count += 1; // Update metrics counters::BLOCKS_ADDED_COUNT.inc(); diff --git a/fog/view/server/src/shard_responses_processor.rs b/fog/view/server/src/shard_responses_processor.rs index 59d213297e..74aeaf18d8 100644 --- a/fog/view/server/src/shard_responses_processor.rs +++ b/fog/view/server/src/shard_responses_processor.rs @@ -3,7 +3,10 @@ use crate::error::RouterServerError; use mc_attest_api::attest; use mc_common::ResponderId; -use mc_fog_api::{view::MultiViewStoreQueryResponse, view_grpc::FogViewStoreApiClient}; +use mc_fog_api::{ + view::{MultiViewStoreQueryResponse, MultiViewStoreQueryResponseStatus}, + view_grpc::FogViewStoreApiClient, +}; use mc_fog_uri::FogViewStoreUri; use std::{str::FromStr, sync::Arc}; @@ -46,16 +49,23 @@ pub fn process_shard_responses( let mut new_query_responses = Vec::new(); for (shard_client, mut response) in clients_and_responses { - // We did not receive a query_response for this shard.Therefore, we need to: - // (a) retry the query - // (b) authenticate with the Fog View Store that returned the decryption_error let store_uri = FogViewStoreUri::from_str(response.get_fog_view_store_uri())?; - if response.has_decryption_error() { - shard_clients_for_retry.push(shard_client); - view_store_uris_for_authentication.push(store_uri); - } else { - let store_responder_id = ResponderId::from_str(&store_uri.to_string())?; - new_query_responses.push((store_responder_id, response.take_query_response())); + match response.get_status() { + MultiViewStoreQueryResponseStatus::SUCCESS => { + let store_responder_id = ResponderId::from_str(&store_uri.to_string())?; + new_query_responses.push((store_responder_id, response.take_query_response())); + } + // The shard was unable to produce a query response because the Fog View Store + // it contacted isn't authenticated with the Fog View Router. Therefore + // we need to (a) retry the query (b) authenticate with the Fog View + // Store. + MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR => { + shard_clients_for_retry.push(shard_client); + view_store_uris_for_authentication.push(store_uri); + } + // Don't do anything if the Fog View Store isn't ready. It's already authenticated, + // hasn't returned a new query response, and shouldn't be retried yet. + MultiViewStoreQueryResponseStatus::NOT_READY => (), } } @@ -88,11 +98,15 @@ mod tests { FogViewStoreScheme::DEFAULT_INSECURE_PORT, ); successful_response.set_fog_view_store_uri(view_uri_string); + successful_response.set_status(MultiViewStoreQueryResponseStatus::SUCCESS); successful_response } - fn create_failed_mvq_response(client_index: usize) -> MultiViewStoreQueryResponse { + fn create_failed_mvq_response( + client_index: usize, + status: MultiViewStoreQueryResponseStatus, + ) -> MultiViewStoreQueryResponse { let mut failed_response = MultiViewStoreQueryResponse::new(); let view_uri_string = format!( "{}://node{}.test.mobilecoin.com:{}", @@ -101,9 +115,7 @@ mod tests { FogViewStoreScheme::DEFAULT_INSECURE_PORT, ); failed_response.set_fog_view_store_uri(view_uri_string); - failed_response - .mut_decryption_error() - .set_error_message("Could not decrypt shard response".to_string()); + failed_response.set_status(status); failed_response } @@ -175,10 +187,13 @@ mod tests { } #[test_with_logger] - fn one_failed_response_one_pending_shard_client(logger: Logger) { + fn one_auth_error_response_one_pending_shard_client(logger: Logger) { let client_index = 0; let grpc_client = create_grpc_client(client_index, logger.clone()); - let failed_mvq_response = create_failed_mvq_response(client_index); + let failed_mvq_response = create_failed_mvq_response( + client_index, + MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR, + ); let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; let result = process_shard_responses(clients_and_responses); @@ -190,10 +205,13 @@ mod tests { } #[test_with_logger] - fn one_failed_response_one_pending_authentications(logger: Logger) { + fn one_auth_error_response_one_pending_authentications(logger: Logger) { let client_index: usize = 0; let grpc_client = create_grpc_client(client_index, logger.clone()); - let failed_mvq_response = create_failed_mvq_response(client_index); + let failed_mvq_response = create_failed_mvq_response( + client_index, + MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR, + ); let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; let result = process_shard_responses(clients_and_responses); @@ -205,10 +223,29 @@ mod tests { } #[test_with_logger] - fn one_failed_response_zero_new_query_responses(logger: Logger) { + fn one_auth_error_response_zero_new_query_responses(logger: Logger) { + let client_index: usize = 0; + let grpc_client = create_grpc_client(client_index, logger.clone()); + let failed_mvq_response = create_failed_mvq_response( + client_index, + MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR, + ); + let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let new_query_responses = result.unwrap().new_query_responses; + assert!(new_query_responses.is_empty()); + } + + #[test_with_logger] + fn one_not_ready_response_zero_new_query_responses(logger: Logger) { let client_index: usize = 0; let grpc_client = create_grpc_client(client_index, logger.clone()); - let failed_mvq_response = create_failed_mvq_response(client_index); + let failed_mvq_response = + create_failed_mvq_response(client_index, MultiViewStoreQueryResponseStatus::NOT_READY); let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; let result = process_shard_responses(clients_and_responses); @@ -220,14 +257,49 @@ mod tests { } #[test_with_logger] - fn mixed_failed_and_successful_responses_processes_correctly(logger: Logger) { + fn one_not_ready_response_zero_pending_authentications(logger: Logger) { + let client_index: usize = 0; + let grpc_client = create_grpc_client(client_index, logger.clone()); + let failed_mvq_response = + create_failed_mvq_response(client_index, MultiViewStoreQueryResponseStatus::NOT_READY); + let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let view_store_uris_for_authentication = result.unwrap().view_store_uris_for_authentication; + assert_eq!(view_store_uris_for_authentication.len(), 0); + } + + #[test_with_logger] + fn one_not_ready_response_zero_pending_shard_clients(logger: Logger) { + let client_index: usize = 0; + let grpc_client = create_grpc_client(client_index, logger.clone()); + let failed_mvq_response = + create_failed_mvq_response(client_index, MultiViewStoreQueryResponseStatus::NOT_READY); + let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + + let result = process_shard_responses(clients_and_responses); + + assert!(result.is_ok()); + + let shard_clients_for_retry = result.unwrap().shard_clients_for_retry; + assert_eq!(shard_clients_for_retry.len(), 0); + } + + #[test_with_logger] + fn mixed_auth_error_and_successful_responses_processes_correctly(logger: Logger) { const NUMBER_OF_FAILURES: usize = 11; const NUMBER_OF_SUCCESSES: usize = 8; let mut clients_and_responses = Vec::new(); for i in 0..NUMBER_OF_FAILURES { let grpc_client = create_grpc_client(i, logger.clone()); - let failed_mvq_response = create_failed_mvq_response(i); + let failed_mvq_response = create_failed_mvq_response( + i, + MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR, + ); clients_and_responses.push((grpc_client, failed_mvq_response)); } for i in 0..NUMBER_OF_SUCCESSES { diff --git a/fog/view/server/src/sharding_strategy.rs b/fog/view/server/src/sharding_strategy.rs index 7e7d9be83e..766038dae1 100644 --- a/fog/view/server/src/sharding_strategy.rs +++ b/fog/view/server/src/sharding_strategy.rs @@ -6,7 +6,7 @@ //! TxOuts across Fog View Store instances. use mc_blockchain_types::BlockIndex; -use mc_fog_types::common::BlockRange; +use mc_fog_types::{common::BlockRange, BlockCount}; use serde::Serialize; use std::str::FromStr; @@ -14,6 +14,13 @@ use std::str::FromStr; pub trait ShardingStrategy { /// Returns true if the Fog View Store should process this block. fn should_process_block(&self, block_index: BlockIndex) -> bool; + + /// Returns true if the Fog View Store is ready to serve TxOuts to the + /// client. + /// + /// Different sharding strategies might be ready to serve TxOuts when + /// different conditions have been met. + fn is_ready_to_serve_tx_outs(&self, processed_block_count: BlockCount) -> bool; } /// Determines whether or not to process a block's TxOuts based on the "epoch" @@ -33,6 +40,10 @@ impl ShardingStrategy for EpochShardingStrategy { fn should_process_block(&self, block_index: BlockIndex) -> bool { self.epoch_block_range.contains(block_index) } + + fn is_ready_to_serve_tx_outs(&self, processed_block_count: BlockCount) -> bool { + self.have_enough_blocks_been_processed(processed_block_count) + } } impl Default for EpochShardingStrategy { @@ -48,6 +59,22 @@ impl EpochShardingStrategy { pub fn new(epoch_block_range: BlockRange) -> Self { Self { epoch_block_range } } + + fn have_enough_blocks_been_processed(&self, processed_block_count: BlockCount) -> bool { + if self.is_first_epoch() { + return true; + } + + let epoch_block_range_length = + self.epoch_block_range.end_block - self.epoch_block_range.start_block; + let minimum_processed_block_count = epoch_block_range_length / 2; + + u64::from(processed_block_count) >= minimum_processed_block_count + } + + fn is_first_epoch(&self) -> bool { + self.epoch_block_range.start_block == 0 + } } impl FromStr for EpochShardingStrategy { @@ -140,4 +167,75 @@ mod epoch_sharding_strategy_tests { assert!(!should_process_block) } + + #[test] + fn is_ready_to_serve_tx_outs_allows_0_in_0_to_100_shard() { + // The first epoch has a start block == 0. + const START_BLOCK: BlockIndex = 0; + const END_BLOCK_EXCLUSIVE: BlockIndex = 100; + let epoch_block_range = BlockRange::new(START_BLOCK, END_BLOCK_EXCLUSIVE); + let epoch_sharding_strategy = EpochShardingStrategy::new(epoch_block_range); + + let is_ready_to_serve_txouts = epoch_sharding_strategy.is_ready_to_serve_tx_outs(0.into()); + + assert!(is_ready_to_serve_txouts) + } + + #[test] + fn is_ready_to_serve_tx_outs_allows_70_in_0_to_100_shard() { + // The first epoch has a start block == 0. + const START_BLOCK: BlockIndex = 0; + const END_BLOCK_EXCLUSIVE: BlockIndex = 100; + let epoch_block_range = BlockRange::new(START_BLOCK, END_BLOCK_EXCLUSIVE); + let epoch_sharding_strategy = EpochShardingStrategy::new(epoch_block_range); + + let is_ready_to_serve_txouts = epoch_sharding_strategy.is_ready_to_serve_tx_outs(70.into()); + + assert!(is_ready_to_serve_txouts) + } + + #[test] + fn is_ready_to_serve_tx_outs_not_first_shard_prevents_less_than_minimum() { + const START_BLOCK: BlockIndex = 100; + const END_BLOCK_EXCLUSIVE: BlockIndex = 111; + let epoch_block_range_length = END_BLOCK_EXCLUSIVE - START_BLOCK; + let minimum_processed_block_count = epoch_block_range_length / 2; + let epoch_block_range = BlockRange::new(START_BLOCK, END_BLOCK_EXCLUSIVE); + let epoch_sharding_strategy = EpochShardingStrategy::new(epoch_block_range); + + let is_ready_to_serve_txouts = epoch_sharding_strategy + .is_ready_to_serve_tx_outs((minimum_processed_block_count - 1).into()); + + assert!(!is_ready_to_serve_txouts) + } + + #[test] + fn is_ready_to_serve_tx_outs_not_first_shard_allows_minimum() { + const START_BLOCK: BlockIndex = 100; + const END_BLOCK_EXCLUSIVE: BlockIndex = 111; + let epoch_block_range_length = END_BLOCK_EXCLUSIVE - START_BLOCK; + let minimum_processed_block_count = epoch_block_range_length / 2; + let epoch_block_range = BlockRange::new(START_BLOCK, END_BLOCK_EXCLUSIVE); + let epoch_sharding_strategy = EpochShardingStrategy::new(epoch_block_range); + + let is_ready_to_serve_txouts = + epoch_sharding_strategy.is_ready_to_serve_tx_outs(minimum_processed_block_count.into()); + + assert!(is_ready_to_serve_txouts) + } + + #[test] + fn is_ready_to_serve_tx_outs_not_first_shard_allows_over_minimum() { + const START_BLOCK: BlockIndex = 100; + const END_BLOCK_EXCLUSIVE: BlockIndex = 110; + let epoch_block_range_length = END_BLOCK_EXCLUSIVE - START_BLOCK; + let minimum_processed_block_count = epoch_block_range_length / 2; + let epoch_block_range = BlockRange::new(START_BLOCK, END_BLOCK_EXCLUSIVE); + let epoch_sharding_strategy = EpochShardingStrategy::new(epoch_block_range); + + let is_ready_to_serve_txouts = epoch_sharding_strategy + .is_ready_to_serve_tx_outs((minimum_processed_block_count + 1).into()); + + assert!(is_ready_to_serve_txouts) + } }