Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Fix task queue so that the updated failure count for requests is stored #893

Merged
merged 3 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BytesTaskQueue;
import tech.pegasys.pantheon.services.queue.BytesTaskQueueAdapter;
import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;

Expand Down Expand Up @@ -170,8 +168,7 @@ private static void ensureDirectoryExists(final File dir) {

private static TaskQueue<NodeDataRequest> createWorldStateDownloaderQueue(
final Path dataDirectory, final MetricsSystem metricsSystem) {
final BytesTaskQueue bytesQueue = RocksDbTaskQueue.create(dataDirectory, metricsSystem);
return new BytesTaskQueueAdapter<>(
bytesQueue, NodeDataRequest::serialize, NodeDataRequest::deserialize);
return RocksDbTaskQueue.create(
dataDirectory, NodeDataRequest::serialize, NodeDataRequest::deserialize, metricsSystem);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.time.Duration;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -73,6 +75,7 @@ private enum Status {
private volatile CompletableFuture<Void> future;
private volatile Status status = Status.IDLE;
private volatile BytesValue rootNode;
private final AtomicInteger highestRetryCount = new AtomicInteger(0);

public WorldStateDownloader(
final EthContext ethContext,
Expand Down Expand Up @@ -106,6 +109,12 @@ public WorldStateDownloader(
MetricCategory.SYNCHRONIZER,
"world_state_retried_requests_total",
"Total number of node data requests repeated as part of fast sync world state download");

metricsSystem.createIntegerGauge(
MetricCategory.SYNCHRONIZER,
"world_state_node_request_failures_max",
"Highest number of times a node data request has been retried in this download",
highestRetryCount::get);
}

public CompletableFuture<Void> run(final BlockHeader header) {
Expand All @@ -115,16 +124,17 @@ public CompletableFuture<Void> run(final BlockHeader header) {
header.getHash());
synchronized (this) {
if (status == Status.RUNNING) {
CompletableFuture<Void> failed = new CompletableFuture<>();
final CompletableFuture<Void> failed = new CompletableFuture<>();
failed.completeExceptionally(
new IllegalStateException(
"Cannot run an already running " + this.getClass().getSimpleName()));
return failed;
}
status = Status.RUNNING;
future = createFuture();
highestRetryCount.set(0);

Hash stateRoot = header.getStateRoot();
final Hash stateRoot = header.getStateRoot();
if (worldStateStorage.isWorldStateAvailable(stateRoot)) {
// If we're requesting data for an existing world state, we're already done
markDone();
Expand All @@ -144,23 +154,23 @@ public void cancel() {
private void requestNodeData(final BlockHeader header) {
if (sendingRequests.compareAndSet(false, true)) {
while (shouldRequestNodeData()) {
Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());
final Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());

if (!maybePeer.isPresent()) {
// If no peer is available, wait and try again
waitForNewPeer().whenComplete((r, t) -> requestNodeData(header));
break;
} else {
EthPeer peer = maybePeer.get();
final EthPeer peer = maybePeer.get();

// Collect data to be requested
List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
final List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
final Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
if (pendingRequestTask == null) {
break;
}
NodeDataRequest pendingRequest = pendingRequestTask.getData();
final NodeDataRequest pendingRequest = pendingRequestTask.getData();
final Optional<BytesValue> existingData =
pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
Expand All @@ -176,7 +186,7 @@ private void requestNodeData(final BlockHeader header) {
sendAndProcessRequests(peer, toRequest, header)
.whenComplete(
(task, error) -> {
boolean done;
final boolean done;
synchronized (this) {
outstandingRequests.remove(task);
done =
Expand Down Expand Up @@ -217,13 +227,13 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
final EthPeer peer,
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader) {
List<Hash> hashes =
final List<Hash> hashes =
requestTasks.stream()
.map(Task::getData)
.map(NodeDataRequest::getHash)
.distinct()
.collect(Collectors.toList());
AbstractPeerTask<List<BytesValue>> ethTask =
final AbstractPeerTask<List<BytesValue>> ethTask =
GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer).assignPeer(peer);
outstandingRequests.add(ethTask);
return ethTask
Expand All @@ -232,14 +242,15 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
.thenApply(this::mapNodeDataByHash)
.handle(
(data, err) -> {
boolean requestFailed = err != null;
Updater storageUpdater = worldStateStorage.updater();
for (Task<NodeDataRequest> task : requestTasks) {
NodeDataRequest request = task.getData();
BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
final boolean requestFailed = err != null;
final Updater storageUpdater = worldStateStorage.updater();
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
retriedRequestsTotal.inc();
int requestFailures = request.trackFailure();
final int requestFailures = request.trackFailure();
updateHighestRetryCount(requestFailures);
if (requestFailures > maxNodeRequestRetries) {
handleStalledDownload();
}
Expand All @@ -263,6 +274,14 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
});
}

private void updateHighestRetryCount(final int requestFailures) {
int previousHighestRetry = highestRetryCount.get();
while (requestFailures > previousHighestRetry) {
highestRetryCount.compareAndSet(previousHighestRetry, requestFailures);
previousHighestRetry = highestRetryCount.get();
}
}

private synchronized void queueChildRequests(final NodeDataRequest request) {
if (status == Status.RUNNING) {
request.getChildRequests().forEach(pendingRequests::enqueue);
Expand All @@ -277,15 +296,17 @@ private synchronized CompletableFuture<Void> getFuture() {
}

private CompletableFuture<Void> createFuture() {
CompletableFuture<Void> future = new CompletableFuture<>();
final CompletableFuture<Void> future = new CompletableFuture<>();
future.whenComplete(
(res, err) -> {
// Handle cancellations
if (future.isCancelled()) {
LOG.info("World state download cancelled");
doCancelDownload();
} else if (err != null) {
LOG.info("World state download failed. ", err);
if (!(ExceptionUtils.rootCause(err) instanceof StalledDownloadException)) {
LOG.info("World state download failed. ", err);
}
doCancelDownload();
}
});
Expand All @@ -297,14 +318,14 @@ private synchronized void handleStalledDownload() {
"Download stalled due to too many failures to retrieve node data (>"
+ maxNodeRequestRetries
+ " failures)";
WorldStateDownloaderException e = new StalledDownloadException(message);
final WorldStateDownloaderException e = new StalledDownloadException(message);
future.completeExceptionally(e);
}

private synchronized void doCancelDownload() {
status = Status.CANCELLED;
pendingRequests.clear();
for (EthTask<?> outstandingRequest : outstandingRequests) {
for (final EthTask<?> outstandingRequest : outstandingRequests) {
outstandingRequest.cancel();
}
}
Expand All @@ -323,7 +344,7 @@ private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest

private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
// Map data by hash
Map<Hash, BytesValue> dataByHash = new HashMap<>();
final Map<Hash, BytesValue> dataByHash = new HashMap<>();
data.stream().forEach(d -> dataByHash.put(Hash.hash(d), d));
return dataByHash;
}
Expand Down

This file was deleted.

This file was deleted.

Loading