Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swarm/pss: Rebase on master after kad depth change + handler refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
nolash committed Nov 26, 2018
1 parent 695a5cc commit 0bb8fe3
Showing 1 changed file with 50 additions and 19 deletions.
69 changes: 50 additions & 19 deletions swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,10 +897,38 @@ func (p *Pss) forward(msg *PssMsg) error {

// send with kademlia
// find the closest peer to the recipient and attempt to send

// number of sends performed. enables us to evaluate whether send was at all successful
sent := 0
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {

// TODO: debug, remove in production
// calculate proximity to recipient address
ponow, _ := p.Kademlia.Pof(p.BaseAddr(), to, 0)

// The effective depth is the same as nearest neighbor depth OR
// the amount of address bytes in the neighbor, whichever is shallower
// this term aliasing has the effect of considering ALL connected peers
// who match the address prefix as nearest neighbors, and we will forward
// to all of them.
effectiveDepth := p.Kademlia.NeighbourhoodDepth()
darkRadius := len(msg.To) * 8
if darkRadius < addressLength*8 && effectiveDepth > darkRadius {
effectiveDepth = darkRadius
}

// Set to depth on the first successful send
cutoffDepth := 0

p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, isproxbin bool) bool {
info := sp.Info()

// the cutoffDepth will be set after the first successful send.
// that means that before a send has been made OR the peer returned
// is still within the effective depth, we will pass through this check
if po < cutoffDepth {
return false
}

// check if the peer is running pss
var ispss bool
for _, cap := range info.Caps {
Expand All @@ -915,39 +943,42 @@ func (p *Pss) forward(msg *PssMsg) error {
}

// get the protocol peer from the forwarding peer cache
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()

// TODO: debug, remove in production
// calculate proximity from returned kademlia peer to destination and log it
powill, _ := p.Kademlia.Pof(sp.Address(), to, 0)
log.Debug("forward", "topic", label(msg.Payload.Topic[:]), "self", label(p.BaseAddr()), "to", label(sp.Address()), "dest", label(to), "po", ponow, "advance", powill-ponow)
println(p.Kademlia.String())

// attempt to send the message
// short circuit to next iteration pass when it fails
err := pp.Send(context.TODO(), msg)
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
return true
}
sent++
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))

// continue forwarding if:
// - if the peer is end recipient but the full address has not been disclosed
// - if the peer address matches the partial address fully
// - if the peer is in proxbin
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
return true
} else if isproxbin {
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
return true

// If the po is at addresslength (TODO: how can it be greater?)
// it means that the peer address is identical to the message address
// and that peer must be the final recipient
// further forwarding is thus not needed
if po >= addressLength*8 {
return false
}
// at this point we stop forwarding, and the state is as follows:
// - the peer is end recipient and we have full address
// - we are not in proxbin (directed routing)
// - partial addresses don't fully match
return false

// activate the cutoff when we have a successful send
if sent == 1 {
cutoffDepth = effectiveDepth
}
return true
})

// if we failed to send to anyone, re-insert message in the send-queue
if sent == 0 {
log.Debug("unable to forward to any peers")
if err := p.enqueue(msg); err != nil {
Expand Down

0 comments on commit 0bb8fe3

Please sign in to comment.