Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
import rpc transactions sequentially (#10051)
Browse files Browse the repository at this point in the history
* import rpc transactions sequentially

* use impl trait in argument position, renamed ProspectiveDispatcher to WithPostSign

* grouped imports

* integrates PostSign with ProspectiveSigner

* fix spaces, removed unnecessary type cast and duplicate polling

* clean up code style

* Apply suggestions from code review
  • Loading branch information
seunlanlege authored and 5chdn committed Feb 12, 2019
1 parent fdd6ac9 commit 6d69a5d
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 65 deletions.
176 changes: 128 additions & 48 deletions rpc/src/v1/helpers/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ use types::basic_account::BasicAccount;
use types::ids::BlockId;

use jsonrpc_core::{BoxFuture, Result, Error};
use jsonrpc_core::futures::{future, Future, Poll, Async};
use jsonrpc_core::futures::{future, Future, Poll, Async, IntoFuture};
use jsonrpc_core::futures::future::Either;
use v1::helpers::{errors, nonce, TransactionRequest, FilledTransactionRequest, ConfirmationPayload};
use v1::types::{
H256 as RpcH256, H520 as RpcH520, Bytes as RpcBytes,
H520 as RpcH520, Bytes as RpcBytes,
RichRawTransaction as RpcRichRawTransaction,
ConfirmationPayload as RpcConfirmationPayload,
ConfirmationResponse,
Expand All @@ -69,12 +69,20 @@ pub trait Dispatcher: Send + Sync + Clone {
fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address, force_nonce: bool)
-> BoxFuture<FilledTransactionRequest>;

/// Sign the given transaction request without dispatching, fetching appropriate nonce.
fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>>;
/// Sign the given transaction request, fetching appropriate nonce and executing the PostSign action
fn sign<P>(
&self,
accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest,
password: SignWith,
post_sign: P
) -> BoxFuture<P::Item>
where
P: PostSign + 'static,
<P::Out as futures::future::IntoFuture>::Future: Send;

/// Converts a `SignedTransaction` into `RichRawTransaction`
fn enrich(&self, SignedTransaction) -> RpcRichRawTransaction;
fn enrich(&self, signed: SignedTransaction) -> RpcRichRawTransaction;

/// "Dispatch" a local transaction.
fn dispatch_transaction(&self, signed_transaction: PendingTransaction)
Expand Down Expand Up @@ -164,19 +172,30 @@ impl<C: miner::BlockChainClient + BlockChainClient, M: MinerService> Dispatcher
}))
}

fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>>
fn sign<P>(
&self,
accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest,
password: SignWith,
post_sign: P
) -> BoxFuture<P::Item>
where
P: PostSign + 'static,
<P::Out as futures::future::IntoFuture>::Future: Send
{
let chain_id = self.client.signing_chain_id();

if let Some(nonce) = filled.nonce {
return Box::new(future::done(sign_transaction(&*accounts, filled, chain_id, nonce, password)));
}

let state = self.state_nonce(&filled.from);
let reserved = self.nonces.lock().reserve(filled.from, state);
let future = sign_transaction(&*accounts, filled, chain_id, nonce, password)
.into_future()
.and_then(move |signed| post_sign.execute(signed));
Box::new(future)
} else {
let state = self.state_nonce(&filled.from);
let reserved = self.nonces.lock().reserve(filled.from, state);

Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password))
Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password, post_sign))
}
}

fn enrich(&self, signed_transaction: SignedTransaction) -> RpcRichRawTransaction {
Expand Down Expand Up @@ -396,12 +415,24 @@ impl Dispatcher for LightDispatcher {
}))
}

fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>>
fn sign<P>(
&self,
accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest,
password: SignWith,
post_sign: P
) -> BoxFuture<P::Item>
where
P: PostSign + 'static,
<P::Out as futures::future::IntoFuture>::Future: Send
{
let chain_id = self.client.signing_chain_id();
let nonce = filled.nonce.expect("nonce is always provided; qed");
Box::new(future::done(sign_transaction(&*accounts, filled, chain_id, nonce, password)))

let future = sign_transaction(&*accounts, filled, chain_id, nonce, password)
.into_future()
.and_then(move |signed| post_sign.execute(signed));
Box::new(future)
}

fn enrich(&self, signed_transaction: SignedTransaction) -> RpcRichRawTransaction {
Expand Down Expand Up @@ -449,28 +480,60 @@ fn sign_transaction(
#[derive(Debug, Clone, Copy)]
enum ProspectiveSignerState {
TryProspectiveSign,
WaitForPostSign,
WaitForNonce,
Finish,
}

struct ProspectiveSigner {
struct ProspectiveSigner<P: PostSign> {
accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest,
chain_id: Option<u64>,
reserved: nonce::Reserved,
password: SignWith,
state: ProspectiveSignerState,
prospective: Option<Result<WithToken<SignedTransaction>>>,
prospective: Option<WithToken<SignedTransaction>>,
ready: Option<nonce::Ready>,
post_sign: Option<P>,
post_sign_future: Option<<P::Out as IntoFuture>::Future>
}

impl ProspectiveSigner {
/// action to execute after signing
/// e.g importing a transaction into the chain
pub trait PostSign: Send {
/// item that this PostSign returns
type Item: Send;
/// incase you need to perform async PostSign actions
type Out: IntoFuture<Item = Self::Item, Error = Error> + Send;
/// perform an action with the signed transaction
fn execute(self, signer: WithToken<SignedTransaction>) -> Self::Out;
}

impl PostSign for () {
type Item = WithToken<SignedTransaction>;
type Out = Result<Self::Item>;
fn execute(self, signed: WithToken<SignedTransaction>) -> Self::Out {
Ok(signed)
}
}

impl<F: Send, T: Send> PostSign for F
where F: FnOnce(WithToken<SignedTransaction>) -> Result<T>
{
type Item = T;
type Out = Result<Self::Item>;
fn execute(self, signed: WithToken<SignedTransaction>) -> Self::Out {
(self)(signed)
}
}

impl<P: PostSign> ProspectiveSigner<P> {
pub fn new(
accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest,
chain_id: Option<u64>,
reserved: nonce::Reserved,
password: SignWith,
post_sign: P
) -> Self {
// If the account is permanently unlocked we can try to sign
// using prospective nonce. This should speed up sending
Expand All @@ -491,6 +554,8 @@ impl ProspectiveSigner {
},
prospective: None,
ready: None,
post_sign: Some(post_sign),
post_sign_future: None
}
}

Expand All @@ -509,8 +574,8 @@ impl ProspectiveSigner {
}
}

impl Future for ProspectiveSigner {
type Item = WithToken<SignedTransaction>;
impl<P: PostSign> Future for ProspectiveSigner<P> {
type Item = P::Item;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand All @@ -523,32 +588,45 @@ impl Future for ProspectiveSigner {
match self.poll_reserved()? {
Async::NotReady => {
self.state = WaitForNonce;
self.prospective = Some(self.sign(self.reserved.prospective_value()));
self.prospective = Some(self.sign(self.reserved.prospective_value())?);
},
Async::Ready(nonce) => {
self.state = Finish;
self.prospective = Some(self.sign(nonce.value()));
self.state = WaitForPostSign;
self.post_sign_future = Some(self.post_sign.take()
.expect("post_sign is set on creation; qed")
.execute(self.sign(nonce.value())?)
.into_future());
self.ready = Some(nonce);
},
}
},
WaitForNonce => {
let nonce = try_ready!(self.poll_reserved());
let result = match (self.prospective.take(), nonce.matches_prospective()) {
let prospective = match (self.prospective.take(), nonce.matches_prospective()) {
(Some(prospective), true) => prospective,
_ => self.sign(nonce.value()),
_ => self.sign(nonce.value())?,
};
self.state = Finish;
self.prospective = Some(result);
self.ready = Some(nonce);
self.state = WaitForPostSign;
self.post_sign_future = Some(self.post_sign.take()
.expect("post_sign is set on creation; qed")
.execute(prospective)
.into_future());
},
Finish => {
if let (Some(result), Some(nonce)) = (self.prospective.take(), self.ready.take()) {
// Mark nonce as used on successful signing
return result.map(move |tx| {
nonce.mark_used();
Async::Ready(tx)
})
WaitForPostSign => {
if let Some(mut fut) = self.post_sign_future.as_mut() {
match fut.poll()? {
Async::Ready(item) => {
let nonce = self.ready
.take()
.expect("nonce is set before state transitions to WaitForPostSign; qed");
nonce.mark_used();
return Ok(Async::Ready(item))
},
Async::NotReady => {
return Ok(Async::NotReady)
}
}
} else {
panic!("Poll after ready.");
}
Expand Down Expand Up @@ -655,19 +733,21 @@ pub fn execute<D: Dispatcher + 'static>(
match payload {
ConfirmationPayload::SendTransaction(request) => {
let condition = request.condition.clone().map(Into::into);
Box::new(dispatcher.sign(accounts, request, pass)
.map(move |v| v.map(move |tx| PendingTransaction::new(tx, condition)))
.map(WithToken::into_tuple)
.map(|(tx, token)| (tx, token, dispatcher))
.and_then(|(tx, tok, dispatcher)| {
dispatcher.dispatch_transaction(tx)
.map(RpcH256::from)
.map(ConfirmationResponse::SendTransaction)
.map(move |h| WithToken::from((h, tok)))
}))
let cloned_dispatcher = dispatcher.clone();
let post_sign = move |with_token_signed: WithToken<SignedTransaction>| {
let (signed, token) = with_token_signed.into_tuple();
let signed_transaction = PendingTransaction::new(signed, condition);
cloned_dispatcher.dispatch_transaction(signed_transaction)
.map(|hash| (hash, token))
};
let future = dispatcher.sign(accounts, request, pass, post_sign)
.map(|(hash, token)| {
WithToken::from((ConfirmationResponse::SendTransaction(hash.into()), token))
});
Box::new(future)
},
ConfirmationPayload::SignTransaction(request) => {
Box::new(dispatcher.sign(accounts, request, pass)
Box::new(dispatcher.sign(accounts, request, pass, ())
.map(move |result| result
.map(move |tx| dispatcher.enrich(tx))
.map(ConfirmationResponse::SignTransaction)
Expand Down
48 changes: 31 additions & 17 deletions rpc/src/v1/impls/personal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;
use std::time::Duration;

use bytes::{Bytes, ToPretty};
use bytes::Bytes;
use ethcore::account_provider::AccountProvider;
use types::transaction::PendingTransaction;
use ethereum_types::{H520, U128, Address};
Expand All @@ -27,7 +27,7 @@ use ethkey::{public_to_address, recover, Signature};
use jsonrpc_core::{BoxFuture, Result};
use jsonrpc_core::futures::{future, Future};
use v1::helpers::{errors, eip191};
use v1::helpers::dispatch::{self, eth_data_hash, Dispatcher, SignWith};
use v1::helpers::dispatch::{self, eth_data_hash, Dispatcher, SignWith, PostSign, WithToken};
use v1::traits::Personal;
use v1::types::{
H160 as RpcH160, H256 as RpcH256, H520 as RpcH520, U128 as RpcU128,
Expand All @@ -41,6 +41,7 @@ use v1::types::{
use v1::metadata::Metadata;
use eip_712::{EIP712, hash_structured_data};
use jsonrpc_core::types::Value;
use transaction::SignedTransaction;

/// Account management (personal) rpc implementation.
pub struct PersonalClient<D: Dispatcher> {
Expand Down Expand Up @@ -68,7 +69,16 @@ impl<D: Dispatcher> PersonalClient<D> {
}

impl<D: Dispatcher + 'static> PersonalClient<D> {
fn do_sign_transaction(&self, _meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<(PendingTransaction, D)> {
fn do_sign_transaction<P>(
&self,
_meta: Metadata,
request: TransactionRequest,
password: String,
post_sign: P
) -> BoxFuture<P::Item>
where P: PostSign + 'static,
<P::Out as futures::future::IntoFuture>::Future: Send
{
let dispatcher = self.dispatcher.clone();
let accounts = self.accounts.clone();

Expand All @@ -86,11 +96,7 @@ impl<D: Dispatcher + 'static> PersonalClient<D> {

Box::new(dispatcher.fill_optional_fields(request.into(), default, false)
.and_then(move |filled| {
let condition = filled.condition.clone().map(Into::into);
dispatcher.sign(accounts, filled, SignWith::Password(password.into()))
.map(|tx| tx.into_value())
.map(move |tx| PendingTransaction::new(tx, condition))
.map(move |tx| (tx, dispatcher))
dispatcher.sign(accounts, filled, SignWith::Password(password.into()), post_sign)
})
)
}
Expand Down Expand Up @@ -223,18 +229,26 @@ impl<D: Dispatcher + 'static> Personal for PersonalClient<D> {
}

fn sign_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcRichRawTransaction> {
Box::new(self.do_sign_transaction(meta, request, password)
.map(|(pending_tx, dispatcher)| dispatcher.enrich(pending_tx.transaction)))
let condition = request.condition.clone().map(Into::into);
let dispatcher = self.dispatcher.clone();
Box::new(self.do_sign_transaction(meta, request, password, ())
.map(move |tx| PendingTransaction::new(tx.into_value(), condition))
.map(move |pending_tx| dispatcher.enrich(pending_tx.transaction)))
}

fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcH256> {
Box::new(self.do_sign_transaction(meta, request, password)
.and_then(|(pending_tx, dispatcher)| {
let chain_id = pending_tx.chain_id();
trace!(target: "miner", "send_transaction: dispatching tx: {} for chain ID {:?}",
::rlp::encode(&*pending_tx).pretty(), chain_id);

dispatcher.dispatch_transaction(pending_tx).map(Into::into)
let condition = request.condition.clone().map(Into::into);
let dispatcher = self.dispatcher.clone();
Box::new(self.do_sign_transaction(meta, request, password, move |signed: WithToken<SignedTransaction>| {
dispatcher.dispatch_transaction(
PendingTransaction::new(
signed.into_value(),
condition
)
)
})
.and_then(|hash| {
Ok(RpcH256::from(hash))
})
)
}
Expand Down

0 comments on commit 6d69a5d

Please sign in to comment.