Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

fix(light-rpc): Make light_sync generic #10238

Merged
merged 2 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 45 additions & 10 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use network::IpFilter;
use private_tx::PrivateTxHandler;
use types::transaction::UnverifiedTransaction;

use super::light_sync::SyncInfo;

/// Parity sync protocol
pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par";
/// Ethereum sync protocol
Expand Down Expand Up @@ -804,6 +806,24 @@ pub trait LightSyncProvider {
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats>;
}

/// Wrapper around `light_sync::SyncInfo` to expose those methods without the concrete type `LightSync`
pub trait LightSyncInfo: Send + Sync {
/// Get the highest block advertised on the network.
fn highest_block(&self) -> Option<u64>;

/// Get the block number at the time of sync start.
fn start_block(&self) -> u64;

/// Whether major sync is underway.
fn is_major_importing(&self) -> bool;
}

/// Execute a closure with a protocol context.
pub trait LightNetworkDispatcher {
/// Execute a closure with a protocol context.
fn with_context<F, T>(&self, f: F) -> Option<T> where F: FnOnce(&::light::net::BasicContext) -> T;
}

/// Configuration for the light sync.
pub struct LightSyncParams<L> {
/// Network configuration.
Expand All @@ -823,7 +843,7 @@ pub struct LightSyncParams<L> {
/// Service for light synchronization.
pub struct LightSync {
proto: Arc<LightProtocol>,
sync: Arc<::light_sync::SyncInfo + Sync + Send>,
sync: Arc<SyncInfo + Sync + Send>,
attached_protos: Vec<AttachedProtocol>,
network: NetworkService,
subprotocol_name: [u8; 3],
Expand Down Expand Up @@ -874,15 +894,6 @@ impl LightSync {
})
}

/// Execute a closure with a protocol context.
pub fn with_context<F, T>(&self, f: F) -> Option<T>
where F: FnOnce(&::light::net::BasicContext) -> T
{
self.network.with_context_eval(
self.subprotocol_name,
move |ctx| self.proto.with_context(&ctx, f),
)
}
}

impl ::std::ops::Deref for LightSync {
Expand All @@ -891,6 +902,16 @@ impl ::std::ops::Deref for LightSync {
fn deref(&self) -> &Self::Target { &*self.sync }
}


impl LightNetworkDispatcher for LightSync {
fn with_context<F, T>(&self, f: F) -> Option<T> where F: FnOnce(&::light::net::BasicContext) -> T {
self.network.with_context_eval(
self.subprotocol_name,
move |ctx| self.proto.with_context(&ctx, f),
)
}
}

impl ManageNetwork for LightSync {
fn accept_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
Expand Down Expand Up @@ -991,3 +1012,17 @@ impl LightSyncProvider for LightSync {
Default::default() // TODO
}
}

impl LightSyncInfo for LightSync {
fn highest_block(&self) -> Option<u64> {
(*self.sync).highest_block()
}

fn start_block(&self) -> u64 {
(*self.sync).start_block()
}

fn is_major_importing(&self) -> bool {
(*self.sync).is_major_importing()
}
}
2 changes: 1 addition & 1 deletion parity/light_helpers/epoch_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::{Arc, Weak};

use ethcore::engines::{EthEngine, StateDependentProof};
use ethcore::machine::EthereumMachine;
use sync::LightSync;
use sync::{LightSync, LightNetworkDispatcher};
use types::encoded;
use types::header::Header;
use types::receipt::Receipt;
Expand Down
2 changes: 1 addition & 1 deletion parity/light_helpers/queue_cull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time::Duration;

use ethcore::client::ClientIoMessage;
use sync::LightSync;
use sync::{LightSync, LightNetworkDispatcher};
use io::{IoContext, IoHandler, TimerToken};

use light::client::LightChainClient;
Expand Down
40 changes: 31 additions & 9 deletions rpc/src/v1/helpers/dispatch/light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use light::client::LightChainClient;
use light::on_demand::{request, OnDemand};
use parking_lot::{Mutex, RwLock};
use stats::Corpus;
use sync::LightSync;
use sync::{LightSyncProvider, LightNetworkDispatcher, ManageNetwork};
use types::basic_account::BasicAccount;
use types::ids::BlockId;
use types::transaction::{SignedTransaction, PendingTransaction, Error as TransactionError};
Expand All @@ -37,10 +37,9 @@ use v1::types::{RichRawTransaction as RpcRichRawTransaction,};
use super::{Dispatcher, Accounts, SignWith, PostSign};

/// Dispatcher for light clients -- fetches default gas price, next nonce, etc. from network.
#[derive(Clone)]
pub struct LightDispatcher {
pub struct LightDispatcher<S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static> {
/// Sync service.
pub sync: Arc<LightSync>,
pub sync: Arc<S>,
/// Header chain client.
pub client: Arc<LightChainClient>,
/// On-demand request service.
Expand All @@ -55,12 +54,15 @@ pub struct LightDispatcher {
pub gas_price_percentile: usize,
}

impl LightDispatcher {
impl<S> LightDispatcher<S>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
/// Create a new `LightDispatcher` from its requisite parts.
///
/// For correct operation, the OnDemand service is assumed to be registered as a network handler,
pub fn new(
sync: Arc<LightSync>,
sync: Arc<S>,
client: Arc<LightChainClient>,
on_demand: Arc<OnDemand>,
cache: Arc<Mutex<LightDataCache>>,
Expand Down Expand Up @@ -115,7 +117,27 @@ impl LightDispatcher {
}
}

impl Dispatcher for LightDispatcher {
impl<S> Clone for LightDispatcher<S>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
fn clone(&self) -> Self {
Self {
sync: self.sync.clone(),
client: self.client.clone(),
on_demand: self.on_demand.clone(),
cache: self.cache.clone(),
transaction_queue: self.transaction_queue.clone(),
nonces: self.nonces.clone(),
gas_price_percentile: self.gas_price_percentile
}
}
}

impl<S> Dispatcher for LightDispatcher<S>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
// Ignore the `force_nonce` flag in order to always query the network when fetching the nonce and
// the account state. If the nonce is specified in the transaction use that nonce instead but do the
// network request anyway to the account state (balance)
Expand Down Expand Up @@ -217,8 +239,8 @@ impl Dispatcher for LightDispatcher {

/// Get a recent gas price corpus.
// TODO: this could be `impl Trait`.
pub fn fetch_gas_price_corpus(
sync: Arc<LightSync>,
pub fn fetch_gas_price_corpus<S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static>(
sync: Arc<S>,
client: Arc<LightChainClient>,
on_demand: Arc<OnDemand>,
cache: Arc<Mutex<LightDataCache>>,
Expand Down
68 changes: 57 additions & 11 deletions rpc/src/v1/helpers/light_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Helpers for fetching blockchain data either from the light client or the network.

use std::cmp;
use std::clone::Clone;
use std::sync::Arc;

use types::basic_account::BasicAccount;
Expand All @@ -40,7 +41,9 @@ use light::on_demand::{
use light::on_demand::error::Error as OnDemandError;
use light::request::Field;

use sync::LightSync;

use sync::{LightNetworkDispatcher, ManageNetwork, LightSyncProvider};

use ethereum_types::{U256, Address};
use hash::H256;
use parking_lot::Mutex;
Expand All @@ -52,10 +55,12 @@ use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch};
use v1::types::{BlockNumber, CallRequest, Log, Transaction};

const NO_INVALID_BACK_REFS_PROOF: &str = "Fails only on invalid back-references; back-references here known to be valid; qed";

const WRONG_RESPONSE_AMOUNT_TYPE_PROOF: &str = "responses correspond directly with requests in amount and type; qed";

pub fn light_all_transactions(dispatch: &Arc<dispatch::LightDispatcher>) -> impl Iterator<Item=PendingTransaction> {
pub fn light_all_transactions<S>(dispatch: &Arc<dispatch::LightDispatcher<S>>) -> impl Iterator<Item=PendingTransaction>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
let txq = dispatch.transaction_queue.read();
let chain_info = dispatch.client.chain_info();

Expand All @@ -66,20 +71,36 @@ pub fn light_all_transactions(dispatch: &Arc<dispatch::LightDispatcher>) -> impl

/// Helper for fetching blockchain data either from the light client or the network
/// as necessary.
#[derive(Clone)]
pub struct LightFetch {
pub struct LightFetch<S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static>
{
/// The light client.
pub client: Arc<LightChainClient>,
/// The on-demand request service.
pub on_demand: Arc<OnDemand>,
/// Handle to the network.
pub sync: Arc<LightSync>,
pub sync: Arc<S>,
/// The light data cache.
pub cache: Arc<Mutex<Cache>>,
/// Gas Price percentile
pub gas_price_percentile: usize,
}

impl<S> Clone for LightFetch<S>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
on_demand: self.on_demand.clone(),
sync: self.sync.clone(),
cache: self.cache.clone(),
gas_price_percentile: self.gas_price_percentile
}
}
}


/// Extract a transaction at given index.
pub fn extract_transaction_at_index(block: encoded::Block, index: usize) -> Option<Transaction> {
block.transactions().into_iter().nth(index)
Expand Down Expand Up @@ -115,7 +136,10 @@ fn extract_header(res: &[OnDemandResponse], header: HeaderRef) -> Option<encoded
}
}

impl LightFetch {
impl<S> LightFetch<S>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
// push the necessary requests onto the request chain to get the header by the given ID.
// yield a header reference which other requests can use.
fn make_header_requests(&self, id: BlockId, reqs: &mut Vec<OnDemandRequest>) -> Result<HeaderRef> {
Expand Down Expand Up @@ -635,20 +659,42 @@ impl LightFetch {
}
}

#[derive(Clone)]
struct ExecuteParams {
struct ExecuteParams<S>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
from: Address,
tx: EthTransaction,
hdr: encoded::Header,
env_info: ::vm::EnvInfo,
engine: Arc<::ethcore::engines::EthEngine>,
on_demand: Arc<OnDemand>,
sync: Arc<LightSync>,
sync: Arc<S>,
}

impl<S> Clone for ExecuteParams<S>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
fn clone(&self) -> Self {
Self {
from: self.from.clone(),
tx: self.tx.clone(),
hdr: self.hdr.clone(),
env_info: self.env_info.clone(),
engine: self.engine.clone(),
on_demand: self.on_demand.clone(),
sync: self.sync.clone()
}
}
}

// Has a peer execute the transaction with given params. If `gas_known` is false, this will set the `gas value` to the
// `required gas value` unless it exceeds the block gas limit
fn execute_read_only_tx(gas_known: bool, params: ExecuteParams) -> impl Future<Item = ExecutionResult, Error = Error> + Send {
fn execute_read_only_tx<S>(gas_known: bool, params: ExecuteParams<S>) -> impl Future<Item = ExecutionResult, Error = Error> + Send
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
if !gas_known {
Box::new(future::loop_fn(params, |mut params| {
execute_read_only_tx(true, params.clone()).and_then(move |res| {
Expand Down
21 changes: 13 additions & 8 deletions rpc/src/v1/impls/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use light::client::{LightChainClient, LightChainNotify};
use light::on_demand::OnDemand;
use parity_runtime::Executor;
use parking_lot::{RwLock, Mutex};
use sync::LightSync;

use sync::{LightSyncProvider, LightNetworkDispatcher, ManageNetwork};

use types::encoded;
use types::filter::Filter as EthFilter;

Expand Down Expand Up @@ -87,12 +89,15 @@ impl<C> EthPubSubClient<C> {
}
}

impl EthPubSubClient<LightFetch> {
impl<S> EthPubSubClient<LightFetch<S>>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
/// Creates a new `EthPubSubClient` for `LightClient`.
pub fn light(
client: Arc<LightChainClient>,
on_demand: Arc<OnDemand>,
sync: Arc<LightSync>,
sync: Arc<S>,
cache: Arc<Mutex<Cache>>,
executor: Executor,
gas_price_percentile: usize,
Expand Down Expand Up @@ -189,7 +194,10 @@ pub trait LightClient: Send + Sync {
fn logs(&self, filter: EthFilter) -> BoxFuture<Vec<Log>>;
}

impl LightClient for LightFetch {
impl<S> LightClient for LightFetch<S>
where
S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static
{
fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.client.block_header(id)
}
Expand All @@ -200,10 +208,7 @@ impl LightClient for LightFetch {
}

impl<C: LightClient> LightChainNotify for ChainNotificationHandler<C> {
fn new_headers(
&self,
enacted: &[H256],
) {
fn new_headers(&self, enacted: &[H256]) {
let headers = enacted
.iter()
.filter_map(|hash| self.client.block_header(BlockId::Hash(*hash)))
Expand Down
Loading