From 143c845a4c0b8a3eef3c75ca2ad17299ccc10ebc Mon Sep 17 00:00:00 2001 From: ian Date: Wed, 13 Dec 2023 17:11:36 +0000 Subject: [PATCH] Fix IPNS publish race condition --- .../org/peergos/protocol/dht/Kademlia.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index 93c7d591..730f6bf6 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -336,7 +336,7 @@ public CompletableFuture publishValue(Multihash publisher, byte[] key = IPNS.getKey(publisher); Id keyId = Id.create(Hash.sha256(key), 256); - SortedSet toQuery = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId))); + SortedSet toQuery = new TreeSet<>((a, b) -> compareKeys(a, b, keyId)); List localClosest = engine.getKClosestPeers(key); int queryParallelism = 3; toQuery.addAll(localClosest.stream() @@ -349,29 +349,30 @@ public CompletableFuture publishValue(Multihash publisher, List thisRound = toQuery.stream() .limit(queryParallelism) .collect(Collectors.toList()); - List> futures = thisRound.stream() + List>> futures = thisRound.stream() .map(r -> { toQuery.remove(r); queried.add(r.addresses.peerId); return ioExec.submit(() -> getCloserPeers(key, r.addresses, us).thenApply(res -> { - for (PeerAddresses peer : res) { - if (! queried.contains(peer.peerId)) { - Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256); - RoutingEntry e = new RoutingEntry(peerKey, peer); - toQuery.add(e); - } - } - ioExec.submit(() -> { - if (putValue(publisher, signedRecord, r.addresses, us)) - publishes.add(r.addresses.peerId); - }); - return true; - })); + List more = new ArrayList<>(); + for (PeerAddresses peer : res) { + if (! queried.contains(peer.peerId)) { + Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256); + RoutingEntry e = new RoutingEntry(peerKey, peer); + more.add(e); + } + } + ioExec.submit(() -> { + if (putValue(publisher, signedRecord, r.addresses, us)) + publishes.add(r.addresses.peerId); + }); + return more; + }).join()); }) .collect(Collectors.toList()); futures.forEach(f -> { try { - f.get(); + toQuery.addAll(f.get()); } catch (Exception e) {} }); // exit early if we have enough results @@ -380,7 +381,7 @@ public CompletableFuture publishValue(Multihash publisher, if (toQuery.size() == remaining) { // publish to closest remaining nodes while (publishes.size() < minPublishes) { - List closest = new TreeSet<>(toQuery).stream() + List closest = toQuery.stream() .limit(minPublishes - publishes.size() + 5) .collect(Collectors.toList()); List> lastFutures = closest.stream()