Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[NC-2195] Commit world state continuously (#809)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaxter authored Feb 8, 2019
1 parent c05576c commit 7e9d6b0
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage.Updater;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
Expand Down Expand Up @@ -151,6 +152,29 @@ public void getNodeData_saveAndGetRegularValue() {
assertThat(storage.getNodeData(Hash.hash(bytes))).contains(bytes);
}

@Test
public void reconcilesNonConflictingUpdaters() {
BytesValue bytesA = BytesValue.fromHexString("0x12");
BytesValue bytesB = BytesValue.fromHexString("0x1234");
BytesValue bytesC = BytesValue.fromHexString("0x123456");

KeyValueStorageWorldStateStorage storage = emptyStorage();
Updater updaterA = storage.updater();
Updater updaterB = storage.updater();

updaterA.putCode(bytesA);
updaterB.putCode(bytesA);
updaterB.putCode(bytesB);
updaterA.putCode(bytesC);

updaterA.commit();
updaterB.commit();

assertThat(storage.getCode(Hash.hash(bytesA))).contains(bytesA);
assertThat(storage.getCode(Hash.hash(bytesB))).contains(bytesB);
assertThat(storage.getCode(Hash.hash(bytesC))).contains(bytesC);
}

private KeyValueStorageWorldStateStorage emptyStorage() {
return new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BigQueue;
Expand Down Expand Up @@ -51,7 +52,6 @@ private enum Status {

private final EthContext ethContext;
private final BigQueue<NodeDataRequest> pendingRequests;
private final WorldStateStorage.Updater worldStateStorageUpdater;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final AtomicInteger outstandingRequests = new AtomicInteger(0);
Expand All @@ -74,7 +74,6 @@ public WorldStateDownloader(
this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests;
this.ethTasksTimer = ethTasksTimer;
this.worldStateStorageUpdater = worldStateStorage.updater();
}

public CompletableFuture<Void> run(final BlockHeader header) {
Expand Down Expand Up @@ -135,7 +134,6 @@ private void requestNodeData(final BlockHeader header) {
(res, error) -> {
if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) {
// We're done
worldStateStorageUpdater.commit();
markDone();
} else {
// Send out additional requests
Expand Down Expand Up @@ -182,14 +180,15 @@ private CompletableFuture<?> sendAndProcessRequests(
.whenComplete(
(data, err) -> {
boolean requestFailed = err != null;
Updater storageUpdater = worldStateStorage.updater();
for (NodeDataRequest request : requests) {
BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
pendingRequests.enqueue(request);
} else {
// Persist request data
request.setData(matchingData);
request.persist(worldStateStorageUpdater);
request.persist(storageUpdater);

// Queue child requests
request
Expand All @@ -198,6 +197,7 @@ private CompletableFuture<?> sendAndProcessRequests(
.forEach(pendingRequests::enqueue);
}
}
storageUpdater.commit();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
Expand Down Expand Up @@ -189,6 +190,11 @@ public void canRecoverFromTimeouts() {
assertAccountsMatch(localWorldState, accounts);
}

@Test
public void handlesPartialResponsesFromNetwork() {
downloadAvailableWorldStateFromPeers(5, 100, 10, 10, this::respondPartially);
}

@Test
public void doesNotRequestKnownCodeFromNetwork() {
BlockDataGenerator dataGen = new BlockDataGenerator(1);
Expand Down Expand Up @@ -479,6 +485,16 @@ private void downloadAvailableWorldStateFromPeers(
final int accountCount,
final int hashesPerRequest,
final int maxOutstandingRequests) {
downloadAvailableWorldStateFromPeers(
peerCount, accountCount, hashesPerRequest, maxOutstandingRequests, this::respondFully);
}

private void downloadAvailableWorldStateFromPeers(
final int peerCount,
final int accountCount,
final int hashesPerRequest,
final int maxOutstandingRequests,
final NetworkResponder networkResponder) {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final int trailingPeerCount = 5;
BlockDataGenerator dataGen = new BlockDataGenerator(1);
Expand Down Expand Up @@ -537,12 +553,15 @@ private void downloadAvailableWorldStateFromPeers(
CompletableFuture<?> result = downloader.run(header);

// Respond to node data requests
Responder responder =
// Send one round of full responses, so that we can get multiple requests queued up
Responder fullResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
while (!result.isDone()) {
for (RespondingEthPeer peer : usefulPeers) {
peer.respond(responder);
}
for (RespondingEthPeer peer : usefulPeers) {
peer.respond(fullResponder);
}
// Respond to remaining queued requests in custom way
if (!result.isDone()) {
networkResponder.respond(usefulPeers, remoteWorldStateArchive, result);
}

// Check that trailing peers were not queried for data
Expand All @@ -562,6 +581,57 @@ private void downloadAvailableWorldStateFromPeers(
}
}

private void respondFully(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture) {
Responder responder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
while (!downloaderFuture.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(responder);
}
}
}

private void respondPartially(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture) {
Responder fullResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
Responder partialResponder =
RespondingEthPeer.partialResponder(
mock(Blockchain.class), remoteWorldStateArchive, MainnetProtocolSchedule.create(), .5f);
Responder emptyResponder = RespondingEthPeer.emptyResponder();

// Send a few partial responses
for (int i = 0; i < 5; i++) {
for (RespondingEthPeer peer : peers) {
peer.respond(partialResponder);
}
}

// Downloader should not complete with partial responses
assertThat(downloaderFuture).isNotDone();

// Send a few empty responses
for (int i = 0; i < 3; i++) {
for (RespondingEthPeer peer : peers) {
peer.respond(emptyResponder);
}
}

// Downloader should not complete with empty responses
assertThat(downloaderFuture).isNotDone();

while (!downloaderFuture.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(fullResponder);
}
}
}

private void assertAccountsMatch(
final WorldState worldState, final List<Account> expectedAccounts) {
for (Account expectedAccount : expectedAccounts) {
Expand All @@ -577,4 +647,12 @@ private void assertAccountsMatch(
assertThat(actualStorage).isEqualTo(expectedStorage);
}
}

@FunctionalInterface
private interface NetworkResponder {
void respond(
final List<RespondingEthPeer> peers,
final WorldStateArchive remoteWorldStateArchive,
final CompletableFuture<?> downloaderFuture);
}
}

0 comments on commit 7e9d6b0

Please sign in to comment.