Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): implement getrawmempool RPC method #3851

Merged
merged 9 commits into from
Mar 12, 2022
37 changes: 36 additions & 1 deletion zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! So this implementation follows the `lightwalletd` client implementation.

use futures::{FutureExt, TryFutureExt};
use hex::FromHex;
use hex::{FromHex, ToHex};
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tower::{buffer::Buffer, Service, ServiceExt};
Expand Down Expand Up @@ -102,6 +102,12 @@ pub trait Rpc {
///
#[rpc(name = "getbestblockhash")]
fn get_best_block_hash(&self) -> BoxFuture<Result<GetBestBlockHash>>;

/// Returns all transaction ids in the memory pool, as a JSON array.
///
/// zcashd reference: <https://zcash.github.io/rpc/getrawmempool.html>
#[rpc(name = "getrawmempool")]
fn get_raw_mempool(&self) -> BoxFuture<Result<Vec<String>>>;
}

/// RPC method implementations.
Expand Down Expand Up @@ -286,6 +292,35 @@ where
}
.boxed()
}

fn get_raw_mempool(&self) -> BoxFuture<Result<Vec<String>>> {
let mut mempool = self.mempool.clone();

async move {
let request = mempool::Request::TransactionIds;

let response = mempool
.ready()
.and_then(|service| service.call(request))
.await
.map_err(|error| Error {
code: ErrorCode::ServerError(0),
message: error.to_string(),
data: None,
})?;

match response {
mempool::Response::TransactionIds(unmined_transaction_ids) => {
Ok(unmined_transaction_ids
.iter()
.map(|id| id.mined_id().encode_hex())
.collect())
}
_ => unreachable!("unmatched response to a transactionids request"),
}
}
.boxed()
}
}

#[derive(serde::Serialize, serde::Deserialize)]
Expand Down
83 changes: 76 additions & 7 deletions zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::collections::HashSet;

use hex::ToHex;
use jsonrpc_core::{Error, ErrorCode};
use proptest::prelude::*;
use thiserror::Error;
use tower::buffer::Buffer;

use zebra_chain::{
serialization::{ZcashDeserialize, ZcashSerialize},
transaction::{Transaction, UnminedTx},
transaction::{Transaction, UnminedTx, UnminedTxId},
};
use zebra_node_services::mempool;
use zebra_state::BoxError;
Expand All @@ -23,7 +26,11 @@ proptest! {
runtime.block_on(async move {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new("RPC test".to_owned(), Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1));
let rpc = RpcImpl::new(
"RPC test".to_owned(),
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
);
let hash = SentTransactionHash(transaction.hash());

let transaction_bytes = transaction
Expand Down Expand Up @@ -65,7 +72,11 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new("RPC test".to_owned(), Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1));
let rpc = RpcImpl::new(
"RPC test".to_owned(),
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
);

let transaction_bytes = transaction
.zcash_serialize_to_vec()
Expand All @@ -84,7 +95,6 @@ proptest! {

state.expect_no_requests().await?;


let result = send_task
.await
.expect("Sending raw transactions should not panic");
Expand Down Expand Up @@ -113,7 +123,11 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new("RPC test".to_owned(), Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1));
let rpc = RpcImpl::new(
"RPC test".to_owned(),
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
);

let transaction_bytes = transaction
.zcash_serialize_to_vec()
Expand Down Expand Up @@ -168,7 +182,11 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new("RPC test".to_owned(), Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1));
let rpc = RpcImpl::new(
"RPC test".to_owned(),
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
);

let send_task = tokio::spawn(rpc.send_raw_transaction(non_hex_string));

Expand Down Expand Up @@ -212,7 +230,11 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new("RPC test".to_owned(), Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1));
let rpc = RpcImpl::new(
"RPC test".to_owned(),
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
);

let send_task = tokio::spawn(rpc.send_raw_transaction(hex::encode(random_bytes)));

Expand All @@ -237,6 +259,53 @@ proptest! {
Ok::<_, TestCaseError>(())
})?;
}

/// Test that the `getrawmempool` method forwards the transactions in the mempool.
///
/// Make the mock mempool service return a list of transaction IDs, and check that the RPC call
/// returns those IDs as hexadecimal strings.
#[test]
fn mempool_transactions_are_sent_to_caller(transaction_ids in any::<HashSet<UnminedTxId>>())
{
let runtime = zebra_test::init_async();
let _guard = runtime.enter();

// CORRECTNESS: Nothing in this test depends on real time, so we can speed it up.
tokio::time::pause();

runtime.block_on(async move {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
"RPC test".to_owned(),
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
);

let call_task = tokio::spawn(rpc.get_raw_mempool());
let expected_response: Vec<String> = transaction_ids
.iter()
.map(|id| id.mined_id().encode_hex())
.collect();

mempool
.expect_request(mempool::Request::TransactionIds)
.await?
.respond(mempool::Response::TransactionIds(transaction_ids));

mempool.expect_no_requests().await?;
state.expect_no_requests().await?;

let result = call_task
.await
.expect("Sending raw transactions should not panic");

prop_assert_eq!(result, Ok(expected_response));

Ok::<_, TestCaseError>(())
})?;
}
}

#[derive(Clone, Copy, Debug, Error)]
Expand Down
13 changes: 0 additions & 13 deletions zebrad/tests/acceptance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1708,20 +1708,7 @@ fn lightwalletd_integration() -> Result<()> {
//
// TODO: add extra checks when we add new Zebra RPCs

// Unimplemented getrawmempool (repeated, in a separate lightwalletd thread)
//
// zcash/lightwalletd exits with a fatal error here.
// adityapk00/lightwalletd keeps trying the mempool,
// but it sometimes skips the "Method not found" log line.
//
// If a refresh is pending, we can get "Mempool refresh error" before the Ingestor log,
// and "Another refresh is in progress" after it.
let result =
lightwalletd.expect_stdout_line_matches("(Mempool refresh error: -32601: Method not found)|(Another refresh in progress, returning)");
let (_, zebrad) = zebrad.kill_on_error(result)?;

// Cleanup both processes

let result = lightwalletd.kill();
let (_, mut zebrad) = zebrad.kill_on_error(result)?;
zebrad.kill()?;
Expand Down