Skip to content

Commit

Permalink
fix: discv4 ensure no duplicate processing in crawls
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 22, 2024
1 parent 494697b commit c51ab62
Showing 1 changed file with 43 additions and 40 deletions.
83 changes: 43 additions & 40 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,9 @@ func (e *Engine[I, R]) Run(ctx context.Context) (map[string]I, error) {
e.tasksChan = nil
break
}
if _, found := e.inflight[string(task.ID())]; found {
break
}
e.peerQueue.Push(string(task.ID()), task, 1)

e.enqueueTask(task)

case observeFn, more := <-e.telemetry.obsChan:
// an opentelemetry gauge wants to perform an observation
if !more {
Expand Down Expand Up @@ -386,42 +385,7 @@ func (e *Engine[I, R]) handlePeerResult(ctx context.Context, result Result[R]) {

// process the new tasks that came out of handling the peer result
for _, task := range newTasks {
mapKey := string(task.ID())

// Don't add this peer to the queue if we're currently querying it
if _, isInflight := e.inflight[mapKey]; isInflight {
continue
}

// Don't add the peer to the queue if we have already processed it
if _, processed := e.processed[mapKey]; processed {
continue
}

// Check if we have already queued this peer. If so, merge the new
// information with the already existing ones.
queuedTask, isQueued := e.peerQueue.Find(mapKey)
if isQueued {
task = task.Merge(queuedTask)
}

// If we don't know any multi addresses for the peer yet, we push it
// to the end of our priority queue by giving it a low priority. If we
// find that peer again in another routing table, we might find another
// multi address. In that case, we update the set of addresses and
// increase the priority.
priority := 1
if len(e.maddrFilter(task.Addrs())) == 0 {
priority = 0
}

// If the peer was already queued we only update its priority. If the
// peer wasn't queued, we push it to the queue.
if isQueued {
e.peerQueue.Update(mapKey, task, priority)
} else {
e.peerQueue.Push(mapKey, task, priority)
}
e.enqueueTask(task)
}

logEntry.WithFields(map[string]interface{}{
Expand All @@ -448,6 +412,45 @@ func (e *Engine[I, R]) handleWriteResult(ctx context.Context, result Result[Writ
}).Debugln("Handled writer result")
}

func (e *Engine[I, R]) enqueueTask(task I) {
mapKey := string(task.ID())

// Don't add this peer to the queue if we're currently querying it
if _, isInflight := e.inflight[mapKey]; isInflight {
return
}

// Don't add the peer to the queue if we have already processed it
if _, processed := e.processed[mapKey]; processed {
return
}

// Check if we have already queued this peer. If so, merge the new
// information with the already existing ones.
queuedTask, isQueued := e.peerQueue.Find(mapKey)
if isQueued {
task = task.Merge(queuedTask)
}

// If we don't know any multi addresses for the peer yet, we push it
// to the end of our priority queue by giving it a low priority. If we
// find that peer again in another routing table, we might find another
// multi address. In that case, we update the set of addresses and
// increase the priority.
priority := 1
if len(e.maddrFilter(task.Addrs())) == 0 {
priority = 0
}

// If the peer was already queued we only update its priority. If the
// peer wasn't queued, we push it to the queue.
if isQueued {
e.peerQueue.Update(mapKey, task, priority)
} else {
e.peerQueue.Push(mapKey, task, priority)
}
}

// reachedProcessingLimit returns true if the processing limit is configured
// (aka != 0) and the processed peers exceed this limit.
func (e *Engine[I, R]) reachedProcessingLimit() bool {
Expand Down

0 comments on commit c51ab62

Please sign in to comment.