From a379d9680724299901cdb086885d2baed9f7bc33 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Wed, 8 Nov 2023 17:35:17 +0000 Subject: [PATCH] peer,main: propagate p2p mixing messages This commit adds the mixing message pool to the server and listens for all peer-to-peer mixing messages broadcast on the network. If these messages are able to be accepted to the mixpool, they are relayed to other peers through the inventory system, and notified to RPC clients. At startup, all current pair request messages are requested from peers, but the entire mixing pools are not synchronized. --- go.mod | 8 +- go.sum | 18 +- internal/blockchain/chain.go | 5 + internal/netsync/interface.go | 5 + internal/netsync/manager.go | 173 ++++++++++++++++--- internal/rpcserver/interface.go | 21 +++ internal/rpcserver/rpcserver.go | 11 +- internal/rpcserver/rpcserverhandlers_test.go | 22 +++ internal/rpcserver/rpcserverhelp.go | 11 ++ internal/rpcserver/rpcwebsocket.go | 142 +++++++++++++++ log.go | 4 + peer/log.go | 8 +- peer/peer.go | 86 ++++++++- rpc/jsonrpc/types/chainsvrwscmds.go | 36 ++++ rpc/jsonrpc/types/chainsvrwsntfns.go | 18 ++ rpcadaptors.go | 16 ++ server.go | 167 ++++++++++++++++-- 17 files changed, 703 insertions(+), 48 deletions(-) diff --git a/go.mod b/go.mod index 51c56fd3cb..2de095cc84 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/decred/dcrd/gcs/v4 v4.0.0 github.com/decred/dcrd/lru v1.1.2 github.com/decred/dcrd/math/uint256 v1.0.1 + github.com/decred/dcrd/mixing v0.0.0 github.com/decred/dcrd/peer/v3 v3.0.2 github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.0.0 github.com/decred/dcrd/rpcclient/v8 v8.0.0 @@ -38,18 +39,20 @@ require ( github.com/jrick/bitset v1.0.0 github.com/jrick/logrotate v1.0.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - golang.org/x/sys v0.8.0 - golang.org/x/term v0.5.0 + golang.org/x/sys v0.13.0 + golang.org/x/term v0.13.0 lukechampine.com/blake3 v1.2.1 ) require ( github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect + github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a // indirect github.com/dchest/siphash v1.2.3 // indirect github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect github.com/decred/dcrd/hdkeychain/v3 v3.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect + golang.org/x/crypto v0.7.0 // indirect ) replace ( @@ -75,6 +78,7 @@ replace ( github.com/decred/dcrd/limits => ./limits github.com/decred/dcrd/lru => ./lru github.com/decred/dcrd/math/uint256 => ./math/uint256 + github.com/decred/dcrd/mixing => ./mixing github.com/decred/dcrd/peer/v3 => ./peer github.com/decred/dcrd/rpc/jsonrpc/types/v4 => ./rpc/jsonrpc/types github.com/decred/dcrd/rpcclient/v8 => ./rpcclient diff --git a/go.sum b/go.sum index 88cb1baeb2..613df41e9d 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ +decred.org/cspp/v2 v2.0.1-0.20230307024253-8a22691aa376 h1:739v8a7LMXuCTFNodcKYVpfj70CKWvJeE3NKFDn/65I= +decred.org/cspp/v2 v2.0.1-0.20230307024253-8a22691aa376/go.mod h1:+/9jr1RhVshWnc0U/eXxMlxfiu9/f7ia6TTyS0Oh5n0= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0= +github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a h1:clYxJ3Os0EQUKDDVU8M0oipllX0EkuFNBfhVQuIfyF0= +github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a/go.mod h1:z/9Ck1EDixEbBbZ2KH2qNHekEmDLTOZ+FyoIPWWSVOI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= @@ -53,11 +57,13 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= +golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc h1:zK/HqS5bZxDptfPJNq8v7vJfXtkU7r9TLIoSr1bXaP4= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -69,14 +75,14 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= diff --git a/internal/blockchain/chain.go b/internal/blockchain/chain.go index 1c897e78ef..8a36c8763e 100644 --- a/internal/blockchain/chain.go +++ b/internal/blockchain/chain.go @@ -297,6 +297,11 @@ type BlockChain struct { bulkImportMode bool } +// ChainParams returns the chain parameters. +func (b *BlockChain) ChainParams() *chaincfg.Params { + return b.chainParams +} + const ( // stakeMajorityCacheKeySize is comprised of the stake version and the // hash size. The stake version is a little endian uint32, hence we diff --git a/internal/netsync/interface.go b/internal/netsync/interface.go index 7a930550a8..cad185c62c 100644 --- a/internal/netsync/interface.go +++ b/internal/netsync/interface.go @@ -6,6 +6,7 @@ package netsync import ( "github.com/decred/dcrd/dcrutil/v4" + "github.com/decred/dcrd/mixing" ) // PeerNotifier provides an interface to notify peers of status changes related @@ -14,4 +15,8 @@ type PeerNotifier interface { // AnnounceNewTransactions generates and relays inventory vectors and // notifies websocket clients of the passed transactions. AnnounceNewTransactions(txns []*dcrutil.Tx) + + // AnnounceMixMessage generates and relays inventory vectors of the + // passed messages. + AnnounceMixMessages(msgs []mixing.Message) } diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 975ee32edf..a62ba4c74d 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -23,6 +23,8 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/progresslog" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" peerpkg "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/wire" ) @@ -64,6 +66,10 @@ const ( maxRejectedTxns = 62500 rejectedTxnsFPRate = 0.0000001 + // XXX these numbers were copied from rejected txns + maxRejectedMixMsgs = 625000 + rejectedMixMsgsFPRate = 0.0000001 + // maxRequestedBlocks is the maximum number of requested block // hashes to store in memory. maxRequestedBlocks = wire.MaxInvPerMsg @@ -72,6 +78,10 @@ const ( // hashes to store in memory. maxRequestedTxns = wire.MaxInvPerMsg + // maxRequestedMixMsgs is the maximum number of hashes of in-flight + // mixing messages. + maxRequestedMixMsgs = wire.MaxInvPerMsg + // maxExpectedHeaderAnnouncementsPerMsg is the maximum number of headers in // a single message that is expected when determining when the message // appears to be announcing new blocks. @@ -179,15 +189,24 @@ type processBlockMsg struct { reply chan processBlockResponse } +// mixMsg is a message type to be sent across the message channel for requesting +// a message's acceptence to the mixing pool. +type mixMsg struct { + msg mixing.Message + peer *Peer + reply chan struct{} +} + // Peer extends a common peer to maintain additional state needed by the sync // manager. The internals are intentionally unexported to create an opaque // type. type Peer struct { *peerpkg.Peer - syncCandidate bool - requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} + syncCandidate bool + requestedTxns map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]struct{} + requestedMixMsgs map[chainhash.Hash]struct{} // initialStateRequested tracks whether or not the initial state data has // been requested from the peer. @@ -207,10 +226,11 @@ type Peer struct { func NewPeer(peer *peerpkg.Peer) *Peer { isSyncCandidate := peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork return &Peer{ - Peer: peer, - syncCandidate: isSyncCandidate, - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), + Peer: peer, + syncCandidate: isSyncCandidate, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedMixMsgs: make(map[chainhash.Hash]struct{}), } } @@ -291,13 +311,15 @@ type SyncManager struct { // time. minKnownWork *uint256.Uint256 - rejectedTxns *apbf.Filter - requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} - progressLogger *progresslog.Logger - syncPeer *Peer - msgChan chan interface{} - peers map[*Peer]struct{} + rejectedTxns *apbf.Filter + rejectedMixMsgs *apbf.Filter + requestedTxns map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]struct{} + requestedMixMsgs map[chainhash.Hash]struct{} + progressLogger *progresslog.Logger + syncPeer *Peer + msgChan chan interface{} + peers map[*Peer]struct{} // hdrSyncState houses the state used to track the initial header sync // process and related stall handling. @@ -567,6 +589,9 @@ func maybeRequestInitialState(peer *Peer) { err := m.AddTypes(wire.InitStateHeadBlocks, wire.InitStateHeadBlockVotes, wire.InitStateTSpends) + if err == nil && peer.ProtocolVersion() >= wire.MixVersion { + err = m.AddType(wire.InitStateMixPRs) + } if err != nil { log.Errorf("Unexpected error building getinitstate msg: %v", err) return @@ -666,6 +691,22 @@ BlockHashes: // No peers found that have announced this data. delete(m.requestedBlocks, blockHash) } + inv.Type = wire.InvTypeMix +MixHashes: + for mixHash := range peer.requestedMixMsgs { + inv.Hash = mixHash + for pp := range m.peers { + if !pp.IsKnownInventory(&inv) { + continue + } + invs := append(requestQueues[pp], inv) + requestQueues[pp] = invs + pp.requestedMixMsgs[mixHash] = struct{}{} + continue MixHashes + } + // No peers found that have announced this data. + delete(m.requestedMixMsgs, mixHash) + } for pp, requestQueue := range requestQueues { var numRequested int32 gdmsg := wire.NewMsgGetData() @@ -754,6 +795,28 @@ func (m *SyncManager) handleTxMsg(tmsg *txMsg) { m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) } +// handleMixMsg handles mixing messages from all peers. +func (m *SyncManager) handleMixMsg(mmsg *mixMsg) { + peer := mmsg.peer + + mixHash := mmsg.msg.Hash() + + accepted, err := m.cfg.MixPool.AcceptMessage(mmsg.msg) + if err != nil { + log.Errorf("Failed to process %T mixing message %v: %v", + mmsg.msg, &mixHash, err) + return + } + if accepted == nil { + return + } + + delete(peer.requestedMixMsgs, mixHash) + delete(m.requestedMixMsgs, mixHash) + + m.cfg.PeerNotifier.AnnounceMixMessages([]mixing.Message{accepted}) +} + // maybeUpdateIsCurrent potentially updates the manager to signal it believes // the chain is considered synced. // @@ -1275,6 +1338,19 @@ func (m *SyncManager) needTx(hash *chainhash.Hash) bool { return true } +// needMixMsg returns whether or not the mixing message needs to be downloaded. +func (m *SyncManager) needMixMsg(hash *chainhash.Hash) bool { + if m.rejectedMixMsgs.Contains(hash[:]) { + return false + } + + if m.cfg.MixPool.HaveMessage(hash) { + return false + } + + return true +} + // handleInvMsg handles inv messages from all peers. This entails examining the // inventory advertised by the remote peer for block and transaction // announcements and acting accordingly. @@ -1332,6 +1408,29 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns) requestQueue = append(requestQueue, iv) } + + case wire.InvTypeMix: + // Add the mix message to the cache of known inventory + // for the peer. This helps avoid sending mix messages + // to the peer that it is already known to have. + peer.AddKnownInventory(iv) + + // Ignore mixing messages before the chain is current or + // if the messages are not needed. Pair request (PR) + // messages reference unspent outputs that must be + // checked to exist and be unspent before they are + // accepted, and all later messages must reference an + // existing PR recorded in the mixing pool. + if !isCurrent || !m.needMixMsg(&iv.Hash) { + continue + } + + // Request the mixing message if it is not already pending. + if _, exists := m.requestedMixMsgs[iv.Hash]; !exists { + limitAdd(m.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs) + limitAdd(peer.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs) + requestQueue = append(requestQueue, iv) + } } } @@ -1420,6 +1519,13 @@ out: case <-ctx.Done(): } + case *mixMsg: + m.handleMixMsg(msg) + select { + case msg.reply <- struct{}{}: + case <-ctx.Done(): + } + case *invMsg: m.handleInvMsg(msg) @@ -1538,6 +1644,15 @@ func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *Peer) { } } +// QueueMixMsg adds the passed mixing message and peer to the event handling +// queue. +func (m *SyncManager) QueueMixMsg(msg mixing.Message, peer *Peer, done chan struct{}) { + select { + case m.msgChan <- &mixMsg{msg: msg, peer: peer, reply: done}: + case <-m.quit: + } +} + // QueueNotFound adds the passed notfound message and peer to the event handling // queue. func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *Peer) { @@ -1803,6 +1918,10 @@ type Config struct { // and querying the most recently confirmed transactions. It is useful for // preventing duplicate requests. RecentlyConfirmedTxns *apbf.Filter + + // MixPool specifies the mixing pool to use for transient mixing + // messages broadcast across the network. + MixPool *mixpool.Pool } // New returns a new network chain synchronization manager. Use Run to begin @@ -1819,17 +1938,19 @@ func New(config *Config) *SyncManager { } return &SyncManager{ - cfg: *config, - rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - peers: make(map[*Peer]struct{}), - minKnownWork: minKnownWork, - hdrSyncState: makeHeaderSyncState(), - progressLogger: progresslog.New("Processed", log), - msgChan: make(chan interface{}, config.MaxPeers*3), - quit: make(chan struct{}), - syncHeight: config.Chain.BestSnapshot().Height, - isCurrent: config.Chain.IsCurrent(), + cfg: *config, + rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), + rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate), + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedMixMsgs: make(map[chainhash.Hash]struct{}), + peers: make(map[*Peer]struct{}), + minKnownWork: minKnownWork, + hdrSyncState: makeHeaderSyncState(), + progressLogger: progresslog.New("Processed", log), + msgChan: make(chan interface{}, config.MaxPeers*3), + quit: make(chan struct{}), + syncHeight: config.Chain.BestSnapshot().Height, + isCurrent: config.Chain.IsCurrent(), } } diff --git a/internal/rpcserver/interface.go b/internal/rpcserver/interface.go index 17bafa419b..a4273fe0ac 100644 --- a/internal/rpcserver/interface.go +++ b/internal/rpcserver/interface.go @@ -19,6 +19,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -130,6 +131,10 @@ type ConnManager interface { // the passed transactions to all connected peers. RelayTransactions(txns []*dcrutil.Tx) + // RelayMixMessages generates and relays inventory vectors for all of + // the passed mixing messages to all connected peers. + RelayMixMessages(msgs []mixing.Message) + // AddedNodeInfo returns information describing persistent (added) nodes. AddedNodeInfo() []Peer @@ -166,6 +171,10 @@ type SyncManager interface { // // This method may report a false positive, but never a false negative. RecentlyConfirmedTxn(hash *chainhash.Hash) bool + + // SubmitMixMessage submits the mixing message to the network after + // processing it locally. + SubmitMixMessage(msg mixing.Message) error } // UtxoEntry represents a utxo entry for use with the RPC server. @@ -675,6 +684,10 @@ type NtfnManager interface { // manager for processing. NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) + // NotifyMixMessage passes a mixing message accepted by the mixpool to the + // notification manager for message broadcasting. + NotifyMixMessage(msg mixing.Message) + // NumClients returns the number of clients actively being served. NumClients() int @@ -726,6 +739,14 @@ type NtfnManager interface { // client when new transaction are added to the memory pool. UnregisterNewMempoolTxsUpdates(wsc *wsClient) + // RegisterMixMessages requests notifications to the passed websocket + // client when new mixing messages are accepted by the mixpool. + RegisterMixMessages(wsc *wsClient) + + // UnregisterMixMessages stops notifications to the websocket client + // client of any newly-accepted mixing messages. + UnregisterMixMessages(wsc *wsClient) + // AddClient adds the passed websocket client to the notification manager. AddClient(wsc *wsClient) diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index 37b4c4fe05..1867dc9b72 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -51,6 +51,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/version" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -61,7 +62,7 @@ import ( // API version constants const ( jsonrpcSemverMajor = 8 - jsonrpcSemverMinor = 1 + jsonrpcSemverMinor = 2 jsonrpcSemverPatch = 0 ) @@ -5163,6 +5164,14 @@ func (s *Server) NotifyTSpend(tx *dcrutil.Tx) { s.ntfnMgr.NotifyTSpend(tx) } +// NotifyMixMessages notifies websocket clients that have registered to +// receive mixing message notifications of newly accepted mix messages. +func (s *Server) NotifyMixMessages(msgs []mixing.Message) { + for _, msg := range msgs { + s.ntfnMgr.NotifyMixMessage(msg) + } +} + // NotifyNewTickets notifies websocket clients that have registered for maturing // ticket updates. func (s *Server) NotifyNewTickets(tnd *blockchain.TicketNotificationsData) { diff --git a/internal/rpcserver/rpcserverhandlers_test.go b/internal/rpcserver/rpcserverhandlers_test.go index d228a3ede3..c936f20f7a 100644 --- a/internal/rpcserver/rpcserverhandlers_test.go +++ b/internal/rpcserver/rpcserverhandlers_test.go @@ -41,6 +41,7 @@ import ( "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4" @@ -531,6 +532,7 @@ func (c *testAddrManager) LocalAddresses() []addrmgr.LocalAddr { type testSyncManager struct { isCurrent bool submitBlockErr error + submitMixErr error syncPeerID int32 syncHeight int64 processTransaction []*dcrutil.Tx @@ -550,6 +552,10 @@ func (s *testSyncManager) SubmitBlock(block *dcrutil.Block) error { return s.submitBlockErr } +func (s *testSyncManager) SubmitMixMessage(msg mixing.Message) error { + return s.submitMixErr +} + // SyncPeer returns a mocked id of the current peer being synced with. func (s *testSyncManager) SyncPeerID() int32 { return s.syncPeerID @@ -870,6 +876,10 @@ func (c *testConnManager) AddRebroadcastInventory(iv *wire.InvVect, data interfa // inventory vectors for all of the passed transactions to all connected peers. func (c *testConnManager) RelayTransactions(txns []*dcrutil.Tx) {} +// RelayMixMessages generates and relays inventory vectors for all of +// the passed mixing messages to all connected peers. +func (c *testConnManager) RelayMixMessages(msgs []mixing.Message) {} + // AddedNodeInfo returns a mocked slice of persistent (added) peers. func (c *testConnManager) AddedNodeInfo() []Peer { return c.addedNodeInfo @@ -1167,6 +1177,10 @@ func (mgr *testNtfnManager) NotifyNewTickets(tnd *blockchain.TicketNotifications // manager for processing. func (mgr *testNtfnManager) NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) {} +// NotifyMixMessage passes a mixing message accepted by the mixpool to the +// notification manager for message broadcasting. +func (mgr *testNtfnManager) NotifyMixMessage(msg mixing.Message) {} + // NumClients returns the number of clients actively being served. func (mgr *testNtfnManager) NumClients() int { return mgr.clients @@ -1228,6 +1242,14 @@ func (mgr *testNtfnManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) {} // client when new transaction are added to the memory pool. func (mgr *testNtfnManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) {} +// RegisterMixMessages requests notifications to the passed websocket +// client when new mixing messages are accepted by the mixpool. +func (mgr *testNtfnManager) RegisterMixMessages(wsc *wsClient) {} + +// UnregisterMixMessages stops notifications to the websocket client +// client of any newly-accepted mixing messages. +func (mgr *testNtfnManager) UnregisterMixMessages(wsc *wsClient) {} + // AddClient adds the passed websocket client to the notification manager. func (mgr *testNtfnManager) AddClient(wsc *wsClient) {} diff --git a/internal/rpcserver/rpcserverhelp.go b/internal/rpcserver/rpcserverhelp.go index 24c6dc7f12..0e2094a03d 100644 --- a/internal/rpcserver/rpcserverhelp.go +++ b/internal/rpcserver/rpcserverhelp.go @@ -789,6 +789,14 @@ var helpDescsEnUS = map[string]string{ // StopNotifyNewTransactionsCmd help. "stopnotifynewtransactions--synopsis": "Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", + "notifymixmessages--synopsis": "Request notifications for whenever mixing messages are accepted to the mixpool.", + + "stopnotifymixmessages--synopsis": "Cancel registered notifications for whenever mixing messages are accepted to the mixpool.", + + "sendmixmessage--synopsis": "Submit a mixing message to the mixpool and broadcast it to the network and all peers", + "sendmixmessage-message": "Mixing message serialized and encoded as hex", + "sendmixmessage-command": "The wire command name of the message type", + // OutPoint help. "outpoint-hash": "The hex-encoded bytes of the outpoint hash", "outpoint-index": "The index of the outpoint", @@ -1002,13 +1010,16 @@ var rpcResultTypes = map[types.Method][]interface{}{ "notifywork": nil, "notifytspend": nil, "notifynewtransactions": nil, + "notifymixmessages": nil, "rebroadcastwinners": nil, "rescan": {(*types.RescanResult)(nil)}, + "sendmixmessage": nil, "session": {(*types.SessionResult)(nil)}, "stopnotifyblocks": nil, "stopnotifywork": nil, "stopnotifytspend": nil, "stopnotifynewtransactions": nil, + "stopnotifymixmessages": nil, } // helpCacher provides a concurrent safe type that provides help and usage for diff --git a/internal/rpcserver/rpcwebsocket.go b/internal/rpcserver/rpcwebsocket.go index d36926c8ec..c650b62010 100644 --- a/internal/rpcserver/rpcwebsocket.go +++ b/internal/rpcserver/rpcwebsocket.go @@ -14,6 +14,7 @@ import ( "fmt" "io" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -29,6 +30,7 @@ import ( "github.com/decred/dcrd/dcrutil/v4" "github.com/decred/dcrd/internal/blockchain" "github.com/decred/dcrd/internal/mining" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" "github.com/decred/dcrd/txscript/v4/stdscript" @@ -86,6 +88,7 @@ var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{ "notifywinningtickets": handleWinningTickets, "notifynewtickets": handleNewTickets, "notifynewtransactions": handleNotifyNewTransactions, + "notifymixmessages": handleNotifyMixMessages, "rebroadcastwinners": handleRebroadcastWinners, "rescan": handleRescan, "session": handleSession, @@ -93,6 +96,8 @@ var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{ "stopnotifywork": handleStopNotifyWork, "stopnotifytspend": handleStopNotifyTSpend, "stopnotifynewtransactions": handleStopNotifyNewTransactions, + "stopnotifymixmessages": handleStopNotifyMixMessages, + "sendmixmessage": handleSendMixMessage, } // WebsocketHandler handles a new websocket client by creating a new wsClient, @@ -278,6 +283,15 @@ func (m *wsNotificationManager) NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) { } } +// NotifyMixMessage passes a mixing message accepted by the mixpool to the +// notification manager for message broadcasting. +func (m *wsNotificationManager) NotifyMixMessage(msg mixing.Message) { + select { + case m.queueNotification <- (notificationMixMessage)(msg): + case <-m.quit: + } +} + // WinningTicketsNtfnData is the data that is used to generate // winning ticket notifications (which indicate a block and // the tickets eligible to vote on it). @@ -408,6 +422,7 @@ type notificationTxAcceptedByMempool struct { isNew bool tx *dcrutil.Tx } +type notificationMixMessage mixing.Message // Notification control requests type notificationRegisterClient wsClient @@ -424,6 +439,8 @@ type notificationRegisterNewTickets wsClient type notificationUnregisterNewTickets wsClient type notificationRegisterNewMempoolTxs wsClient type notificationUnregisterNewMempoolTxs wsClient +type notificationRegisterMixMessages wsClient +type notificationUnregisterMixMessages wsClient // notificationHandler reads notifications and control messages from the queue // handler and processes one at a time. @@ -444,6 +461,7 @@ func (m *wsNotificationManager) notificationHandler(ctx context.Context) { winningTicketNotifications := make(map[chan struct{}]*wsClient) ticketNewNotifications := make(map[chan struct{}]*wsClient) txNotifications := make(map[chan struct{}]*wsClient) + mixNotifications := make(map[chan struct{}]*wsClient) out: for { @@ -489,6 +507,17 @@ out: } m.notifyRelevantTxAccepted(n.tx, clients) + case notificationMixMessage: + m.notifyMixMessage(mixNotifications, (mixing.Message)(n)) + + case *notificationRegisterMixMessages: + wsc := (*wsClient)(n) + mixNotifications[wsc.quit] = wsc + + case *notificationUnregisterMixMessages: + wsc := (*wsClient)(n) + delete(mixNotifications, wsc.quit) + case *notificationRegisterBlocks: wsc := (*wsClient)(n) blockNotifications[wsc.quit] = wsc @@ -1179,6 +1208,56 @@ func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *dcrutil.Tx, } } +// RegisterMixMessages requests notifications to the passed websocket +// client when mixing messages are accepted to the mixpool. +func (m *wsNotificationManager) RegisterMixMessages(wsc *wsClient) { + select { + case m.queueNotification <- (*notificationRegisterMixMessages)(wsc): + case <-m.quit: + } +} + +// UnregisterMixMessages stops notifications to the websocket client of any +// newly-accepted mixing messages. +func (m *wsNotificationManager) UnregisterMixMessages(wsc *wsClient) { + select { + case m.queueNotification <- (*notificationUnregisterMixMessages)(wsc): + case <-m.quit: + } +} + +// notifyMixMessage notifies all clients subscribed to mixing messages with +// the accepted mixing message. +func (m *wsNotificationManager) notifyMixMessage(clients map[chan struct{}]*wsClient, + msg mixing.Message) { + + // Skip notification creation if no clients have requested mix + // notifications. + if len(clients) == 0 { + return + } + + // Write write message payload in hex encoding. + buf := new(bytes.Buffer) + err := msg.BtcEncode(hex.NewEncoder(buf), wire.MixVersion) + if err != nil { + // Should never error; the message has already been processed + // and accepted. + panic(err) + } + + ntfn := types.NewMixMessageNtfn(msg.Command(), buf.String()) + marshaledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn) + if err != nil { + log.Errorf("Failed to marshal mix message notification: %v", + err) + return + } + for _, client := range clients { + client.QueueNotification(marshaledJSON) + } +} + // AddClient adds the passed websocket client to the notification manager. func (m *wsNotificationManager) AddClient(wsc *wsClient) { select { @@ -2173,6 +2252,69 @@ func handleStopNotifyNewTransactions(_ context.Context, wsc *wsClient, _ interfa return nil, nil } +// handleNotifyMixMessages implements the notifymixmessages command extension +// for websocket connections. +func handleNotifyMixMessages(_ context.Context, wsc *wsClient, _ interface{}) (interface{}, error) { + wsc.rpcServer.ntfnMgr.RegisterMixMessages(wsc) + return nil, nil +} + +// handleStopNotifyMixMessages implements the stopnotifymixmessages command +// extension for websocket connections. +func handleStopNotifyMixMessages(_ context.Context, wsc *wsClient, _ interface{}) (interface{}, error) { + wsc.rpcServer.ntfnMgr.UnregisterMixMessages(wsc) + return nil, nil +} + +func handleSendMixMessage(_ context.Context, wsc *wsClient, icmd interface{}) (interface{}, error) { + c := icmd.(*types.SendMixMessageCmd) + + // Allocate a message of the appropriate type based on the wire + // command string. + var msg mixing.Message + switch c.Command { + case wire.CmdMixPR: + msg = new(wire.MsgMixPR) + case wire.CmdMixKE: + msg = new(wire.MsgMixKE) + case wire.CmdMixCT: + msg = new(wire.MsgMixCT) + case wire.CmdMixSR: + msg = new(wire.MsgMixSR) + case wire.CmdMixDC: + msg = new(wire.MsgMixDC) + case wire.CmdMixCM: + msg = new(wire.MsgMixCM) + case wire.CmdMixRS: + msg = new(wire.MsgMixRS) + default: + err := rpcDeserializationError("Unrecognized mixing message "+ + "wire command string %q", c.Command) + return nil, err + } + + // Deserialize message. + err := msg.BtcDecode(hex.NewDecoder(strings.NewReader(c.Message)), + wire.MixVersion) + if err != nil { + return nil, rpcDeserializationError("Could not decode mix "+ + "message: %v", err) + } + + err = wsc.rpcServer.cfg.SyncMgr.SubmitMixMessage(msg) + if err != nil { + // XXX: consider a better error code/function + str := fmt.Sprintf("Rejected mix message: %s", err) + return nil, rpcMiscError(str) + } + + wsc.rpcServer.cfg.ConnMgr.RelayMixMessages([]mixing.Message{msg}) + + wsc.rpcServer.ntfnMgr.NotifyMixMessage(msg) + + return nil, nil +} + // rescanBlock rescans a block for any relevant transactions for the passed // lookup keys. Any discovered transactions are returned hex encoded as a // string slice. diff --git a/log.go b/log.go index eb4be6dacc..cddaa2e3b7 100644 --- a/log.go +++ b/log.go @@ -22,6 +22,7 @@ import ( "github.com/decred/dcrd/internal/mining/cpuminer" "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/txscript/v4" "github.com/decred/slog" @@ -68,6 +69,7 @@ var ( feesLog = backendLog.Logger("FEES") indxLog = backendLog.Logger("INDX") minrLog = backendLog.Logger("MINR") + mixpLog = backendLog.Logger("MIXP") peerLog = backendLog.Logger("PEER") rpcsLog = backendLog.Logger("RPCS") scrpLog = backendLog.Logger("SCRP") @@ -89,6 +91,7 @@ func init() { indexers.UseLogger(indxLog) mempool.UseLogger(txmpLog) mining.UseLogger(minrLog) + mixpool.UseLogger(mixpLog) cpuminer.UseLogger(minrLog) peer.UseLogger(peerLog) rpcserver.UseLogger(rpcsLog) @@ -109,6 +112,7 @@ var subsystemLoggers = map[string]slog.Logger{ "FEES": feesLog, "INDX": indxLog, "MINR": minrLog, + "MIXP": mixpLog, "PEER": peerLog, "RPCS": rpcsLog, "SCRP": scrpLog, diff --git a/peer/log.go b/peer/log.go index 90f80e615f..354f5e4322 100644 --- a/peer/log.go +++ b/peer/log.go @@ -68,6 +68,8 @@ func invSummary(invList []*wire.InvVect) string { return fmt.Sprintf("tx %s", iv.Hash) case wire.InvTypeFilteredBlock: return fmt.Sprintf("filtered block %s", iv.Hash) + case wire.InvTypeMix: + return fmt.Sprintf("mix %s", iv.Hash) } return fmt.Sprintf("unknown (%d) %s", uint32(iv.Type), iv.Hash) @@ -161,8 +163,10 @@ func messageSummary(msg wire.Message) string { return fmt.Sprintf("types %v", msg.Types) case *wire.MsgInitState: - return fmt.Sprintf("blocks %d, votes %d, treasury spends %d", - len(msg.BlockHashes), len(msg.VoteHashes), len(msg.TSpendHashes)) + return fmt.Sprintf("blocks %d, votes %d, treasury spends %d, "+ + "mix PRs %d", + len(msg.BlockHashes), len(msg.VoteHashes), + len(msg.TSpendHashes), len(msg.MixPRHashes)) } // No summary for other messages. diff --git a/peer/peer.go b/peer/peer.go index e88f6f428e..0b35cb1696 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -196,6 +196,27 @@ type MessageListeners struct { // OnInitState is invoked when a peer receives an initstate message. OnInitState func(p *Peer, msg *wire.MsgInitState) + // OnMixPR is invoked when a peer receives a mixpr message. + OnMixPR func(p *Peer, msg *wire.MsgMixPR) + + // OnMixKE is invoked when a peer receives a mixke message. + OnMixKE func(p *Peer, msg *wire.MsgMixKE) + + // OnMixCT is invoked when a peer receives a mixct message. + OnMixCT func(p *Peer, msg *wire.MsgMixCT) + + // OnMixSR is invoked when a peer receives a mixsr message. + OnMixSR func(p *Peer, msg *wire.MsgMixSR) + + // OnMixDC is invoked when a peer receives a mixdc message. + OnMixDC func(p *Peer, msg *wire.MsgMixDC) + + // OnMixCM is invoked when a peer receives a mixcm message. + OnMixCM func(p *Peer, msg *wire.MsgMixCM) + + // OnMixRS is invoked when a peer receives a mixrs message. + OnMixRS func(p *Peer, msg *wire.MsgMixRS) + // OnRead is invoked when a peer receives a wire message. It consists // of the number of bytes read, the message, and whether or not an error // in the read occurred. Typically, callers will opt to use the @@ -1059,9 +1080,16 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st addedDeadline = true case wire.CmdGetData: - // Expects a block, tx, or notfound message. + // Expects a block, tx, mix, or notfound message. pendingResponses[wire.CmdBlock] = deadline pendingResponses[wire.CmdTx] = deadline + pendingResponses[wire.CmdMixPR] = deadline + pendingResponses[wire.CmdMixKE] = deadline + pendingResponses[wire.CmdMixCT] = deadline + pendingResponses[wire.CmdMixSR] = deadline + pendingResponses[wire.CmdMixDC] = deadline + pendingResponses[wire.CmdMixCM] = deadline + pendingResponses[wire.CmdMixRS] = deadline pendingResponses[wire.CmdNotFound] = deadline addedDeadline = true @@ -1125,9 +1153,30 @@ out: fallthrough case wire.CmdTx: fallthrough + case wire.CmdMixPR: + fallthrough + case wire.CmdMixKE: + fallthrough + case wire.CmdMixCT: + fallthrough + case wire.CmdMixSR: + fallthrough + case wire.CmdMixDC: + fallthrough + case wire.CmdMixCM: + fallthrough + case wire.CmdMixRS: + fallthrough case wire.CmdNotFound: delete(pendingResponses, wire.CmdBlock) delete(pendingResponses, wire.CmdTx) + delete(pendingResponses, wire.CmdMixPR) + delete(pendingResponses, wire.CmdMixKE) + delete(pendingResponses, wire.CmdMixCT) + delete(pendingResponses, wire.CmdMixSR) + delete(pendingResponses, wire.CmdMixDC) + delete(pendingResponses, wire.CmdMixCM) + delete(pendingResponses, wire.CmdMixRS) delete(pendingResponses, wire.CmdNotFound) default: @@ -1436,6 +1485,41 @@ out: p.cfg.Listeners.OnInitState(p, msg) } + case *wire.MsgMixPR: + if p.cfg.Listeners.OnMixPR != nil { + p.cfg.Listeners.OnMixPR(p, msg) + } + + case *wire.MsgMixKE: + if p.cfg.Listeners.OnMixKE != nil { + p.cfg.Listeners.OnMixKE(p, msg) + } + + case *wire.MsgMixCT: + if p.cfg.Listeners.OnMixCT != nil { + p.cfg.Listeners.OnMixCT(p, msg) + } + + case *wire.MsgMixSR: + if p.cfg.Listeners.OnMixSR != nil { + p.cfg.Listeners.OnMixSR(p, msg) + } + + case *wire.MsgMixDC: + if p.cfg.Listeners.OnMixDC != nil { + p.cfg.Listeners.OnMixDC(p, msg) + } + + case *wire.MsgMixCM: + if p.cfg.Listeners.OnMixCM != nil { + p.cfg.Listeners.OnMixCM(p, msg) + } + + case *wire.MsgMixRS: + if p.cfg.Listeners.OnMixRS != nil { + p.cfg.Listeners.OnMixRS(p, msg) + } + default: log.Debugf("Received unhandled message of type %v "+ "from %v", rmsg.Command(), p) diff --git a/rpc/jsonrpc/types/chainsvrwscmds.go b/rpc/jsonrpc/types/chainsvrwscmds.go index c1998337f7..31919e606c 100644 --- a/rpc/jsonrpc/types/chainsvrwscmds.go +++ b/rpc/jsonrpc/types/chainsvrwscmds.go @@ -153,6 +153,24 @@ func NewNotifyNewTransactionsCmd(verbose *bool) *NotifyNewTransactionsCmd { } } +// NotifyMixMessagesCmd defines the notifymixmessages JSON-RPC command. +type NotifyMixMessagesCmd struct{} + +// NewNotifyMixMessagesCmd returns a new instance which can be used to issue a +// notifymixmessages JSON-RPC command. +func NewNotifyMixMessagesCmd() *NotifyMixMessagesCmd { + return &NotifyMixMessagesCmd{} +} + +// StopNotifyMixMessagesCmd defines the stopnotifymixmessages JSON-RPC command. +type StopNotifyMixMessagesCmd struct{} + +// StopNewNotifyMixMessagesCmd returns a new instance which can be used to issue a +// stopnotifymixmessages JSON-RPC command. +func StopNewNotifyMixMessagesCmd() *StopNotifyMixMessagesCmd { + return &StopNotifyMixMessagesCmd{} +} + // SessionCmd defines the session JSON-RPC command. type SessionCmd struct{} @@ -185,6 +203,21 @@ func NewRescanCmd(blockHashes []string) *RescanCmd { return &RescanCmd{BlockHashes: blockHashes} } +// SendMixMessage defines the sendmixmessage JSON-RPC command. +type SendMixMessageCmd struct { + Command string + Message string +} + +// NewSendMixMessageCmd returns a new instance which can be used to issue a +// sendmixmessage JSON-RPC command. +func NewSendMixMessageCmd(command, message string) *SendMixMessageCmd { + return &SendMixMessageCmd{ + Command: command, + Message: message, + } +} + func init() { // The commands in this file are only usable by websockets. flags := dcrjson.UFWebsocketOnly @@ -197,11 +230,14 @@ func init() { dcrjson.MustRegister(Method("notifynewtransactions"), (*NotifyNewTransactionsCmd)(nil), flags) dcrjson.MustRegister(Method("notifynewtickets"), (*NotifyNewTicketsCmd)(nil), flags) dcrjson.MustRegister(Method("notifywinningtickets"), (*NotifyWinningTicketsCmd)(nil), flags) + dcrjson.MustRegister(Method("notifymixmessages"), (*NotifyMixMessagesCmd)(nil), flags) dcrjson.MustRegister(Method("rebroadcastwinners"), (*RebroadcastWinnersCmd)(nil), flags) dcrjson.MustRegister(Method("session"), (*SessionCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifyblocks"), (*StopNotifyBlocksCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifywork"), (*StopNotifyWorkCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifytspend"), (*StopNotifyTSpendCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifynewtransactions"), (*StopNotifyNewTransactionsCmd)(nil), flags) + dcrjson.MustRegister(Method("stopnotifymixmessages"), (*StopNotifyMixMessagesCmd)(nil), flags) dcrjson.MustRegister(Method("rescan"), (*RescanCmd)(nil), flags) + dcrjson.MustRegister(Method("sendmixmessage"), (*SendMixMessageCmd)(nil), flags) } diff --git a/rpc/jsonrpc/types/chainsvrwsntfns.go b/rpc/jsonrpc/types/chainsvrwsntfns.go index 5942a86b54..2f563229f5 100644 --- a/rpc/jsonrpc/types/chainsvrwsntfns.go +++ b/rpc/jsonrpc/types/chainsvrwsntfns.go @@ -52,6 +52,9 @@ const ( // WinningTicketsNtfnMethod is the method of the daemon winningtickets // notification. WinningTicketsNtfnMethod Method = "winningtickets" + + // MixMessageNtfnMethod is the method of the mixmessage notification. + MixMessageNtfnMethod Method = "mixmessage" ) // BlockConnectedNtfn defines the blockconnected JSON-RPC notification. @@ -207,6 +210,20 @@ func NewWinningTicketsNtfn(hash string, height int32, tickets map[string]string) } } +// MixMessageNtfn defines the mixmessage JSON-RPC notification. +type MixMessageNtfn struct { + Command string `json:"command"` + Payload string `json:"payload"` +} + +// NewMixMessageNtfn returns a new instance which can be used to issue a +// mixmessage JSON-RPC notification. +func NewMixMessageNtfn(command, payload string) *MixMessageNtfn { + return &MixMessageNtfn{ + Command: command, + Payload: payload, + } +} func init() { // The commands in this file are only usable by websockets and are // notifications. @@ -222,4 +239,5 @@ func init() { dcrjson.MustRegister(TxAcceptedVerboseNtfnMethod, (*TxAcceptedVerboseNtfn)(nil), flags) dcrjson.MustRegister(RelevantTxAcceptedNtfnMethod, (*RelevantTxAcceptedNtfn)(nil), flags) dcrjson.MustRegister(WinningTicketsNtfnMethod, (*WinningTicketsNtfn)(nil), flags) + dcrjson.MustRegister(MixMessageNtfnMethod, (*MixMessageNtfn)(nil), flags) } diff --git a/rpcadaptors.go b/rpcadaptors.go index 643f9f802e..d45f57fdbb 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -20,6 +20,7 @@ import ( "github.com/decred/dcrd/internal/mining/cpuminer" "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/wire" ) @@ -282,6 +283,15 @@ func (cm *rpcConnManager) RelayTransactions(txns []*dcrutil.Tx) { cm.server.relayTransactions(txns) } +// RelayMixMessages generates and relays inventory vectors for all of the +// passed mixing messages to all connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserver.ConnManager interface implementation. +func (cm *rpcConnManager) RelayMixMessages(msgs []mixing.Message) { + cm.server.relayMixMessages(msgs) +} + // AddedNodeInfo returns information describing persistent (added) nodes. // // This function is safe for concurrent access and is part of the @@ -364,6 +374,12 @@ func (b *rpcSyncMgr) RecentlyConfirmedTxn(hash *chainhash.Hash) bool { return b.server.recentlyConfirmedTxns.Contains(hash[:]) } +// SubmitMixMessage locally processes the mixing message. +func (b *rpcSyncMgr) SubmitMixMessage(msg mixing.Message) error { + _, err := b.server.mixMsgPool.AcceptMessage(msg) + return err +} + // rpcUtxoEntry represents a utxo entry for use with the RPC server and // implements the rpcserver.UtxoEntry interface. type rpcUtxoEntry struct { diff --git a/server.go b/server.go index af8c05079f..c4914ed2fd 100644 --- a/server.go +++ b/server.go @@ -45,6 +45,8 @@ import ( "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/txscript/v4" "github.com/decred/dcrd/wire" @@ -74,7 +76,7 @@ const ( connectionRetryInterval = time.Second * 5 // maxProtocolVersion is the max protocol version the server supports. - maxProtocolVersion = wire.RemoveRejectVersion + maxProtocolVersion = wire.MixVersion // These fields are used to track known addresses on a per-peer basis. // @@ -512,6 +514,7 @@ type server struct { txMemPool *mempool.TxPool feeEstimator *fees.Estimator cpuMiner *cpuminer.CPUMiner + mixMsgPool *mixpool.Pool modifyRebroadcastInv chan interface{} newPeers chan *serverPeer donePeers chan *serverPeer @@ -571,8 +574,9 @@ type serverPeer struct { // The following fields are used to synchronize the net sync manager and // server. - txProcessed chan struct{} - blockProcessed chan struct{} + txProcessed chan struct{} + blockProcessed chan struct{} + mixMsgProcessed chan struct{} // peerNa is network address of the peer connected to. peerNa *wire.NetAddress @@ -598,13 +602,14 @@ type serverPeer struct { // the caller. func newServerPeer(s *server, isPersistent bool) *serverPeer { return &serverPeer{ - server: s, - persistent: isPersistent, - knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate), - quit: make(chan struct{}), - txProcessed: make(chan struct{}, 1), - blockProcessed: make(chan struct{}, 1), - getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs), + server: s, + persistent: isPersistent, + knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate), + quit: make(chan struct{}), + txProcessed: make(chan struct{}, 1), + blockProcessed: make(chan struct{}, 1), + mixMsgProcessed: make(chan struct{}, 1), + getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs), } } @@ -668,6 +673,16 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, continueHash := sp.continueHash.Load() sendInv = continueHash != nil && *continueHash == *blockHash + case wire.InvTypeMix: + mixHash := &iv.Hash + msg, err := sp.server.mixMsgPool.Message(mixHash) + if err != nil { + peerLog.Tracef("Unable to fetch requested mix message %v: %v", + mixHash, err) + break + } + dataMsg = msg + default: peerLog.Warnf("Unknown type '%d' in inventory request from %s", iv.Type, sp) @@ -1199,6 +1214,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { // Response data. var blockHashes, voteHashes, tspendHashes []chainhash.Hash + var mixPRHashes []chainhash.Hash // Map from the types slice into a map for easier checking. types := make(map[string]struct{}, len(msg.Types)) @@ -1208,6 +1224,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { _, wantBlocks := types[wire.InitStateHeadBlocks] _, wantVotes := types[wire.InitStateHeadBlockVotes] _, wantTSpends := types[wire.InitStateTSpends] + _, wantMixPRs := types[wire.InitStateMixPRs] // Fetch head block hashes if we need to send either them or their // votes. @@ -1243,6 +1260,12 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { tspendHashes = mp.TSpendHashes() } + // Construct mixprs to send. + if wantMixPRs { + mixpool := sp.server.mixMsgPool + mixPRHashes = mixpool.MixPRHashes() + } + // Clear out block hashes to be sent if they weren't requested. if !wantBlocks { blockHashes = nil @@ -1254,6 +1277,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { peerLog.Warnf("Unexpected error while building initstate msg: %v", err) return } + initMsg.MixPRHashes = mixPRHashes sp.QueueMessage(initMsg, nil) } @@ -1341,6 +1365,12 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { sp.Disconnect() return } + if invVect.Type == wire.InvTypeMix { + peerLog.Infof("Peer %v is announcing mix messages -- disconnecting", + sp) + sp.Disconnect() + return + } err := newInv.AddInvVect(invVect) if err != nil { peerLog.Errorf("Failed to add inventory vector: %v", err) @@ -1628,6 +1658,61 @@ func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { sp.server.addrManager.AddAddresses(addrList, remoteAddr) } +// onMixMessage is the generic handler for all mix messages handler callbacks. +func (sp *serverPeer) onMixMessage(msg mixing.Message) { + if cfg.BlocksOnly { + peerLog.Tracef("Ignoring mix message %v from %v - blocksonly "+ + "enabled", msg.Hash(), sp) + return + } + + // TODO - add ban score increases + + // Add the message to the known inventory for the peer. + hash := msg.Hash() + iv := wire.NewInvVect(wire.InvTypeMix, &hash) + sp.AddKnownInventory(iv) + + // Queue the message to be handled by the net sync manager + sp.server.syncManager.QueueMixMsg(msg, sp.syncMgrPeer, sp.mixMsgProcessed) + <-sp.mixMsgProcessed +} + +// OnMixPR submits a received mixing pair request message to the mixpool. +func (sp *serverPeer) OnMixPR(_ *peer.Peer, msg *wire.MsgMixPR) { + sp.onMixMessage(msg) +} + +// OnMixK submits a received mixing key exchange message to the mixpool. +func (sp *serverPeer) OnMixKE(_ *peer.Peer, msg *wire.MsgMixKE) { + sp.onMixMessage(msg) +} + +// OnMixCT submits a received mixing ciphertext exchange message to the mixpool. +func (sp *serverPeer) OnMixCT(_ *peer.Peer, msg *wire.MsgMixCT) { + sp.onMixMessage(msg) +} + +// OnMixSR submits a received mixing slot reservation message to the mixpool. +func (sp *serverPeer) OnMixSR(_ *peer.Peer, msg *wire.MsgMixSR) { + sp.onMixMessage(msg) +} + +// OnMixDC submits a received mixing XOR DC-net message to the mixpool. +func (sp *serverPeer) OnMixDC(_ *peer.Peer, msg *wire.MsgMixDC) { + sp.onMixMessage(msg) +} + +// OnMixCM submits a received mixing confirmation message to the mixpool. +func (sp *serverPeer) OnMixCM(_ *peer.Peer, msg *wire.MsgMixCM) { + sp.onMixMessage(msg) +} + +// OnMixRS submits a received mixing reveal secrets message to the mixpool. +func (sp *serverPeer) OnMixRS(_ *peer.Peer, msg *wire.MsgMixRS) { + sp.onMixMessage(msg) +} + // OnRead is invoked when a peer receives a message and it is used to update // the bytes received by the server. func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) { @@ -1767,6 +1852,16 @@ func (s *server) relayTransactions(txns []*dcrutil.Tx) { } } +// relayMixMessages generates and relays inventory vectors for all of the +// passed mixing messages to all connected peers. +func (s *server) relayMixMessages(msgs []mixing.Message) { + for _, m := range msgs { + hash := m.Hash() + iv := wire.NewInvVect(wire.InvTypeMix, &hash) + s.RelayInventory(iv, m, false) + } +} + // AnnounceNewTransactions generates and relays inventory vectors and notifies // websocket clients of the passed transactions. This function should be // called whenever new transactions are added to the mempool. @@ -1781,6 +1876,19 @@ func (s *server) AnnounceNewTransactions(txns []*dcrutil.Tx) { } } +// AnnounceMixMessages generates and relays inventory vectors of the passed +// mixing messages. This function should be called whenever new messages are +// accepted to the mixpool. +func (s *server) AnnounceMixMessages(msgs []mixing.Message) { + // Generate and relay inventory vectors for all newly accepted mixing + // messages. + s.relayMixMessages(msgs) + + if s.rpcServer != nil { + s.rpcServer.NotifyMixMessages(msgs) + } +} + // TransactionConfirmed marks the provided single confirmation transaction as // no longer needing rebroadcasting and keeps track of it for use when avoiding // requests for recently confirmed transactions. @@ -2054,6 +2162,14 @@ func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { } } + if iv.Type == wire.InvTypeMix { + // Don't relay mix message inventory when unsupported + // by the negotiated protocol version. + if sp.ProtocolVersion() < wire.MixVersion { + return + } + } + // Either queue the inventory to be relayed immediately or with // the next batch depending on the immediate flag. // @@ -2311,6 +2427,13 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnInitState: sp.OnInitState, OnTx: sp.OnTx, OnBlock: sp.OnBlock, + OnMixPR: sp.OnMixPR, + OnMixKE: sp.OnMixKE, + OnMixCT: sp.OnMixCT, + OnMixSR: sp.OnMixSR, + OnMixDC: sp.OnMixDC, + OnMixCM: sp.OnMixCM, + OnMixRS: sp.OnMixRS, OnInv: sp.OnInv, OnHeaders: sp.OnHeaders, OnGetData: sp.OnGetData, @@ -3504,6 +3627,26 @@ func (c *reloadableTLSConfig) configFileClient(_ *tls.ClientHelloInfo) (*tls.Con return c.cachedConfig, nil } +// mixpoolChain adapts the internal blockchain type with a FetchUtxoEntry +// method that is compatible with the mixpool package. +type mixpoolChain struct { + *blockchain.BlockChain +} + +var _ mixpool.BlockChain = (*mixpoolChain)(nil) +var _ mixpool.UtxoEntry = (*blockchain.UtxoEntry)(nil) + +func (m *mixpoolChain) FetchUtxoEntry(op wire.OutPoint) (mixpool.UtxoEntry, error) { + entry, err := m.BlockChain.FetchUtxoEntry(op) + if err != nil { + return nil, err + } + if entry == nil { + return nil, err + } + return entry, nil +} + // makeReloadableTLSConfig returns a TLS configuration that will dynamically // reload the server certificate, server key, and client CAs from the configured // paths when the files are updated. @@ -3850,6 +3993,9 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, } s.txMemPool = mempool.New(&txC) + mixchain := &mixpoolChain{s.chain} + s.mixMsgPool = mixpool.NewPool(mixchain) + s.syncManager = netsync.New(&netsync.Config{ PeerNotifier: &s, Chain: s.chain, @@ -3860,6 +4006,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, MaxPeers: cfg.MaxPeers, MaxOrphanTxs: cfg.MaxOrphanTxs, RecentlyConfirmedTxns: s.recentlyConfirmedTxns, + MixPool: s.mixMsgPool, }) // Dump the blockchain and quit if requested.