Skip to content

Commit

Permalink
Add getrawsnapshot RPC
Browse files Browse the repository at this point in the history
In functional tests we use p2p interface to retrieve the snapshot
before broadcasting to other nodes. It works fine but now we want to
extend tests and broadcast the valid snapshot but which is not
part of finalized epoch. The issue is that from p2p we can't
receive the non-finalized snapshot to simulate this scenario.

Having an RPC command to retrieve any available snapshot
will help us to extend fast sync tests.

Signed-off-by: Kostiantyn Stepaniuk <kostia@thirdhash.com>
  • Loading branch information
Kostiantyn Stepaniuk committed Apr 2, 2019
1 parent 59ec8c7 commit 9554954
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 32 deletions.
36 changes: 36 additions & 0 deletions src/snapshot/rpc_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,41 @@ UniValue gettipsnapshot(const JSONRPCRequest &request) {
return root;
}

UniValue getrawsnapshot(const JSONRPCRequest &request) {
if (request.fHelp) {
throw std::runtime_error(
"getrawsnapshot\n"
"\nReturns hex string that contains snapshot data\n"
"\nArguments:\n"
"1. \"snapshothash\" (hex, required) snapshot that must be returned.\n"
"\nExamples:\n" +
HelpExampleCli("getrawsnapshot", "34aa7d3aabd5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03") +
HelpExampleRpc("getrawsnapshot", "34aa7d3aabd5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03"));
}

LOCK(cs_snapshot);

uint256 snapshot_hash = uint256S(request.params[0].get_str());
std::unique_ptr<Indexer> idx = SnapshotIndex::OpenSnapshot(snapshot_hash);
if (!idx) {
throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "Snapshot was not found");
}

Snapshot snapshot;
snapshot.snapshot_hash = idx->GetSnapshotHeader().snapshot_hash;
snapshot.utxo_subset_index = 0;

Iterator iter(std::move(idx));
while (iter.Valid()) {
snapshot.utxo_subsets.emplace_back(iter.GetUTXOSubset());
iter.Next();
}

CDataStream stream(SER_NETWORK, PROTOCOL_VERSION);
stream << snapshot;
return HexStr(stream.begin(), stream.end());
}

// clang-format off
static const CRPCCommand commands[] = {
// category name actor (function) argNames
Expand All @@ -270,6 +305,7 @@ static const CRPCCommand commands[] = {
{ "snapshot", "listsnapshots", &listsnapshots, {""} },
{ "snapshot", "gettipsnapshot", &gettipsnapshot, {}},
{ "snapshot", "calcsnapshothash", &calcsnapshothash, {}},
{ "snapshot", "getrawsnapshot", &getrawsnapshot, {"snapshothash"}},
};
// clang-format on

Expand Down
44 changes: 12 additions & 32 deletions test/functional/p2p_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def on_getdata(self, message):
if i.hash in self.parent_blocks:
self.send_message(msg_witness_block(self.parent_blocks[i.hash]))

def update_snapshot_header_from(self, node):
def update_snapshot_from(self, node):
# take the latest finalized
res = next(s for s in reversed(node.listsnapshots()) if s['snapshot_finalized'])
self.snapshot_header = SnapshotHeader(
Expand All @@ -119,6 +119,8 @@ def update_snapshot_header_from(self, node):
chain_work=uint256_from_rev_hex(res['chain_work']),
total_utxo_subsets=res['total_utxo_subsets'],
)
snapshot = FromHex(Snapshot(), node.getrawsnapshot(res['snapshot_hash']))
self.snapshot_data = snapshot.utxo_subsets

def update_headers_and_blocks_from(self, node):
self.headers = []
Expand Down Expand Up @@ -292,7 +294,7 @@ def test_p2p_schema(self):
serving_p2p = syncing_node.add_p2p_connection(BaseNode(), services=SERVICE_FLAGS_WITH_SNAPSHOT)

# configure serving_p2p to have snapshot header and parent block
serving_p2p.update_snapshot_header_from(serving_node)
serving_p2p.update_snapshot_from(serving_node)
serving_p2p.update_headers_and_blocks_from(serving_node)

network_thread_start()
Expand Down Expand Up @@ -373,19 +375,10 @@ def test_sync_with_restarts(self):

# configure p2p to have snapshot header and parent block
p2p = node.add_p2p_connection(WaitNode(), services=SERVICE_FLAGS_WITH_SNAPSHOT)
p2p.update_snapshot_header_from(snap_node)
p2p.update_snapshot_from(snap_node)
p2p.update_headers_and_blocks_from(snap_node)

# helper p2p connection to fetch snapshot content
snap_p2p = snap_node.add_p2p_connection(BaseNode())

network_thread_start()
snap_p2p.wait_for_verack()

# fetch snapshot content for p2p
snap_p2p.wait_for_verack()
p2p.snapshot_data = snap_p2p.fetch_snapshot_data(p2p.snapshot_header)
snap_node.disconnect_p2ps()

# test 1. the node can be restarted after it discovered the snapshot
wait_until(lambda: p2p.snapshot_chunk1_requested, timeout=10)
Expand Down Expand Up @@ -474,34 +467,25 @@ def test_invalid_snapshot(self):
wait_until(lambda: has_valid_snapshot(snap_node, 4), timeout=10)

valid_p2p = node.add_p2p_connection(WaitNode(), services=SERVICE_FLAGS_WITH_SNAPSHOT)
valid_p2p.update_snapshot_header_from(snap_node)
valid_p2p.update_snapshot_from(snap_node)

# create the second snapshot and store it in broken_p2p
snap_node.generatetoaddress(5, snap_node.getnewaddress('', 'bech32'))
assert_equal(snap_node.getblockcount(), 16)
wait_until(lambda: has_valid_snapshot(snap_node, 9), timeout=10)

broken_p2p = node.add_p2p_connection(WaitNode(), services=SERVICE_FLAGS_WITH_SNAPSHOT)
broken_p2p.update_snapshot_header_from(snap_node)
broken_p2p.update_snapshot_from(snap_node)
broken_p2p.snapshot_data[-1].outputs[0].nValue *= 2 # break snapshot
broken_p2p.update_headers_and_blocks_from(snap_node)
valid_p2p.update_headers_and_blocks_from(snap_node)

# helper p2p connection to fetch snapshot content
snap_p2p = snap_node.add_p2p_connection(BaseNode())

network_thread_start()

# make sure that node knows about both peers
valid_p2p.wait_for_verack()
broken_p2p.wait_for_verack()

# add snapshot data to p2p
snap_p2p.wait_for_verack()
valid_p2p.snapshot_data = snap_p2p.fetch_snapshot_data(valid_p2p.snapshot_header)
broken_p2p.snapshot_data = snap_p2p.fetch_snapshot_data(broken_p2p.snapshot_header)
broken_p2p.snapshot_data[-1].outputs[0].nValue *= 2 # break snapshot
snap_node.disconnect_p2ps()

# node must pick the best snapshot
wait_until(lambda: broken_p2p.snapshot_chunk1_requested, timeout=10)
broken_p2p.return_snapshot_chunk1 = True
Expand Down Expand Up @@ -555,25 +539,21 @@ def test_cannot_sync_with_snapshot(self):
full_snap_p2p = sync_node.add_p2p_connection(WaitNode(), services=SERVICE_FLAGS_WITH_SNAPSHOT)
no_snap_p2p = sync_node.add_p2p_connection(WaitNode())
for p2p in [full_snap_p2p, no_snap_p2p]:
p2p.update_snapshot_header_from(snap_node)
p2p.update_snapshot_from(snap_node)

# add the best snapshot to half_snap_p2p
snap_node.generatetoaddress(5, snap_node.getnewaddress('', 'bech32'))
assert_equal(snap_node.getblockcount(), 16)
wait_until(lambda: has_valid_snapshot(snap_node, 9), timeout=10)
half_snap_p2p = sync_node.add_p2p_connection(WaitNode(), services=SERVICE_FLAGS_WITH_SNAPSHOT)
half_snap_p2p.update_snapshot_header_from(snap_node)
half_snap_p2p.update_snapshot_from(snap_node)
for p2p in [half_snap_p2p, full_snap_p2p, no_snap_p2p]:
p2p.update_headers_and_blocks_from(snap_node)

# retrieve snapshot data
helper_p2p = snap_node.add_p2p_connection(BaseNode())
network_thread_start()
helper_p2p.wait_for_verack()
full_snap_p2p.snapshot_data = helper_p2p.fetch_snapshot_data(full_snap_p2p.snapshot_header)
half_snap_p2p.snapshot_data = helper_p2p.fetch_snapshot_data(half_snap_p2p.snapshot_header)
self.stop_node(snap_node.index)

network_thread_start()

# test 1. the node requests snapshot from peers that have service flag set
full_snap_p2p.wait_for_verack()
half_snap_p2p.wait_for_verack()
Expand Down

0 comments on commit 9554954

Please sign in to comment.