Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
76c2af8
feat: add dataset request batching
gmega Jun 5, 2025
93a355c
feat: cap how many blocks we can pack in a single message
gmega Jun 6, 2025
62c4056
update engine tests; add BlockAddress hashing tests
gmega Jun 6, 2025
c268741
replace list operations with sets
gmega Jun 6, 2025
25b9884
optimize remaining list joins so they're not quadratic
gmega Jun 9, 2025
8f7ddff
adapt existing tests to new data structures, remove vestigial tests
gmega Jun 9, 2025
7a8c7f9
Update codex/stores/networkstore.nim
gmega Jun 9, 2025
3c758d2
fix: refresh timestamp before issuing request to prevent flood of kno…
gmega Jun 10, 2025
36a895c
feat: add SafeAsyncIter chaining
gmega Jun 27, 2025
b037978
feat: remove quadratic joins in cancelBlocks; use SafeAsyncIterator f…
gmega Jun 30, 2025
7b00624
feat: allow futures to be returned out-of-order to decrease memory co…
gmega Jun 30, 2025
934fc26
chore: remove file committed by accident
gmega Jun 30, 2025
422c976
feat: modify retry mechanism; add DHT guard rails; improve block canc…
gmega Jul 2, 2025
1d96fdd
feat: drop peer on activity timeout
gmega Jul 2, 2025
d225cbc
fix: fix block exchange test to stricter protocol; minor refactor
gmega Jul 3, 2025
d4d75cf
fix: fix testdiscovery so it works with stricter block protocol
gmega Jul 3, 2025
f6185da
feat: add stopgap "adaptive" refresh
gmega Jul 3, 2025
78e2fd4
feat: add block knowledge request mechanism, implement tests
gmega Jul 8, 2025
788164a
fix: randomize block refresh time, optimize context store checks
gmega Jul 9, 2025
048f728
Implement load-balanced peer selection for block requests
cnanakos Sep 12, 2025
0fc4ca5
feat: implement weighted random peer selection for load balancing
cnanakos Sep 15, 2025
909ddbc
perf: optimize further fetchBatched
cnanakos Sep 15, 2025
806b14f
feat: add peer count limits to discovery engine
cnanakos Sep 16, 2025
89d8ca3
perf: optimize block batch size from 500 to 50 blocks per message
cnanakos Sep 24, 2025
9b51bb2
feat: add strategic runtime metrics for block exchange monitoring
cnanakos Sep 26, 2025
67f99f4
fix: resolve stuck peer refresh state preventing block discovery
cnanakos Sep 26, 2025
ea98fd2
refactor: make markRequested idempotent
cnanakos Sep 29, 2025
5e74fea
chore: apply nph formatting
cnanakos Sep 30, 2025
372151e
fix: assign selectPeer field in BlockExcEngine ctor
cnanakos Sep 30, 2025
a7b4c3d
refactor: remove makeRandomDataset helper function
cnanakos Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
## those terms.

import std/sequtils
import std/algorithm

import pkg/chronos
import pkg/libp2p/cid
Expand Down Expand Up @@ -38,6 +39,7 @@ const
DefaultConcurrentDiscRequests = 10
DefaultDiscoveryTimeout = 1.minutes
DefaultMinPeersPerBlock = 3
DefaultMaxPeersPerBlock = 8
DefaultDiscoveryLoopSleep = 3.seconds

type DiscoveryEngine* = ref object of RootObj
Expand All @@ -51,11 +53,32 @@ type DiscoveryEngine* = ref object of RootObj
discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
minPeersPerBlock*: int # Min number of peers with block
maxPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]]
# Inflight discovery requests

proc cleanupExcessPeers(b: DiscoveryEngine, cid: Cid) {.gcsafe, raises: [].} =
var haves = b.peers.peersHave(cid)
let count = haves.len - b.maxPeersPerBlock
if count <= 0:
return

haves.sort(
proc(a, b: BlockExcPeerCtx): int =
cmp(a.lastExchange, b.lastExchange)
)

let toRemove = haves[0 ..< count]
for peer in toRemove:
try:
peer.cleanPresence(BlockAddress.init(cid))
trace "Removed block presence from peer", cid, peer = peer.id
except CatchableError as exc:
error "Failed to clean presence for peer",
cid, peer = peer.id, error = exc.msg, name = exc.name

proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
try:
while b.discEngineRunning:
Expand All @@ -78,8 +101,16 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
trace "Discovery request already in progress", cid
continue

trace "Running discovery task for cid", cid

let haves = b.peers.peersHave(cid)

if haves.len > b.maxPeersPerBlock:
trace "Cleaning up excess peers",
cid, peers = haves.len, max = b.maxPeersPerBlock
b.cleanupExcessPeers(cid)
continue

if haves.len < b.minPeersPerBlock:
let request = b.discovery.find(cid)
b.inFlightDiscReqs[cid] = request
Expand Down Expand Up @@ -156,6 +187,7 @@ proc new*(
concurrentDiscReqs = DefaultConcurrentDiscRequests,
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock,
maxPeersPerBlock = DefaultMaxPeersPerBlock,
): DiscoveryEngine =
## Create a discovery engine instance for advertising services
##
Expand All @@ -171,4 +203,5 @@ proc new*(
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock,
maxPeersPerBlock: maxPeersPerBlock,
)
Loading
Loading