Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure shards provide correct blocks #2981

Merged
merged 11 commits into from
Jan 13, 2023
16 changes: 11 additions & 5 deletions fog/types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use core::str::FromStr;
use prost::Message;
use serde::{Deserialize, Serialize};

/// The string that delimits the start and end blocks in a string that
/// represents a BlockRange.
pub const BLOCK_RANGE_DELIMITER: &str = "-";

/// A half-open [a, b) range of blocks
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Message, Serialize, Deserialize)]
pub struct BlockRange {
Expand Down Expand Up @@ -70,7 +74,7 @@ impl FromStr for BlockRange {

fn from_str(s: &str) -> Result<Self, Self::Err> {
let block_indices: Vec<u64> = s
.split(',')
.split(BLOCK_RANGE_DELIMITER)
.map(|index_str| index_str.trim().parse())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| "BlockRange index is not a number.")?;
Expand Down Expand Up @@ -134,7 +138,7 @@ mod tests {
fn from_string_well_formatted_creates_block_range() {
let start_block = 0;
let end_block = 10;
let block_range_str = format!("{},{}", start_block, end_block);
let block_range_str = format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}");

let result = BlockRange::from_str(&block_range_str);

Expand All @@ -148,7 +152,7 @@ mod tests {
fn from_string_well_formatted_with_whitespace_creates_block_range() {
let start_block = 0;
let end_block = 10;
let block_range_str = format!(" {} , {} ", start_block, end_block);
let block_range_str = format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}");

let result = BlockRange::from_str(&block_range_str);

Expand All @@ -163,7 +167,9 @@ mod tests {
let start_block = 0;
let end_block = 10;
let third_block = 10;
let block_range_str = format!("{},{},{}", start_block, end_block, third_block);
let block_range_str = format!(
"{start_block}{BLOCK_RANGE_DELIMITER}{end_block}{BLOCK_RANGE_DELIMITER}{third_block}"
);

let result = BlockRange::from_str(&block_range_str);

Expand All @@ -174,7 +180,7 @@ mod tests {
fn from_string_non_numbers_errors() {
let start_block = 'a';
let end_block = 'b';
let block_range_str = format!("{},{}", start_block, end_block);
let block_range_str = format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}");

let result = BlockRange::from_str(&block_range_str);

Expand Down
20 changes: 13 additions & 7 deletions fog/view/server/src/bin/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use mc_common::{logger::log, time::SystemTimeProvider};
use mc_fog_api::view_grpc::FogViewStoreApiClient;
use mc_fog_view_enclave::{SgxViewEnclave, ENCLAVE_FILE};
use mc_fog_view_server::{
config::FogViewRouterConfig, fog_view_router_server::FogViewRouterServer,
config::FogViewRouterConfig,
fog_view_router_server::{FogViewRouterServer, Shard},
sharding_strategy::{EpochShardingStrategy, ShardingStrategy},
};
use mc_util_cli::ParserWithBuildInfo;
use mc_util_grpc::ConnectionUriGrpcioChannel;
use std::{
collections::HashMap,
env,
sync::{Arc, RwLock},
};
Expand All @@ -40,8 +41,7 @@ fn main() {
logger.clone(),
);

// TODO: Remove and get from a config.
let mut fog_view_store_grpc_clients = HashMap::new();
let mut shards = Vec::new();
let grpc_env = Arc::new(
grpcio::EnvBuilder::new()
.name_prefix("Main-RPC".to_string())
Expand All @@ -52,16 +52,22 @@ fn main() {
ChannelBuilder::default_channel_builder(grpc_env.clone())
.connect_to_uri(&shard_uri, &logger),
);
fog_view_store_grpc_clients.insert(shard_uri, Arc::new(fog_view_store_grpc_client));

// TODO: update this logic once we introduce other types of sharding strategies.
let epoch_sharding_strategy = EpochShardingStrategy::try_from(shard_uri.clone())
.expect("Could not get sharding strategy");
let block_range = epoch_sharding_strategy.get_block_range();
let shard = Shard::new(shard_uri, Arc::new(fog_view_store_grpc_client), block_range);
shards.push(shard);
}
let fog_view_store_grpc_clients = Arc::new(RwLock::new(fog_view_store_grpc_clients));
let shards = Arc::new(RwLock::new(shards));

let ias_client = Client::new(&config.ias_api_key).expect("Could not create IAS client");
let mut router_server = FogViewRouterServer::new(
config,
sgx_enclave,
ias_client,
fog_view_store_grpc_clients,
shards,
SystemTimeProvider::default(),
logger,
);
Expand Down
7 changes: 4 additions & 3 deletions fog/view/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ pub struct FogViewRouterConfig {
#[clap(long, env = "MC_CLIENT_LISTEN_URI")]
pub client_listen_uri: RouterClientListenUri,

/// gRPC listening URI for Fog View Stores.
#[clap(long, env = "MC_CLIENT_LISTEN_URI")]
/// gRPC listening URI for Fog View Stores. Should be indexed the same as
/// the `sharding_strategies` field.
#[clap(long, env = "MC_VIEW_SHARD_URIS")]
pub shard_uris: Vec<FogViewStoreUri>,

/// PEM-formatted keypair to send with an Attestation Request.
Expand All @@ -147,7 +148,7 @@ pub struct FogViewRouterConfig {
pub omap_capacity: u64,

/// Router admin listening URI.
#[clap(long)]
#[clap(long, env = "MC_ADMIN_LISTEN_URI")]
pub admin_listen_uri: AdminUri,

/// The chain id of the network we are a part of
Expand Down
35 changes: 30 additions & 5 deletions fog/view/server/src/fog_view_router_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ use mc_common::{
time::TimeProvider,
};
use mc_fog_api::view_grpc;
use mc_fog_types::common::BlockRange;
use mc_fog_uri::{ConnectionUri, FogViewStoreUri};
use mc_fog_view_enclave::ViewEnclaveProxy;
use mc_sgx_report_cache_untrusted::ReportCacheThread;
use mc_util_grpc::{
AnonymousAuthenticator, Authenticator, ConnectionUriGrpcioServer, ReadinessIndicator,
TokenAuthenticator,
};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use std::sync::{Arc, RwLock};

pub struct FogViewRouterServer<E, RC>
where
Expand All @@ -43,6 +41,33 @@ where
report_cache_thread: Option<ReportCacheThread>,
}

/// A shard that fulfills a portion of the router's query requests.
#[derive(Clone)]
pub struct Shard {
/// The uri that this shard listens on.
pub uri: FogViewStoreUri,

/// The gRPC client that is used to communicate with the shard.
pub grpc_client: Arc<view_grpc::FogViewStoreApiClient>,

/// The `BlockRange` that this shard is responsible for providing.
pub block_range: BlockRange,
}

impl Shard {
pub fn new(
uri: FogViewStoreUri,
grpc_client: Arc<view_grpc::FogViewStoreApiClient>,
block_range: BlockRange,
) -> Self {
Self {
uri,
grpc_client,
block_range,
}
}
}

impl<E, RC> FogViewRouterServer<E, RC>
where
E: ViewEnclaveProxy,
Expand All @@ -53,7 +78,7 @@ where
config: FogViewRouterConfig,
enclave: E,
ra_client: RC,
shards: Arc<RwLock<HashMap<FogViewStoreUri, Arc<view_grpc::FogViewStoreApiClient>>>>,
shards: Arc<RwLock<Vec<Shard>>>,
time_provider: impl TimeProvider + 'static,
logger: Logger,
) -> FogViewRouterServer<E, RC>
Expand Down
24 changes: 10 additions & 14 deletions fog/view/server/src/fog_view_router_service.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
// Copyright (c) 2018-2022 The MobileCoin Foundation

use crate::{router_request_handler, SVC_COUNTERS};
use crate::{fog_view_router_server::Shard, router_request_handler, SVC_COUNTERS};
use futures::{executor::block_on, FutureExt, TryFutureExt};
use grpcio::{DuplexSink, RequestStream, RpcContext, UnarySink};
use mc_attest_api::attest;
use mc_common::logger::{log, Logger};
use mc_fog_api::{
view::{FogViewRouterRequest, FogViewRouterResponse},
view_grpc::{FogViewApi, FogViewRouterApi, FogViewStoreApiClient},
view_grpc::{FogViewApi, FogViewRouterApi},
};
use mc_fog_uri::FogViewStoreUri;
use mc_fog_view_enclave_api::ViewEnclaveProxy;
use mc_util_grpc::{check_request_chain_id, rpc_logger, send_result, Authenticator};
use mc_util_metrics::ServiceMetrics;
use mc_util_telemetry::tracer;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use std::sync::{Arc, RwLock};

#[derive(Clone)]
pub struct FogViewRouterService<E>
where
E: ViewEnclaveProxy,
{
enclave: E,
shard_clients: Arc<RwLock<HashMap<FogViewStoreUri, Arc<FogViewStoreApiClient>>>>,
shards: Arc<RwLock<Vec<Shard>>>,
chain_id: String,
/// GRPC request authenticator.
authenticator: Arc<dyn Authenticator + Send + Sync>,
Expand All @@ -40,14 +36,14 @@ impl<E: ViewEnclaveProxy> FogViewRouterService<E> {
/// perform view store authentication on each one.
pub fn new(
enclave: E,
shard_clients: Arc<RwLock<HashMap<FogViewStoreUri, Arc<FogViewStoreApiClient>>>>,
shards: Arc<RwLock<Vec<Shard>>>,
chain_id: String,
authenticator: Arc<dyn Authenticator + Send + Sync>,
logger: Logger,
) -> Self {
Self {
enclave,
shard_clients,
shards,
chain_id,
authenticator,
logger,
Expand All @@ -69,11 +65,11 @@ where
let logger = logger.clone();
// TODO: Confirm that we don't need to perform the authenticator logic. I think
// we don't because of streaming...
Comment on lines 66 to 67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Total aside: Could you clarify this TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that the authenticator enforces unary authentication logic, so I don't think it applies for streaming. I can look more into this though, but will make changes in a follow-up PR if necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess what I'm saying is: what logic would go in here?

let shard_clients = self.shard_clients.read().expect("RwLock poisoned");
let shards = self.shards.read().expect("RwLock poisoned");
let method_name = ServiceMetrics::get_method_name(&ctx);
let future = router_request_handler::handle_requests(
method_name,
shard_clients.values().cloned().collect(),
shards.clone(),
self.enclave.clone(),
requests,
responses,
Expand Down Expand Up @@ -134,12 +130,12 @@ where
}

// This will block the async API. We should use some sort of differentiator...
let shard_clients = self.shard_clients.read().expect("RwLock poisoned");
let shards = self.shards.read().expect("RwLock poisoned");
let tracer = tracer!();
let result = block_on(router_request_handler::handle_query_request(
request,
self.enclave.clone(),
shard_clients.values().cloned().collect(),
shards.clone(),
self.logger.clone(),
&tracer,
))
Expand Down
31 changes: 16 additions & 15 deletions fog/view/server/src/router_admin_service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
// Copyright (c) 2018-2022 The MobileCoin Foundation

use crate::SVC_COUNTERS;
use crate::{
fog_view_router_server::Shard,
sharding_strategy::{EpochShardingStrategy, ShardingStrategy},
SVC_COUNTERS,
};
use grpcio::{ChannelBuilder, RpcContext, RpcStatus, UnarySink};
use itertools::Itertools;
use mc_common::logger::{log, Logger};
use mc_fog_api::{
view::AddShardRequest,
Expand All @@ -14,26 +17,19 @@ use mc_util_grpc::{
ConnectionUriGrpcioChannel, Empty,
};
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, RwLock},
};

#[derive(Clone)]
pub struct FogViewRouterAdminService {
shard_clients: Arc<RwLock<HashMap<FogViewStoreUri, Arc<FogViewStoreApiClient>>>>,
shards: Arc<RwLock<Vec<Shard>>>,
logger: Logger,
}

impl FogViewRouterAdminService {
pub fn new(
shard_clients: Arc<RwLock<HashMap<FogViewStoreUri, Arc<FogViewStoreApiClient>>>>,
logger: Logger,
) -> Self {
Self {
shard_clients,
logger,
}
pub fn new(shards: Arc<RwLock<Vec<Shard>>>, logger: Logger) -> Self {
Self { shards, logger }
}

fn add_shard_impl(&mut self, shard_uri: &str, logger: &Logger) -> Result<Empty, RpcStatus> {
Expand All @@ -44,8 +40,11 @@ impl FogViewRouterAdminService {
logger,
)
})?;
let mut shard_clients = self.shard_clients.write().expect("RwLock Poisoned");
if shard_clients.keys().contains(&view_store_uri) {
let mut shards = self.shards.write().expect("RwLock Poisoned");
if shards
.iter()
.any(|shard| shard.uri.clone() == view_store_uri)
{
let error = rpc_precondition_error(
"add_shard",
format!("Shard uri {} already exists in the shard list", shard_uri),
Expand All @@ -62,7 +61,9 @@ impl FogViewRouterAdminService {
ChannelBuilder::default_channel_builder(grpc_env)
.connect_to_uri(&view_store_uri, logger),
);
shard_clients.insert(view_store_uri, Arc::new(view_store_client));
let block_range = EpochShardingStrategy::default().get_block_range();
let shard = Shard::new(view_store_uri, Arc::new(view_store_client), block_range);
shards.push(shard);

Ok(Empty::new())
}
Expand Down
Loading