Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Add GetNodeDataFromPeerTask. (#597)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored and mbaxter committed Jan 18, 2019
1 parent f3a0d4c commit 2fa4cb9
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.List;

public final class BlockBodiesMessage extends AbstractMessageData {

public static BlockBodiesMessage readFrom(final MessageData message) {
Expand Down Expand Up @@ -53,7 +55,7 @@ public int getCode() {
return EthPV62.BLOCK_BODIES;
}

public <C> Iterable<BlockBody> bodies(final ProtocolSchedule<C> protocolSchedule) {
public <C> List<BlockBody> bodies(final ProtocolSchedule<C> protocolSchedule) {
final BlockHashFunction blockHashFunction =
ScheduleBasedBlockHashFunction.create(protocolSchedule);
return new BytesValueRLPInput(data, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Optional;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -103,7 +102,7 @@ protected Optional<List<Block>> processResponse(
}

final BlockBodiesMessage bodiesMessage = BlockBodiesMessage.readFrom(message);
final List<BlockBody> bodies = Lists.newArrayList(bodiesMessage.bodies(protocolSchedule));
final List<BlockBody> bodies = bodiesMessage.bodies(protocolSchedule);
if (bodies.size() == 0) {
// Message contains no data - nothing to do
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerRequestTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask<List<BytesValue>> {

private static final Logger LOG = LogManager.getLogger();

private final Set<Hash> hashes;

private GetNodeDataFromPeerTask(
final EthContext ethContext,
final Collection<Hash> hashes,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, EthPV63.GET_NODE_DATA, ethTasksTimer);
this.hashes = new HashSet<>(hashes);
}

public static GetNodeDataFromPeerTask forHashes(
final EthContext ethContext,
final Collection<Hash> hashes,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetNodeDataFromPeerTask(ethContext, hashes, ethTasksTimer);
}

@Override
protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected {
LOG.debug("Requesting {} node data entries from peer {}.", hashes.size(), peer);
return peer.getNodeData(hashes);
}

@Override
protected Optional<List<BytesValue>> processResponse(
final boolean streamClosed, final MessageData message, final EthPeer peer) {
if (streamClosed) {
// We don't record this as a useless response because it's impossible to know if a peer has
// the data we're requesting.
return Optional.of(emptyList());
}
final NodeDataMessage nodeDataMessage = NodeDataMessage.readFrom(message);
final List<BytesValue> nodeData = nodeDataMessage.nodeData();
if (nodeData.isEmpty()) {
return Optional.empty();
} else if (nodeData.size() > hashes.size()) {
// Can't be the response to our request
return Optional.empty();
}

if (nodeData.stream().anyMatch(data -> !hashes.contains(Hash.hash(data)))) {
// Message contains unrequested data, must not be the response to our request.
return Optional.empty();
}
return Optional.of(nodeData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,12 @@ public static Responder blockchainResponder(
*/
public static <C> Responder partialResponder(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final ProtocolSchedule<C> protocolSchedule,
final float portion) {
checkArgument(portion >= 0.0 && portion <= 1.0, "Portion is in the range [0.0..1.0]");

final Responder fullResponder = blockchainResponder(blockchain);
final Responder fullResponder = blockchainResponder(blockchain, worldStateArchive);
return (cap, msg) -> {
final Optional<MessageData> maybeResponse = fullResponder.respond(cap, msg);
if (!maybeResponse.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -83,9 +82,10 @@ protected abstract void assertResultMatchesExpectation(
T requestedData, R response, EthPeer respondingPeer);

@Test
public void completesWhenPeersAreResponsive() throws ExecutionException, InterruptedException {
public void completesWhenPeersAreResponsive() {
// Setup a responsive peer
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);

Expand All @@ -109,8 +109,7 @@ public void completesWhenPeersAreResponsive() throws ExecutionException, Interru
}

@Test
public void doesNotCompleteWhenPeersDoNotRespond()
throws ExecutionException, InterruptedException {
public void doesNotCompleteWhenPeersDoNotRespond() {
// Setup a unresponsive peer
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);

Expand All @@ -129,7 +128,7 @@ public void doesNotCompleteWhenPeersDoNotRespond()
}

@Test
public void cancel() throws ExecutionException, InterruptedException {
public void cancel() {
// Setup a unresponsive peer
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public abstract class PeerMessageTaskTest<T> extends AbstractMessageTaskTest<T,
public void completesWhenPeerReturnsPartialResult() {
// Setup a partially responsive peer
final Responder responder =
RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f);
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.5f);
final RespondingEthPeer respondingEthPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public void failsWhenPeerReturnsPartialResultThenStops() {

// Setup a partially responsive peer and a non-responsive peer
final Responder partialResponder =
RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f);
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.5f);
final Responder emptyResponder = RespondingEthPeer.emptyResponder();
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
Expand Down Expand Up @@ -96,10 +97,18 @@ public void completesWhenPeerReturnsPartialResult()
final CompletableFuture<T> future = task.run();

// Respond with partial data up until complete.
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.25f));
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.50f));
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.75f));
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 1.0f));
respondingPeer.respond(
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.25f));
respondingPeer.respond(
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.50f));
respondingPeer.respond(
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 0.75f));
respondingPeer.respond(
RespondingEthPeer.partialResponder(
blockchain, protocolContext.getWorldStateArchive(), protocolSchedule, 1.0f));

assertThat(future.isDone()).isTrue();
assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.List;

public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest<List<BytesValue>> {

@Override
protected List<BytesValue> generateDataToBeRequested() {
final List<BytesValue> requestedData = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final BlockHeader blockHeader = blockchain.getBlockHeader(10 + i).get();
requestedData.add(
protocolContext.getWorldStateArchive().getNodeData(blockHeader.getStateRoot()).get());
}
return requestedData;
}

@Override
protected EthTask<PeerTaskResult<List<BytesValue>>> createTask(
final List<BytesValue> requestedData) {
final List<Hash> hashes = requestedData.stream().map(Hash::hash).collect(toList());
return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer);
}

@Override
protected void assertPartialResultMatchesExpectation(
final List<BytesValue> requestedData, final List<BytesValue> partialResponse) {
assertThat(partialResponse.size()).isLessThanOrEqualTo(requestedData.size());
assertThat(partialResponse.size()).isGreaterThan(0);
assertThat(requestedData).containsAll(partialResponse);
}

@Override
protected void assertResultMatchesExpectation(
final List<BytesValue> requestedData,
final PeerTaskResult<List<BytesValue>> response,
final EthPeer respondingPeer) {
assertThat(response.getResult()).containsExactlyInAnyOrderElementsOf(requestedData);
assertThat(response.getPeer()).isEqualTo(respondingPeer);
}
}

0 comments on commit 2fa4cb9

Please sign in to comment.