Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm/pss: Remove spurious forwarding #1008

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion accounts/keystore/account_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ func (ac *accountCache) scanAccounts() error {
case (addr == common.Address{}):
log.Debug("Failed to decode keystore key", "path", path, "err", "missing or zero address")
default:
return &accounts.Account{Address: addr, URL: accounts.URL{Scheme: KeyStoreScheme, Path: path}}
return &accounts.Account{
Address: addr,
URL: accounts.URL{Scheme: KeyStoreScheme, Path: path},
}
}
return nil
}
Expand Down
8 changes: 6 additions & 2 deletions accounts/keystore/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ func storeNewKey(ks keyStore, rand io.Reader, auth string) (*Key, accounts.Accou
if err != nil {
return nil, accounts.Account{}, err
}
a := accounts.Account{Address: key.Address, URL: accounts.URL{Scheme: KeyStoreScheme, Path: ks.JoinPath(keyFileName(key.Address))}}
a := accounts.Account{
Address: key.Address,
URL: accounts.URL{Scheme: KeyStoreScheme, Path: ks.JoinPath(keyFileName(key.Address))},
}
if err := ks.StoreKey(a.URL.Path, key, auth); err != nil {
zeroKey(key.PrivateKey)
return nil, a, err
Expand Down Expand Up @@ -224,5 +227,6 @@ func toISO8601(t time.Time) string {
} else {
tz = fmt.Sprintf("%03d00", offset/3600)
}
return fmt.Sprintf("%04d-%02d-%02dT%02d-%02d-%02d.%09d%s", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), tz)
return fmt.Sprintf("%04d-%02d-%02dT%02d-%02d-%02d.%09d%s",
t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), tz)
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func DecryptKey(keyjson []byte, auth string) (*Key, error) {
PrivateKey: key,
}, nil
}

func DecryptDataV3(cryptoJson CryptoJSON, auth string) ([]byte, error) {
if cryptoJson.Cipher != "aes-128-ctr" {
return nil, fmt.Errorf("Cipher not supported: %v", cryptoJson.Cipher)
Expand Down
8 changes: 7 additions & 1 deletion accounts/keystore/presale.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ func importPreSaleKey(keyStore keyStore, keyJSON []byte, password string) (accou
return accounts.Account{}, nil, err
}
key.Id = uuid.NewRandom()
a := accounts.Account{Address: key.Address, URL: accounts.URL{Scheme: KeyStoreScheme, Path: keyStore.JoinPath(keyFileName(key.Address))}}
a := accounts.Account{
Address: key.Address,
URL: accounts.URL{
Scheme: KeyStoreScheme,
Path: keyStore.JoinPath(keyFileName(key.Address)),
},
}
err = keyStore.StoreKey(a.URL.Path, key, password)
return a, key, err
}
Expand Down
1 change: 0 additions & 1 deletion les/flowcontrol/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
time := mclock.Now()
peer.recalcBV(time)
peer.bufValue -= cost
peer.recalcBV(time)
rcValue, rcost := peer.cm.processed(peer.cmNode, time)
if rcValue < peer.params.BufLimit {
bv := peer.params.BufLimit - rcValue
Expand Down
7 changes: 7 additions & 0 deletions mobile/big.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func (bi *BigInt) SetString(x string, base int) {
// BigInts represents a slice of big ints.
type BigInts struct{ bigints []*big.Int }

// NewBigInts creates a slice of uninitialized big numbers.
func NewBigInts(size int) *BigInts {
return &BigInts{
bigints: make([]*big.Int, size),
}
}

// Size returns the number of big ints in the slice.
func (bi *BigInts) Size() int {
return len(bi.bigints)
Expand Down
69 changes: 50 additions & 19 deletions swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,10 +897,38 @@ func (p *Pss) forward(msg *PssMsg) error {

// send with kademlia
// find the closest peer to the recipient and attempt to send

// number of sends performed. enables us to evaluate whether send was at all successful
sent := 0
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {

// TODO: debug, remove in production
// calculate proximity to recipient address
ponow, _ := p.Kademlia.Pof(p.BaseAddr(), to, 0)

// The effective depth is the same as nearest neighbor depth OR
// the amount of address bytes in the neighbor, whichever is shallower
// this term aliasing has the effect of considering ALL connected peers
// who match the address prefix as nearest neighbors, and we will forward
// to all of them.
effectiveDepth := p.Kademlia.NeighbourhoodDepth()
darkRadius := len(msg.To) * 8
if darkRadius < addressLength*8 && effectiveDepth > darkRadius {
effectiveDepth = darkRadius
}

// Set to depth on the first successful send
cutoffDepth := 0

p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, isproxbin bool) bool {
info := sp.Info()

// the cutoffDepth will be set after the first successful send.
// that means that before a send has been made OR the peer returned
// is still within the effective depth, we will pass through this check
if po < cutoffDepth {
return false
}

// check if the peer is running pss
var ispss bool
for _, cap := range info.Caps {
Expand All @@ -915,39 +943,42 @@ func (p *Pss) forward(msg *PssMsg) error {
}

// get the protocol peer from the forwarding peer cache
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use already assigned local var sp.Info() -> info

p.fwdPoolMu.RUnlock()

// TODO: debug, remove in production
// calculate proximity from returned kademlia peer to destination and log it
powill, _ := p.Kademlia.Pof(sp.Address(), to, 0)
log.Debug("forward", "topic", label(msg.Payload.Topic[:]), "self", label(p.BaseAddr()), "to", label(sp.Address()), "dest", label(to), "po", ponow, "advance", powill-ponow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we need debug lines still

println(p.Kademlia.String())

// attempt to send the message
// short circuit to next iteration pass when it fails
err := pp.Send(context.TODO(), msg)
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
return true
}
sent++
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))

// continue forwarding if:
// - if the peer is end recipient but the full address has not been disclosed
// - if the peer address matches the partial address fully
// - if the peer is in proxbin
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
return true
} else if isproxbin {
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
return true

// If the po is at addresslength (TODO: how can it be greater?)
// it means that the peer address is identical to the message address
// and that peer must be the final recipient
// further forwarding is thus not needed
if po >= addressLength*8 {
return false
}
// at this point we stop forwarding, and the state is as follows:
// - the peer is end recipient and we have full address
// - we are not in proxbin (directed routing)
// - partial addresses don't fully match
return false

// activate the cutoff when we have a successful send
if sent == 1 {
cutoffDepth = effectiveDepth
}
return true
})

// if we failed to send to anyone, re-insert message in the send-queue
if sent == 0 {
log.Debug("unable to forward to any peers")
if err := p.enqueue(msg); err != nil {
Expand Down
1 change: 0 additions & 1 deletion vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/syndtr/goleveldb/leveldb/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 9 additions & 15 deletions vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/syndtr/goleveldb/leveldb/db_util.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/syndtr/goleveldb/leveldb/session_util.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading