Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add readiness API to ShardingStrategy #2353

Merged
merged 5 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions fog/api/proto/view.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,23 @@ message FogViewRouterResponse {
}
}

message FogViewStoreDecryptionError {
/// An error message that describes the decryption error.
string error_message = 1;
}

message MultiViewStoreQueryRequest {
/// A list of queries encrypted for Fog View Stores.
repeated attest.Message queries = 1;
}

/// The status associated with a MultiViewStoreQueryResponse
enum MultiViewStoreQueryResponseStatus {
/// The Fog View Store successfully fulfilled the request.
SUCCESS = 0;
/// The Fog View Store is unable to decrypt a query within the MultiViewStoreQuery. It needs to be authenticated
/// by the router.
AUTHENTICATION_ERROR = 1;
/// The Fog View Store is not ready to service a MultiViewStoreQueryRequest. This might be because the store has
/// not loaded enough blocks yet.
NOT_READY = 2;
}

message MultiViewStoreQueryResponse {
/// Optional field that gets set when the Fog View Store is able to decrypt a query
/// included in the MultiViewStoreQueryRequest and create a query response for that
Expand All @@ -58,9 +65,8 @@ message MultiViewStoreQueryResponse {
/// described by this URI.
string fog_view_store_uri = 2;

/// Optional error that gets returned when the Fog View Store
/// cannot decrypt the MultiViewStoreQuery.
FogViewStoreDecryptionError decryption_error = 3;
/// Status that gets returned when the Fog View Store services a MultiViewStoreQueryRequest.
MultiViewStoreQueryResponseStatus status = 3;
}

/// Fulfills requests sent directly by a Fog client, e.g. a mobile phone using the SDK.
Expand Down
89 changes: 68 additions & 21 deletions fog/view/server/src/fog_view_service.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
// Copyright (c) 2018-2022 The MobileCoin Foundation

use crate::{config::ClientListenUri, server::DbPollSharedState};
use crate::{
config::ClientListenUri, server::DbPollSharedState, sharding_strategy::ShardingStrategy,
};
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use mc_attest_api::attest;
use mc_common::logger::{log, Logger};
use mc_fog_api::{
view::{MultiViewStoreQueryRequest, MultiViewStoreQueryResponse},
view::{
MultiViewStoreQueryRequest, MultiViewStoreQueryResponse, MultiViewStoreQueryResponseStatus,
},
view_grpc::{FogViewApi, FogViewStoreApi},
};
use mc_fog_recovery_db_iface::RecoveryDb;
use mc_fog_types::view::QueryRequestAAD;
use mc_fog_uri::ConnectionUri;
use mc_fog_uri::{ConnectionUri, FogViewStoreUri};
use mc_fog_view_enclave::{Error as ViewEnclaveError, ViewEnclaveProxy};
use mc_fog_view_enclave_api::UntrustedQueryResponse;
use mc_util_grpc::{
Expand All @@ -22,7 +26,12 @@ use mc_util_telemetry::{tracer, Tracer};
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct FogViewService<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> {
pub struct FogViewService<E, DB, SS>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Send + Sync,
SS: ShardingStrategy,
{
/// Enclave providing access to the Recovery DB
enclave: E,

Expand All @@ -40,9 +49,17 @@ pub struct FogViewService<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> {

/// Slog logger object
logger: Logger,

/// Dictates what blocks to process.
sharding_strategy: SS,
}

impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewService<E, DB> {
impl<E, DB, SS> FogViewService<E, DB, SS>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Send + Sync,
SS: ShardingStrategy,
{
/// Creates a new fog-view-service node (but does not create sockets and
/// start it etc.)
pub fn new(
Expand All @@ -51,6 +68,7 @@ impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewService<E, DB> {
db_poll_shared_state: Arc<Mutex<DbPollSharedState>>,
authenticator: Arc<dyn Authenticator + Send + Sync>,
client_listen_uri: ClientListenUri,
sharding_strategy: SS,
logger: Logger,
) -> Self {
Self {
Expand All @@ -59,6 +77,7 @@ impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewService<E, DB> {
db_poll_shared_state,
authenticator,
client_listen_uri,
sharding_strategy,
logger,
}
}
Expand Down Expand Up @@ -151,6 +170,37 @@ impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewService<E, DB> {
})
}

fn process_queries(
&mut self,
fog_view_store_uri: FogViewStoreUri,
queries: Vec<attest::Message>,
) -> MultiViewStoreQueryResponse {
let mut response = MultiViewStoreQueryResponse::new();
response.set_fog_view_store_uri(fog_view_store_uri.url().to_string());
for query in queries.into_iter() {
let result = self.query_impl(query);
// Only one of the query messages in an MVSQR is intended for this store
if let Ok(attested_message) = result {
{
let shared_state = self.db_poll_shared_state.lock().expect("mutex poisoned");
if !self
.sharding_strategy
.is_ready_to_serve_tx_outs(shared_state.processed_block_count.into())
{
response.set_status(MultiViewStoreQueryResponseStatus::NOT_READY);
} else {
response.set_query_response(attested_message);
response.set_status(MultiViewStoreQueryResponseStatus::SUCCESS);
}
}
return response;
}
}

response.set_status(MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR);
response
}

// Helper function that is common
fn enclave_err_to_rpc_status(&self, context: &str, src: ViewEnclaveError) -> RpcStatus {
// Treat prost-decode error as an invalid arg,
Expand All @@ -169,7 +219,12 @@ impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewService<E, DB> {
}

// Implement grpc trait
impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewApi for FogViewService<E, DB> {
impl<E, DB, SS> FogViewApi for FogViewService<E, DB, SS>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Send + Sync,
SS: ShardingStrategy,
{
fn auth(
&mut self,
ctx: RpcContext,
Expand Down Expand Up @@ -204,7 +259,12 @@ impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewApi for FogViewSe
}

/// Implement the FogViewStoreService gRPC trait.
impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewStoreApi for FogViewService<E, DB> {
impl<E, DB, SS> FogViewStoreApi for FogViewService<E, DB, SS>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Send + Sync,
SS: ShardingStrategy,
{
fn auth(
&mut self,
ctx: RpcContext,
Expand Down Expand Up @@ -235,20 +295,7 @@ impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewStoreApi for FogV
return send_result(ctx, sink, err.into(), logger);
}
if let ClientListenUri::Store(fog_view_store_uri) = self.client_listen_uri.clone() {
let mut response = MultiViewStoreQueryResponse::new();
response.set_fog_view_store_uri(fog_view_store_uri.url().to_string());
for query in request.queries {
let result = self.query_impl(query);
if let Ok(attested_message) = result {
response.set_query_response(attested_message);
return send_result(ctx, sink, Ok(response), logger);
}
}

let decryption_error = response.mut_decryption_error();
decryption_error.set_error_message(
"Could not decrypt a query embedded in the MultiViewStoreQuery".to_string(),
);
let response = self.process_queries(fog_view_store_uri, request.queries.into_vec());
send_result(ctx, sink, Ok(response), logger)
} else {
let rpc_permissions_error = rpc_permissions_error(
Expand Down
8 changes: 7 additions & 1 deletion fog/view/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
enclave.clone(),
recovery_db.clone(),
readiness_indicator.clone(),
sharding_strategy,
sharding_strategy.clone(),
logger.clone(),
);

Expand All @@ -108,6 +108,7 @@ where
db_poll_thread.get_shared_state(),
client_authenticator,
config.client_listen_uri.clone(),
sharding_strategy,
logger.clone(),
));
log::debug!(logger, "Constructed View GRPC Service");
Expand Down Expand Up @@ -223,6 +224,9 @@ pub struct DbPollSharedState {

/// The cumulative txo count of the last known block.
pub last_known_block_cumulative_txo_count: u64,

/// The number of blocks that have been processed.
pub processed_block_count: u64,
}

/// A thread that periodically pushes new tx data from db to enclave
Expand Down Expand Up @@ -628,6 +632,8 @@ where
// Track that this block was processed.
self.enclave_block_tracker
.block_processed(ingress_key, block_index);
let mut shared_state = self.shared_state.lock().expect("mutex poisoned");
shared_state.processed_block_count += 1;

// Update metrics
counters::BLOCKS_ADDED_COUNT.inc();
Expand Down
Loading