Skip to content

Commit

Permalink
txBroadcast: Stabilize to version 1 (#4169)
Browse files Browse the repository at this point in the history
This PR stabilizes the txBroadcast API to version 1.

Ideally needs:
- #4050
- #3772

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Apr 19, 2024
1 parent 69f4373 commit 148d942
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 51 deletions.
8 changes: 8 additions & 0 deletions prdoc/pr_4169.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
title: Stabilize transactionBroadcast RPC class to version 1

doc:
- audience: Node Dev
description: |
The transactionBroadcast RPC API is stabilized to version 1.

crates: [ ]
5 changes: 3 additions & 2 deletions substrate/client/rpc-spec-v2/src/transaction/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ pub trait TransactionBroadcastApi {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_broadcast", raw_method)]
#[method(name = "transaction_v1_broadcast", raw_method)]
async fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;

/// Broadcast an extrinsic to the chain.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_stop", raw_method)]
#[method(name = "transaction_v1_stop", raw_method)]
async fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ async fn tx_broadcast_enters_pool() {
let xt = hex_string(&uxt.encode());

let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();

// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;

// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
Expand Down Expand Up @@ -84,10 +84,7 @@ async fn tx_broadcast_enters_pool() {

// The future broadcast awaits for the finalized status to be reached.
// Force the future to exit by calling stop.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
let _: () = tx_api.call("transaction_v1_stop", rpc_params![&operation_id]).await.unwrap();

// Ensure the broadcast future finishes.
let _ = get_next_event!(&mut exec_middleware.recv);
Expand All @@ -101,7 +98,7 @@ async fn tx_broadcast_invalid_tx() {

// Invalid parameters.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8])
.call::<_, serde_json::Value>("transaction_v1_broadcast", [1u8])
.await
.unwrap_err();
assert_matches!(err,
Expand All @@ -113,7 +110,7 @@ async fn tx_broadcast_invalid_tx() {
// Invalid transaction that cannot be decoded. The broadcast silently exits.
let xt = "0xdeadbeef";
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();

assert_eq!(0, pool.status().ready);

Expand All @@ -124,7 +121,7 @@ async fn tx_broadcast_invalid_tx() {
// When the operation is not active, either from the tx being finalized or a
// terminal error; the stop method should return an error.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
.call::<_, serde_json::Value>("transaction_v1_stop", rpc_params![&operation_id])
.await
.unwrap_err();
assert_matches!(err,
Expand All @@ -138,7 +135,7 @@ async fn tx_stop_with_invalid_operation_id() {

// Make an invalid stop call.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"])
.call::<_, serde_json::Value>("transaction_v1_stop", ["invalid_operation_id"])
.await
.unwrap_err();
assert_matches!(err,
Expand All @@ -161,15 +158,13 @@ async fn tx_broadcast_resubmits_future_nonce_tx() {
let future_uxt = uxt(Alice, ALICE_NONCE + 1);
let future_xt = hex_string(&future_uxt.encode());

let future_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&future_xt])
.await
.unwrap();
let future_operation_id: String =
tx_api.call("transaction_v1_broadcast", rpc_params![&future_xt]).await.unwrap();

// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;

// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
Expand All @@ -188,13 +183,11 @@ async fn tx_broadcast_resubmits_future_nonce_tx() {
let block_2_header = api.push_block(2, vec![], true);
let block_2 = block_2_header.hash();

let operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&current_xt])
.await
.unwrap();
let operation_id: String =
tx_api.call("transaction_v1_broadcast", rpc_params![&current_xt]).await.unwrap();
assert_ne!(future_operation_id, operation_id);

// Announce block 2 to `transaction_unstable_broadcast`.
// Announce block 2 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_2_header).await;

// Collect the events of both transactions.
Expand Down Expand Up @@ -249,12 +242,12 @@ async fn tx_broadcast_stop_after_broadcast_finishes() {
let xt = hex_string(&uxt.encode());

let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();

// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;

// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction
// pool.inner_pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
Expand Down Expand Up @@ -303,7 +296,7 @@ async fn tx_broadcast_stop_after_broadcast_finishes() {
// The operation ID is no longer valid, check that the broadcast future
// cleared out the inner state of the operation.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
.call::<_, serde_json::Value>("transaction_v1_stop", rpc_params![&operation_id])
.await
.unwrap_err();
assert_matches!(err,
Expand All @@ -328,14 +321,14 @@ async fn tx_broadcast_resubmits_invalid_tx() {
let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());
let _operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();

let block_1_header = api.push_block(1, vec![], true);
let block_1 = block_1_header.hash();
// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;

// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
Expand All @@ -355,7 +348,7 @@ async fn tx_broadcast_resubmits_invalid_tx() {
pool.inner_pool.maintain(event).await;
assert_eq!(1, pool.inner_pool.status().ready);

// Ensure the `transaction_unstable_broadcast` is aware of the invalid transaction.
// Ensure the `transaction_v1_broadcast` is aware of the invalid transaction.
let event = get_next_event!(&mut pool_middleware);
// Because we have received an `Invalid` status, we try to broadcast the transaction with the
// next announced block.
Expand Down Expand Up @@ -388,7 +381,7 @@ async fn tx_broadcast_resubmits_invalid_tx() {
pool.inner_pool.maintain(event).await;
assert_eq!(0, pool.inner_pool.status().ready);

// Announce block to `transaction_unstable_broadcast`.
// Announce block to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_3_header).await;

let event = get_next_event!(&mut pool_middleware);
Expand Down Expand Up @@ -456,12 +449,10 @@ async fn tx_broadcast_resubmits_dropped_tx() {
// are immediately dropped.
api.set_priority(&current_uxt, 10);

let current_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&current_xt])
.await
.unwrap();
let current_operation_id: String =
tx_api.call("transaction_v1_broadcast", rpc_params![&current_xt]).await.unwrap();

// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
let block_1_header = api.push_block(1, vec![], true);
let event =
ChainEvent::Finalized { hash: block_1_header.hash(), tree_route: Arc::from(vec![]) };
Expand All @@ -480,10 +471,8 @@ async fn tx_broadcast_resubmits_dropped_tx() {

// The future tx has priority 2, smaller than the current 10.
api.set_priority(&future_uxt, 2);
let future_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&future_xt])
.await
.unwrap();
let future_operation_id: String =
tx_api.call("transaction_v1_broadcast", rpc_params![&future_xt]).await.unwrap();
assert_ne!(current_operation_id, future_operation_id);

let block_2_header = api.push_block(2, vec![], true);
Expand Down Expand Up @@ -535,12 +524,12 @@ async fn tx_broadcast_limit_reached() {
let xt = hex_string(&uxt.encode());

let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();

// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;

// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
Expand All @@ -552,23 +541,20 @@ async fn tx_broadcast_limit_reached() {
assert_eq!(1, exec_middleware.num_tasks());

let operation_id_limit_reached: Option<String> =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
assert!(operation_id_limit_reached.is_none(), "No operation ID => tx was rejected");

// We still have in flight one operation.
assert_eq!(1, exec_middleware.num_tasks());

// Force the future to exit by calling stop.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
let _: () = tx_api.call("transaction_v1_stop", rpc_params![&operation_id]).await.unwrap();

// Ensure the broadcast future finishes.
let _ = get_next_event!(&mut exec_middleware.recv);
assert_eq!(0, exec_middleware.num_tasks());

// Can resubmit again now.
let _operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
}

0 comments on commit 148d942

Please sign in to comment.