diff --git a/mixing/go.mod b/mixing/go.mod index 0a5e7139e..e1d903269 100644 --- a/mixing/go.mod +++ b/mixing/go.mod @@ -8,6 +8,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/decred/dcrd/chaincfg/chainhash v1.0.4 github.com/decred/dcrd/chaincfg/v3 v3.2.1 + github.com/decred/dcrd/container/lru v1.0.0 github.com/decred/dcrd/crypto/blake256 v1.0.1 github.com/decred/dcrd/crypto/rand v1.0.0 github.com/decred/dcrd/dcrec v1.0.1 diff --git a/mixing/go.sum b/mixing/go.sum index 68c9f7b83..cbcf18f9f 100644 --- a/mixing/go.sum +++ b/mixing/go.sum @@ -14,6 +14,8 @@ github.com/decred/dcrd/chaincfg/chainhash v1.0.4 h1:zRCv6tdncLfLTKYqu7hrXvs7hW+8 github.com/decred/dcrd/chaincfg/chainhash v1.0.4/go.mod h1:hA86XxlBWwHivMvxzXTSD0ZCG/LoYsFdWnCekkTMCqY= github.com/decred/dcrd/chaincfg/v3 v3.2.1 h1:x9zKJaU24WAKbxAR1UyFKHlM3oJgP0H9LodokM4X5lM= github.com/decred/dcrd/chaincfg/v3 v3.2.1/go.mod h1:SDCWDtY7BLj0leXc9FuoA1YjSVKyCIBVAyxwZn6+sXc= +github.com/decred/dcrd/container/lru v1.0.0 h1:7foQymtbu18aQWYiY9RnNIeE+kvpiN+fiBQ3+viyJjI= +github.com/decred/dcrd/container/lru v1.0.0/go.mod h1:vlPwj0l+IzAHhQSsbgQnJgO5Cte78+yI065V+Mc5PRQ= github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/crypto/rand v1.0.0 h1:Ah9Asl36OZt09sGSMbJZuL1HfwGdlC38q/ZUeLDVKRg= diff --git a/mixing/mixpool/log.go b/mixing/mixpool/log.go index 22d648284..9f2ee50f3 100644 --- a/mixing/mixpool/log.go +++ b/mixing/mixpool/log.go @@ -20,3 +20,12 @@ var log = slog.Disabled func UseLogger(logger slog.Logger) { log = logger } + +// pickNoun returns the singular or plural form of a noun depending on the count +// n. +func pickNoun[T ~uint32 | ~uint64](n T, singular, plural string) string { + if n == 1 { + return singular + } + return plural +} diff --git a/mixing/mixpool/mixpool.go b/mixing/mixpool/mixpool.go index 972a28684..a1c4dd985 100644 --- a/mixing/mixpool/mixpool.go +++ b/mixing/mixpool/mixpool.go @@ -16,6 +16,7 @@ import ( "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/chaincfg/v3" + "github.com/decred/dcrd/container/lru" "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/mixing/utxoproof" "github.com/decred/dcrd/txscript/v4" @@ -47,6 +48,21 @@ const ( nmsgtypes = msgtypeRS ) +const ( + // These fields are used when caching recently removed mix messages. + // + // maxRecentlyRemovedMixMsgs specifies the maximum numbr to cache and is + // set to target about 100 participants per denomination up to typical + // ticket prices plus an additional 20%. + // + // maxRecentMixMsgsTTL is the time to keep items in the recently removed mix + // messages cache before they are expired. + // + // These values result in about 676 KiB memory usage including overhead. + maxRecentlyRemovedMixMsgs = 8500 + maxRecentMixMsgsTTL = time.Minute +) + func (m msgtype) String() string { switch m { case msgtypeKE: @@ -138,6 +154,27 @@ type Pool struct { expireHeight uint32 expireSem chan struct{} + // recentMixMsgs caches mix messages that have recently been removed from + // the pool. The cache handles automatic expiration and maximum entry + // limiting. + // + // Maintaining a separate cache of recently removed mix messages increases + // the probability they are available to serve regardless of whether or not + // they are still in the pool when a request for the advertisement arrives. + recentMixMsgs *lru.Map[chainhash.Hash, mixing.Message] + + // The following fields are used to periodically log the total number + // evicted items from the cache of recently removed mix messages. They are + // protected by the pool mutex. + // + // totalRecentsEvicted is the total number of items that have been evicted + // from the cache since the previous report. + // + // lastRecentsLogged is the last time the total number of items that have + // been evicted from the cache was reported. + totalRecentsEvicted uint64 + lastRecentsLogged time.Time + blockchain BlockChain utxoFetcher UtxoFetcher feeRate int64 @@ -174,6 +211,13 @@ type BlockChain interface { CurrentTip() (chainhash.Hash, int64) } +// newRecentMixMsgsCache returns a new LRU cache for tracking mix messages that +// have recently been removed from the pool. +func newRecentMixMsgsCache() *lru.Map[chainhash.Hash, mixing.Message] { + return lru.NewMapWithDefaultTTL[chainhash.Hash, mixing.Message]( + maxRecentlyRemovedMixMsgs, maxRecentMixMsgsTTL) +} + // NewPool returns a new mixing pool that accepts and validates mixing messages // required for distributed transaction mixing. func NewPool(blockchain BlockChain) *Pool { @@ -190,6 +234,8 @@ func NewPool(blockchain BlockChain) *Pool { epoch: 10 * time.Minute, // XXX: mainnet epoch: add to chainparams expireHeight: 0, expireSem: make(chan struct{}, 1), + recentMixMsgs: newRecentMixMsgsCache(), + lastRecentsLogged: time.Now(), blockchain: blockchain, feeRate: feeRate, params: blockchain.ChainParams(), @@ -236,6 +282,22 @@ func (p *Pool) HaveMessage(query *chainhash.Hash) bool { return ok } +// RecentMessages attempts to find a message by its hash in both the mixing pool +// that contains accepted messages as well as the cache of recently removed +// messages. +func (p *Pool) RecentMessage(query *chainhash.Hash) (mixing.Message, bool) { + defer p.mtx.RUnlock() + p.mtx.RLock() + + if pr := p.prs[*query]; pr != nil { + return pr, true + } + if e, ok := p.pool[*query]; ok && e.msg != nil { + return e.msg, true + } + return p.recentMixMsgs.Get(*query) +} + // MixPRs returns all MixPR messages. // // Any expired PRs that are still internally tracked by the mixpool for @@ -351,6 +413,57 @@ func (p *Pool) expireMessages() { p.expireMessagesNow(height) } +// maybeLogRecentMixMsgsNumEvicted periodically logs the total number of evicted +// items from the recently removed mix messages cache. +// +// This function MUST be called with the pool mutex held (for writes). +func (p *Pool) maybeLogRecentMixMsgsNumEvicted() { + totalEvicted := p.totalRecentsEvicted + if totalEvicted == 0 { + return + } + + const logInterval = time.Hour + sinceLastLogged := time.Since(p.lastRecentsLogged) + if sinceLastLogged < logInterval { + return + } + + log.Debugf("Evicted %d recent %s in the last %v (%d remaining, %.2f%% "+ + "hit ratio)", totalEvicted, + pickNoun(totalEvicted, "mix message", "mix messages"), + sinceLastLogged.Truncate(time.Second), p.recentMixMsgs.Len(), + p.recentMixMsgs.HitRatio()) + + p.totalRecentsEvicted = 0 + p.lastRecentsLogged = time.Now() +} + +// removeMessage removes the message associated with the passed hash (when it +// exists) from the pool and adds it to the cache of recently removed mix +// messages. +// +// This MUST be called with the pool mutex held (for writes). +func (p *Pool) removeMessage(hash chainhash.Hash) { + e, ok := p.pool[hash] + if !ok { + return + } + delete(p.pool, hash) + if e.msg == nil { + return + } + + // Track recently removed mix messsages for a period of time in order to + // increase the probability they are still available to serve for a while + // even though they are no longer in the mixpool. This helps provide a + // buffer against the inability to serve any advertisements that take place + // just prior to removal of the message. + numEvicted := p.recentMixMsgs.Put(hash, e.msg) + p.totalRecentsEvicted += uint64(numEvicted) + p.maybeLogRecentMixMsgsNumEvicted() +} + // ExpireMessages immediately expires all pair requests and sessions built // from them that indicate expiry at or after a block height. func (p *Pool) ExpireMessages(height uint32) { @@ -370,7 +483,7 @@ func (p *Pool) expireMessagesNow(height uint32) { delete(p.sessions, sid) for hash := range ses.hashes { - delete(p.pool, hash) + p.removeMessage(hash) } } @@ -406,7 +519,7 @@ func (p *Pool) RemoveMessage(msg mixing.Message) { defer p.mtx.Unlock() msgHash := msg.Hash() - delete(p.pool, msgHash) + p.removeMessage(msgHash) if pr, ok := msg.(*wire.MsgMixPairReq); ok { p.removePR(pr, "rejected") } @@ -460,12 +573,12 @@ func (p *Pool) removeSession(sid [32]byte, txHash *chainhash.Hash, success bool) if ok { log.Debugf("Removing session %x %T %v by %x", sid[:], e.msg, hash, e.msg.Pub()) - delete(p.pool, hash) + p.removeMessage(hash) } } for _, prHash := range removePRs { - delete(p.pool, prHash) + p.removeMessage(prHash) if pr := p.prs[prHash]; pr != nil { p.removePR(pr, "mixed") } @@ -490,11 +603,11 @@ func (p *Pool) removeConfirmedSessions() { delete(p.sessions, sid) for hash := range ses.hashes { - delete(p.pool, hash) + p.removeMessage(hash) } for _, hash := range ses.prs { - delete(p.pool, hash) + p.removeMessage(hash) pr := p.prs[hash] if pr != nil { p.removePR(pr, "confirmed") @@ -962,12 +1075,24 @@ func (p *Pool) AcceptMessage(msg mixing.Message) (accepted []mixing.Message, err // removePR removes a pair request message and all other messages and sessions // that the peer sent and was involved in. +// +// This MUST be called with the pool mutex held (for writes). func (p *Pool) removePR(pr *wire.MsgMixPairReq, reason string) { prHash := pr.Hash() log.Debugf("Removing %s PR %s by %x", reason, prHash, pr.Identity[:]) delete(p.prs, prHash) + + // Track recently removed pair requests for a period of time in order to + // increase the probability they are still available to serve for a while + // even though they are no longer in the mixpool. This helps provide a + // buffer against the inability to serve any advertisements that take place + // just prior to removal of the message. + numEvicted := p.recentMixMsgs.Put(prHash, pr) + p.totalRecentsEvicted += uint64(numEvicted) + p.maybeLogRecentMixMsgsNumEvicted() + for _, hash := range p.messagesByIdentity[pr.Identity] { e, ok := p.pool[hash] if !ok { @@ -977,7 +1102,7 @@ func (p *Pool) removePR(pr *wire.MsgMixPairReq, reason string) { if ok { p.removeSession(ke.SessionID, nil, false) } - delete(p.pool, hash) + p.removeMessage(hash) } delete(p.messagesByIdentity, pr.Identity) delete(p.latestKE, pr.Identity) diff --git a/server.go b/server.go index 96b045364..676edb2e6 100644 --- a/server.go +++ b/server.go @@ -788,11 +788,10 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, case wire.InvTypeMix: mixHash := &iv.Hash - msg, err := sp.server.mixMsgPool.Message(mixHash) - if err != nil { - peerLog.Debugf("Unable to fetch mix message %v ", - "from the mix pool for %v: %v", mixHash, - sp, err) + msg, ok := sp.server.mixMsgPool.RecentMessage(mixHash) + if !ok { + peerLog.Debugf("Unable to fetch mix message %v from the mix "+ + "pool for peer %s", mixHash, sp) break } dataMsg = msg