Skip to content

Commit

Permalink
Fix IPNS publish race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Dec 13, 2023
1 parent f86c885 commit 143c845
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public CompletableFuture<Void> publishValue(Multihash publisher,

byte[] key = IPNS.getKey(publisher);
Id keyId = Id.create(Hash.sha256(key), 256);
SortedSet<RoutingEntry> toQuery = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId)));
SortedSet<RoutingEntry> toQuery = new TreeSet<>((a, b) -> compareKeys(a, b, keyId));
List<PeerAddresses> localClosest = engine.getKClosestPeers(key);
int queryParallelism = 3;
toQuery.addAll(localClosest.stream()
Expand All @@ -349,29 +349,30 @@ public CompletableFuture<Void> publishValue(Multihash publisher,
List<RoutingEntry> thisRound = toQuery.stream()
.limit(queryParallelism)
.collect(Collectors.toList());
List<? extends Future<?>> futures = thisRound.stream()
List<? extends Future<List<RoutingEntry>>> futures = thisRound.stream()
.map(r -> {
toQuery.remove(r);
queried.add(r.addresses.peerId);
return ioExec.submit(() -> getCloserPeers(key, r.addresses, us).thenApply(res -> {
for (PeerAddresses peer : res) {
if (! queried.contains(peer.peerId)) {
Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256);
RoutingEntry e = new RoutingEntry(peerKey, peer);
toQuery.add(e);
}
}
ioExec.submit(() -> {
if (putValue(publisher, signedRecord, r.addresses, us))
publishes.add(r.addresses.peerId);
});
return true;
}));
List<RoutingEntry> more = new ArrayList<>();
for (PeerAddresses peer : res) {
if (! queried.contains(peer.peerId)) {
Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256);
RoutingEntry e = new RoutingEntry(peerKey, peer);
more.add(e);
}
}
ioExec.submit(() -> {
if (putValue(publisher, signedRecord, r.addresses, us))
publishes.add(r.addresses.peerId);
});
return more;
}).join());
})
.collect(Collectors.toList());
futures.forEach(f -> {
try {
f.get();
toQuery.addAll(f.get());
} catch (Exception e) {}
});
// exit early if we have enough results
Expand All @@ -380,7 +381,7 @@ public CompletableFuture<Void> publishValue(Multihash publisher,
if (toQuery.size() == remaining) {
// publish to closest remaining nodes
while (publishes.size() < minPublishes) {
List<RoutingEntry> closest = new TreeSet<>(toQuery).stream()
List<RoutingEntry> closest = toQuery.stream()
.limit(minPublishes - publishes.size() + 5)
.collect(Collectors.toList());
List<? extends Future<?>> lastFutures = closest.stream()
Expand Down

0 comments on commit 143c845

Please sign in to comment.