From b8ffdedccbc2b73ecf78319af16ea087598e30a2 Mon Sep 17 00:00:00 2001 From: Sam Date: Wed, 4 Jan 2023 14:24:51 -0500 Subject: [PATCH 01/11] Add sharding_strategies field to config --- fog/view/server/src/config.rs | 10 +++++++-- fog/view/server/test-utils/src/lib.rs | 29 ++++++++++++++++++--------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/fog/view/server/src/config.rs b/fog/view/server/src/config.rs index 01b7ba5c63..863aa525a4 100644 --- a/fog/view/server/src/config.rs +++ b/fog/view/server/src/config.rs @@ -121,8 +121,14 @@ pub struct FogViewRouterConfig { #[clap(long, env = "MC_CLIENT_LISTEN_URI")] pub client_listen_uri: RouterClientListenUri, - /// gRPC listening URI for Fog View Stores. - #[clap(long, env = "MC_CLIENT_LISTEN_URI")] + /// Sharding Strategies for each Shard. Should be indexed the same as the + /// `shard_uris` field. + #[clap(long, env = "MC_VIEW_SHARDING_STRATEGIES")] + pub sharding_strategies: Vec, + + /// gRPC listening URI for Fog View Stores. Should be indexed the same as + /// the `sharding_stratgies` field. + #[clap(long, env = "MC_VIEW_SHARD_URIS")] pub shard_uris: Vec, /// PEM-formatted keypair to send with an Attestation Request. diff --git a/fog/view/server/test-utils/src/lib.rs b/fog/view/server/test-utils/src/lib.rs index 75cf074c74..e1d695bcfd 100644 --- a/fog/view/server/test-utils/src/lib.rs +++ b/fog/view/server/test-utils/src/lib.rs @@ -72,7 +72,7 @@ pub struct RouterTestEnvironment { impl RouterTestEnvironment { /// Creates a `RouterTestEnvironment` for the router integration tests. pub fn new(omap_capacity: u64, store_block_ranges: Vec, logger: Logger) -> Self { - let (db_test_context, store_servers, store_clients, store_uris) = + let (db_test_context, store_servers, store_clients, shard_uris, sharding_strategies) = Self::create_view_stores(omap_capacity, store_block_ranges, logger.clone()); let port = portpicker::pick_unused_port().expect("pick_unused_port"); let router_uri = @@ -81,14 +81,14 @@ impl RouterTestEnvironment { let port = portpicker::pick_unused_port().expect("pick_unused_port"); let admin_listen_uri = AdminUri::from_str(&format!("insecure-mca://127.0.0.1:{}", port)).unwrap(); - let config = FogViewRouterConfig { chain_id: "local".to_string(), client_responder_id: router_uri .responder_id() .expect("Could not get responder id for Fog View Router."), ias_api_key: Default::default(), - shard_uris: store_uris, + sharding_strategies, + shard_uris, ias_spid: Default::default(), client_listen_uri: RouterClientListenUri::Streaming(router_uri.clone()), client_auth_token_max_lifetime: Default::default(), @@ -113,7 +113,7 @@ impl RouterTestEnvironment { store_block_ranges: Vec, logger: Logger, ) -> Self { - let (db_test_context, store_servers, store_clients, store_uris) = + let (db_test_context, store_servers, store_clients, shard_uris, sharding_strategies) = Self::create_view_stores(omap_capacity, store_block_ranges, logger.clone()); let port = portpicker::pick_unused_port().expect("pick_unused_port"); let router_uri = @@ -129,7 +129,8 @@ impl RouterTestEnvironment { .expect("Could not get responder id for Fog View Router."), ias_api_key: Default::default(), ias_spid: Default::default(), - shard_uris: store_uris, + sharding_strategies, + shard_uris, client_listen_uri: RouterClientListenUri::Unary(router_uri.clone()), client_auth_token_max_lifetime: Default::default(), client_auth_token_secret: None, @@ -218,12 +219,14 @@ impl RouterTestEnvironment { Vec, Arc>>>, Vec, + Vec, ) { let db_test_context = SqlRecoveryDbTestContext::new(logger.clone()); let db = db_test_context.get_db_instance(); let mut store_servers = Vec::new(); let mut store_clients = HashMap::new(); - let mut store_uris: Vec = Vec::new(); + let mut shard_uris: Vec = Vec::new(); + let mut sharding_strategies: Vec = Vec::new(); for (i, store_block_range) in store_block_ranges.into_iter().enumerate() { let (store, store_uri) = { @@ -233,9 +236,11 @@ impl RouterTestEnvironment { port )) .unwrap(); - store_uris.push(uri.clone()); let epoch_sharding_strategy = EpochShardingStrategy::new(store_block_range); + let sharding_strategy = Epoch(epoch_sharding_strategy); + shard_uris.push(uri.clone()); + sharding_strategies.push(sharding_strategy.clone()); let config = ViewConfig { chain_id: "local".to_string(), @@ -247,7 +252,7 @@ impl RouterTestEnvironment { ias_api_key: Default::default(), admin_listen_uri: Default::default(), client_auth_token_max_lifetime: Default::default(), - sharding_strategy: ShardingStrategy::Epoch(epoch_sharding_strategy), + sharding_strategy, postgres_config: Default::default(), block_query_batch_size: 2, }; @@ -291,7 +296,13 @@ impl RouterTestEnvironment { let store_clients = Arc::new(RwLock::new(store_clients)); - (db_test_context, store_servers, store_clients, store_uris) + ( + db_test_context, + store_servers, + store_clients, + shard_uris, + sharding_strategies, + ) } } From 8e9bce9a7f07907b8084a061c8017d075cb59beb Mon Sep 17 00:00:00 2001 From: Sam Date: Wed, 4 Jan 2023 17:14:41 -0500 Subject: [PATCH 02/11] Plumb block range from config down to shard processor --- fog/view/server/src/bin/router.rs | 28 ++- fog/view/server/src/fog_view_router_server.rs | 35 ++- .../server/src/fog_view_router_service.rs | 24 +-- fog/view/server/src/router_admin_service.rs | 40 ++-- fog/view/server/src/router_request_handler.rs | 71 +++---- .../server/src/shard_responses_processor.rs | 201 +++++++++++------- fog/view/server/test-utils/src/lib.rs | 18 +- 7 files changed, 241 insertions(+), 176 deletions(-) diff --git a/fog/view/server/src/bin/router.rs b/fog/view/server/src/bin/router.rs index 6a5ef16208..494c68248f 100644 --- a/fog/view/server/src/bin/router.rs +++ b/fog/view/server/src/bin/router.rs @@ -8,12 +8,13 @@ use mc_common::{logger::log, time::SystemTimeProvider}; use mc_fog_api::view_grpc::FogViewStoreApiClient; use mc_fog_view_enclave::{SgxViewEnclave, ENCLAVE_FILE}; use mc_fog_view_server::{ - config::FogViewRouterConfig, fog_view_router_server::FogViewRouterServer, + config::{FogViewRouterConfig, ShardingStrategy::Epoch}, + fog_view_router_server::{FogViewRouterServer, Shard}, + sharding_strategy::ShardingStrategy, }; use mc_util_cli::ParserWithBuildInfo; use mc_util_grpc::ConnectionUriGrpcioChannel; use std::{ - collections::HashMap, env, sync::{Arc, RwLock}, }; @@ -40,28 +41,39 @@ fn main() { logger.clone(), ); - // TODO: Remove and get from a config. - let mut fog_view_store_grpc_clients = HashMap::new(); + let mut shards = Vec::new(); let grpc_env = Arc::new( grpcio::EnvBuilder::new() .name_prefix("Main-RPC".to_string()) .build(), ); - for shard_uri in config.shard_uris.clone() { + for (i, shard_uri) in config.shard_uris.clone().into_iter().enumerate() { let fog_view_store_grpc_client = FogViewStoreApiClient::new( ChannelBuilder::default_channel_builder(grpc_env.clone()) .connect_to_uri(&shard_uri, &logger), ); - fog_view_store_grpc_clients.insert(shard_uri, Arc::new(fog_view_store_grpc_client)); + + let sharding_strategy = config + .sharding_strategies + .get(i) + .unwrap_or_else(|| panic!("Couldn't find shard at index {}", i)); + let Epoch(epoch_sharding_strategy) = sharding_strategy; + let block_range = epoch_sharding_strategy.get_block_range(); + let shard = Shard::new( + shard_uri.clone(), + Arc::new(fog_view_store_grpc_client), + block_range, + ); + shards.push(shard); } - let fog_view_store_grpc_clients = Arc::new(RwLock::new(fog_view_store_grpc_clients)); + let shards = Arc::new(RwLock::new(shards)); let ias_client = Client::new(&config.ias_api_key).expect("Could not create IAS client"); let mut router_server = FogViewRouterServer::new( config, sgx_enclave, ias_client, - fog_view_store_grpc_clients, + shards, SystemTimeProvider::default(), logger, ); diff --git a/fog/view/server/src/fog_view_router_server.rs b/fog/view/server/src/fog_view_router_server.rs index 5866c2283c..bb92aa813a 100644 --- a/fog/view/server/src/fog_view_router_server.rs +++ b/fog/view/server/src/fog_view_router_server.rs @@ -17,6 +17,7 @@ use mc_common::{ time::TimeProvider, }; use mc_fog_api::view_grpc; +use mc_fog_types::common::BlockRange; use mc_fog_uri::{ConnectionUri, FogViewStoreUri}; use mc_fog_view_enclave::ViewEnclaveProxy; use mc_sgx_report_cache_untrusted::ReportCacheThread; @@ -24,10 +25,7 @@ use mc_util_grpc::{ AnonymousAuthenticator, Authenticator, ConnectionUriGrpcioServer, ReadinessIndicator, TokenAuthenticator, }; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::sync::{Arc, RwLock}; pub struct FogViewRouterServer where @@ -43,6 +41,33 @@ where report_cache_thread: Option, } +/// A shard that fulfills a portion of the router's query requests. +#[derive(Clone)] +pub struct Shard { + /// The uri that this shard listens on. + pub uri: FogViewStoreUri, + + /// The gRPC client that is used to communicate with the shard. + pub grpc_client: Arc, + + /// The `BlockRange` that this shard is responsible for providing. + pub block_range: BlockRange, +} + +impl Shard { + pub fn new( + uri: FogViewStoreUri, + grpc_client: Arc, + block_range: BlockRange, + ) -> Self { + Self { + uri, + grpc_client, + block_range, + } + } +} + impl FogViewRouterServer where E: ViewEnclaveProxy, @@ -53,7 +78,7 @@ where config: FogViewRouterConfig, enclave: E, ra_client: RC, - shards: Arc>>>, + shards: Arc>>, time_provider: impl TimeProvider + 'static, logger: Logger, ) -> FogViewRouterServer diff --git a/fog/view/server/src/fog_view_router_service.rs b/fog/view/server/src/fog_view_router_service.rs index a10579f309..c5646594ec 100644 --- a/fog/view/server/src/fog_view_router_service.rs +++ b/fog/view/server/src/fog_view_router_service.rs @@ -1,23 +1,19 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation -use crate::{router_request_handler, SVC_COUNTERS}; +use crate::{fog_view_router_server::Shard, router_request_handler, SVC_COUNTERS}; use futures::{executor::block_on, FutureExt, TryFutureExt}; use grpcio::{DuplexSink, RequestStream, RpcContext, UnarySink}; use mc_attest_api::attest; use mc_common::logger::{log, Logger}; use mc_fog_api::{ view::{FogViewRouterRequest, FogViewRouterResponse}, - view_grpc::{FogViewApi, FogViewRouterApi, FogViewStoreApiClient}, + view_grpc::{FogViewApi, FogViewRouterApi}, }; -use mc_fog_uri::FogViewStoreUri; use mc_fog_view_enclave_api::ViewEnclaveProxy; use mc_util_grpc::{check_request_chain_id, rpc_logger, send_result, Authenticator}; use mc_util_metrics::ServiceMetrics; use mc_util_telemetry::tracer; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::sync::{Arc, RwLock}; #[derive(Clone)] pub struct FogViewRouterService @@ -25,7 +21,7 @@ where E: ViewEnclaveProxy, { enclave: E, - shard_clients: Arc>>>, + shards: Arc>>, chain_id: String, /// GRPC request authenticator. authenticator: Arc, @@ -40,14 +36,14 @@ impl FogViewRouterService { /// perform view store authentication on each one. pub fn new( enclave: E, - shard_clients: Arc>>>, + shards: Arc>>, chain_id: String, authenticator: Arc, logger: Logger, ) -> Self { Self { enclave, - shard_clients, + shards, chain_id, authenticator, logger, @@ -69,11 +65,11 @@ where let logger = logger.clone(); // TODO: Confirm that we don't need to perform the authenticator logic. I think // we don't because of streaming... - let shard_clients = self.shard_clients.read().expect("RwLock poisoned"); + let shards = self.shards.read().expect("RwLock poisoned"); let method_name = ServiceMetrics::get_method_name(&ctx); let future = router_request_handler::handle_requests( method_name, - shard_clients.values().cloned().collect(), + shards.clone(), self.enclave.clone(), requests, responses, @@ -134,12 +130,12 @@ where } // This will block the async API. We should use some sort of differentiator... - let shard_clients = self.shard_clients.read().expect("RwLock poisoned"); + let shards = self.shards.read().expect("RwLock poisoned"); let tracer = tracer!(); let result = block_on(router_request_handler::handle_query_request( request, self.enclave.clone(), - shard_clients.values().cloned().collect(), + shards.clone(), self.logger.clone(), &tracer, )) diff --git a/fog/view/server/src/router_admin_service.rs b/fog/view/server/src/router_admin_service.rs index 123bc5c372..5c8acb8a7a 100644 --- a/fog/view/server/src/router_admin_service.rs +++ b/fog/view/server/src/router_admin_service.rs @@ -1,9 +1,15 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation -use crate::SVC_COUNTERS; +use crate::{ + fog_view_router_server::Shard, + sharding_strategy::{EpochShardingStrategy, ShardingStrategy}, + SVC_COUNTERS, +}; use grpcio::{ChannelBuilder, RpcContext, RpcStatus, UnarySink}; -use itertools::Itertools; -use mc_common::logger::{log, Logger}; +use mc_common::{ + logger::{log, Logger}, + HashSet, +}; use mc_fog_api::{ view::AddShardRequest, view_grpc::{FogViewRouterAdminApi, FogViewStoreApiClient}, @@ -14,26 +20,19 @@ use mc_util_grpc::{ ConnectionUriGrpcioChannel, Empty, }; use std::{ - collections::HashMap, str::FromStr, sync::{Arc, RwLock}, }; #[derive(Clone)] pub struct FogViewRouterAdminService { - shard_clients: Arc>>>, + shards: Arc>>, logger: Logger, } impl FogViewRouterAdminService { - pub fn new( - shard_clients: Arc>>>, - logger: Logger, - ) -> Self { - Self { - shard_clients, - logger, - } + pub fn new(shards: Arc>>, logger: Logger) -> Self { + Self { shards, logger } } fn add_shard_impl(&mut self, shard_uri: &str, logger: &Logger) -> Result { @@ -44,8 +43,13 @@ impl FogViewRouterAdminService { logger, ) })?; - let mut shard_clients = self.shard_clients.write().expect("RwLock Poisoned"); - if shard_clients.keys().contains(&view_store_uri) { + let mut shards = self.shards.write().expect("RwLock Poisoned"); + if shards + .iter() + .map(|shard| shard.uri.clone()) + .collect::>() + .contains(&view_store_uri) + { let error = rpc_precondition_error( "add_shard", format!("Shard uri {} already exists in the shard list", shard_uri), @@ -62,7 +66,11 @@ impl FogViewRouterAdminService { ChannelBuilder::default_channel_builder(grpc_env) .connect_to_uri(&view_store_uri, logger), ); - shard_clients.insert(view_store_uri, Arc::new(view_store_client)); + // TODO: Add block range or sharding strategy to this... + // Check to make sure this block range isn't already covered... + let block_range = EpochShardingStrategy::default().get_block_range(); + let shard = Shard::new(view_store_uri, Arc::new(view_store_client), block_range); + shards.push(shard); Ok(Empty::new()) } diff --git a/fog/view/server/src/router_request_handler.rs b/fog/view/server/src/router_request_handler.rs index 3257d351b4..d4be90db31 100644 --- a/fog/view/server/src/router_request_handler.rs +++ b/fog/view/server/src/router_request_handler.rs @@ -2,6 +2,7 @@ use crate::{ error::{router_server_err_to_rpc_status, RouterServerError}, + fog_view_router_server::Shard, shard_responses_processor, SVC_COUNTERS, }; use futures::{future::try_join_all, SinkExt, TryStreamExt}; @@ -27,7 +28,7 @@ const RETRY_COUNT: usize = 3; /// Handles a series of requests sent by the Fog Router client. pub async fn handle_requests( method_name: GrpcMethodName, - shard_clients: Vec>, + shards: Vec, enclave: E, mut requests: RequestStream, mut responses: DuplexSink, @@ -38,13 +39,7 @@ where { while let Some(request) = requests.try_next().await? { let _timer = SVC_COUNTERS.req_impl(&method_name); - let result = handle_request( - request, - shard_clients.clone(), - enclave.clone(), - logger.clone(), - ) - .await; + let result = handle_request(request, shards.clone(), enclave.clone(), logger.clone()).await; // Perform prometheus logic before the match statement to ensure that // this logic is executed. @@ -65,7 +60,7 @@ where /// query. pub async fn handle_request( mut request: FogViewRouterRequest, - shard_clients: Vec>, + shards: Vec, enclave: E, logger: Logger, ) -> Result @@ -78,15 +73,9 @@ where handle_auth_request(enclave, request.take_auth(), logger) }) } else if request.has_query() { - handle_query_request( - request.take_query(), - enclave, - shard_clients, - logger, - &tracer, - ) - .with_context(create_context(&tracer, "router_query")) - .await + handle_query_request(request.take_query(), enclave, shards, logger, &tracer) + .with_context(create_context(&tracer, "router_query")) + .await } else { let rpc_status = rpc_invalid_arg_error( "Inavlid FogViewRouterRequest request", @@ -119,7 +108,7 @@ where pub async fn handle_query_request( query: attest::Message, enclave: E, - shard_clients: Vec>, + shards: Vec, logger: Logger, tracer: &BoxedTracer, ) -> Result @@ -139,7 +128,7 @@ where let query_responses = get_query_responses( sealed_query.clone(), enclave.clone(), - shard_clients.clone(), + shards.clone(), logger.clone(), ) .with_context(create_context(tracer, "router_get_query_responses")) @@ -165,14 +154,13 @@ where async fn get_query_responses( sealed_query: SealedClientMessage, enclave: E, - mut shard_clients: Vec>, + mut shards: Vec, logger: Logger, ) -> Result, RpcStatus> where E: ViewEnclaveProxy, { - let mut query_responses: Vec = - Vec::with_capacity(shard_clients.len()); + let mut query_responses: Vec = Vec::with_capacity(shards.len()); let mut remaining_tries = RETRY_COUNT; while remaining_tries > 0 { let multi_view_store_query_request = enclave @@ -185,16 +173,15 @@ where ) })? .into(); - let clients_and_responses = - route_query(&multi_view_store_query_request, shard_clients.clone()) - .await - .map_err(|err| { - router_server_err_to_rpc_status( - "Query: internal query routing error", - err, - logger.clone(), - ) - })?; + let clients_and_responses = route_query(&multi_view_store_query_request, shards.clone()) + .await + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query routing error", + err, + logger.clone(), + ) + })?; let processed_shard_response_data = shard_responses_processor::process_shard_responses( clients_and_responses, @@ -215,8 +202,8 @@ where query_responses.push(multi_view_store_query_response); } - shard_clients = processed_shard_response_data.shard_clients_for_retry; - if shard_clients.is_empty() { + shards = processed_shard_response_data.shards_for_retry; + if shards.is_empty() { break; } @@ -251,9 +238,9 @@ where /// Sends a client's query request to all of the Fog View shards. async fn route_query( request: &MultiViewStoreQueryRequest, - shard_clients: Vec>, -) -> Result, MultiViewStoreQueryResponse)>, RouterServerError> { - let responses = shard_clients + shards: Vec, +) -> Result, RouterServerError> { + let responses = shards .into_iter() .map(|shard_client| query_shard(request, shard_client)); try_join_all(responses).await @@ -262,12 +249,12 @@ async fn route_query( /// Sends a client's query request to one of the Fog View shards. async fn query_shard( request: &MultiViewStoreQueryRequest, - shard_client: Arc, -) -> Result<(Arc, MultiViewStoreQueryResponse), RouterServerError> { - let client_unary_receiver = shard_client.multi_view_store_query_async(request)?; + shard: Shard, +) -> Result<(Shard, MultiViewStoreQueryResponse), RouterServerError> { + let client_unary_receiver = shard.grpc_client.multi_view_store_query_async(request)?; let response = client_unary_receiver.await?; - Ok((shard_client, response.try_into()?)) + Ok((shard, response.try_into()?)) } /// Authenticates Fog View Stores that have previously not been authenticated. diff --git a/fog/view/server/src/shard_responses_processor.rs b/fog/view/server/src/shard_responses_processor.rs index 5f8a0bff62..cf713d6821 100644 --- a/fog/view/server/src/shard_responses_processor.rs +++ b/fog/view/server/src/shard_responses_processor.rs @@ -1,18 +1,17 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation -use crate::error::RouterServerError; +use crate::{error::RouterServerError, fog_view_router_server::Shard}; use mc_common::logger::{log, Logger}; -use mc_fog_api::view_grpc::FogViewStoreApiClient; use mc_fog_types::view::MultiViewStoreQueryResponse; use mc_fog_uri::FogViewStoreUri; -use std::{str::FromStr, sync::Arc}; +use std::str::FromStr; /// The result of processing the MultiViewStoreQueryResponse from each Fog View /// Shard. pub struct ProcessedShardResponseData { /// gRPC clients for Shards that need to be retried for a successful /// response. - pub shard_clients_for_retry: Vec>, + pub shards_for_retry: Vec, /// Uris for *individual* Fog View Stores that need to be authenticated with /// by the Fog Router. It should only have entries if @@ -25,12 +24,12 @@ pub struct ProcessedShardResponseData { impl ProcessedShardResponseData { pub fn new( - shard_clients_for_retry: Vec>, + shards_for_retry: Vec, view_store_uris_for_authentication: Vec, new_query_responses: Vec, ) -> Self { ProcessedShardResponseData { - shard_clients_for_retry, + shards_for_retry, view_store_uris_for_authentication, multi_view_store_query_responses: new_query_responses, } @@ -39,14 +38,16 @@ impl ProcessedShardResponseData { /// Processes the MultiViewStoreQueryResponses returned by each Fog View Shard. pub fn process_shard_responses( - clients_and_responses: Vec<(Arc, MultiViewStoreQueryResponse)>, + shards_and_responses: Vec<(Shard, MultiViewStoreQueryResponse)>, logger: Logger, ) -> Result { - let mut shard_clients_for_retry = Vec::new(); + let mut shards_for_retry = Vec::new(); let mut view_store_uris_for_authentication = Vec::new(); let mut new_query_responses = Vec::new(); - for (shard_client, response) in clients_and_responses { + for (shard, response) in shards_and_responses { + // TODO: Add check here and throw appropriate error if the shard provides the + // wrong block range. match response.status { mc_fog_types::view::MultiViewStoreQueryResponseStatus::Unknown => { log::error!( @@ -54,7 +55,7 @@ pub fn process_shard_responses( "Received a response with status 'unknown' from store{}", FogViewStoreUri::from_str(&response.store_uri)? ); - shard_clients_for_retry.push(shard_client); + shards_for_retry.push(shard); } mc_fog_types::view::MultiViewStoreQueryResponseStatus::Success => { new_query_responses.push(response.clone()); @@ -64,7 +65,7 @@ pub fn process_shard_responses( // we need to (a) retry the query (b) authenticate with the Fog View // Store. mc_fog_types::view::MultiViewStoreQueryResponseStatus::AuthenticationError => { - shard_clients_for_retry.push(shard_client); + shards_for_retry.push(shard); view_store_uris_for_authentication .push(FogViewStoreUri::from_str(&response.store_uri)?); } @@ -75,7 +76,7 @@ pub fn process_shard_responses( } Ok(ProcessedShardResponseData::new( - shard_clients_for_retry, + shards_for_retry, view_store_uris_for_authentication, new_query_responses, )) @@ -84,13 +85,20 @@ pub fn process_shard_responses( #[cfg(test)] mod tests { use super::*; + use crate::sharding_strategy::{EpochShardingStrategy, ShardingStrategy}; use grpcio::ChannelBuilder; use mc_common::logger::{test_with_logger, Logger}; + use mc_fog_api::view_grpc::FogViewStoreApiClient; + use mc_fog_types::common::BlockRange; use mc_fog_uri::FogViewStoreScheme; use mc_util_grpc::ConnectionUriGrpcioChannel; use mc_util_uri::UriScheme; + use std::sync::Arc; - fn create_successful_mvq_response(client_index: usize) -> MultiViewStoreQueryResponse { + fn create_successful_mvq_response( + shard_index: usize, + block_range: BlockRange, + ) -> MultiViewStoreQueryResponse { let mut successful_response = mc_fog_api::view::MultiViewStoreQueryResponse::new(); let client_auth_request = Vec::new(); successful_response @@ -99,10 +107,11 @@ mod tests { let view_uri_string = format!( "{}://node{}.test.mobilecoin.com:{}", FogViewStoreScheme::SCHEME_INSECURE, - client_index, + shard_index, FogViewStoreScheme::DEFAULT_INSECURE_PORT, ); successful_response.set_store_uri(view_uri_string); + successful_response.set_block_range(mc_fog_api::fog_common::BlockRange::from(&block_range)); successful_response .set_status(mc_fog_api::view::MultiViewStoreQueryResponseStatus::SUCCESS); @@ -112,17 +121,19 @@ mod tests { } fn create_failed_mvq_response( - client_index: usize, + shard_index: usize, + block_range: BlockRange, status: mc_fog_api::view::MultiViewStoreQueryResponseStatus, ) -> MultiViewStoreQueryResponse { let mut failed_response = mc_fog_api::view::MultiViewStoreQueryResponse::new(); let view_uri_string = format!( "{}://node{}.test.mobilecoin.com:{}", FogViewStoreScheme::SCHEME_INSECURE, - client_index, + shard_index, FogViewStoreScheme::DEFAULT_INSECURE_PORT, ); failed_response.set_store_uri(view_uri_string); + failed_response.set_block_range(mc_fog_api::fog_common::BlockRange::from(&block_range)); failed_response.set_status(status); failed_response @@ -130,14 +141,14 @@ mod tests { .expect("Couldn't convert MultiViewStoreQueryResponse proto to internal struct") } - fn create_grpc_client(i: usize, logger: Logger) -> Arc { + fn create_shard(i: usize, block_range: BlockRange, logger: Logger) -> Shard { let view_uri_string = format!( "{}://node{}.test.mobilecoin.com:{}", FogViewStoreScheme::SCHEME_INSECURE, i, FogViewStoreScheme::DEFAULT_INSECURE_PORT, ); - let view_uri = FogViewStoreUri::from_str(&view_uri_string).unwrap(); + let uri = FogViewStoreUri::from_str(&view_uri_string).unwrap(); let grpc_env = Arc::new( grpcio::EnvBuilder::new() .name_prefix("processor-test".to_string()) @@ -145,35 +156,39 @@ mod tests { ); let grpc_client = FogViewStoreApiClient::new( - ChannelBuilder::default_channel_builder(grpc_env).connect_to_uri(&view_uri, &logger), + ChannelBuilder::default_channel_builder(grpc_env).connect_to_uri(&uri, &logger), ); - Arc::new(grpc_client) + Shard::new(uri, Arc::new(grpc_client), block_range) } #[test_with_logger] - fn one_successful_response_no_shard_clients(logger: Logger) { - let client_index = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); - let successful_mvq_response = create_successful_mvq_response(client_index); - let clients_and_responses = vec![(grpc_client, successful_mvq_response)]; + fn one_successful_response_no_shards(logger: Logger) { + let shard_index = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); + let successful_mvq_response = create_successful_mvq_response(shard_index, block_range); + let shards_and_responses = vec![(shard, successful_mvq_response)]; - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); - let shard_clients_for_retry = result.unwrap().shard_clients_for_retry; - assert!(shard_clients_for_retry.is_empty()); + let shards_for_retry = result.unwrap().shards_for_retry; + assert!(shards_for_retry.is_empty()); } #[test_with_logger] fn one_successful_response_no_pending_authentications(logger: Logger) { - let client_index = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); - let successful_mvq_response = create_successful_mvq_response(client_index); - let clients_and_responses = vec![(grpc_client, successful_mvq_response)]; + let shard_index = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); + let successful_mvq_response = create_successful_mvq_response(shard_index, block_range); + let shards_and_responses = vec![(shard, successful_mvq_response)]; - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); @@ -183,12 +198,14 @@ mod tests { #[test_with_logger] fn one_successful_response_one_new_query_response(logger: Logger) { - let client_index = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); - let successful_mvq_response = create_successful_mvq_response(client_index); - let clients_and_responses = vec![(grpc_client, successful_mvq_response)]; + let shard_index = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); + let successful_mvq_response = create_successful_mvq_response(shard_index, block_range); + let shards_and_responses = vec![(shard, successful_mvq_response)]; - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); @@ -197,34 +214,40 @@ mod tests { } #[test_with_logger] - fn one_auth_error_response_one_pending_shard_client(logger: Logger) { - let client_index = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); + fn one_auth_error_response_one_pending_shard(logger: Logger) { + let shard_index = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); let failed_mvq_response = create_failed_mvq_response( - client_index, + shard_index, + block_range, mc_fog_api::view::MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR, ); - let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + let shards_and_responses = vec![(shard, failed_mvq_response)]; - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); - let shard_clients_for_retry = result.unwrap().shard_clients_for_retry; - assert_eq!(shard_clients_for_retry.len(), 1); + let shards_for_retry = result.unwrap().shards_for_retry; + assert_eq!(shards_for_retry.len(), 1); } #[test_with_logger] fn one_auth_error_response_one_pending_authentications(logger: Logger) { - let client_index: usize = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); + let shard_index: usize = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); let failed_mvq_response = create_failed_mvq_response( - client_index, + shard_index, + block_range, mc_fog_api::view::MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR, ); - let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + let shards_and_responses = vec![(shard, failed_mvq_response)]; - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); @@ -234,15 +257,18 @@ mod tests { #[test_with_logger] fn one_auth_error_response_zero_new_query_responses(logger: Logger) { - let client_index: usize = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); + let shard_index: usize = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); let failed_mvq_response = create_failed_mvq_response( - client_index, + shard_index, + block_range, mc_fog_api::view::MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR, ); - let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + let shards_and_responses = vec![(shard, failed_mvq_response)]; - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); @@ -252,15 +278,18 @@ mod tests { #[test_with_logger] fn one_not_ready_response_zero_new_query_responses(logger: Logger) { - let client_index: usize = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); + let shard_index: usize = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); let failed_mvq_response = create_failed_mvq_response( - client_index, + shard_index, + block_range, mc_fog_api::view::MultiViewStoreQueryResponseStatus::NOT_READY, ); - let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + let shards_and_responses = vec![(shard, failed_mvq_response)]; - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); @@ -270,15 +299,18 @@ mod tests { #[test_with_logger] fn one_not_ready_response_zero_pending_authentications(logger: Logger) { - let client_index: usize = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); + let shard_index: usize = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); let failed_mvq_response = create_failed_mvq_response( - client_index, + shard_index, + block_range, mc_fog_api::view::MultiViewStoreQueryResponseStatus::NOT_READY, ); - let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; + let shards_and_responses = vec![(shard, failed_mvq_response)]; - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); @@ -288,19 +320,21 @@ mod tests { #[test_with_logger] fn one_not_ready_response_zero_pending_shard_clients(logger: Logger) { - let client_index: usize = 0; - let grpc_client = create_grpc_client(client_index, logger.clone()); + let shard_index: usize = 0; + let sharding_strategy = EpochShardingStrategy::default(); + let block_range = sharding_strategy.get_block_range(); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); let failed_mvq_response = create_failed_mvq_response( - client_index, + shard_index, + block_range, mc_fog_api::view::MultiViewStoreQueryResponseStatus::NOT_READY, ); - let clients_and_responses = vec![(grpc_client, failed_mvq_response)]; - - let result = process_shard_responses(clients_and_responses, logger.clone()); + let shards_and_responses = vec![(shard, failed_mvq_response)]; + let result = process_shard_responses(shards_and_responses, logger.clone()); assert!(result.is_ok()); - let shard_clients_for_retry = result.unwrap().shard_clients_for_retry; + let shard_clients_for_retry = result.unwrap().shards_for_retry; assert_eq!(shard_clients_for_retry.len(), 0); } @@ -309,28 +343,31 @@ mod tests { const NUMBER_OF_FAILURES: usize = 11; const NUMBER_OF_SUCCESSES: usize = 8; - let mut clients_and_responses = Vec::new(); + let mut shards_and_clients = Vec::new(); for i in 0..NUMBER_OF_FAILURES { - let grpc_client = create_grpc_client(i, logger.clone()); + let block_range = BlockRange::new_from_length(i as u64, 1); + let shard = create_shard(i, block_range.clone(), logger.clone()); let failed_mvq_response = create_failed_mvq_response( i, + block_range, mc_fog_api::view::MultiViewStoreQueryResponseStatus::AUTHENTICATION_ERROR, ); - clients_and_responses.push((grpc_client, failed_mvq_response)); + shards_and_clients.push((shard, failed_mvq_response)); } for i in 0..NUMBER_OF_SUCCESSES { - let client_index = i + NUMBER_OF_FAILURES; - let grpc_client = create_grpc_client(client_index, logger.clone()); - let successful_mvq_response = create_successful_mvq_response(client_index); - clients_and_responses.push((grpc_client, successful_mvq_response)); + let shard_index = i + NUMBER_OF_FAILURES; + let block_range = BlockRange::new_from_length(shard_index as u64, 1); + let shard = create_shard(shard_index, block_range.clone(), logger.clone()); + let successful_mvq_response = create_successful_mvq_response(shard_index, block_range); + shards_and_clients.push((shard, successful_mvq_response)); } - let result = process_shard_responses(clients_and_responses, logger.clone()); + let result = process_shard_responses(shards_and_clients, logger.clone()); assert!(result.is_ok()); let processed_shard_response_data = result.unwrap(); assert_eq!( - processed_shard_response_data.shard_clients_for_retry.len(), + processed_shard_response_data.shards_for_retry.len(), NUMBER_OF_FAILURES ); assert_eq!( diff --git a/fog/view/server/test-utils/src/lib.rs b/fog/view/server/test-utils/src/lib.rs index e1d695bcfd..7398f3b549 100644 --- a/fog/view/server/test-utils/src/lib.rs +++ b/fog/view/server/test-utils/src/lib.rs @@ -28,7 +28,7 @@ use mc_fog_view_server::{ FogViewRouterConfig, MobileAcctViewConfig as ViewConfig, RouterClientListenUri, ShardingStrategy, ShardingStrategy::Epoch, }, - fog_view_router_server::FogViewRouterServer, + fog_view_router_server::{FogViewRouterServer, Shard}, server::ViewServer, sharding_strategy::EpochShardingStrategy, }; @@ -36,7 +36,6 @@ use mc_transaction_core::BlockVersion; use mc_util_grpc::{ConnectionUriGrpcioChannel, GrpcRetryConfig}; use mc_util_uri::{AdminUri, ConnectionUri}; use std::{ - collections::HashMap, str::FromStr, sync::{Arc, RwLock}, thread::sleep, @@ -151,7 +150,7 @@ impl RouterTestEnvironment { fn create_router_server( config: FogViewRouterConfig, - store_clients: Arc>>>, + shards: Arc>>, logger: &Logger, ) -> FogViewRouterServer { let enclave = SgxViewEnclave::new( @@ -166,7 +165,7 @@ impl RouterTestEnvironment { config, enclave, ra_client, - store_clients, + shards, SystemTimeProvider::default(), logger.clone(), ); @@ -217,14 +216,14 @@ impl RouterTestEnvironment { ) -> ( SqlRecoveryDbTestContext, Vec, - Arc>>>, + Arc>>, Vec, Vec, ) { let db_test_context = SqlRecoveryDbTestContext::new(logger.clone()); let db = db_test_context.get_db_instance(); let mut store_servers = Vec::new(); - let mut store_clients = HashMap::new(); + let mut shards = Vec::new(); let mut shard_uris: Vec = Vec::new(); let mut sharding_strategies: Vec = Vec::new(); @@ -237,7 +236,7 @@ impl RouterTestEnvironment { )) .unwrap(); - let epoch_sharding_strategy = EpochShardingStrategy::new(store_block_range); + let epoch_sharding_strategy = EpochShardingStrategy::new(store_block_range.clone()); let sharding_strategy = Epoch(epoch_sharding_strategy); shard_uris.push(uri.clone()); sharding_strategies.push(sharding_strategy.clone()); @@ -291,10 +290,11 @@ impl RouterTestEnvironment { ChannelBuilder::default_channel_builder(grpc_env) .connect_to_uri(&store_uri, &logger), ); - store_clients.insert(store_uri, Arc::new(store_client)); + let shard = Shard::new(store_uri, Arc::new(store_client), store_block_range); + shards.push(shard); } - let store_clients = Arc::new(RwLock::new(store_clients)); + let store_clients = Arc::new(RwLock::new(shards)); ( db_test_context, From ee07193dfb78c8702c7a8d7c1603e3e73679a39e Mon Sep 17 00:00:00 2001 From: Sam Date: Thu, 5 Jan 2023 14:10:54 -0500 Subject: [PATCH 03/11] Throw error if block ranges do not match --- .../server/src/shard_responses_processor.rs | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/fog/view/server/src/shard_responses_processor.rs b/fog/view/server/src/shard_responses_processor.rs index cf713d6821..e0c16dfc55 100644 --- a/fog/view/server/src/shard_responses_processor.rs +++ b/fog/view/server/src/shard_responses_processor.rs @@ -46,8 +46,9 @@ pub fn process_shard_responses( let mut new_query_responses = Vec::new(); for (shard, response) in shards_and_responses { - // TODO: Add check here and throw appropriate error if the shard provides the - // wrong block range. + if response.block_range != shard.block_range { + return Err(RouterServerError::ViewStoreError(format!("The shard response's block range {} does not match the shard's configured block range {}.", response.block_range, shard.block_range))); + } match response.status { mc_fog_types::view::MultiViewStoreQueryResponseStatus::Unknown => { log::error!( @@ -383,4 +384,19 @@ mod tests { NUMBER_OF_SUCCESSES ); } + + #[test_with_logger] + fn shard_block_range_does_not_match_configured_block_range(logger: Logger) { + let shard_index: usize = 0; + let configured_block_range = BlockRange::new(0, 10); + let shard = create_shard(shard_index, configured_block_range, logger.clone()); + + let response_block_range = BlockRange::new(100, 110); + let response = create_successful_mvq_response(shard_index, response_block_range); + let shards_and_responses = vec![(shard, response)]; + + let result = process_shard_responses(shards_and_responses, logger.clone()); + + assert!(result.is_err()); + } } From 1bfbd9a402f7494cd604507a3fada39448f61e8a Mon Sep 17 00:00:00 2001 From: Sam Date: Fri, 6 Jan 2023 14:26:10 -0500 Subject: [PATCH 04/11] Fix python tests --- tools/fog-local-network/fog_conformance_tests.py | 2 ++ tools/fog-local-network/fog_local_network.py | 2 ++ tools/fog-local-network/local_fog.py | 10 ++++++++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tools/fog-local-network/fog_conformance_tests.py b/tools/fog-local-network/fog_conformance_tests.py index fe951fe0b2..b9f1905fc4 100755 --- a/tools/fog-local-network/fog_conformance_tests.py +++ b/tools/fog-local-network/fog_conformance_tests.py @@ -484,6 +484,7 @@ def run(self, args): admin_port = BASE_VIEW_STORE_ADMIN_PORT, admin_http_gateway_port = BASE_VIEW_STORE_ADMIN_HTTP_GATEWAY_PORT, release = self.release, + sharding_strategy= 'default' ) self.fog_view_store.start() @@ -495,6 +496,7 @@ def run(self, args): admin_http_gateway_port = BASE_VIEW_ADMIN_HTTP_GATEWAY_PORT, release = self.release, shard_uris = [self.fog_view_store.get_client_listen_uri()], + sharding_strategies= [self.fog_view_store.get_sharding_strategy()] ) self.fog_view_router.start() diff --git a/tools/fog-local-network/fog_local_network.py b/tools/fog-local-network/fog_local_network.py index 63468c8d36..12ddb20852 100644 --- a/tools/fog-local-network/fog_local_network.py +++ b/tools/fog-local-network/fog_local_network.py @@ -97,6 +97,7 @@ def start(self): admin_port = BASE_VIEW_STORE_ADMIN_PORT, admin_http_gateway_port = BASE_VIEW_STORE_ADMIN_HTTP_GATEWAY_PORT, release = True, + sharding_strategy= 'default' ) self.fog_view_store.start() @@ -108,6 +109,7 @@ def start(self): admin_http_gateway_port = BASE_VIEW_ADMIN_HTTP_GATEWAY_PORT, release = True, shard_uris = [self.fog_view_store.get_client_listen_uri()], + sharding_strategies= [self.fog_view_store.get_sharding_strategy()], ) self.fog_view_router.start() diff --git a/tools/fog-local-network/local_fog.py b/tools/fog-local-network/local_fog.py index e200695113..ec49e52469 100644 --- a/tools/fog-local-network/local_fog.py +++ b/tools/fog-local-network/local_fog.py @@ -177,7 +177,7 @@ def report_lost_ingress_key(self, lost_key): class FogViewRouter: - def __init__(self, name, client_responder_id, client_port, admin_port, admin_http_gateway_port, shard_uris, release): + def __init__(self, name, client_responder_id, client_port, admin_port, admin_http_gateway_port, shard_uris, sharding_strategies, release): self.name = name self.client_responder_id = client_responder_id @@ -189,6 +189,7 @@ def __init__(self, name, client_responder_id, client_port, admin_port, admin_htt self.admin_http_gateway_port = admin_http_gateway_port self.shard_uris = shard_uris + self.sharding_strategies = sharding_strategies self.release = release self.target_dir = target_dir(self.release) @@ -210,6 +211,7 @@ def start(self): f'--client-responder-id={self.client_responder_id}', f'--ias-api-key={IAS_API_KEY}', f'--shard-uris={",".join(self.shard_uris)}', + f'--sharding-strategies={",".join(self.sharding_strategies)}', f'--ias-spid={IAS_SPID}', f'--admin-listen-uri=insecure-mca://{LISTEN_HOST}:{self.admin_port}/', ]) @@ -228,12 +230,13 @@ def stop(self): self.admin_http_gateway_process = None class FogViewStore: - def __init__(self, name, client_port, admin_port, admin_http_gateway_port, release): + def __init__(self, name, client_port, admin_port, admin_http_gateway_port, release, sharding_strategy): self.name = name self.client_port = client_port self.client_responder_id = f'{LISTEN_HOST}:{self.client_port}' self.client_listen_url = f'insecure-fog-view-store://{LISTEN_HOST}:{self.client_port}/' + self.sharding_strategy = sharding_strategy self.admin_port = admin_port self.admin_http_gateway_port = admin_http_gateway_port @@ -249,6 +252,8 @@ def __repr__(self): def get_client_listen_uri(self): return self.client_listen_url + def get_sharding_strategy(self): + return self.sharding_strategy def start(self): self.stop() @@ -259,6 +264,7 @@ def start(self): f'exec {self.target_dir}/fog_view_server', f'--client-listen-uri={self.client_listen_url}', f'--client-responder-id={self.client_responder_id}', + f'--sharding-strategy={self.sharding_strategy}', f'--ias-api-key={IAS_API_KEY}', f'--ias-spid={IAS_SPID}', f'--admin-listen-uri=insecure-mca://{LISTEN_HOST}:{self.admin_port}/', From 4efcae6fd9d4820d7a28c8302a4c05ba50a89852 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 11 Jan 2023 10:37:49 -0800 Subject: [PATCH 05/11] Implement James's and Andrew's suggestions --- fog/types/src/common.rs | 6 ++++- fog/view/server/src/bin/router.rs | 20 ++++++---------- fog/view/server/src/config.rs | 5 ---- fog/view/server/src/router_admin_service.rs | 11 ++------- fog/view/server/src/sharding_strategy.rs | 26 ++++++++++++++++++++- fog/view/server/test-utils/src/lib.rs | 25 ++++++-------------- 6 files changed, 46 insertions(+), 47 deletions(-) diff --git a/fog/types/src/common.rs b/fog/types/src/common.rs index ab412f40fa..dbba778e68 100644 --- a/fog/types/src/common.rs +++ b/fog/types/src/common.rs @@ -5,6 +5,10 @@ use core::str::FromStr; use prost::Message; use serde::{Deserialize, Serialize}; +/// The string that delimits the start and end blocks in a string that +/// represents a BlockRange. +pub const BLOCK_RANGE_DELIMITER: &str = "-"; + /// A half-open [a, b) range of blocks #[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Message, Serialize, Deserialize)] pub struct BlockRange { @@ -70,7 +74,7 @@ impl FromStr for BlockRange { fn from_str(s: &str) -> Result { let block_indices: Vec = s - .split(',') + .split(BLOCK_RANGE_DELIMITER) .map(|index_str| index_str.trim().parse()) .collect::, _>>() .map_err(|_| "BlockRange index is not a number.")?; diff --git a/fog/view/server/src/bin/router.rs b/fog/view/server/src/bin/router.rs index 494c68248f..fcfe543018 100644 --- a/fog/view/server/src/bin/router.rs +++ b/fog/view/server/src/bin/router.rs @@ -8,9 +8,9 @@ use mc_common::{logger::log, time::SystemTimeProvider}; use mc_fog_api::view_grpc::FogViewStoreApiClient; use mc_fog_view_enclave::{SgxViewEnclave, ENCLAVE_FILE}; use mc_fog_view_server::{ - config::{FogViewRouterConfig, ShardingStrategy::Epoch}, + config::FogViewRouterConfig, fog_view_router_server::{FogViewRouterServer, Shard}, - sharding_strategy::ShardingStrategy, + sharding_strategy::{EpochShardingStrategy, ShardingStrategy}, }; use mc_util_cli::ParserWithBuildInfo; use mc_util_grpc::ConnectionUriGrpcioChannel; @@ -47,23 +47,17 @@ fn main() { .name_prefix("Main-RPC".to_string()) .build(), ); - for (i, shard_uri) in config.shard_uris.clone().into_iter().enumerate() { + for shard_uri in config.shard_uris.clone() { let fog_view_store_grpc_client = FogViewStoreApiClient::new( ChannelBuilder::default_channel_builder(grpc_env.clone()) .connect_to_uri(&shard_uri, &logger), ); - let sharding_strategy = config - .sharding_strategies - .get(i) - .unwrap_or_else(|| panic!("Couldn't find shard at index {}", i)); - let Epoch(epoch_sharding_strategy) = sharding_strategy; + // TODO: update this logic once we introduce other types of sharding strategies. + let epoch_sharding_strategy = EpochShardingStrategy::try_from(shard_uri.clone()) + .expect("Could not get sharding strategy"); let block_range = epoch_sharding_strategy.get_block_range(); - let shard = Shard::new( - shard_uri.clone(), - Arc::new(fog_view_store_grpc_client), - block_range, - ); + let shard = Shard::new(shard_uri, Arc::new(fog_view_store_grpc_client), block_range); shards.push(shard); } let shards = Arc::new(RwLock::new(shards)); diff --git a/fog/view/server/src/config.rs b/fog/view/server/src/config.rs index 863aa525a4..113bf3bbdd 100644 --- a/fog/view/server/src/config.rs +++ b/fog/view/server/src/config.rs @@ -121,11 +121,6 @@ pub struct FogViewRouterConfig { #[clap(long, env = "MC_CLIENT_LISTEN_URI")] pub client_listen_uri: RouterClientListenUri, - /// Sharding Strategies for each Shard. Should be indexed the same as the - /// `shard_uris` field. - #[clap(long, env = "MC_VIEW_SHARDING_STRATEGIES")] - pub sharding_strategies: Vec, - /// gRPC listening URI for Fog View Stores. Should be indexed the same as /// the `sharding_stratgies` field. #[clap(long, env = "MC_VIEW_SHARD_URIS")] diff --git a/fog/view/server/src/router_admin_service.rs b/fog/view/server/src/router_admin_service.rs index 5c8acb8a7a..c34c6ab28b 100644 --- a/fog/view/server/src/router_admin_service.rs +++ b/fog/view/server/src/router_admin_service.rs @@ -6,10 +6,7 @@ use crate::{ SVC_COUNTERS, }; use grpcio::{ChannelBuilder, RpcContext, RpcStatus, UnarySink}; -use mc_common::{ - logger::{log, Logger}, - HashSet, -}; +use mc_common::logger::{log, Logger}; use mc_fog_api::{ view::AddShardRequest, view_grpc::{FogViewRouterAdminApi, FogViewStoreApiClient}, @@ -46,9 +43,7 @@ impl FogViewRouterAdminService { let mut shards = self.shards.write().expect("RwLock Poisoned"); if shards .iter() - .map(|shard| shard.uri.clone()) - .collect::>() - .contains(&view_store_uri) + .any(|shard| shard.uri.clone() == view_store_uri) { let error = rpc_precondition_error( "add_shard", @@ -66,8 +61,6 @@ impl FogViewRouterAdminService { ChannelBuilder::default_channel_builder(grpc_env) .connect_to_uri(&view_store_uri, logger), ); - // TODO: Add block range or sharding strategy to this... - // Check to make sure this block range isn't already covered... let block_range = EpochShardingStrategy::default().get_block_range(); let shard = Shard::new(view_store_uri, Arc::new(view_store_client), block_range); shards.push(shard); diff --git a/fog/view/server/src/sharding_strategy.rs b/fog/view/server/src/sharding_strategy.rs index a0866a3a17..92acbf48cc 100644 --- a/fog/view/server/src/sharding_strategy.rs +++ b/fog/view/server/src/sharding_strategy.rs @@ -6,7 +6,12 @@ //! TxOuts across Fog View Store instances. use mc_blockchain_types::BlockIndex; -use mc_fog_types::{common::BlockRange, BlockCount}; +use mc_fog_types::{ + common::{BlockRange, BLOCK_RANGE_DELIMITER}, + BlockCount, +}; +use mc_fog_uri::FogViewStoreUri; +use mc_util_uri::ConnectionUri; use serde::Serialize; use std::str::FromStr; @@ -39,6 +44,17 @@ pub struct EpochShardingStrategy { epoch_block_range: BlockRange, } +impl TryFrom for EpochShardingStrategy { + type Error = String; + + fn try_from(src: FogViewStoreUri) -> Result { + let sharding_strategy_string = src + .get_param("sharding_strategy") + .unwrap_or_else(|| "default".to_string()); + EpochShardingStrategy::from_str(&sharding_strategy_string) + } +} + impl ShardingStrategy for EpochShardingStrategy { fn should_process_block(&self, block_index: BlockIndex) -> bool { self.epoch_block_range.contains(block_index) @@ -61,6 +77,14 @@ impl Default for EpochShardingStrategy { } } +impl ToString for EpochShardingStrategy { + fn to_string(&self) -> String { + let start_block = self.epoch_block_range.start_block; + let end_block = self.epoch_block_range.end_block; + format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}") + } +} + impl EpochShardingStrategy { #[allow(dead_code)] pub fn new(epoch_block_range: BlockRange) -> Self { diff --git a/fog/view/server/test-utils/src/lib.rs b/fog/view/server/test-utils/src/lib.rs index 7398f3b549..b51d1752ca 100644 --- a/fog/view/server/test-utils/src/lib.rs +++ b/fog/view/server/test-utils/src/lib.rs @@ -26,7 +26,7 @@ use mc_fog_view_protocol::FogViewConnection; use mc_fog_view_server::{ config::{ FogViewRouterConfig, MobileAcctViewConfig as ViewConfig, RouterClientListenUri, - ShardingStrategy, ShardingStrategy::Epoch, + ShardingStrategy::Epoch, }, fog_view_router_server::{FogViewRouterServer, Shard}, server::ViewServer, @@ -71,7 +71,7 @@ pub struct RouterTestEnvironment { impl RouterTestEnvironment { /// Creates a `RouterTestEnvironment` for the router integration tests. pub fn new(omap_capacity: u64, store_block_ranges: Vec, logger: Logger) -> Self { - let (db_test_context, store_servers, store_clients, shard_uris, sharding_strategies) = + let (db_test_context, store_servers, store_clients, shard_uris) = Self::create_view_stores(omap_capacity, store_block_ranges, logger.clone()); let port = portpicker::pick_unused_port().expect("pick_unused_port"); let router_uri = @@ -86,7 +86,6 @@ impl RouterTestEnvironment { .responder_id() .expect("Could not get responder id for Fog View Router."), ias_api_key: Default::default(), - sharding_strategies, shard_uris, ias_spid: Default::default(), client_listen_uri: RouterClientListenUri::Streaming(router_uri.clone()), @@ -112,7 +111,7 @@ impl RouterTestEnvironment { store_block_ranges: Vec, logger: Logger, ) -> Self { - let (db_test_context, store_servers, store_clients, shard_uris, sharding_strategies) = + let (db_test_context, store_servers, store_clients, shard_uris) = Self::create_view_stores(omap_capacity, store_block_ranges, logger.clone()); let port = portpicker::pick_unused_port().expect("pick_unused_port"); let router_uri = @@ -128,7 +127,6 @@ impl RouterTestEnvironment { .expect("Could not get responder id for Fog View Router."), ias_api_key: Default::default(), ias_spid: Default::default(), - sharding_strategies, shard_uris, client_listen_uri: RouterClientListenUri::Unary(router_uri.clone()), client_auth_token_max_lifetime: Default::default(), @@ -218,28 +216,25 @@ impl RouterTestEnvironment { Vec, Arc>>, Vec, - Vec, ) { let db_test_context = SqlRecoveryDbTestContext::new(logger.clone()); let db = db_test_context.get_db_instance(); let mut store_servers = Vec::new(); let mut shards = Vec::new(); let mut shard_uris: Vec = Vec::new(); - let mut sharding_strategies: Vec = Vec::new(); for (i, store_block_range) in store_block_ranges.into_iter().enumerate() { let (store, store_uri) = { let port = portpicker::pick_unused_port().expect("pick_unused_port"); + let epoch_sharding_strategy = EpochShardingStrategy::new(store_block_range.clone()); let uri = FogViewStoreUri::from_str(&format!( - "insecure-fog-view-store://127.0.0.1:{}", - port + "insecure-fog-view-store://127.0.0.1:{port}?sharding_strategy={}", + epoch_sharding_strategy.to_string() )) .unwrap(); - let epoch_sharding_strategy = EpochShardingStrategy::new(store_block_range.clone()); let sharding_strategy = Epoch(epoch_sharding_strategy); shard_uris.push(uri.clone()); - sharding_strategies.push(sharding_strategy.clone()); let config = ViewConfig { chain_id: "local".to_string(), @@ -296,13 +291,7 @@ impl RouterTestEnvironment { let store_clients = Arc::new(RwLock::new(shards)); - ( - db_test_context, - store_servers, - store_clients, - shard_uris, - sharding_strategies, - ) + (db_test_context, store_servers, store_clients, shard_uris) } } From 9d09b4fc587e4cec5ea55c83ff09ab7aa76df011 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 11 Jan 2023 16:04:20 -0800 Subject: [PATCH 06/11] Fix tests --- fog/types/src/common.rs | 8 ++++---- tools/fog-local-network/fog_local_network.py | 1 - tools/fog-local-network/local_fog.py | 6 ++---- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/fog/types/src/common.rs b/fog/types/src/common.rs index dbba778e68..91d886e727 100644 --- a/fog/types/src/common.rs +++ b/fog/types/src/common.rs @@ -138,7 +138,7 @@ mod tests { fn from_string_well_formatted_creates_block_range() { let start_block = 0; let end_block = 10; - let block_range_str = format!("{},{}", start_block, end_block); + let block_range_str = format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}"); let result = BlockRange::from_str(&block_range_str); @@ -152,7 +152,7 @@ mod tests { fn from_string_well_formatted_with_whitespace_creates_block_range() { let start_block = 0; let end_block = 10; - let block_range_str = format!(" {} , {} ", start_block, end_block); + let block_range_str = format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}"); let result = BlockRange::from_str(&block_range_str); @@ -167,7 +167,7 @@ mod tests { let start_block = 0; let end_block = 10; let third_block = 10; - let block_range_str = format!("{},{},{}", start_block, end_block, third_block); + let block_range_str = format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}{BLOCK_RANGE_DELIMITER}{third_block}"); let result = BlockRange::from_str(&block_range_str); @@ -178,7 +178,7 @@ mod tests { fn from_string_non_numbers_errors() { let start_block = 'a'; let end_block = 'b'; - let block_range_str = format!("{},{}", start_block, end_block); + let block_range_str = format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}"); let result = BlockRange::from_str(&block_range_str); diff --git a/tools/fog-local-network/fog_local_network.py b/tools/fog-local-network/fog_local_network.py index 12ddb20852..b698351790 100644 --- a/tools/fog-local-network/fog_local_network.py +++ b/tools/fog-local-network/fog_local_network.py @@ -109,7 +109,6 @@ def start(self): admin_http_gateway_port = BASE_VIEW_ADMIN_HTTP_GATEWAY_PORT, release = True, shard_uris = [self.fog_view_store.get_client_listen_uri()], - sharding_strategies= [self.fog_view_store.get_sharding_strategy()], ) self.fog_view_router.start() diff --git a/tools/fog-local-network/local_fog.py b/tools/fog-local-network/local_fog.py index ec49e52469..276ad60703 100644 --- a/tools/fog-local-network/local_fog.py +++ b/tools/fog-local-network/local_fog.py @@ -177,7 +177,7 @@ def report_lost_ingress_key(self, lost_key): class FogViewRouter: - def __init__(self, name, client_responder_id, client_port, admin_port, admin_http_gateway_port, shard_uris, sharding_strategies, release): + def __init__(self, name, client_responder_id, client_port, admin_port, admin_http_gateway_port, shard_uris, release): self.name = name self.client_responder_id = client_responder_id @@ -189,7 +189,6 @@ def __init__(self, name, client_responder_id, client_port, admin_port, admin_htt self.admin_http_gateway_port = admin_http_gateway_port self.shard_uris = shard_uris - self.sharding_strategies = sharding_strategies self.release = release self.target_dir = target_dir(self.release) @@ -211,7 +210,6 @@ def start(self): f'--client-responder-id={self.client_responder_id}', f'--ias-api-key={IAS_API_KEY}', f'--shard-uris={",".join(self.shard_uris)}', - f'--sharding-strategies={",".join(self.sharding_strategies)}', f'--ias-spid={IAS_SPID}', f'--admin-listen-uri=insecure-mca://{LISTEN_HOST}:{self.admin_port}/', ]) @@ -235,7 +233,7 @@ def __init__(self, name, client_port, admin_port, admin_http_gateway_port, relea self.client_port = client_port self.client_responder_id = f'{LISTEN_HOST}:{self.client_port}' - self.client_listen_url = f'insecure-fog-view-store://{LISTEN_HOST}:{self.client_port}/' + self.client_listen_url = f'insecure-fog-view-store://{LISTEN_HOST}:{self.client_port}/?sharding_strategy={self.sharding_strategy}' self.sharding_strategy = sharding_strategy self.admin_port = admin_port From 45593f6410b6c19bfcca3f48321a0e25a2a71525 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 11 Jan 2023 18:06:37 -0800 Subject: [PATCH 07/11] Run cargo fmt --- fog/types/src/common.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fog/types/src/common.rs b/fog/types/src/common.rs index 91d886e727..a7fe504f41 100644 --- a/fog/types/src/common.rs +++ b/fog/types/src/common.rs @@ -167,7 +167,9 @@ mod tests { let start_block = 0; let end_block = 10; let third_block = 10; - let block_range_str = format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}{BLOCK_RANGE_DELIMITER}{third_block}"); + let block_range_str = format!( + "{start_block}{BLOCK_RANGE_DELIMITER}{end_block}{BLOCK_RANGE_DELIMITER}{third_block}" + ); let result = BlockRange::from_str(&block_range_str); From a818bb362117352108adf7b8d079cf24cbe767a4 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 11 Jan 2023 18:09:38 -0800 Subject: [PATCH 08/11] Fix python tests --- tools/fog-local-network/fog_conformance_tests.py | 1 - tools/fog-local-network/local_fog.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/tools/fog-local-network/fog_conformance_tests.py b/tools/fog-local-network/fog_conformance_tests.py index b9f1905fc4..93d860e88b 100755 --- a/tools/fog-local-network/fog_conformance_tests.py +++ b/tools/fog-local-network/fog_conformance_tests.py @@ -496,7 +496,6 @@ def run(self, args): admin_http_gateway_port = BASE_VIEW_ADMIN_HTTP_GATEWAY_PORT, release = self.release, shard_uris = [self.fog_view_store.get_client_listen_uri()], - sharding_strategies= [self.fog_view_store.get_sharding_strategy()] ) self.fog_view_router.start() diff --git a/tools/fog-local-network/local_fog.py b/tools/fog-local-network/local_fog.py index 276ad60703..2b559ca2ed 100644 --- a/tools/fog-local-network/local_fog.py +++ b/tools/fog-local-network/local_fog.py @@ -233,6 +233,7 @@ def __init__(self, name, client_port, admin_port, admin_http_gateway_port, relea self.client_port = client_port self.client_responder_id = f'{LISTEN_HOST}:{self.client_port}' + self.sharding_strategy = sharding_strategy self.client_listen_url = f'insecure-fog-view-store://{LISTEN_HOST}:{self.client_port}/?sharding_strategy={self.sharding_strategy}' self.sharding_strategy = sharding_strategy @@ -250,8 +251,6 @@ def __repr__(self): def get_client_listen_uri(self): return self.client_listen_url - def get_sharding_strategy(self): - return self.sharding_strategy def start(self): self.stop() From 5b12aac9ce7f97bbe11f6d182d381ab31b9f784f Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Wed, 11 Jan 2023 18:58:55 -0800 Subject: [PATCH 09/11] Fix default parse --- fog/view/server/src/sharding_strategy.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fog/view/server/src/sharding_strategy.rs b/fog/view/server/src/sharding_strategy.rs index 92acbf48cc..4bb5e43c55 100644 --- a/fog/view/server/src/sharding_strategy.rs +++ b/fog/view/server/src/sharding_strategy.rs @@ -112,6 +112,9 @@ impl FromStr for EpochShardingStrategy { type Err = String; fn from_str(s: &str) -> Result { + if s.eq("default") { + return Ok(EpochShardingStrategy::default()); + } if let Ok(block_range) = BlockRange::from_str(s) { return Ok(Self::new(block_range)); } From 1081b1f4bfcffd07f826eb4728292dc5cbef6c32 Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Thu, 12 Jan 2023 14:40:56 -0800 Subject: [PATCH 10/11] Add admin uri env var --- fog/view/server/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fog/view/server/src/config.rs b/fog/view/server/src/config.rs index 113bf3bbdd..5c205f3759 100644 --- a/fog/view/server/src/config.rs +++ b/fog/view/server/src/config.rs @@ -148,7 +148,7 @@ pub struct FogViewRouterConfig { pub omap_capacity: u64, /// Router admin listening URI. - #[clap(long)] + #[clap(long, env = "MC_ADMIN_LISTEN_URI")] pub admin_listen_uri: AdminUri, /// The chain id of the network we are a part of From 9c37f7effd6933518990e8204f02d2e32709a50b Mon Sep 17 00:00:00 2001 From: Sam Dealy Date: Thu, 12 Jan 2023 14:44:11 -0800 Subject: [PATCH 11/11] Fix typo --- fog/view/server/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fog/view/server/src/config.rs b/fog/view/server/src/config.rs index 5c205f3759..a9bd1cfda7 100644 --- a/fog/view/server/src/config.rs +++ b/fog/view/server/src/config.rs @@ -122,7 +122,7 @@ pub struct FogViewRouterConfig { pub client_listen_uri: RouterClientListenUri, /// gRPC listening URI for Fog View Stores. Should be indexed the same as - /// the `sharding_stratgies` field. + /// the `sharding_strategies` field. #[clap(long, env = "MC_VIEW_SHARD_URIS")] pub shard_uris: Vec,