Skip to content

Commit

Permalink
mixpool: Cache recently removed msgs.
Browse files Browse the repository at this point in the history
This adds a cache to house mix messages that have recently been removed
from the mixpool.  It makes use of the new container/lru module to
handle automatic expiration of entries and maximum entry limiting.

The rationale for this change is that it is considered misbehavior to
advertise a mix message and then claim it is not found when the
corresponding request arrives.  Maintaining a separate cache of mix
messages recently removed from the mixpool for a short period of time
significantly increases the probability they are available to serve when
a request for the advertisement arrives independent of the current
status of the mixpool.
  • Loading branch information
davecgh committed Jun 15, 2024
1 parent 80b9f7b commit 2f79a9c
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 12 deletions.
1 change: 1 addition & 0 deletions mixing/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/decred/dcrd/chaincfg/chainhash v1.0.4
github.com/decred/dcrd/chaincfg/v3 v3.2.1
github.com/decred/dcrd/container/lru v1.0.0
github.com/decred/dcrd/crypto/blake256 v1.0.1
github.com/decred/dcrd/crypto/rand v1.0.0
github.com/decred/dcrd/dcrec v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions mixing/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/decred/dcrd/chaincfg/chainhash v1.0.4 h1:zRCv6tdncLfLTKYqu7hrXvs7hW+8
github.com/decred/dcrd/chaincfg/chainhash v1.0.4/go.mod h1:hA86XxlBWwHivMvxzXTSD0ZCG/LoYsFdWnCekkTMCqY=
github.com/decred/dcrd/chaincfg/v3 v3.2.1 h1:x9zKJaU24WAKbxAR1UyFKHlM3oJgP0H9LodokM4X5lM=
github.com/decred/dcrd/chaincfg/v3 v3.2.1/go.mod h1:SDCWDtY7BLj0leXc9FuoA1YjSVKyCIBVAyxwZn6+sXc=
github.com/decred/dcrd/container/lru v1.0.0 h1:7foQymtbu18aQWYiY9RnNIeE+kvpiN+fiBQ3+viyJjI=
github.com/decred/dcrd/container/lru v1.0.0/go.mod h1:vlPwj0l+IzAHhQSsbgQnJgO5Cte78+yI065V+Mc5PRQ=
github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y=
github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
github.com/decred/dcrd/crypto/rand v1.0.0 h1:Ah9Asl36OZt09sGSMbJZuL1HfwGdlC38q/ZUeLDVKRg=
Expand Down
9 changes: 9 additions & 0 deletions mixing/mixpool/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,12 @@ var log = slog.Disabled
func UseLogger(logger slog.Logger) {
log = logger
}

// pickNoun returns the singular or plural form of a noun depending on the count
// n.
func pickNoun[T ~uint32 | ~uint64](n T, singular, plural string) string {
if n == 1 {
return singular
}
return plural
}
139 changes: 132 additions & 7 deletions mixing/mixpool/mixpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/chaincfg/v3"
"github.com/decred/dcrd/container/lru"
"github.com/decred/dcrd/mixing"
"github.com/decred/dcrd/mixing/utxoproof"
"github.com/decred/dcrd/txscript/v4"
Expand Down Expand Up @@ -47,6 +48,21 @@ const (
nmsgtypes = msgtypeRS
)

const (
// These fields are used when caching recently removed mix messages.
//
// maxRecentlyRemovedMixMsgs specifies the maximum numbr to cache and is
// set to target about 100 participants per denomination up to typical
// ticket prices plus an additional 20%.
//
// maxRecentMixMsgsTTL is the time to keep items in the recently removed mix
// messages cache before they are expired.
//
// These values result in about 676 KiB memory usage including overhead.
maxRecentlyRemovedMixMsgs = 8500
maxRecentMixMsgsTTL = time.Minute
)

func (m msgtype) String() string {
switch m {
case msgtypeKE:
Expand Down Expand Up @@ -138,6 +154,27 @@ type Pool struct {
expireHeight uint32
expireSem chan struct{}

// recentMixMsgs caches mix messages that have recently been removed from
// the pool. The cache handles automatic expiration and maximum entry
// limiting.
//
// Maintaining a separate cache of recently removed mix messages increases
// the probability they are available to serve regardless of whether or not
// they are still in the pool when a request for the advertisement arrives.
recentMixMsgs *lru.Map[chainhash.Hash, mixing.Message]

// The following fields are used to periodically log the total number
// evicted items from the cache of recently removed mix messages. They are
// protected by the pool mutex.
//
// totalRecentsEvicted is the total number of items that have been evicted
// from the cache since the previous report.
//
// lastRecentsLogged is the last time the total number of items that have
// been evicted from the cache was reported.
totalRecentsEvicted uint64
lastRecentsLogged time.Time

blockchain BlockChain
utxoFetcher UtxoFetcher
feeRate int64
Expand Down Expand Up @@ -174,6 +211,13 @@ type BlockChain interface {
CurrentTip() (chainhash.Hash, int64)
}

// newRecentMixMsgsCache returns a new LRU cache for tracking mix messages that
// have recently been removed from the pool.
func newRecentMixMsgsCache() *lru.Map[chainhash.Hash, mixing.Message] {
return lru.NewMapWithDefaultTTL[chainhash.Hash, mixing.Message](
maxRecentlyRemovedMixMsgs, maxRecentMixMsgsTTL)
}

// NewPool returns a new mixing pool that accepts and validates mixing messages
// required for distributed transaction mixing.
func NewPool(blockchain BlockChain) *Pool {
Expand All @@ -190,6 +234,8 @@ func NewPool(blockchain BlockChain) *Pool {
epoch: 10 * time.Minute, // XXX: mainnet epoch: add to chainparams
expireHeight: 0,
expireSem: make(chan struct{}, 1),
recentMixMsgs: newRecentMixMsgsCache(),
lastRecentsLogged: time.Now(),
blockchain: blockchain,
feeRate: feeRate,
params: blockchain.ChainParams(),
Expand Down Expand Up @@ -236,6 +282,22 @@ func (p *Pool) HaveMessage(query *chainhash.Hash) bool {
return ok
}

// RecentMessages attempts to find a message by its hash in both the mixing pool
// that contains accepted messages as well as the cache of recently removed
// messages.
func (p *Pool) RecentMessage(query *chainhash.Hash) (mixing.Message, bool) {
defer p.mtx.RUnlock()
p.mtx.RLock()

if pr := p.prs[*query]; pr != nil {
return pr, true
}
if e, ok := p.pool[*query]; ok && e.msg != nil {
return e.msg, true
}
return p.recentMixMsgs.Get(*query)
}

// MixPRs returns all MixPR messages.
//
// Any expired PRs that are still internally tracked by the mixpool for
Expand Down Expand Up @@ -351,6 +413,57 @@ func (p *Pool) expireMessages() {
p.expireMessagesNow(height)
}

// maybeLogRecentMixMsgsNumEvicted periodically logs the total number of evicted
// items from the recently removed mix messages cache.
//
// This function MUST be called with the pool mutex held (for writes).
func (p *Pool) maybeLogRecentMixMsgsNumEvicted() {
totalEvicted := p.totalRecentsEvicted
if totalEvicted == 0 {
return
}

const logInterval = time.Hour
sinceLastLogged := time.Since(p.lastRecentsLogged)
if sinceLastLogged < logInterval {
return
}

log.Debugf("Evicted %d recent %s in the last %v (%d remaining, %.2f%% "+
"hit ratio)", totalEvicted,
pickNoun(totalEvicted, "mix message", "mix messages"),
sinceLastLogged.Truncate(time.Second), p.recentMixMsgs.Len(),
p.recentMixMsgs.HitRatio())

p.totalRecentsEvicted = 0
p.lastRecentsLogged = time.Now()
}

// removeMessage removes the message associated with the passed hash (when it
// exists) from the pool and adds it to the cache of recently removed mix
// messages.
//
// This MUST be called with the pool mutex held (for writes).
func (p *Pool) removeMessage(hash chainhash.Hash) {
e, ok := p.pool[hash]
if !ok {
return
}
delete(p.pool, hash)
if e.msg == nil {
return
}

// Track recently removed mix messsages for a period of time in order to
// increase the probability they are still available to serve for a while
// even though they are no longer in the mixpool. This helps provide a
// buffer against the inability to serve any advertisements that take place
// just prior to removal of the message.
numEvicted := p.recentMixMsgs.Put(hash, e.msg)
p.totalRecentsEvicted += uint64(numEvicted)
p.maybeLogRecentMixMsgsNumEvicted()
}

// ExpireMessages immediately expires all pair requests and sessions built
// from them that indicate expiry at or after a block height.
func (p *Pool) ExpireMessages(height uint32) {
Expand All @@ -370,7 +483,7 @@ func (p *Pool) expireMessagesNow(height uint32) {

delete(p.sessions, sid)
for hash := range ses.hashes {
delete(p.pool, hash)
p.removeMessage(hash)
}
}

Expand Down Expand Up @@ -406,7 +519,7 @@ func (p *Pool) RemoveMessage(msg mixing.Message) {
defer p.mtx.Unlock()

msgHash := msg.Hash()
delete(p.pool, msgHash)
p.removeMessage(msgHash)
if pr, ok := msg.(*wire.MsgMixPairReq); ok {
p.removePR(pr, "rejected")
}
Expand Down Expand Up @@ -460,12 +573,12 @@ func (p *Pool) removeSession(sid [32]byte, txHash *chainhash.Hash, success bool)
if ok {
log.Debugf("Removing session %x %T %v by %x",
sid[:], e.msg, hash, e.msg.Pub())
delete(p.pool, hash)
p.removeMessage(hash)
}
}

for _, prHash := range removePRs {
delete(p.pool, prHash)
p.removeMessage(prHash)
if pr := p.prs[prHash]; pr != nil {
p.removePR(pr, "mixed")
}
Expand All @@ -490,11 +603,11 @@ func (p *Pool) removeConfirmedSessions() {

delete(p.sessions, sid)
for hash := range ses.hashes {
delete(p.pool, hash)
p.removeMessage(hash)
}

for _, hash := range ses.prs {
delete(p.pool, hash)
p.removeMessage(hash)
pr := p.prs[hash]
if pr != nil {
p.removePR(pr, "confirmed")
Expand Down Expand Up @@ -962,12 +1075,24 @@ func (p *Pool) AcceptMessage(msg mixing.Message) (accepted []mixing.Message, err

// removePR removes a pair request message and all other messages and sessions
// that the peer sent and was involved in.
//
// This MUST be called with the pool mutex held (for writes).
func (p *Pool) removePR(pr *wire.MsgMixPairReq, reason string) {
prHash := pr.Hash()

log.Debugf("Removing %s PR %s by %x", reason, prHash, pr.Identity[:])

delete(p.prs, prHash)

// Track recently removed pair requests for a period of time in order to
// increase the probability they are still available to serve for a while
// even though they are no longer in the mixpool. This helps provide a
// buffer against the inability to serve any advertisements that take place
// just prior to removal of the message.
numEvicted := p.recentMixMsgs.Put(prHash, pr)
p.totalRecentsEvicted += uint64(numEvicted)
p.maybeLogRecentMixMsgsNumEvicted()

for _, hash := range p.messagesByIdentity[pr.Identity] {
e, ok := p.pool[hash]
if !ok {
Expand All @@ -977,7 +1102,7 @@ func (p *Pool) removePR(pr *wire.MsgMixPairReq, reason string) {
if ok {
p.removeSession(ke.SessionID, nil, false)
}
delete(p.pool, hash)
p.removeMessage(hash)
}
delete(p.messagesByIdentity, pr.Identity)
delete(p.latestKE, pr.Identity)
Expand Down
9 changes: 4 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,11 +788,10 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,

case wire.InvTypeMix:
mixHash := &iv.Hash
msg, err := sp.server.mixMsgPool.Message(mixHash)
if err != nil {
peerLog.Debugf("Unable to fetch mix message %v ",
"from the mix pool for %v: %v", mixHash,
sp, err)
msg, ok := sp.server.mixMsgPool.RecentMessage(mixHash)
if !ok {
peerLog.Debugf("Unable to fetch mix message %v from the mix "+
"pool for peer %s", mixHash, sp)
break
}
dataMsg = msg
Expand Down

0 comments on commit 2f79a9c

Please sign in to comment.