Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
Missing file in commit
Browse files Browse the repository at this point in the history
  • Loading branch information
kortatu committed Sep 25, 2019
1 parent 97ab47e commit e666714
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 23 deletions.
10 changes: 6 additions & 4 deletions pss/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pss

import (
"fmt"
"github.com/ethersphere/swarm/log"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestForwardBasic(t *testing.T) {
}
testCases = append(testCases, c)

// test with partial addresses
//test with partial addresses
const part = 12

for i := 0; i < firstNearest; i++ {
Expand Down Expand Up @@ -198,7 +199,7 @@ func TestForwardBasic(t *testing.T) {
expected: all[indexAtPo8:],
exclusive: false,
}
testCases = append(testCases, c)
//testCases = append(testCases, c)

// luminous radius of 256 bits, proximity order 8
a4 := pot.Address{}
Expand All @@ -210,7 +211,7 @@ func TestForwardBasic(t *testing.T) {
expected: []int{indexAtPo8, indexAtPo8 + 1},
exclusive: true,
}
testCases = append(testCases, c)
//testCases = append(testCases, c)

// check correct behaviour in case send fails
for i := 2; i < firstNearest-3; i += 2 {
Expand All @@ -220,7 +221,7 @@ func TestForwardBasic(t *testing.T) {
// msg should be received by only one of the deeper peers.
a := pot.RandomAddressAt(base, po)
c = testCase{
name: fmt.Sprintf("Send direct to known, id: [%d]", i),
name: fmt.Sprintf("Send direct to known with errors, id: [%d] po=%v", i, po),
recipient: a[:],
peers: peerAddresses,
expected: all[i+1:],
Expand All @@ -238,6 +239,7 @@ func TestForwardBasic(t *testing.T) {
// this function tests the forwarding of a single message. the recipient address is passed as param,
// along with addresses of all peers, and indices of those peers which are expected to receive the message.
func testForwardMsg(t *testing.T, ps *Pss, c *testCase) {
log.Debug("Test forward msg", "name", c.name)
recipientAddr := c.recipient
peers := c.peers
expected := c.expected
Expand Down
68 changes: 49 additions & 19 deletions pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (o outbox) reenqueue(slot int) {
type Pss struct {
*network.Kademlia // we can get the Kademlia address from this
*KeyStore
kademliaLB *network.KademliaLoadBalancer
forwardCache *ttlset.TTLSet
gcTicker *ticker.Ticker

Expand Down Expand Up @@ -249,6 +250,7 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) {
Kademlia: k,
KeyStore: loadKeyStore(),

kademliaLB: network.NewKademliaLoadBalancer(k),
privateKey: params.privateKey,
quitC: make(chan struct{}),

Expand Down Expand Up @@ -318,6 +320,7 @@ func (p *Pss) Stop() error {
return err
}
close(p.quitC)
p.kademliaLB.Stop()
return nil
}

Expand Down Expand Up @@ -803,29 +806,56 @@ func (p *Pss) forward(msg *message.Message) error {
onlySendOnce = true
}

p.Kademlia.EachConnLB(to, addressLength*8, func(sp *network.Peer, po int) (continueIt bool, peerUsed bool) {
if po < broadcastThreshold && sent > 0 {
return // stop iterating, peer not used
p.kademliaLB.EachBin(to, func(bin network.LBBin) bool {
if bin.ProximityOrder < broadcastThreshold && sent > 0 {
// This bin is at the same distance as the node to the message. If already sent, we stop sending
return false
}
peerUsed = true
if sendFunc(p, sp, msg) {
sent++
if onlySendOnce {
continueIt = false
// stop iterating peer used
return
}
if po == addressLength*8 {
continueIt = false
// stop iterating if successfully sent to the exact recipient (perfect match of full address)
// peer used
return
for _, lbPeer := range bin.LBPeers {
if sendFunc(p, lbPeer.Peer, msg) {
lbPeer.Use()
sent++
if onlySendOnce {
return false
}
if bin.ProximityOrder == addressLength*8 {
// stop iterating if successfully sent to the exact recipient (perfect match of full address)
return false //stop iterating
}
}
}
continueIt = true
// continue iterating, peer used
return
return true
})
//if onlySendOnce {
// p.Kademlia.EachConnDescendingBinLB(to, 0, func(sp *network.Peer, _ int) (continueIt bool, peerUsed bool) {
// if sendFunc(p, sp, msg) {
// sent++
// // stop iterating peer used
// return false, true
// } else {
// log.Warn("Error sending only once")
// }
// // continue iterating, peer used
// return true, true
// })
//} else {
// p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool {
// if po < broadcastThreshold && sent > 0 {
// return false// stop iterating
// }
// if sendFunc(p, sp, msg) {
// sent++
// if po == addressLength*8 {
// // stop iterating if successfully sent to the exact recipient (perfect match of full address)
// return false
// }
// } else {
// log.Warn("Error sending")
// }
// // continue iterating
// return true
// })
//}

// cache the message
p.addFwdCache(msg)
Expand Down

0 comments on commit e666714

Please sign in to comment.