Skip to content

Commit

Permalink
Initial take on API endpoint implementation (#224)
Browse files Browse the repository at this point in the history
* pass LedgerDb to service

* fix rpc logger usage

* get_archive_blocks_impl

* propose_tx_impl

* fetch_fog_report_impl

* get_last_block_info_impl
  • Loading branch information
eranrund authored Jan 13, 2022
1 parent 420a0c0 commit 6db6f8b
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 43 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion validator/api/proto/validator_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@ service ValidatorAPI {

rpc ProposeTx(external.Tx) returns (consensus_common.ProposeTxResponse);

rpc FetchFogReport(FetchFogReportRequest) returns (report.ReportResponse);
rpc FetchFogReport(FetchFogReportRequest) returns (FetchFogReportResponse);
}

enum FetchFogReportResult {
Ok = 0;
NoReports = 1;
}

message FetchFogReportRequest {
// Fog report server URI.
string uri = 1;
}

message FetchFogReportResponse {
FetchFogReportResult result = 1;
report.ReportResponse report = 2;
}
5 changes: 5 additions & 0 deletions validator/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ mc-validator-api = { path = "../api" }

mc-attest-verifier = { path = "../../mobilecoin/attest/verifier" }
mc-common = { path = "../../mobilecoin/common", default-features = false, features = ["loggers"] }
mc-connection = { path = "../../mobilecoin/connection" }
mc-consensus-enclave-measurement = { path = "../../mobilecoin/consensus/enclave/measurement" }
mc-fog-report-connection = { path = "../../mobilecoin/fog/report/connection" }
mc-ledger-db = { path = "../../mobilecoin/ledger/db" }
mc-ledger-sync = { path = "../../mobilecoin/ledger/sync" }
mc-transaction-core = { path = "../../mobilecoin/transaction/core" }
mc-util-grpc = { path = "../../mobilecoin/util/grpc" }
mc-util-parse = { path = "../../mobilecoin/util/parse" }
mc-util-uri = { path = "../../mobilecoin/util/uri" }

grpcio = "0.9.0"
structopt = "0.3"
rayon = "1.5"
6 changes: 3 additions & 3 deletions validator/service/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ fn main() {

// Start ledger sync thread.
let _ledger_sync_service_thread = LedgerSyncServiceThread::new(
ledger_db,
peer_manager,
ledger_db.clone(),
peer_manager.clone(),
network_state,
transactions_fetcher,
config.poll_interval,
logger.clone(),
);

// Start GRPC service.
let _service = Service::new(&config.listen_uri, logger);
let _service = Service::new(&config.listen_uri, ledger_db, peer_manager, logger);

// Sleep indefinitely.
loop {
Expand Down
79 changes: 68 additions & 11 deletions validator/service/src/blockchain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,98 @@
use grpcio::{RpcContext, RpcStatus, Service, UnarySink};
use mc_common::logger::Logger;
use mc_util_grpc::{rpc_logger, send_result};
use mc_connection::{BlockchainConnection, ConnectionManager, RetryableBlockchainConnection};
use mc_ledger_db::{Ledger, LedgerDB};
use mc_transaction_core::constants::MINIMUM_FEE;
use mc_util_grpc::{rpc_database_err, rpc_logger, send_result};
use mc_validator_api::{
consensus_common::LastBlockInfoResponse,
consensus_common_grpc::{create_blockchain_api, BlockchainApi as GrpcBlockchainApi},
empty::Empty,
};
use rayon::prelude::*; // For par_iter

#[derive(Clone)]
pub struct BlockchainApi {
pub struct BlockchainApi<BC: BlockchainConnection + 'static> {
/// Ledger DB.
ledger_db: LedgerDB,

/// Connection manager.
conn_manager: ConnectionManager<BC>,

/// Logger.
logger: Logger,
}

impl BlockchainApi {
pub fn new(logger: Logger) -> Self {
Self { logger }
impl<BC: BlockchainConnection + 'static> Clone for BlockchainApi<BC> {
fn clone(&self) -> Self {
Self {
ledger_db: self.ledger_db.clone(),
conn_manager: self.conn_manager.clone(),
logger: self.logger.clone(),
}
}
}

impl<BC: BlockchainConnection + 'static> BlockchainApi<BC> {
pub fn new(ledger_db: LedgerDB, conn_manager: ConnectionManager<BC>, logger: Logger) -> Self {
Self {
ledger_db,
conn_manager,
logger,
}
}

pub fn into_service(self) -> Service {
create_blockchain_api(self)
}

fn get_last_block_info_impl(&self) -> Result<LastBlockInfoResponse, RpcStatus> {
todo!()
fn get_last_block_info_impl(
&self,
logger: &Logger,
) -> Result<LastBlockInfoResponse, RpcStatus> {
let num_blocks = self
.ledger_db
.num_blocks()
.map_err(|err| rpc_database_err(err, logger))?;

let mut resp = LastBlockInfoResponse::new();
resp.set_index(num_blocks - 1);

// Iterate an owned list of connections in parallel, get the block info for
// each, and extract the fee. If no fees are returned, use the hard-coded
// minimum.
let minimum_fee = self
.conn_manager
.conns()
.par_iter()
.filter_map(|conn| conn.fetch_block_info(std::iter::empty()).ok())
.filter_map(|block_info| {
// Cleanup the protobuf default fee
if block_info.minimum_fee == 0 {
None
} else {
Some(block_info.minimum_fee)
}
})
.max()
.unwrap_or(MINIMUM_FEE);

resp.set_minimum_fee(minimum_fee);

Ok(resp)
}
}

impl GrpcBlockchainApi for BlockchainApi {
impl<BC: BlockchainConnection + 'static> GrpcBlockchainApi for BlockchainApi<BC> {
fn get_last_block_info(
&mut self,
ctx: RpcContext,
_request: Empty,
sink: UnarySink<LastBlockInfoResponse>,
) {
let logger = rpc_logger(&ctx, &self.logger);
send_result(ctx, sink, self.get_last_block_info_impl(), &logger)
mc_common::logger::scoped_global_logger(&rpc_logger(&ctx, &self.logger), |logger| {
send_result(ctx, sink, self.get_last_block_info_impl(logger), logger)
})
}

// TODO: GetBlocks is purposefully unimplemented since it is unclear if it will
Expand Down
16 changes: 13 additions & 3 deletions validator/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use crate::{blockchain_api::BlockchainApi, validator_api::ValidatorApi};
use grpcio::{EnvBuilder, ServerBuilder};
use mc_common::logger::{log, Logger};
use mc_connection::{BlockchainConnection, ConnectionManager, UserTxConnection};
use mc_ledger_db::LedgerDB;
use mc_util_grpc::{BuildInfoService, ConnectionUriGrpcioServer, HealthService};
use mc_validator_api::ValidatorUri;
use std::sync::Arc;
Expand All @@ -15,18 +17,26 @@ pub struct Service {
}

impl Service {
pub fn new(listen_uri: &ValidatorUri, logger: Logger) -> Self {
pub fn new<C: BlockchainConnection + UserTxConnection + 'static>(
listen_uri: &ValidatorUri,
ledger_db: LedgerDB,
conn_manager: ConnectionManager<C>,
logger: Logger,
) -> Self {
// Build info API service.
let build_info_service = BuildInfoService::new(logger.clone()).into_service();

// Health check service.
let health_service = HealthService::new(None, logger.clone()).into_service();

// Validator API service.
let validator_service = ValidatorApi::new(logger.clone()).into_service();
let validator_service =
ValidatorApi::new(ledger_db.clone(), conn_manager.clone(), logger.clone())
.into_service();

// Blockchain API service.
let blockchain_service = BlockchainApi::new(logger.clone()).into_service();
let blockchain_service =
BlockchainApi::new(ledger_db, conn_manager, logger.clone()).into_service();

// Package service into grpc server.
log::info!(logger, "Starting validator API Service on {}", listen_uri);
Expand Down
Loading

0 comments on commit 6db6f8b

Please sign in to comment.