Skip to content

Commit

Permalink
Implement shard management API sans auth (#2354)
Browse files Browse the repository at this point in the history
* Implement shard management API

* Implement James's suggestions
  • Loading branch information
samdealy committed Sep 28, 2022
1 parent 789ffb8 commit 53ee79c
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fog/view/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ displaydoc = { version = "0.2", default-features = false }
futures = "0.3"
grpcio = "0.11.0"
hex = "0.4"
itertools = "0.10"
lazy_static = "1.4"

# mobilecoin
Expand Down
12 changes: 9 additions & 3 deletions fog/view/server/src/bin/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ use mc_fog_view_server::{
};
use mc_util_cli::ParserWithBuildInfo;
use mc_util_grpc::ConnectionUriGrpcioChannel;
use std::{env, str::FromStr, sync::Arc};
use std::{
collections::HashMap,
env,
str::FromStr,
sync::{Arc, RwLock},
};

fn main() {
mc_common::setup_panic_handler();
Expand All @@ -38,7 +43,7 @@ fn main() {
);

// TODO: Remove and get from a config.
let mut fog_view_store_grpc_clients = Vec::new();
let mut fog_view_store_grpc_clients = HashMap::new();
let grpc_env = Arc::new(
grpcio::EnvBuilder::new()
.name_prefix("Main-RPC".to_string())
Expand All @@ -54,8 +59,9 @@ fn main() {
ChannelBuilder::default_channel_builder(grpc_env.clone())
.connect_to_uri(&shard_uri, &logger),
);
fog_view_store_grpc_clients.push(fog_view_store_grpc_client);
fog_view_store_grpc_clients.insert(shard_uri, Arc::new(fog_view_store_grpc_client));
}
let fog_view_store_grpc_clients = Arc::new(RwLock::new(fog_view_store_grpc_clients));

let ias_client = Client::new(&config.ias_api_key).expect("Could not create IAS client");
let mut router_server = FogViewRouterServer::new(
Expand Down
6 changes: 5 additions & 1 deletion fog/view/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use clap::Parser;
use mc_attest_core::ProviderId;
use mc_common::ResponderId;
use mc_fog_sql_recovery_db::SqlRecoveryDbConnectionConfig;
use mc_fog_uri::{FogViewRouterUri, FogViewStoreUri, FogViewUri};
use mc_fog_uri::{FogViewRouterAdminUri, FogViewRouterUri, FogViewStoreUri, FogViewUri};
use mc_util_parse::parse_duration_in_seconds;
use mc_util_uri::AdminUri;
use serde::Serialize;
Expand Down Expand Up @@ -160,4 +160,8 @@ pub struct FogViewRouterConfig {
/// to disk by linux kernel.
#[clap(long, default_value = "1048576", env = "MC_OMAP_CAPACITY")]
pub omap_capacity: u64,

/// Router admin listening URI.
#[clap(long)]
pub admin_listen_uri: FogViewRouterAdminUri,
}
54 changes: 41 additions & 13 deletions fog/view/server/src/fog_view_router_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,30 @@
//! Constructible from config (for testability) and with a mechanism for
//! stopping it
use crate::{config::FogViewRouterConfig, counters, fog_view_router_service::FogViewRouterService};
use crate::{
config::FogViewRouterConfig, counters, fog_view_router_service::FogViewRouterService,
router_admin_service::FogViewRouterAdminService,
};
use futures::executor::block_on;
use mc_attest_net::RaClient;
use mc_common::logger::{log, Logger};
use mc_fog_api::view_grpc;
use mc_fog_uri::ConnectionUri;
use mc_fog_uri::{ConnectionUri, FogViewStoreUri};
use mc_fog_view_enclave::ViewEnclaveProxy;
use mc_sgx_report_cache_untrusted::ReportCacheThread;
use mc_util_grpc::{ConnectionUriGrpcioServer, ReadinessIndicator};
use std::sync::Arc;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

pub struct FogViewRouterServer<E, RC>
where
E: ViewEnclaveProxy,
RC: RaClient + Send + Sync + 'static,
{
server: grpcio::Server,
router_server: grpcio::Server,
admin_server: grpcio::Server,
enclave: E,
config: FogViewRouterConfig,
logger: Logger,
Expand All @@ -38,7 +45,7 @@ where
config: FogViewRouterConfig,
enclave: E,
ra_client: RC,
shards: Vec<view_grpc::FogViewStoreApiClient>,
shards: Arc<RwLock<HashMap<FogViewStoreUri, Arc<view_grpc::FogViewStoreApiClient>>>>,
logger: Logger,
) -> FogViewRouterServer<E, RC>
where
Expand All @@ -53,10 +60,15 @@ where
);

let fog_view_router_service = view_grpc::create_fog_view_router_api(
FogViewRouterService::new(enclave.clone(), shards, logger.clone()),
FogViewRouterService::new(enclave.clone(), shards.clone(), logger.clone()),
);
log::debug!(logger, "Constructed Fog View Router GRPC Service");

let fog_view_router_admin_service = view_grpc::create_fog_view_router_admin_api(
FogViewRouterAdminService::new(shards, logger.clone()),
);
log::debug!(logger, "Constructed Fog View Router Admin GRPC Service");

// Health check service
let health_service =
mc_util_grpc::HealthService::new(Some(readiness_indicator.into()), logger.clone())
Expand All @@ -68,15 +80,21 @@ where
"Starting Fog View Router server on {}",
config.client_listen_uri.addr(),
);
let server_builder = grpcio::ServerBuilder::new(env)
let router_server_builder = grpcio::ServerBuilder::new(env.clone())
.register_service(fog_view_router_service)
.register_service(health_service)
.bind_using_uri(&config.client_listen_uri, logger.clone());

let server = server_builder.build().unwrap();
let admin_server_builder = grpcio::ServerBuilder::new(env)
.register_service(fog_view_router_admin_service)
.bind_using_uri(&config.admin_listen_uri, logger.clone());

let router_server = router_server_builder.build().unwrap();
let admin_server = admin_server_builder.build().unwrap();

Self {
server,
router_server,
admin_server,
enclave,
config,
logger,
Expand All @@ -97,9 +115,18 @@ where
)
.expect("failed starting report cache thread"),
);
self.server.start();
for (host, port) in self.server.bind_addrs() {
log::info!(self.logger, "API listening on {}:{}", host, port);
self.router_server.start();
for (host, port) in self.router_server.bind_addrs() {
log::info!(self.logger, "Router API listening on {}:{}", host, port);
}
self.admin_server.start();
for (host, port) in self.admin_server.bind_addrs() {
log::info!(
self.logger,
"Router Admin API listening on {}:{}",
host,
port
);
}
}

Expand All @@ -108,7 +135,8 @@ where
if let Some(ref mut thread) = self.report_cache_thread.take() {
thread.stop().expect("Could not stop report cache thread");
}
block_on(self.server.shutdown()).expect("Could not stop grpc server");
block_on(self.router_server.shutdown()).expect("Could not stop router grpc server");
block_on(self.admin_server.shutdown()).expect("Could not stop admin router server");
}
}

Expand Down
18 changes: 13 additions & 5 deletions fog/view/server/src/fog_view_router_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@ use mc_fog_api::{
view::{FogViewRouterRequest, FogViewRouterResponse},
view_grpc::{FogViewRouterApi, FogViewStoreApiClient},
};
use mc_fog_uri::FogViewStoreUri;
use mc_fog_view_enclave_api::ViewEnclaveProxy;
use mc_util_grpc::rpc_logger;
use mc_util_metrics::SVC_COUNTERS;
use std::sync::Arc;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

#[derive(Clone)]
pub struct FogViewRouterService<E>
where
E: ViewEnclaveProxy,
{
enclave: E,
shard_clients: Vec<Arc<FogViewStoreApiClient>>,
shard_clients: Arc<RwLock<HashMap<FogViewStoreUri, Arc<FogViewStoreApiClient>>>>,
logger: Logger,
}

Expand All @@ -29,8 +33,11 @@ impl<E: ViewEnclaveProxy> FogViewRouterService<E> {
///
/// TODO: Add a `view_store_clients` parameter of type FogApiClient, and
/// perform view store authentication on each one.
pub fn new(enclave: E, shard_clients: Vec<FogViewStoreApiClient>, logger: Logger) -> Self {
let shard_clients = shard_clients.into_iter().map(Arc::new).collect();
pub fn new(
enclave: E,
shard_clients: Arc<RwLock<HashMap<FogViewStoreUri, Arc<FogViewStoreApiClient>>>>,
logger: Logger,
) -> Self {
Self {
enclave,
shard_clients,
Expand All @@ -55,8 +62,9 @@ where
let logger = logger.clone();
// TODO: Confirm that we don't need to perform the authenticator logic. I think
// we don't because of streaming...
let shard_clients = self.shard_clients.read().expect("RwLock poisoned");
let future = router_request_handler::handle_requests(
self.shard_clients.clone(),
shard_clients.values().cloned().collect(),
self.enclave.clone(),
requests,
responses,
Expand Down
1 change: 1 addition & 0 deletions fog/view/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ pub mod sharding_strategy;
mod block_tracker;
mod counters;
mod db_fetcher;
mod router_admin_service;
mod router_request_handler;
mod shard_responses_processor;
84 changes: 84 additions & 0 deletions fog/view/server/src/router_admin_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2018-2022 The MobileCoin Foundation

use grpcio::{ChannelBuilder, RpcContext, RpcStatus, UnarySink};
use itertools::Itertools;
use mc_common::logger::{log, Logger};
use mc_fog_api::{
view::AddShardRequest,
view_grpc::{FogViewRouterAdminApi, FogViewStoreApiClient},
};
use mc_fog_uri::FogViewStoreUri;
use mc_util_grpc::{
rpc_invalid_arg_error, rpc_logger, rpc_precondition_error, send_result,
ConnectionUriGrpcioChannel, Empty,
};
use mc_util_metrics::SVC_COUNTERS;
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, RwLock},
};

#[derive(Clone)]
pub struct FogViewRouterAdminService {
shard_clients: Arc<RwLock<HashMap<FogViewStoreUri, Arc<FogViewStoreApiClient>>>>,
logger: Logger,
}

impl FogViewRouterAdminService {
pub fn new(
shard_clients: Arc<RwLock<HashMap<FogViewStoreUri, Arc<FogViewStoreApiClient>>>>,
logger: Logger,
) -> Self {
Self {
shard_clients,
logger,
}
}

fn add_shard_impl(&mut self, shard_uri: &str, logger: &Logger) -> Result<Empty, RpcStatus> {
let view_store_uri = FogViewStoreUri::from_str(shard_uri).map_err(|_| {
rpc_invalid_arg_error(
"add_shard",
format!("Shard uri string {} is invalid", shard_uri),
logger,
)
})?;
let mut shard_clients = self.shard_clients.write().expect("RwLock Poisoned");
if shard_clients.keys().contains(&view_store_uri) {
let error = rpc_precondition_error(
"add_shard",
format!("Shard uri {} already exists in the shard list", shard_uri),
logger,
);
return Err(error);
}
let grpc_env = Arc::new(
grpcio::EnvBuilder::new()
.name_prefix("add-shard".to_string())
.build(),
);
let view_store_client = FogViewStoreApiClient::new(
ChannelBuilder::default_channel_builder(grpc_env)
.connect_to_uri(&view_store_uri, logger),
);
shard_clients.insert(view_store_uri, Arc::new(view_store_client));

Ok(Empty::new())
}
}

impl FogViewRouterAdminApi for FogViewRouterAdminService {
fn add_shard(&mut self, ctx: RpcContext, request: AddShardRequest, sink: UnarySink<Empty>) {
log::info!(self.logger, "Request received in add_shard fn");
let _timer = SVC_COUNTERS.req(&ctx);
mc_common::logger::scoped_global_logger(&rpc_logger(&ctx, &self.logger), |logger| {
send_result(
ctx,
sink,
self.add_shard_impl(request.get_shard_uri(), logger),
logger,
);
});
}
}

0 comments on commit 53ee79c

Please sign in to comment.