Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm/pss: Deduplicate messages when processing #314

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
// will also be instrumental in flood guard mechanism
// and mailbox implementation
type pssCacheEntry struct {
forwarded bool
expiresAt time.Time
receivedFrom []byte
}
Expand Down Expand Up @@ -283,6 +284,11 @@ func (self *Pss) getHandlers(topic Topic) map[*Handler]bool {
// Passes error to pss protocol handler if payload is not valid pssmsg
func (self *Pss) handlePssMsg(msg interface{}) error {
pssmsg, ok := msg.(*PssMsg)
digest, err := self.storeMsg(pssmsg)
if err != nil {
log.Warn(fmt.Sprintf("could not store message %v to cache on pss handle: %v", msg, err))
}

if ok {
var err error
if !self.isSelfPossibleRecipient(pssmsg) {
Expand All @@ -294,12 +300,12 @@ func (self *Pss) handlePssMsg(msg interface{}) error {
return errors.New("Invalid TTL")
}
log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(self.BaseAddr()))
return self.forward(pssmsg)
return self.forward(pssmsg, digest)
}
log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(self.BaseAddr()))

if !self.process(pssmsg) {
err = self.forward(pssmsg)
if !self.process(pssmsg, digest) {
err = self.forward(pssmsg, digest)
}
return err
}
Expand All @@ -310,14 +316,21 @@ func (self *Pss) handlePssMsg(msg interface{}) error {
// Entry point to processing a message for which the current node can be the intended recipient.
// Attempts symmetric and asymmetric decryption with stored keys.
// Dispatches message to all handlers matching the message topic
func (self *Pss) process(pssmsg *PssMsg) bool {
func (self *Pss) process(pssmsg *PssMsg, digest pssDigest) bool {
var err error
var recvmsg *whisper.ReceivedMessage
var from *PssAddress
var asymmetric bool
var keyid string
var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error)

// dont process if we've seen the message before
if self.checkFwdCache([]byte{}, digest, false) {
log.Warn("cache block in pss msg process", "pss", self)
return false
}
self.addFwdCache(digest, false)

envelope := pssmsg.Payload
psstopic := Topic(envelope.Topic)

Expand All @@ -333,14 +346,17 @@ func (self *Pss) process(pssmsg *PssMsg) bool {
return false
}

// if we forward the message, it will be put in the cache there
// if not, we put it in the cache here for deduplication
if len(pssmsg.To) < addressLength {
go func() {
err := self.forward(pssmsg)
err := self.forward(pssmsg, digest)
if err != nil {
log.Warn("Redundant forward fail: %v", err)
}
}()
}

handlers := self.getHandlers(psstopic)
nid, _ := discover.HexID("0x00") // this hack is needed to satisfy the p2p method
p := p2p.NewPeer(nid, fmt.Sprintf("%x", from), []p2p.Cap{})
Expand Down Expand Up @@ -644,25 +660,25 @@ func (self *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key [
Expire: uint32(time.Now().Add(self.msgTTL).Unix()),
Payload: envelope,
}
return self.forward(pssmsg)

digest, err := self.storeMsg(pssmsg)
if err != nil {
log.Warn(fmt.Sprintf("could not store message %v to cache on pss send: %v", msg, err))
}

return self.forward(pssmsg, digest)
}

// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
// of the peer address of the equivalent length.
func (self *Pss) forward(msg *PssMsg) error {
func (self *Pss) forward(msg *PssMsg, digest pssDigest) error {
to := make([]byte, addressLength)
copy(to[:len(msg.To)], msg.To)

// cache the message
digest, err := self.storeMsg(msg)
if err != nil {
log.Warn(fmt.Sprintf("could not store message %v to cache: %v", msg, err))
}

// flood guard:
// don't allow identical messages we saw shortly before
if self.checkFwdCache(nil, digest) {
if self.checkFwdCache(nil, digest, true) {
log.Trace(fmt.Sprintf("pss relay block-cache match: FROM %x TO %x", self.Overlay.BaseAddr(), common.ToHex(msg.To)))
return nil
}
Expand Down Expand Up @@ -697,7 +713,7 @@ func (self *Pss) forward(msg *PssMsg) error {
// get the protocol peer from the forwarding peer cache
sendMsg := fmt.Sprintf("MSG %x TO %x FROM %x VIA %x", digest, to, self.BaseAddr(), op.Address())
pp := self.fwdPool[sp.Info().ID]
if self.checkFwdCache(op.Address(), digest) {
if self.checkFwdCache(op.Address(), digest, true) {
log.Trace(fmt.Sprintf("%v: peer already forwarded to", sendMsg))
return true
}
Expand Down Expand Up @@ -733,7 +749,7 @@ func (self *Pss) forward(msg *PssMsg) error {
return nil
}

self.addFwdCache(digest)
self.addFwdCache(digest, true)
return nil
}

Expand All @@ -742,27 +758,36 @@ func (self *Pss) forward(msg *PssMsg) error {
/////////////////////////////////////////////////////////////////////

// add a message to the cache
func (self *Pss) addFwdCache(digest pssDigest) error {
// - on SENDING or RELAYING a message, after successful forwarding set the cache with forward TRUE
// - on RECEIVING a message, initially set cache with forward FALSE, which will subsequently set to TRUE if forwarded to proxpeers
func (self *Pss) addFwdCache(digest pssDigest, forwarded bool) error {
self.fwdCacheMu.Lock()
defer self.fwdCacheMu.Unlock()
var entry pssCacheEntry
var ok bool
if entry, ok = self.fwdCache[digest]; !ok {
entry = pssCacheEntry{}
entry.expiresAt = time.Now().Add(self.cacheTTL)
}
if forwarded {
entry.forwarded = true
}
entry.expiresAt = time.Now().Add(self.cacheTTL)
self.fwdCache[digest] = entry

return nil
}

// check if message is in the cache
func (self *Pss) checkFwdCache(addr []byte, digest pssDigest) bool {
// if forwarded is set to true, it will only match the cache entry if the forward value is true
// when receiving a message, we will check with forwarded = false, will always match if we've seen the message before
// when forwarding a message, we will check with forwarded = true, will match only if the message has been forwarded before
func (self *Pss) checkFwdCache(addr []byte, digest pssDigest, forwarded bool) bool {
self.fwdCacheMu.Lock()
defer self.fwdCacheMu.Unlock()
entry, ok := self.fwdCache[digest]
if ok {
if ok && ((forwarded && self.fwdCache[digest].forwarded) || !forwarded) {
if entry.expiresAt.After(time.Now()) {
log.Trace(fmt.Sprintf("unexpired cache for digest %x", digest))
log.Trace(fmt.Sprintf("unexpired cache for digest %x", digest), "pss", self)
return true
} else if entry.expiresAt.IsZero() && bytes.Equal(addr, entry.receivedFrom) {
log.Trace(fmt.Sprintf("sendermatch %x for digest %x", common.ToHex(addr), digest))
Expand Down
6 changes: 3 additions & 3 deletions swarm/pss/pss_go18plus_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build foo
// +build go1.8

package pss

Expand Down Expand Up @@ -626,7 +626,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if !ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1]) {
if !ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1], pssDigest{}) {
b.Fatalf("pss processing failed: %v", err)
}
}
Expand Down Expand Up @@ -708,7 +708,7 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) {
Payload: env,
}
for i := 0; i < b.N; i++ {
if !ps.process(pssmsg) {
if !ps.process(pssmsg, pssDigest{}) {
b.Fatalf("pss processing failed: %v", err)
}
}
Expand Down
Loading