Skip to content

Commit

Permalink
[feature] hyperledger-iroha#2940: Add an ability to cancel submitting…
Browse files Browse the repository at this point in the history
… a transaction

Signed-off-by: Vladimir Pesterev <pesterev@pm.me>
  • Loading branch information
pesterev committed Feb 6, 2023
1 parent 7316336 commit f204298
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tokio = { version = "1.23.0", features = ["rt"] }
tokio-tungstenite = { version = "0.16", features = ["native-tls"] }
tungstenite = { version = "0.16", features = ["native-tls"] }
futures-util = "0.3.25"
crossbeam-channel = "0.5"


[dev-dependencies]
Expand Down
168 changes: 147 additions & 21 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)]
use std::{collections::HashMap, fmt::Debug, marker::PhantomData, thread, time::Duration};

use crossbeam_channel::{bounded, Receiver, Select};
use derive_more::{DebugCustom, Display};
use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
Expand All @@ -32,7 +33,6 @@ use crate::{
};

const APPLICATION_JSON: &str = "application/json";

/// General trait for all response handlers
pub trait ResponseHandler<T = Vec<u8>> {
/// What is the output of the handler
Expand Down Expand Up @@ -380,6 +380,15 @@ impl Client {
self.submit_all([isi])
}

pub fn submit_with_cancellation(
&self,
instruction: impl Into<Instruction>,
cancellation: Receiver<()>,
) -> Result<HashOf<VersionedSignedTransaction>> {
let isi = instruction.into();
self.submit_all_with_cancellation([isi], cancellation)
}

/// Instructions API entry point. Submits several Iroha Special Instructions to `Iroha` peers.
/// Returns submitted transaction's hash or error string.
///
Expand All @@ -392,6 +401,18 @@ impl Client {
self.submit_all_with_metadata(instructions, UnlimitedMetadata::new())
}

pub fn submit_all_with_cancellation(
&self,
instructions: impl IntoIterator<Item = Instruction>,
cancellation: Receiver<()>,
) -> Result<HashOf<VersionedSignedTransaction>> {
self.submit_all_with_metadata_and_cancellation(
instructions,
UnlimitedMetadata::new(),
cancellation,
)
}

/// Instructions API entry point. Submits one Iroha Special Instruction to `Iroha` peers.
/// Allows to specify [`Metadata`] of [`Transaction`].
/// Returns submitted transaction's hash or error string.
Expand Down Expand Up @@ -420,6 +441,18 @@ impl Client {
self.submit_transaction(self.build_transaction(instructions.into(), metadata)?)
}

pub fn submit_all_with_metadata_and_cancellation(
&self,
instructions: impl IntoIterator<Item = Instruction>,
metadata: UnlimitedMetadata,
cancellation: Receiver<()>,
) -> Result<HashOf<VersionedSignedTransaction>> {
self.submit_transaction_with_cancellation(
self.build_transaction(instructions.into(), metadata)?,
Some(cancellation),
)
}

/// Submit a prebuilt transaction.
/// Returns submitted transaction's hash or error string.
///
Expand All @@ -429,15 +462,41 @@ impl Client {
&self,
transaction: SignedTransaction,
) -> Result<HashOf<VersionedSignedTransaction>> {
iroha_logger::trace!(tx=?transaction);
let (req, hash, resp_handler) =
self.prepare_transaction_request::<DefaultRequestBuilder>(transaction)?;
let response = req
.build()?
.send()
.wrap_err_with(|| format!("Failed to send transaction with hash {hash:?}"))?;
resp_handler.handle(response)?;
Ok(hash)
self.submit_transaction_with_cancellation(transaction, None)
}

pub fn submit_transaction_with_cancellation(
&self,
transaction: SignedTransaction,
cancellation: Option<Receiver<()>>,
) -> Result<HashOf<VersionedSignedTransaction>> {
thread::scope(move |scope| {
iroha_logger::trace!(tx=?transaction);
let (req_builder, hash, resp_handler) =
self.prepare_transaction_request::<DefaultRequestBuilder>(transaction)?;
let mut sel = Select::new();

if let Some(rx) = cancellation.as_ref() {
sel.recv(rx);
}

let (tx, rx) = bounded(1);
let submitting_index = sel.recv(&rx);
scope.spawn(move || {
let res = req_builder
.build()
.and_then(|req| req.send())
.wrap_err_with(|| format!("Failed to send transaction with hash {hash:?}"))
.and_then(|response| resp_handler.handle(response))
.map(|_| hash);
let _ = tx.send(res);
});

match sel.ready() {
i if i == submitting_index => rx.recv()?,
_ => return Err(eyre!("A request has been cancelled")),
}
})
}

/// Submit the prebuilt transaction and wait until it is either rejected or committed.
Expand All @@ -448,24 +507,48 @@ impl Client {
pub fn submit_transaction_blocking(
&self,
transaction: SignedTransaction,
) -> Result<HashOf<VersionedSignedTransaction>> {
self.submit_transaction_blocking_with_cancellation(transaction, None)
}

pub fn submit_transaction_blocking_with_cancellation(
&self,
transaction: SignedTransaction,
cancellation: Option<Receiver<()>>,
) -> Result<HashOf<VersionedSignedTransaction>> {
let (init_sender, init_receiver) = tokio::sync::oneshot::channel();
let hash = transaction.hash();

thread::scope(|spawner| {
let submitter_handle = spawner.spawn(move || -> Result<()> {
// Do not submit transaction if event listener is failed to initialize
if init_receiver
.blocking_recv()
.wrap_err("Failed to receive init message.")?
{
self.submit_transaction(transaction)?;
thread::scope(move |spawner| {
let submitter_handle = spawner.spawn({
let cancellation = cancellation.clone();
move || -> Result<()> {
// Do not submit transaction if event listener is failed to initialize
if init_receiver
.blocking_recv()
.wrap_err("Failed to receive init message.")?
{
self.submit_transaction_with_cancellation(transaction, cancellation)?;
}
Ok(())
}
Ok(())
});

let confirmation_res = self.listen_for_tx_confirmation(init_sender, hash);

let mut sel = Select::new();
if let Some(rx) = cancellation.as_ref() {
sel.recv(rx);
}
let (tx, rx) = bounded(1);
spawner.spawn(move || {
let res = self.listen_for_tx_confirmation(init_sender, hash);
// Skip a disconnection error because it means a request has been cancelled.
let _ = tx.send(res);
});
let confirmation_index = sel.recv(&rx);
let confirmation_res = match sel.ready() {
i if i == confirmation_index => rx.recv().expect("This channel was disconnected"),
_ => return Err(eyre!("A request has been cancelled")),
};
match submitter_handle.join() {
Ok(Ok(())) => confirmation_res,
Ok(Err(e)) => Err(e).wrap_err("Transaction submitter thread exited with error"),
Expand Down Expand Up @@ -579,6 +662,14 @@ impl Client {
self.submit_all_blocking(vec![instruction.into()])
}

pub fn submit_blocking_with_cancellation(
&self,
instruction: impl Into<Instruction>,
cancellation: Receiver<()>,
) -> Result<HashOf<VersionedSignedTransaction>> {
self.submit_all_blocking_with_cancellation(vec![instruction.into()], cancellation)
}

/// Submits and waits until the transaction is either rejected or committed.
/// Returns rejection reason if transaction was rejected.
///
Expand All @@ -591,6 +682,18 @@ impl Client {
self.submit_all_blocking_with_metadata(instructions, UnlimitedMetadata::new())
}

pub fn submit_all_blocking_with_cancellation(
&self,
instructions: impl IntoIterator<Item = Instruction>,
cancellation: Receiver<()>,
) -> Result<HashOf<VersionedSignedTransaction>> {
self.submit_all_blocking_with_metadata_and_cancellation(
instructions,
UnlimitedMetadata::new(),
cancellation,
)
}

/// Submits and waits until the transaction is either rejected or committed.
/// Allows to specify [`Metadata`] of [`Transaction`].
/// Returns rejection reason if transaction was rejected.
Expand All @@ -605,6 +708,19 @@ impl Client {
self.submit_all_blocking_with_metadata(vec![instruction.into()], metadata)
}

pub fn submit_blocking_with_metadata_and_cancellation(
&self,
instruction: impl Into<Instruction>,
metadata: UnlimitedMetadata,
cancellation: Receiver<()>,
) -> Result<HashOf<VersionedSignedTransaction>> {
self.submit_all_blocking_with_metadata_and_cancellation(
vec![instruction.into()],
metadata,
cancellation,
)
}

/// Submits and waits until the transaction is either rejected or committed.
/// Allows to specify [`Metadata`] of [`Transaction`].
/// Returns rejection reason if transaction was rejected.
Expand All @@ -620,6 +736,16 @@ impl Client {
self.submit_transaction_blocking(transaction)
}

pub fn submit_all_blocking_with_metadata_and_cancellation(
&self,
instructions: impl IntoIterator<Item = Instruction>,
metadata: UnlimitedMetadata,
cancellation: Receiver<()>,
) -> Result<HashOf<VersionedSignedTransaction>> {
let transaction = self.build_transaction(instructions.into(), metadata)?;
self.submit_transaction_blocking_with_cancellation(transaction, Some(cancellation))
}

/// Lower-level Query API entry point. Prepares an http-request and returns it with an http-response handler.
///
/// # Errors
Expand Down
7 changes: 6 additions & 1 deletion client/tests/integration/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ fn client_register_asset_should_add_asset_once_but_not_twice() -> Result<()> {
})
})?;

let res = test_client.submit_blocking(register_asset);
if let Ok(hash) = res {
dbg!("some shit", hash);
}

// But registering an asset to account already having one should fail
assert!(test_client.submit_blocking(register_asset).is_err());
assert!(res.is_err());

Ok(())
}
Expand Down
39 changes: 39 additions & 0 deletions client/tests/integration/cancellation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::str::FromStr;

use eyre::Result;
use iroha_core::tx::{Account, AccountId, RegisterBox};
use test_network::*;

#[test]
fn client_cancel_to_submit_transaction() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(10_680).start_with_runtime();

let new_acc_id = AccountId::from_str("bobby@wonderland").expect("Valid");
let register_new_acc = RegisterBox::new(Account::new(new_acc_id, []));

let (tx, rx) = crossbeam_channel::bounded(1);
tx.send(()).expect("Sent successfully");

assert!(client
.submit_with_cancellation(register_new_acc, rx)
.is_err());

Ok(())
}

#[test]
fn client_cancel_to_blocking_submit_transaction() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(10_690).start_with_runtime();

let new_acc_id = AccountId::from_str("bobby@wonderland").expect("Valid");
let register_new_acc = RegisterBox::new(Account::new(new_acc_id, []));

let (tx, rx) = crossbeam_channel::bounded(1);
tx.send(()).expect("Sent successfully");

assert!(client
.submit_blocking_with_cancellation(register_new_acc, rx)
.is_err());

Ok(())
}
1 change: 1 addition & 0 deletions client/tests/integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod add_domain;
mod asset;
mod asset_propagation;
mod burn_public_keys;
mod cancellation;
mod config;
mod connected_peers;
mod events;
Expand Down

0 comments on commit f204298

Please sign in to comment.