Skip to content

Commit

Permalink
rpcdaemon: replace UnaryRpc with rpc::unary_rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat committed Jan 7, 2025
1 parent d840a60 commit c33e4d1
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 127 deletions.
5 changes: 1 addition & 4 deletions silkworm/rpc/commands/trace_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <silkworm/infra/concurrency/private_service.hpp>
#include <silkworm/infra/concurrency/shared_service.hpp>
#include <silkworm/rpc/common/worker_pool.hpp>
#include <silkworm/rpc/ethbackend/backend.hpp>
#include <silkworm/rpc/ethdb/database.hpp>
#include <silkworm/rpc/json/stream.hpp>

Expand All @@ -43,8 +42,7 @@ class TraceRpcApi {
block_cache_{must_use_shared_service<BlockCache>(ioc_)},
state_cache_{must_use_shared_service<db::kv::api::StateCache>(ioc_)},
database_{must_use_private_service<ethdb::Database>(ioc_)},
workers_{workers},
backend_{must_use_private_service<ethbackend::BackEnd>(ioc_)} {}
workers_{workers} {}

virtual ~TraceRpcApi() = default;

Expand All @@ -70,7 +68,6 @@ class TraceRpcApi {
db::kv::api::StateCache* state_cache_;
ethdb::Database* database_;
WorkerPool& workers_;
ethbackend::BackEnd* backend_;

friend class silkworm::rpc::json_rpc::RequestHandler;
};
Expand Down
51 changes: 24 additions & 27 deletions silkworm/rpc/ethbackend/remote_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,30 @@
#include <silkworm/core/types/address.hpp>
#include <silkworm/infra/common/clock_time.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/grpc/client/unary_rpc.hpp>
#include <silkworm/infra/grpc/client/call.hpp>
#include <silkworm/infra/grpc/common/conversion.hpp>
#include <silkworm/rpc/json/types.hpp>

namespace silkworm::rpc::ethbackend {

namespace proto = ::remote;
using Stub = proto::ETHBACKEND::StubInterface;

RemoteBackEnd::RemoteBackEnd(
boost::asio::io_context& ioc,
const std::shared_ptr<grpc::Channel>& channel,
agrpc::GrpcContext& grpc_context)
: RemoteBackEnd(ioc.get_executor(), ::remote::ETHBACKEND::NewStub(channel), grpc_context) {}

RemoteBackEnd::RemoteBackEnd(boost::asio::io_context::executor_type executor,
std::unique_ptr<::remote::ETHBACKEND::StubInterface> stub,
std::unique_ptr<Stub> stub,
agrpc::GrpcContext& grpc_context)
: executor_(std::move(executor)), stub_(std::move(stub)), grpc_context_(grpc_context) {}

Task<evmc::address> RemoteBackEnd::etherbase() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncEtherbase> eb_rpc{*stub_, grpc_context_};
const auto reply = co_await eb_rpc.finish_on(executor_, ::remote::EtherbaseRequest{});
const proto::EtherbaseRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncEtherbase, *stub_, request, grpc_context_);
evmc::address evmc_address;
if (reply.has_address()) {
const auto& h160_address = reply.address();
Expand All @@ -58,35 +61,35 @@ Task<evmc::address> RemoteBackEnd::etherbase() {

Task<uint64_t> RemoteBackEnd::protocol_version() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncProtocolVersion> pv_rpc{*stub_, grpc_context_};
const auto reply = co_await pv_rpc.finish_on(executor_, ::remote::ProtocolVersionRequest{});
const proto::ProtocolVersionRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncProtocolVersion, *stub_, request, grpc_context_);
const auto pv = reply.id();
SILK_TRACE << "RemoteBackEnd::protocol_version version=" << pv << " t=" << clock_time::since(start_time);
co_return pv;
}

Task<BlockNum> RemoteBackEnd::net_version() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncNetVersion> nv_rpc{*stub_, grpc_context_};
const auto reply = co_await nv_rpc.finish_on(executor_, ::remote::NetVersionRequest{});
const proto::NetVersionRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncNetVersion, *stub_, request, grpc_context_);
const auto nv = reply.id();
SILK_TRACE << "RemoteBackEnd::net_version version=" << nv << " t=" << clock_time::since(start_time);
co_return nv;
}

Task<std::string> RemoteBackEnd::client_version() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncClientVersion> cv_rpc{*stub_, grpc_context_};
const auto reply = co_await cv_rpc.finish_on(executor_, ::remote::ClientVersionRequest{});
const proto::ClientVersionRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncClientVersion, *stub_, request, grpc_context_);
const auto& cv = reply.node_name();
SILK_TRACE << "RemoteBackEnd::client_version version=" << cv << " t=" << clock_time::since(start_time);
co_return cv;
}

Task<uint64_t> RemoteBackEnd::net_peer_count() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncNetPeerCount> npc_rpc{*stub_, grpc_context_};
const auto reply = co_await npc_rpc.finish_on(executor_, ::remote::NetPeerCountRequest{});
const proto::NetPeerCountRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncNetPeerCount, *stub_, request, grpc_context_);
const auto count = reply.count();
SILK_TRACE << "RemoteBackEnd::net_peer_count count=" << count << " t=" << clock_time::since(start_time);
co_return count;
Expand All @@ -95,8 +98,8 @@ Task<uint64_t> RemoteBackEnd::net_peer_count() {
Task<NodeInfos> RemoteBackEnd::engine_node_info() {
NodeInfos node_info_list;
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncNodeInfo> ni_rpc{*stub_, grpc_context_};
const auto reply = co_await ni_rpc.finish_on(executor_, ::remote::NodesInfoRequest{});
const proto::NodesInfoRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncNodeInfo, *stub_, request, grpc_context_);
for (int i = 0; i < reply.nodes_info_size(); ++i) {
NodeInfo node_info;
const auto& backend_node_info = reply.nodes_info(i);
Expand All @@ -119,9 +122,8 @@ Task<NodeInfos> RemoteBackEnd::engine_node_info() {

Task<PeerInfos> RemoteBackEnd::peers() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncPeers> peers_rpc{*stub_, grpc_context_};
::google::protobuf::Empty request;
const auto reply = co_await peers_rpc.finish_on(executor_, request);
const ::google::protobuf::Empty request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncPeers, *stub_, request, grpc_context_);
PeerInfos peer_infos;
peer_infos.reserve(static_cast<size_t>(reply.peers_size()));
for (const auto& peer : reply.peers()) {
Expand All @@ -145,11 +147,10 @@ Task<PeerInfos> RemoteBackEnd::peers() {

Task<bool> RemoteBackEnd::get_block(BlockNum block_num, const HashAsSpan& hash, bool read_senders, silkworm::Block& block) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncBlock> get_block_rpc{*stub_, grpc_context_};
::remote::BlockRequest request;
request.set_block_height(block_num);
request.set_allocated_block_hash(h256_from_bytes(hash).release());
const auto reply = co_await get_block_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncBlock, *stub_, request, grpc_context_);

Check warning on line 153 in silkworm/rpc/ethbackend/remote_backend.cpp

View check run for this annotation

Codecov / codecov/patch

silkworm/rpc/ethbackend/remote_backend.cpp#L153

Added line #L153 was not covered by tests
ByteView block_rlp{string_view_to_byte_view(reply.block_rlp())};
if (const auto decode_result{rlp::decode(block_rlp, block)}; !decode_result) {
co_return false;
Expand All @@ -171,10 +172,9 @@ Task<bool> RemoteBackEnd::get_block(BlockNum block_num, const HashAsSpan& hash,

Task<std::optional<BlockNum>> RemoteBackEnd::get_block_num_from_txn_hash(const HashAsSpan& hash) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncTxnLookup> txn_lookup_rpc{*stub_, grpc_context_};
::remote::TxnLookupRequest request;
request.set_allocated_txn_hash(h256_from_bytes(hash).release());
const auto reply = co_await txn_lookup_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncTxnLookup, *stub_, request, grpc_context_);
if (reply.block_number() == 0) {
co_return std::nullopt;
}
Expand All @@ -185,10 +185,9 @@ Task<std::optional<BlockNum>> RemoteBackEnd::get_block_num_from_txn_hash(const H

Task<std::optional<BlockNum>> RemoteBackEnd::get_block_num_from_hash(const HashAsSpan& hash) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncHeaderNumber> header_number_rpc{*stub_, grpc_context_};
::remote::HeaderNumberRequest request;
request.set_allocated_hash(h256_from_bytes(hash).release());
const auto reply = co_await header_number_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncHeaderNumber, *stub_, request, grpc_context_);
if (!reply.has_number()) {
co_return std::nullopt;
}
Expand All @@ -199,10 +198,9 @@ Task<std::optional<BlockNum>> RemoteBackEnd::get_block_num_from_hash(const HashA

Task<std::optional<evmc::bytes32>> RemoteBackEnd::get_block_hash_from_block_num(BlockNum block_num) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncCanonicalHash> canonical_hsh_rpc{*stub_, grpc_context_};
::remote::CanonicalHashRequest request;
request.set_block_number(block_num);
const auto reply = co_await canonical_hsh_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncCanonicalHash, *stub_, request, grpc_context_);
evmc::bytes32 hash;
if (reply.has_hash() == 0) {
co_return std::nullopt;
Expand All @@ -215,10 +213,9 @@ Task<std::optional<evmc::bytes32>> RemoteBackEnd::get_block_hash_from_block_num(

Task<std::optional<Bytes>> RemoteBackEnd::canonical_body_for_storage(BlockNum block_num) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncCanonicalBodyForStorage> canonical_body_for_storage_rpc{*stub_, grpc_context_};
::remote::CanonicalBodyForStorageRequest request;
request.set_blocknumber(block_num);
const auto reply = co_await canonical_body_for_storage_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncCanonicalBodyForStorage, *stub_, request, grpc_context_);
SILK_TRACE << "RemoteBackEnd::canonical_body_for_storage block_num=" << block_num
<< " t=" << clock_time::since(start_time);
if (reply.body().empty()) {
Expand Down
31 changes: 15 additions & 16 deletions silkworm/rpc/ethbackend/remote_backend_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
#include <string>
#include <utility>

#include <agrpc/test.hpp>
#include <catch2/catch_test_macros.hpp>
#include <evmc/evmc.hpp>
#include <gmock/gmock.h>

#include <silkworm/core/common/bytes_to_string.hpp>
#include <silkworm/core/common/util.hpp>
#include <silkworm/infra/grpc/client/call.hpp>
#include <silkworm/infra/grpc/test_util/grpc_actions.hpp>
#include <silkworm/infra/grpc/test_util/grpc_responder.hpp>
#include <silkworm/interfaces/remote/ethbackend_mock.grpc.pb.h>
Expand Down Expand Up @@ -83,7 +82,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::etherbase", "[silkworm][rpc][ethbacke

SECTION("call etherbase and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::etherbase>()), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::etherbase>()), rpc::GrpcStatusError);
}
}

Expand All @@ -107,7 +106,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::protocol_version", "[silkworm][rpc][e

SECTION("call protocol_version and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::protocol_version>()), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::protocol_version>()), rpc::GrpcStatusError);
}
}

Expand All @@ -131,7 +130,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::net_version", "[silkworm][rpc][ethbac

SECTION("call net_version and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::net_version>()), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::net_version>()), rpc::GrpcStatusError);
}
}

Expand All @@ -155,7 +154,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::client_version", "[silkworm][rpc][eth

SECTION("call client_version and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::client_version>()), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::client_version>()), rpc::GrpcStatusError);
}
}

Expand All @@ -182,7 +181,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::get_block_num_from_txn_hash", "[silkw

SECTION("call get_block_num_from_txn_hash and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::get_block_num_from_txn_hash>(hash.bytes)), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::get_block_num_from_txn_hash>(hash.bytes)), rpc::GrpcStatusError);
}
}

Expand Down Expand Up @@ -214,7 +213,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::get_block_num_from_hash", "[silkworm]

SECTION("call get_block_num_from_hash and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::get_block_num_from_hash>(hash.bytes)), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::get_block_num_from_hash>(hash.bytes)), rpc::GrpcStatusError);
}
}

Expand All @@ -233,7 +232,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::canonical_body_for_storage", "[silkwo

SECTION("call get_block_num_from_hash and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::canonical_body_for_storage>(block_num)), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::canonical_body_for_storage>(block_num)), rpc::GrpcStatusError);
}
}

Expand All @@ -258,7 +257,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::get_block_hash_from_block_num", "[sil

SECTION("call get_block_hash_from_block_num and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::get_block_hash_from_block_num>(block_num)), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::get_block_hash_from_block_num>(block_num)), rpc::GrpcStatusError);
}
}

Expand All @@ -282,7 +281,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::net_peer_count", "[silkworm][rpc][eth

SECTION("call net_peer_count and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::net_peer_count>()), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::net_peer_count>()), rpc::GrpcStatusError);
}
}

Expand All @@ -304,8 +303,8 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::node_info", "[silkworm][rpc][ethbacke
ports_ref->set_listener(30000);
reply->set_allocated_ports(ports_ref);
std::string protocols = std::string(R"({"eth": {"network":5, "difficulty":10790000, "genesis":"0xbf7e331f7f7c1dd2e05159666b3bf8bc7a8a3a9eb1d518969eab529dd9b88c1a",)");
protocols += R"( "config": {"ChainName":"goerli", "chainId":5, "consensus":"clique", "homesteadBlock":0, "daoForkSupport":true, "eip150Block":0,)";
protocols += R"( "eip150Hash":"0x0000000000000000000000000000000000000000000000000000000000000000", "eip155Block":0, "byzantiumBlock":0, "constantinopleBlock":0,)";
protocols += R"("config": {"ChainName":"goerli", "chainId":5, "consensus":"clique", "homesteadBlock":0, "daoForkSupport":true, "eip150Block":0,)";
protocols += R"("eip150Hash":"0x0000000000000000000000000000000000000000000000000000000000000000", "eip155Block":0, "byzantiumBlock":0, "constantinopleBlock":0,)";
protocols += R"("petersburgBlock":0, "istanbulBlock":1561651, "berlinBlock":4460644, "londonBlock":5062605, "terminalTotalDifficulty":10790000,)";
protocols += R"("terminalTotalDifficultyPassed":true, "clique": {"period":15, "epoch":30000}},)";
protocols += R"("head":"0x11fce21bdebbcf09e1e2e37b874729c17518cd342fcf0959659e650fa45f9768"}})";
Expand Down Expand Up @@ -382,7 +381,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::node_info", "[silkworm][rpc][ethbacke
SECTION("call engine_get_payload and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::engine_get_payload>(0u)), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::engine_get_payload>(0u)), rpc::GrpcStatusError);
}
}
Expand Down Expand Up @@ -491,7 +490,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::engine_new_payload", "[silkworm][rpc]
SECTION("call engine_new_payload and get error [i=" + std::to_string(i) + "]") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::engine_new_payload>(new_payload_request)), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::engine_new_payload>(new_payload_request)), rpc::GrpcStatusError);
}
}
}
Expand Down Expand Up @@ -538,7 +537,7 @@ TEST_CASE_METHOD(EthBackendTest, "BackEnd::engine_forkchoice_updated", "[silkwor
SECTION("call engine_forkchoice_updated_v1 and get error") {
EXPECT_CALL(reader, Finish).WillOnce(test::finish_cancelled(grpc_context_));
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::engine_forkchoice_updated>(forkchoice_request)), boost::system::system_error);
CHECK_THROWS_AS((run<&ethbackend::RemoteBackEnd::engine_forkchoice_updated>(forkchoice_request)), rpc::GrpcStatusError);
}
}*/

Expand Down
Loading

0 comments on commit c33e4d1

Please sign in to comment.