diff --git a/dcrjson/chainsvrwscmds.go b/dcrjson/chainsvrwscmds.go index fde994706fe..1ab0220d2e7 100644 --- a/dcrjson/chainsvrwscmds.go +++ b/dcrjson/chainsvrwscmds.go @@ -23,6 +23,32 @@ func NewAuthenticateCmd(username, passphrase string) *AuthenticateCmd { } } +// OutPoint describes a transaction outpoint that will be marshalled to and +// from JSON. Contains Decred addition. +type OutPoint struct { + Hash string `json:"hash"` + Tree int8 `json:"tree"` + Index uint32 `json:"index"` +} + +// LoadTxFilterCmd defines the loadtxfilter request parameters to load or +// reload a transaction filter. +type LoadTxFilterCmd struct { + Reload bool + Addresses []string + OutPoints []OutPoint +} + +// NewLoadTxFilterCmd returns a new instance which can be used to issue a +// loadtxfilter JSON-RPC command. +func NewLoadTxFilterCmd(reload bool, addresses []string, outPoints []OutPoint) *LoadTxFilterCmd { + return &LoadTxFilterCmd{ + Reload: reload, + Addresses: addresses, + OutPoints: outPoints, + } +} + // NotifyBlocksCmd defines the notifyblocks JSON-RPC command. type NotifyBlocksCmd struct{} @@ -78,86 +104,17 @@ func NewStopNotifyNewTransactionsCmd() *StopNotifyNewTransactionsCmd { return &StopNotifyNewTransactionsCmd{} } -// NotifyReceivedCmd defines the notifyreceived JSON-RPC command. -type NotifyReceivedCmd struct { - Addresses []string -} - -// NewNotifyReceivedCmd returns a new instance which can be used to issue a -// notifyreceived JSON-RPC command. -func NewNotifyReceivedCmd(addresses []string) *NotifyReceivedCmd { - return &NotifyReceivedCmd{ - Addresses: addresses, - } -} - -// OutPoint describes a transaction outpoint that will be marshalled to and -// from JSON. Contains Decred addition. -type OutPoint struct { - Hash string `json:"hash"` - Tree int8 `json:"tree"` - Index uint32 `json:"index"` -} - -// NotifySpentCmd defines the notifyspent JSON-RPC command. -type NotifySpentCmd struct { - OutPoints []OutPoint -} - -// NewNotifySpentCmd returns a new instance which can be used to issue a -// notifyspent JSON-RPC command. -func NewNotifySpentCmd(outPoints []OutPoint) *NotifySpentCmd { - return &NotifySpentCmd{ - OutPoints: outPoints, - } -} - -// StopNotifyReceivedCmd defines the stopnotifyreceived JSON-RPC command. -type StopNotifyReceivedCmd struct { - Addresses []string -} - -// NewStopNotifyReceivedCmd returns a new instance which can be used to issue a -// stopnotifyreceived JSON-RPC command. -func NewStopNotifyReceivedCmd(addresses []string) *StopNotifyReceivedCmd { - return &StopNotifyReceivedCmd{ - Addresses: addresses, - } -} - -// StopNotifySpentCmd defines the stopnotifyspent JSON-RPC command. -type StopNotifySpentCmd struct { - OutPoints []OutPoint -} - -// NewStopNotifySpentCmd returns a new instance which can be used to issue a -// stopnotifyspent JSON-RPC command. -func NewStopNotifySpentCmd(outPoints []OutPoint) *StopNotifySpentCmd { - return &StopNotifySpentCmd{ - OutPoints: outPoints, - } -} - // RescanCmd defines the rescan JSON-RPC command. type RescanCmd struct { - BeginBlock string - Addresses []string - OutPoints []OutPoint - EndBlock *string + // Concatinated block hashes in non-byte-reversed hex encoding. Must + // have length evenly divisible by 2*chainhash.HashSize. + BlockHashes string } // NewRescanCmd returns a new instance which can be used to issue a rescan // JSON-RPC command. -// -// The parameters which are pointers indicate they are optional. Passing nil -// for optional parameters will use the default value. -func NewRescanCmd(beginBlock string, addresses []string, outPoints []OutPoint, endBlock *string) *RescanCmd { - return &RescanCmd{ - BeginBlock: beginBlock, - Addresses: addresses, - OutPoints: outPoints, - EndBlock: endBlock, - } +func NewRescanCmd(blockHashes string) *RescanCmd { + return &RescanCmd{BlockHashes: blockHashes} } func init() { @@ -165,14 +122,11 @@ func init() { flags := UFWebsocketOnly MustRegisterCmd("authenticate", (*AuthenticateCmd)(nil), flags) + MustRegisterCmd("loadtxfilter", (*LoadTxFilterCmd)(nil), flags) MustRegisterCmd("notifyblocks", (*NotifyBlocksCmd)(nil), flags) MustRegisterCmd("notifynewtransactions", (*NotifyNewTransactionsCmd)(nil), flags) - MustRegisterCmd("notifyreceived", (*NotifyReceivedCmd)(nil), flags) - MustRegisterCmd("notifyspent", (*NotifySpentCmd)(nil), flags) MustRegisterCmd("session", (*SessionCmd)(nil), flags) MustRegisterCmd("stopnotifyblocks", (*StopNotifyBlocksCmd)(nil), flags) MustRegisterCmd("stopnotifynewtransactions", (*StopNotifyNewTransactionsCmd)(nil), flags) - MustRegisterCmd("stopnotifyspent", (*StopNotifySpentCmd)(nil), flags) - MustRegisterCmd("stopnotifyreceived", (*StopNotifyReceivedCmd)(nil), flags) MustRegisterCmd("rescan", (*RescanCmd)(nil), flags) } diff --git a/dcrjson/chainsvrwscmds_test.go b/dcrjson/chainsvrwscmds_test.go index 9ca414b83d1..a0951e069ba 100644 --- a/dcrjson/chainsvrwscmds_test.go +++ b/dcrjson/chainsvrwscmds_test.go @@ -100,97 +100,17 @@ func TestChainSvrWsCmds(t *testing.T) { marshalled: `{"jsonrpc":"1.0","method":"stopnotifynewtransactions","params":[],"id":1}`, unmarshalled: &dcrjson.StopNotifyNewTransactionsCmd{}, }, - { - name: "notifyreceived", - newCmd: func() (interface{}, error) { - return dcrjson.NewCmd("notifyreceived", []string{"1Address"}) - }, - staticCmd: func() interface{} { - return dcrjson.NewNotifyReceivedCmd([]string{"1Address"}) - }, - marshalled: `{"jsonrpc":"1.0","method":"notifyreceived","params":[["1Address"]],"id":1}`, - unmarshalled: &dcrjson.NotifyReceivedCmd{ - Addresses: []string{"1Address"}, - }, - }, - { - name: "stopnotifyreceived", - newCmd: func() (interface{}, error) { - return dcrjson.NewCmd("stopnotifyreceived", []string{"1Address"}) - }, - staticCmd: func() interface{} { - return dcrjson.NewStopNotifyReceivedCmd([]string{"1Address"}) - }, - marshalled: `{"jsonrpc":"1.0","method":"stopnotifyreceived","params":[["1Address"]],"id":1}`, - unmarshalled: &dcrjson.StopNotifyReceivedCmd{ - Addresses: []string{"1Address"}, - }, - }, - { - name: "notifyspent", - newCmd: func() (interface{}, error) { - return dcrjson.NewCmd("notifyspent", `[{"hash":"123","index":0}]`) - }, - staticCmd: func() interface{} { - ops := []dcrjson.OutPoint{{Hash: "123", Index: 0}} - return dcrjson.NewNotifySpentCmd(ops) - }, - marshalled: `{"jsonrpc":"1.0","method":"notifyspent","params":[[{"hash":"123","tree":0,"index":0}]],"id":1}`, - unmarshalled: &dcrjson.NotifySpentCmd{ - OutPoints: []dcrjson.OutPoint{{Hash: "123", Index: 0}}, - }, - }, - { - name: "stopnotifyspent", - newCmd: func() (interface{}, error) { - return dcrjson.NewCmd("stopnotifyspent", `[{"hash":"123","index":0}]`) - }, - staticCmd: func() interface{} { - ops := []dcrjson.OutPoint{{Hash: "123", Index: 0}} - return dcrjson.NewStopNotifySpentCmd(ops) - }, - marshalled: `{"jsonrpc":"1.0","method":"stopnotifyspent","params":[[{"hash":"123","tree":0,"index":0}]],"id":1}`, - unmarshalled: &dcrjson.StopNotifySpentCmd{ - OutPoints: []dcrjson.OutPoint{{Hash: "123", Index: 0}}, - }, - }, { name: "rescan", newCmd: func() (interface{}, error) { - return dcrjson.NewCmd("rescan", "123", `["1Address"]`, `[{"hash":"0000000000000000000000000000000000000000000000000000000000000123","tree":0,"index":0}]`) - }, - staticCmd: func() interface{} { - addrs := []string{"1Address"} - ops := []dcrjson.OutPoint{{ - Hash: "0000000000000000000000000000000000000000000000000000000000000123", - Index: 0, - }} - return dcrjson.NewRescanCmd("123", addrs, ops, nil) - }, - marshalled: `{"jsonrpc":"1.0","method":"rescan","params":["123",["1Address"],[{"hash":"0000000000000000000000000000000000000000000000000000000000000123","tree":0,"index":0}]],"id":1}`, - unmarshalled: &dcrjson.RescanCmd{ - BeginBlock: "123", - Addresses: []string{"1Address"}, - OutPoints: []dcrjson.OutPoint{{Hash: "0000000000000000000000000000000000000000000000000000000000000123", Index: 0}}, - EndBlock: nil, - }, - }, - { - name: "rescan optional", - newCmd: func() (interface{}, error) { - return dcrjson.NewCmd("rescan", "123", `["1Address"]`, `[{"hash":"123","tree":0,"index":0}]`, "456") + return dcrjson.NewCmd("rescan", "0000000000000000000000000000000000000000000000000000000000000123") }, staticCmd: func() interface{} { - addrs := []string{"1Address"} - ops := []dcrjson.OutPoint{{Hash: "123", Index: 0}} - return dcrjson.NewRescanCmd("123", addrs, ops, dcrjson.String("456")) + return dcrjson.NewRescanCmd("0000000000000000000000000000000000000000000000000000000000000123") }, - marshalled: `{"jsonrpc":"1.0","method":"rescan","params":["123",["1Address"],[{"hash":"123","tree":0,"index":0}],"456"],"id":1}`, + marshalled: `{"jsonrpc":"1.0","method":"rescan","params":["0000000000000000000000000000000000000000000000000000000000000123"],"id":1}`, unmarshalled: &dcrjson.RescanCmd{ - BeginBlock: "123", - Addresses: []string{"1Address"}, - OutPoints: []dcrjson.OutPoint{{Hash: "123", Index: 0}}, - EndBlock: dcrjson.String("456"), + BlockHashes: "0000000000000000000000000000000000000000000000000000000000000123", }, }, } diff --git a/dcrjson/chainsvrwsntfns.go b/dcrjson/chainsvrwsntfns.go index 31ed6412f66..dcab78177aa 100644 --- a/dcrjson/chainsvrwsntfns.go +++ b/dcrjson/chainsvrwsntfns.go @@ -21,25 +21,6 @@ const ( // block chain is in the process of a reorganization. ReorganizationNtfnMethod = "reorganization" - // RecvTxNtfnMethod is the method used for notifications from the chain - // server that a transaction which pays to a registered address has been - // processed. - RecvTxNtfnMethod = "recvtx" - - // RedeemingTxNtfnMethod is the method used for notifications from the - // chain server that a transaction which spends a registered outpoint - // has been processed. - RedeemingTxNtfnMethod = "redeemingtx" - - // RescanFinishedNtfnMethod is the method used for notifications from - // the chain server that a rescan operation has finished. - RescanFinishedNtfnMethod = "rescanfinished" - - // RescanProgressNtfnMethod is the method used for notifications from - // the chain server that a rescan operation this is underway has made - // progress. - RescanProgressNtfnMethod = "rescanprogress" - // TxAcceptedNtfnMethod is the method used for notifications from the // chain server that a transaction has been accepted into the mempool. TxAcceptedNtfnMethod = "txaccepted" @@ -49,52 +30,47 @@ const ( // mempool. This differs from TxAcceptedNtfnMethod in that it provides // more details in the notification. TxAcceptedVerboseNtfnMethod = "txacceptedverbose" + + // RelevantTxAcceptedNtfnMethod is the method used for notifications + // from the chain server that inform a client that a relevant + // transaction was accepted by the mempool. + RelevantTxAcceptedNtfnMethod = "relevanttxaccepted" ) // BlockConnectedNtfn defines the blockconnected JSON-RPC notification. type BlockConnectedNtfn struct { - Hash string - Height int32 - Time int64 - VoteBits uint16 + Header string `json:"header"` + SubscribedTxs []string `json:"subscribedtxs"` } // NewBlockConnectedNtfn returns a new instance which can be used to issue a // blockconnected JSON-RPC notification. -func NewBlockConnectedNtfn(hash string, height int32, time int64, voteBits uint16) *BlockConnectedNtfn { +func NewBlockConnectedNtfn(header string, subscribedTxs []string) *BlockConnectedNtfn { return &BlockConnectedNtfn{ - Hash: hash, - Height: height, - Time: time, - VoteBits: voteBits, + Header: header, + SubscribedTxs: subscribedTxs, } } // BlockDisconnectedNtfn defines the blockdisconnected JSON-RPC notification. type BlockDisconnectedNtfn struct { - Hash string - Height int32 - Time int64 - VoteBits uint16 + Header string `json:"header"` } // NewBlockDisconnectedNtfn returns a new instance which can be used to issue a // blockdisconnected JSON-RPC notification. -func NewBlockDisconnectedNtfn(hash string, height int32, time int64, voteBits uint16) *BlockDisconnectedNtfn { +func NewBlockDisconnectedNtfn(header string) *BlockDisconnectedNtfn { return &BlockDisconnectedNtfn{ - Hash: hash, - Height: height, - Time: time, - VoteBits: voteBits, + Header: header, } } // ReorganizationNtfn defines the reorganization JSON-RPC notification. type ReorganizationNtfn struct { - OldHash string - OldHeight int32 - NewHash string - NewHeight int32 + OldHash string `json:"oldhash"` + OldHeight int32 `json:"oldheight"` + NewHash string `json:"newhash"` + NewHeight int32 `json:"newheight"` } // NewReorganizationNtfn returns a new instance which can be used to issue a @@ -109,84 +85,10 @@ func NewReorganizationNtfn(oldHash string, oldHeight int32, newHash string, } } -// BlockDetails describes details of a tx in a block. -type BlockDetails struct { - Height int32 `json:"height"` - Tree int8 `json:"tree"` - Hash string `json:"hash"` - Index int `json:"index"` - Time int64 `json:"time"` - VoteBits uint16 `json:"votebits"` -} - -// RecvTxNtfn defines the recvtx JSON-RPC notification. -type RecvTxNtfn struct { - HexTx string - Block *BlockDetails -} - -// NewRecvTxNtfn returns a new instance which can be used to issue a recvtx -// JSON-RPC notification. -func NewRecvTxNtfn(hexTx string, block *BlockDetails) *RecvTxNtfn { - return &RecvTxNtfn{ - HexTx: hexTx, - Block: block, - } -} - -// RedeemingTxNtfn defines the redeemingtx JSON-RPC notification. -type RedeemingTxNtfn struct { - HexTx string - Block *BlockDetails -} - -// NewRedeemingTxNtfn returns a new instance which can be used to issue a -// redeemingtx JSON-RPC notification. -func NewRedeemingTxNtfn(hexTx string, block *BlockDetails) *RedeemingTxNtfn { - return &RedeemingTxNtfn{ - HexTx: hexTx, - Block: block, - } -} - -// RescanFinishedNtfn defines the rescanfinished JSON-RPC notification. -type RescanFinishedNtfn struct { - Hash string - Height int64 - Time int64 -} - -// NewRescanFinishedNtfn returns a new instance which can be used to issue a -// rescanfinished JSON-RPC notification. -func NewRescanFinishedNtfn(hash string, height int64, time int64) *RescanFinishedNtfn { - return &RescanFinishedNtfn{ - Hash: hash, - Height: height, - Time: time, - } -} - -// RescanProgressNtfn defines the rescanprogress JSON-RPC notification. -type RescanProgressNtfn struct { - Hash string - Height int32 - Time int64 -} - -// NewRescanProgressNtfn returns a new instance which can be used to issue a -// rescanprogress JSON-RPC notification. -func NewRescanProgressNtfn(hash string, height int32, time int64) *RescanProgressNtfn { - return &RescanProgressNtfn{ - Hash: hash, - Height: height, - Time: time, - } -} - // TxAcceptedNtfn defines the txaccepted JSON-RPC notification. type TxAcceptedNtfn struct { - TxID string - Amount float64 + TxID string `json:"txid"` + Amount float64 `json:"amount"` } // NewTxAcceptedNtfn returns a new instance which can be used to issue a @@ -200,7 +102,7 @@ func NewTxAcceptedNtfn(txHash string, amount float64) *TxAcceptedNtfn { // TxAcceptedVerboseNtfn defines the txacceptedverbose JSON-RPC notification. type TxAcceptedVerboseNtfn struct { - RawTx TxRawResult + RawTx TxRawResult `json:"rawtx"` } // NewTxAcceptedVerboseNtfn returns a new instance which can be used to issue a @@ -211,6 +113,18 @@ func NewTxAcceptedVerboseNtfn(rawTx TxRawResult) *TxAcceptedVerboseNtfn { } } +// RelevantTxAcceptedNtfn defines the parameters to the relevanttxaccepted +// JSON-RPC notification. +type RelevantTxAcceptedNtfn struct { + Transaction string `json:"transaction"` +} + +// NewRelevantTxAcceptedNtfn returns a new instance which can be used to issue a +// relevantxaccepted JSON-RPC notification. +func NewRelevantTxAcceptedNtfn(txHex string) *RelevantTxAcceptedNtfn { + return &RelevantTxAcceptedNtfn{Transaction: txHex} +} + func init() { // The commands in this file are only usable by websockets and are // notifications. @@ -219,10 +133,7 @@ func init() { MustRegisterCmd(BlockConnectedNtfnMethod, (*BlockConnectedNtfn)(nil), flags) MustRegisterCmd(BlockDisconnectedNtfnMethod, (*BlockDisconnectedNtfn)(nil), flags) MustRegisterCmd(ReorganizationNtfnMethod, (*ReorganizationNtfn)(nil), flags) - MustRegisterCmd(RecvTxNtfnMethod, (*RecvTxNtfn)(nil), flags) - MustRegisterCmd(RedeemingTxNtfnMethod, (*RedeemingTxNtfn)(nil), flags) - MustRegisterCmd(RescanFinishedNtfnMethod, (*RescanFinishedNtfn)(nil), flags) - MustRegisterCmd(RescanProgressNtfnMethod, (*RescanProgressNtfn)(nil), flags) MustRegisterCmd(TxAcceptedNtfnMethod, (*TxAcceptedNtfn)(nil), flags) MustRegisterCmd(TxAcceptedVerboseNtfnMethod, (*TxAcceptedVerboseNtfn)(nil), flags) + MustRegisterCmd(RelevantTxAcceptedNtfnMethod, (*RelevantTxAcceptedNtfn)(nil), flags) } diff --git a/dcrjson/chainsvrwsntfns_test.go b/dcrjson/chainsvrwsntfns_test.go index e075e63b612..d2f54dd2b08 100644 --- a/dcrjson/chainsvrwsntfns_test.go +++ b/dcrjson/chainsvrwsntfns_test.go @@ -32,119 +32,41 @@ func TestChainSvrWsNtfns(t *testing.T) { { name: "blockconnected", newNtfn: func() (interface{}, error) { - return dcrjson.NewCmd("blockconnected", "123", 100000, 123456789, 0) + return dcrjson.NewCmd("blockconnected", "header", []string{"tx0", "tx1"}) }, staticNtfn: func() interface{} { - return dcrjson.NewBlockConnectedNtfn("123", 100000, 123456789, 0) + return dcrjson.NewBlockConnectedNtfn("header", []string{"tx0", "tx1"}) }, - marshalled: `{"jsonrpc":"1.0","method":"blockconnected","params":["123",100000,123456789,0],"id":null}`, + marshalled: `{"jsonrpc":"1.0","method":"blockconnected","params":["header",["tx0","tx1"]],"id":null}`, unmarshalled: &dcrjson.BlockConnectedNtfn{ - Hash: "123", - Height: 100000, - Time: 123456789, - VoteBits: 0, + Header: "header", + SubscribedTxs: []string{"tx0", "tx1"}, }, }, { name: "blockdisconnected", newNtfn: func() (interface{}, error) { - return dcrjson.NewCmd("blockdisconnected", "123", 100000, 123456789, 0) + return dcrjson.NewCmd("blockdisconnected", "header") }, staticNtfn: func() interface{} { - return dcrjson.NewBlockDisconnectedNtfn("123", 100000, 123456789, 0) + return dcrjson.NewBlockDisconnectedNtfn("header") }, - marshalled: `{"jsonrpc":"1.0","method":"blockdisconnected","params":["123",100000,123456789,0],"id":null}`, + marshalled: `{"jsonrpc":"1.0","method":"blockdisconnected","params":["header"],"id":null}`, unmarshalled: &dcrjson.BlockDisconnectedNtfn{ - Hash: "123", - Height: 100000, - Time: 123456789, - VoteBits: 0, + Header: "header", }, }, { - name: "recvtx", + name: "relevanttxaccepted", newNtfn: func() (interface{}, error) { - return dcrjson.NewCmd("recvtx", "001122", `{"height":100000,"tree":0,"hash":"123","index":0,"time":12345678,"votebits":0}`) + return dcrjson.NewCmd("relevanttxaccepted", "001122") }, staticNtfn: func() interface{} { - blockDetails := dcrjson.BlockDetails{ - Height: 100000, - Tree: 0, - Hash: "123", - Index: 0, - Time: 12345678, - VoteBits: 0, - } - return dcrjson.NewRecvTxNtfn("001122", &blockDetails) - }, - marshalled: `{"jsonrpc":"1.0","method":"recvtx","params":["001122",{"height":100000,"tree":0,"hash":"123","index":0,"time":12345678,"votebits":0}],"id":null}`, - unmarshalled: &dcrjson.RecvTxNtfn{ - HexTx: "001122", - Block: &dcrjson.BlockDetails{ - Height: 100000, - Tree: 0, - Hash: "123", - Index: 0, - Time: 12345678, - VoteBits: 0, - }, - }, - }, - { - name: "redeemingtx", - newNtfn: func() (interface{}, error) { - return dcrjson.NewCmd("redeemingtx", "001122", `{"height":100000,"tree":0,"hash":"123","index":0,"time":12345678,"votebits":0}`) - }, - staticNtfn: func() interface{} { - blockDetails := dcrjson.BlockDetails{ - Height: 100000, - Hash: "123", - Index: 0, - Time: 12345678, - VoteBits: 0, - } - return dcrjson.NewRedeemingTxNtfn("001122", &blockDetails) - }, - marshalled: `{"jsonrpc":"1.0","method":"redeemingtx","params":["001122",{"height":100000,"tree":0,"hash":"123","index":0,"time":12345678,"votebits":0}],"id":null}`, - unmarshalled: &dcrjson.RedeemingTxNtfn{ - HexTx: "001122", - Block: &dcrjson.BlockDetails{ - Height: 100000, - Hash: "123", - Index: 0, - Time: 12345678, - VoteBits: 0, - }, - }, - }, - { - name: "rescanfinished", - newNtfn: func() (interface{}, error) { - return dcrjson.NewCmd("rescanfinished", "123", 100000, 12345678) - }, - staticNtfn: func() interface{} { - return dcrjson.NewRescanFinishedNtfn("123", 100000, 12345678) - }, - marshalled: `{"jsonrpc":"1.0","method":"rescanfinished","params":["123",100000,12345678],"id":null}`, - unmarshalled: &dcrjson.RescanFinishedNtfn{ - Hash: "123", - Height: 100000, - Time: 12345678, - }, - }, - { - name: "rescanprogress", - newNtfn: func() (interface{}, error) { - return dcrjson.NewCmd("rescanprogress", "123", 100000, 12345678) - }, - staticNtfn: func() interface{} { - return dcrjson.NewRescanProgressNtfn("123", 100000, 12345678) + return dcrjson.NewRelevantTxAcceptedNtfn("001122") }, - marshalled: `{"jsonrpc":"1.0","method":"rescanprogress","params":["123",100000,12345678],"id":null}`, - unmarshalled: &dcrjson.RescanProgressNtfn{ - Hash: "123", - Height: 100000, - Time: 12345678, + marshalled: `{"jsonrpc":"1.0","method":"relevanttxaccepted","params":["001122"],"id":null}`, + unmarshalled: &dcrjson.RelevantTxAcceptedNtfn{ + Transaction: "001122", }, }, { diff --git a/dcrjson/chainsvrwsresults.go b/dcrjson/chainsvrwsresults.go index 58c6a7820f5..53e76805d9b 100644 --- a/dcrjson/chainsvrwsresults.go +++ b/dcrjson/chainsvrwsresults.go @@ -9,3 +9,15 @@ package dcrjson type SessionResult struct { SessionID uint64 `json:"sessionid"` } + +// RescanResult models the result object returned by the rescan RPC. +type RescanResult struct { + DiscoveredData []RescannedBlock `json:"discovereddata"` +} + +// RescannedBlock contains the hash and all discovered transactions of a single +// rescanned block. +type RescannedBlock struct { + Hash string `json:"hash"` + Transactions []string `json:"transactions"` +} diff --git a/glide.lock b/glide.lock index 156299ed40b..cd9e68e5fc4 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: f31381b9d7836c3acfccda809b8b279691b38d97517933b4f10e97f8ec468d2b -updated: 2016-10-05T20:46:59.837714578Z +hash: bd6d1cd7c6a97f133a021754dcaba3c202f155e2f4bc079e20d2da7639819eb3 +updated: 2016-11-01T10:13:55.6450189-04:00 imports: - name: github.com/btcsuite/btclog version: 73889fb79bd687870312b6e40effcecffbd57d30 @@ -51,7 +51,8 @@ imports: - name: github.com/decred/blake256 version: a840e32d7c31fe2e0218607334cb120a683951a4 - name: github.com/decred/dcrrpcclient - version: 0f562bb540de12dfd5248aee87063eaa4792705f + version: a12e28b78870f352ba63aceca0a438b12fe6ac64 + repo: https://github.com/jrick/btcrpcclient.git - name: github.com/decred/dcrutil version: 0484582bf5503574d824f110e836a8c48aa60c8c subpackages: diff --git a/glide.yaml b/glide.yaml index 1865b399ec2..8d117e17fa6 100644 --- a/glide.yaml +++ b/glide.yaml @@ -30,6 +30,8 @@ import: - package: github.com/decred/bitset - package: github.com/decred/blake256 - package: github.com/decred/dcrrpcclient + repo: https://github.com/jrick/btcrpcclient.git + version: ntfnapi - package: github.com/decred/dcrutil subpackages: - bloom diff --git a/rpcserver.go b/rpcserver.go index 9c4b78a0264..816ba7e5151 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -51,9 +51,9 @@ import ( // API version constants const ( - jsonrpcSemverString = "1.2.0" - jsonrpcSemverMajor = 1 - jsonrpcSemverMinor = 2 + jsonrpcSemverString = "2.0.0" + jsonrpcSemverMajor = 2 + jsonrpcSemverMinor = 0 jsonrpcSemverPatch = 0 ) diff --git a/rpcserverhelp.go b/rpcserverhelp.go index 86d3f3d6eb8..93405ff3d26 100644 --- a/rpcserverhelp.go +++ b/rpcserverhelp.go @@ -688,37 +688,20 @@ var helpDescsEnUS = map[string]string{ // StopNotifyNewTransactionsCmd help. "stopnotifynewtransactions--synopsis": "Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", - // NotifyReceivedCmd help. - "notifyreceived--synopsis": "Send a recvtx notification when a transaction added to mempool or appears in a newly-attached block contains a txout pkScript sending to any of the passed addresses.\n" + - "Matching outpoints are automatically registered for redeemingtx notifications.", - "notifyreceived-addresses": "List of address to receive notifications about", - - // StopNotifyReceivedCmd help. - "stopnotifyreceived--synopsis": "Cancel registered receive notifications for each passed address.", - "stopnotifyreceived-addresses": "List of address to cancel receive notifications for", - // OutPoint help. "outpoint-hash": "The hex-encoded bytes of the outpoint hash", "outpoint-index": "The index of the outpoint", "outpoint-tree": "The tree of the outpoint", - // NotifySpentCmd help. - "notifyspent--synopsis": "Send a redeemingtx notification when a transaction spending an outpoint appears in mempool (if relayed to this dcrd instance) and when such a transaction first appears in a newly-attached block.", - "notifyspent-outpoints": "List of transaction outpoints to monitor.", - - // StopNotifySpentCmd help. - "stopnotifyspent--synopsis": "Cancel registered spending notifications for each passed outpoint.", - "stopnotifyspent-outpoints": "List of transaction outpoints to stop monitoring.", + // LoadTxFilterCmd help. + "loadtxfilter--synopsis": "Load, add to, or reload a websocket client's transaction filter for mempool transactions, new blocks and rescans.", + "loadtxfilter-reload": "Load a new filter instead of adding data to an existing one", + "loadtxfilter-addresses": "Array of addresses to add to the transaction filter", + "loadtxfilter-outpoints": "Array of outpoints to add to the transaction filter", // Rescan help. - "rescan--synopsis": "Rescan block chain for transactions to addresses.\n" + - "When the endblock parameter is omitted, the rescan continues through the best block in the main chain.\n" + - "Rescan results are sent as recvtx and redeemingtx notifications.\n" + - "This call returns once the rescan completes.", - "rescan-beginblock": "Hash of the first block to begin rescanning", - "rescan-addresses": "List of addresses to include in the rescan", - "rescan-outpoints": "List of transaction outpoints to include in the rescan", - "rescan-endblock": "Hash of final block to rescan", + "rescan--synopsis": "Rescan blocks for transactions matching the loaded transaction filter.", + "rescan-blockhashes": "Concatenated block hashes to rescan. Each next block must be a child of the previous.", // -------- Decred-specific help -------- @@ -895,6 +878,7 @@ var rpcResultTypes = map[string][]interface{}{ "version": {(*map[string]dcrjson.VersionResult)(nil)}, // Websocket commands. + "loadtxfilter": nil, "session": {(*dcrjson.SessionResult)(nil)}, "notifywinningtickets": nil, "notifyspentandmissedtickets": nil, diff --git a/rpcwebsocket.go b/rpcwebsocket.go index 0661c89167f..373c2b0118e 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -16,7 +16,6 @@ import ( "errors" "fmt" "io" - "math" "strconv" "sync" "time" @@ -27,7 +26,6 @@ import ( "github.com/decred/dcrd/blockchain" "github.com/decred/dcrd/blockchain/stake" "github.com/decred/dcrd/chaincfg/chainhash" - "github.com/decred/dcrd/database" "github.com/decred/dcrd/dcrjson" "github.com/decred/dcrd/txscript" "github.com/decred/dcrd/wire" @@ -56,21 +54,18 @@ type wsCommandHandler func(*wsClient, interface{}) (interface{}, error) // causes a dependency loop. var wsHandlers map[string]wsCommandHandler var wsHandlersBeforeInit = map[string]wsCommandHandler{ + "loadtxfilter": handleLoadTxFilter, "notifyblocks": handleNotifyBlocks, "notifywinningtickets": handleWinningTickets, "notifyspentandmissedtickets": handleSpentAndMissedTickets, "notifynewtickets": handleNewTickets, "notifystakedifficulty": handleStakeDifficulty, "notifynewtransactions": handleNotifyNewTransactions, - "notifyreceived": handleNotifyReceived, - "notifyspent": handleNotifySpent, "session": handleSession, "help": handleWebsocketHelp, "rescan": handleRescan, "stopnotifyblocks": handleStopNotifyBlocks, "stopnotifynewtransactions": handleStopNotifyNewTransactions, - "stopnotifyspent": handleStopNotifySpent, - "stopnotifyreceived": handleStopNotifyReceived, } // wsAsyncHandlers holds the websocket commands which should be run @@ -336,6 +331,165 @@ type StakeDifficultyNtfnData struct { StakeDifficulty int64 } +type wsClientFilter struct { + mu sync.Mutex + + // Implemented fast paths for address lookup. + pubKeyHashes map[[ripemd160.Size]byte]struct{} + scriptHashes map[[ripemd160.Size]byte]struct{} + compressedPubKeys map[[33]byte]struct{} + uncompressedPubKeys map[[65]byte]struct{} + + // A fallback address lookup map in case a fast path doesn't exist. + // Only exists for completeness. If using this shows up in a profile, + // there's a good chance a fast path should be added. + otherAddresses map[string]struct{} + + // Outpoints of unspent outputs. + unspent map[wire.OutPoint]struct{} +} + +func makeWSClientFilter(addresses []string, unspentOutPoints []*wire.OutPoint) *wsClientFilter { + filter := &wsClientFilter{ + pubKeyHashes: map[[ripemd160.Size]byte]struct{}{}, + scriptHashes: map[[ripemd160.Size]byte]struct{}{}, + compressedPubKeys: map[[33]byte]struct{}{}, + uncompressedPubKeys: map[[65]byte]struct{}{}, + otherAddresses: map[string]struct{}{}, + unspent: make(map[wire.OutPoint]struct{}, len(unspentOutPoints)), + } + + for _, s := range addresses { + filter.addAddressStr(s) + } + for _, op := range unspentOutPoints { + filter.addUnspentOutPoint(op) + } + + return filter +} + +func (f *wsClientFilter) addAddress(a dcrutil.Address) { + switch a := a.(type) { + case *dcrutil.AddressPubKeyHash: + f.pubKeyHashes[*a.Hash160()] = struct{}{} + return + case *dcrutil.AddressScriptHash: + f.scriptHashes[*a.Hash160()] = struct{}{} + return + case *dcrutil.AddressSecpPubKey: + serializedPubKey := a.ScriptAddress() + switch len(serializedPubKey) { + case 33: // compressed + var compressedPubKey [33]byte + copy(compressedPubKey[:], serializedPubKey) + f.compressedPubKeys[compressedPubKey] = struct{}{} + return + case 65: // uncompressed + var uncompressedPubKey [65]byte + copy(uncompressedPubKey[:], serializedPubKey) + f.uncompressedPubKeys[uncompressedPubKey] = struct{}{} + return + } + } + + f.otherAddresses[a.EncodeAddress()] = struct{}{} +} + +func (f *wsClientFilter) addAddressStr(s string) { + a, err := dcrutil.DecodeAddress(s, activeNetParams.Params) + // If address can't be decoded, no point in saving it since it should also + // impossible to create the address from an inspected transaction output + // script. + if err != nil { + return + } + f.addAddress(a) +} + +func (f *wsClientFilter) existsAddress(a dcrutil.Address) bool { + switch a := a.(type) { + case *dcrutil.AddressPubKeyHash: + _, ok := f.pubKeyHashes[*a.Hash160()] + return ok + case *dcrutil.AddressScriptHash: + _, ok := f.scriptHashes[*a.Hash160()] + return ok + case *dcrutil.AddressSecpPubKey: + serializedPubKey := a.ScriptAddress() + switch len(serializedPubKey) { + case 33: // compressed + var compressedPubKey [33]byte + copy(compressedPubKey[:], serializedPubKey) + _, ok := f.compressedPubKeys[compressedPubKey] + if !ok { + _, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()] + } + return ok + case 65: // uncompressed + var uncompressedPubKey [65]byte + copy(uncompressedPubKey[:], serializedPubKey) + _, ok := f.uncompressedPubKeys[uncompressedPubKey] + if !ok { + _, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()] + } + return ok + } + } + + _, ok := f.otherAddresses[a.EncodeAddress()] + return ok +} + +func (f *wsClientFilter) removeAddress(a dcrutil.Address) { + switch a := a.(type) { + case *dcrutil.AddressPubKeyHash: + delete(f.pubKeyHashes, *a.Hash160()) + return + case *dcrutil.AddressScriptHash: + delete(f.scriptHashes, *a.Hash160()) + return + case *dcrutil.AddressSecpPubKey: + serializedPubKey := a.ScriptAddress() + switch len(serializedPubKey) { + case 33: // compressed + var compressedPubKey [33]byte + copy(compressedPubKey[:], serializedPubKey) + delete(f.compressedPubKeys, compressedPubKey) + return + case 65: // uncompressed + var uncompressedPubKey [65]byte + copy(uncompressedPubKey[:], serializedPubKey) + delete(f.uncompressedPubKeys, uncompressedPubKey) + return + } + } + + delete(f.otherAddresses, a.EncodeAddress()) +} + +func (f *wsClientFilter) removeAddressStr(s string) { + a, err := dcrutil.DecodeAddress(s, activeNetParams.Params) + if err == nil { + f.removeAddress(a) + } else { + delete(f.otherAddresses, s) + } +} + +func (f *wsClientFilter) addUnspentOutPoint(op *wire.OutPoint) { + f.unspent[*op] = struct{}{} +} + +func (f *wsClientFilter) existsUnspentOutPoint(op *wire.OutPoint) bool { + _, ok := f.unspent[*op] + return ok +} + +func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) { + delete(f.unspent, *op) +} + // Notification types type notificationBlockConnected dcrutil.Block type notificationBlockDisconnected dcrutil.Block @@ -364,22 +518,6 @@ type notificationRegisterStakeDifficulty wsClient type notificationUnregisterStakeDifficulty wsClient type notificationRegisterNewMempoolTxs wsClient type notificationUnregisterNewMempoolTxs wsClient -type notificationRegisterSpent struct { - wsc *wsClient - ops []*wire.OutPoint -} -type notificationUnregisterSpent struct { - wsc *wsClient - op *wire.OutPoint -} -type notificationRegisterAddr struct { - wsc *wsClient - addrs []string -} -type notificationUnregisterAddr struct { - wsc *wsClient - addr string -} // notificationHandler reads notifications and control messages from the queue // handler and processes one at a time. @@ -400,8 +538,6 @@ func (m *wsNotificationManager) notificationHandler() { ticketNewNotifications := make(map[chan struct{}]*wsClient) stakeDifficultyNotifications := make(map[chan struct{}]*wsClient) txNotifications := make(map[chan struct{}]*wsClient) - watchedOutPoints := make(map[wire.OutPoint]map[chan struct{}]*wsClient) - watchedAddrs := make(map[string]map[chan struct{}]*wsClient) out: for { @@ -415,41 +551,13 @@ out: case *notificationBlockConnected: block := (*dcrutil.Block)(n) - // If the block was voted for by the stakeholders, announce the - // transactions to the notifications watcher. - msgblock := block.MsgBlock() - votebits := msgblock.Header.VoteBits - - // Skip iterating through all txs if no - // tx notification requests exist. - if len(watchedOutPoints) != 0 || len(watchedAddrs) != 0 { - if dcrutil.IsFlagSet16(votebits, dcrutil.BlockValid) { - prevblock, err := m.server.chain.BlockByHash( - &msgblock.Header.PrevBlock) - if err != nil { - rpcsLog.Error("Previous block could not be loaded "+ - "from database!", err) - break // Correct behaviour? This should never happen - } - - for _, tx := range prevblock.Transactions() { - m.notifyForTx(watchedOutPoints, - watchedAddrs, - tx, - prevblock) - } - } - - // Stake tx are included regardless of voting. - for _, tx := range block.STransactions() { - m.notifyForTx(watchedOutPoints, watchedAddrs, tx, block) - } + // Skip iterating through all txs if no tx + // notification requests exist. + if len(blockNotifications) == 0 { + continue } - if len(blockNotifications) != 0 { - m.notifyBlockConnected(blockNotifications, - block) - } + m.notifyBlockConnected(blockNotifications, block) case *notificationBlockDisconnected: m.notifyBlockDisconnected(blockNotifications, @@ -479,7 +587,7 @@ out: if n.isNew && len(txNotifications) != 0 { m.notifyForNewTx(txNotifications, n.tx) } - m.notifyForTx(watchedOutPoints, watchedAddrs, n.tx, nil) + m.notifyRelevantTxAccepted(n.tx, clients) case *notificationRegisterBlocks: wsc := (*wsClient)(n) @@ -531,27 +639,8 @@ out: // the client itself. delete(blockNotifications, wsc.quit) delete(txNotifications, wsc.quit) - for k := range wsc.spentRequests { - op := k - m.removeSpentRequest(watchedOutPoints, wsc, &op) - } - for addr := range wsc.addrRequests { - m.removeAddrRequest(watchedAddrs, wsc, addr) - } delete(clients, wsc.quit) - case *notificationRegisterSpent: - m.addSpentRequests(watchedOutPoints, n.wsc, n.ops) - - case *notificationUnregisterSpent: - m.removeSpentRequest(watchedOutPoints, n.wsc, n.op) - - case *notificationRegisterAddr: - m.addAddrRequests(watchedAddrs, n.wsc, n.addrs) - - case *notificationUnregisterAddr: - m.removeAddrRequest(watchedAddrs, n.wsc, n.addr) - case *notificationRegisterNewMempoolTxs: wsc := (*wsClient)(n) txNotifications[wsc.quit] = wsc @@ -599,23 +688,124 @@ func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) { m.queueNotification <- (*notificationUnregisterBlocks)(wsc) } +// subscribedClients returns the set of all websocket client quit channels that +// are registered to receive notifications regarding tx, either due to tx +// spending a watched output or outputting to a watched address. Matching +// client's filters are updated based on this transaction's outputs and output +// addresses that may be relevant for a client. +func (m *wsNotificationManager) subscribedClients(tx *dcrutil.Tx, + clients map[chan struct{}]*wsClient) map[chan struct{}]struct{} { + + // Use a map of client quit channels as keys to prevent duplicates when + // multiple inputs and/or outputs are relevant to the client. + subscribed := make(map[chan struct{}]struct{}) + + msgTx := tx.MsgTx() + for q, c := range clients { + c.Lock() + f := c.filterData + c.Unlock() + if f == nil { + continue + } + f.mu.Lock() + + for _, input := range msgTx.TxIn { + if f.existsUnspentOutPoint(&input.PreviousOutPoint) { + subscribed[q] = struct{}{} + } + } + + for i, output := range msgTx.TxOut { + _, addrs, _, err := txscript.ExtractPkScriptAddrs( + txscript.DefaultScriptVersion, + output.PkScript, m.server.server.chainParams) + if err != nil { + // Clients are not able to subscribe to + // nonstandard or non-address outputs. + continue + } + for _, a := range addrs { + if f.existsAddress(a) { + subscribed[q] = struct{}{} + op := wire.OutPoint{ + Hash: *tx.Sha(), + Index: uint32(i), + Tree: tx.Tree(), + } + f.addUnspentOutPoint(&op) + } + } + } + + f.mu.Unlock() + } + + return subscribed +} + // notifyBlockConnected notifies websocket clients that have registered for // block updates when a block is connected to the main chain. -func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient, +func (m *wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient, block *dcrutil.Block) { - // Notify interested websocket clients about the connected block. - ntfn := dcrjson.NewBlockConnectedNtfn(block.Sha().String(), - int32(block.Height()), block.MsgBlock().Header.Timestamp.Unix(), - block.MsgBlock().Header.VoteBits) - marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn) + // Create the common portion of the notification that is the same for + // every client. Stake transactions are always included for every + // client regardless of what outpoints and addresses they watch. + headerBytes, err := block.MsgBlock().Header.Bytes() if err != nil { - rpcsLog.Error("Failed to marshal block connected notification: "+ - "%v", err) - return + // This should never error. The header is written to an + // in-memory expandable buffer, and given that the block was + // just accepted, there should be no issues serializing it. + panic(err) + } + stakeTransactions := block.STransactions() + hexStakeTransactions := make([]string, 0, len(stakeTransactions)) + buf := new(bytes.Buffer) + for _, tx := range stakeTransactions { + err := tx.MsgTx().Serialize(buf) + if err != nil { + // This should never error. The transaction is written + // to an in-memory expandable buffer, and given that the + // block was just accepted, there should be no issues + // serializing it. + panic(err) + } + stakeTxHex := hex.EncodeToString(buf.Bytes()) + buf.Reset() + hexStakeTransactions = append(hexStakeTransactions, stakeTxHex) } - for _, wsc := range clients { - wsc.QueueNotification(marshalledJSON) + ntfn := dcrjson.BlockConnectedNtfn{ + Header: hex.EncodeToString(headerBytes), + SubscribedTxs: nil, // Set individually for each client + } + + // Search for relevant transactions for each client and save them + // serialized in hex encoding for the notification. + subscribedTxs := make(map[chan struct{}][]string) + for _, tx := range block.Transactions() { + var txHex string + for quitChan := range m.subscribedClients(tx, clients) { + if txHex == "" { + txHex = txHexString(tx.MsgTx()) + } + subscribedTxs[quitChan] = append(subscribedTxs[quitChan], txHex) + } + } + + for quitChan, client := range clients { + // Add all stake transactions and the previously discovered + // relevant transactions for this client, if any. + ntfn.SubscribedTxs = append(hexStakeTransactions, subscribedTxs[quitChan]...) + + // Marshal and queue notification. + marshalledJSON, err := dcrjson.MarshalCmd(nil, &ntfn) + if err != nil { + rpcsLog.Error("Failed to marshal block connected "+ + "notification: %v", err) + continue + } + client.QueueNotification(marshalledJSON) } } @@ -630,10 +820,18 @@ func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}] } // Notify interested websocket clients about the disconnected block. - ntfn := dcrjson.NewBlockDisconnectedNtfn(block.Sha().String(), - int32(block.Height()), block.MsgBlock().Header.Timestamp.Unix(), - block.MsgBlock().Header.VoteBits) - marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn) + headerBytes, err := block.MsgBlock().Header.Bytes() + if err != nil { + // This should never error. The header is written to an + // in-memory expandable buffer, and given that the block was + // previously accepted, there should be no issues serializing + // it. + panic(err) + } + ntfn := dcrjson.BlockDisconnectedNtfn{ + Header: hex.EncodeToString(headerBytes), + } + marshalledJSON, err := dcrjson.MarshalCmd(nil, &ntfn) if err != nil { rpcsLog.Error("Failed to marshal block disconnected "+ "notification: %v", err) @@ -885,288 +1083,80 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie } } -// RegisterSpentRequests requests a notification when each of the passed -// outpoints is confirmed spent (contained in a block connected to the main -// chain) for the passed websocket client. The request is automatically -// removed once the notification has been sent. -func (m *wsNotificationManager) RegisterSpentRequests(wsc *wsClient, ops []*wire.OutPoint) { - m.queueNotification <- ¬ificationRegisterSpent{ - wsc: wsc, - ops: ops, - } -} - -// addSpentRequests modifies a map of watched outpoints to sets of websocket -// clients to add a new request watch all of the outpoints in ops and create -// and send a notification when spent to the websocket client wsc. -func (*wsNotificationManager) addSpentRequests(opMap map[wire.OutPoint]map[chan struct{}]*wsClient, - wsc *wsClient, ops []*wire.OutPoint) { - - for _, op := range ops { - // Track the request in the client as well so it can be quickly - // be removed on disconnect. - wsc.spentRequests[*op] = struct{}{} - - // Add the client to the list to notify when the outpoint is seen. - // Create the list as needed. - cmap, ok := opMap[*op] - if !ok { - cmap = make(map[chan struct{}]*wsClient) - opMap[*op] = cmap - } - cmap[wsc.quit] = wsc - } -} - -// UnregisterSpentRequest removes a request from the passed websocket client -// to be notified when the passed outpoint is confirmed spent (contained in a -// block connected to the main chain). -func (m *wsNotificationManager) UnregisterSpentRequest(wsc *wsClient, op *wire.OutPoint) { - m.queueNotification <- ¬ificationUnregisterSpent{ - wsc: wsc, - op: op, - } -} - -// removeSpentRequest modifies a map of watched outpoints to remove the -// websocket client wsc from the set of clients to be notified when a -// watched outpoint is spent. If wsc is the last client, the outpoint -// key is removed from the map. -func (*wsNotificationManager) removeSpentRequest(ops map[wire.OutPoint]map[chan struct{}]*wsClient, - wsc *wsClient, op *wire.OutPoint) { - - // Remove the request tracking from the client. - delete(wsc.spentRequests, *op) - - // Remove the client from the list to notify. - notifyMap, ok := ops[*op] - if !ok { - rpcsLog.Warnf("Attempt to remove nonexistent spent request "+ - "for websocket client %s", wsc.addr) - return - } - delete(notifyMap, wsc.quit) - - // Remove the map entry altogether if there are - // no more clients interested in it. - if len(notifyMap) == 0 { - delete(ops, *op) - } -} - // txHexString returns the serialized transaction encoded in hexadecimal. -func txHexString(tx *dcrutil.Tx) string { - buf := bytes.NewBuffer(make([]byte, 0, tx.MsgTx().SerializeSize())) +func txHexString(tx *wire.MsgTx) string { + buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize())) // Ignore Serialize's error, as writing to a bytes.buffer cannot fail. - tx.MsgTx().Serialize(buf) + tx.Serialize(buf) return hex.EncodeToString(buf.Bytes()) } -// blockDetails creates a BlockDetails struct to include in dcrws notifications -// from a block and a transaction's block index. -func blockDetails(block *dcrutil.Block, txTree int8, txIndex int) *dcrjson.BlockDetails { - if block == nil { - return nil - } - return &dcrjson.BlockDetails{ - Height: int32(block.Height()), - Hash: block.Sha().String(), - Index: txIndex, - Time: block.MsgBlock().Header.Timestamp.Unix(), - Tree: txTree, - VoteBits: block.MsgBlock().Header.VoteBits, - } -} - -// newRedeemingTxNotification returns a new marshalled redeemingtx notification -// with the passed parameters. -func newRedeemingTxNotification(txHex string, tree int8, index int, block *dcrutil.Block) ([]byte, error) { - // Create and marshal the notification. - ntfn := dcrjson.NewRedeemingTxNtfn(txHex, blockDetails(block, tree, index)) - return dcrjson.MarshalCmd(nil, ntfn) -} - -// notifyForTxOuts examines each transaction output, notifying interested -// websocket clients of the transaction if an output spends to a watched -// address. A spent notification request is automatically registered for -// the client for each matching output. -func (m *wsNotificationManager) notifyForTxOuts( - ops map[wire.OutPoint]map[chan struct{}]*wsClient, - addrs map[string]map[chan struct{}]*wsClient, tx *dcrutil.Tx, - block *dcrutil.Block) { +// notifyRelevantTxAccepted examines the inputs and outputs of the passed +// transaction, notifying websocket clients of outputs spending to a watched +// address and inputs spending a watched outpoint. Any outputs paying to a +// watched address result in the output being watched as well for future +// notifications. +func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *dcrutil.Tx, + clients map[chan struct{}]*wsClient) { - // Nothing to do if nobody is listening for address notifications. - if len(addrs) == 0 { - return - } + var clientsToNotify map[chan struct{}]*wsClient - txHex := "" - wscNotified := make(map[chan struct{}]struct{}) - for i, txOut := range tx.MsgTx().TxOut { - _, txAddrs, _, err := txscript.ExtractPkScriptAddrs(txOut.Version, - txOut.PkScript, m.server.server.chainParams) - if err != nil { + msgTx := tx.MsgTx() + for q, c := range clients { + c.Lock() + f := c.filterData + c.Unlock() + if f == nil { continue } + f.mu.Lock() - for _, txAddr := range txAddrs { - cmap, ok := addrs[txAddr.EncodeAddress()] - if !ok { - continue - } - - if txHex == "" { - txHex = txHexString(tx) - } - ntfn := dcrjson.NewRecvTxNtfn(txHex, blockDetails(block, - tx.Tree(), tx.Index())) - - marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn) - if err != nil { - rpcsLog.Errorf("Failed to marshal processedtx notification: %v", - err) - continue - } - - op := []*wire.OutPoint{wire.NewOutPoint( - tx.Sha(), - uint32(i), - tx.Tree())} - for wscQuit, wsc := range cmap { - m.addSpentRequests(ops, wsc, op) - - if _, ok := wscNotified[wscQuit]; !ok { - wscNotified[wscQuit] = struct{}{} - wsc.QueueNotification(marshalledJSON) + for _, input := range msgTx.TxIn { + if f.existsUnspentOutPoint(&input.PreviousOutPoint) { + if clientsToNotify == nil { + clientsToNotify = make(map[chan struct{}]*wsClient) } + clientsToNotify[q] = c } } - } -} -// notifyForTx examines the inputs and outputs of the passed transaction, -// notifying websocket clients of outputs spending to a watched address -// and inputs spending a watched outpoint. -func (m *wsNotificationManager) notifyForTx( - ops map[wire.OutPoint]map[chan struct{}]*wsClient, - addrs map[string]map[chan struct{}]*wsClient, - tx *dcrutil.Tx, block *dcrutil.Block) { - - if len(ops) != 0 { - m.notifyForTxIns(ops, tx, block) - } - if len(addrs) != 0 { - m.notifyForTxOuts(ops, addrs, tx, block) - } -} - -// notifyForTxIns examines the inputs of the passed transaction and sends -// interested websocket clients a redeemingtx notification if any inputs -// spend a watched output. If block is non-nil, any matching spent -// requests are removed. -func (m *wsNotificationManager) notifyForTxIns( - ops map[wire.OutPoint]map[chan struct{}]*wsClient, tx *dcrutil.Tx, - block *dcrutil.Block) { - - // Nothing to do if nobody is watching outpoints. - if len(ops) == 0 { - return - } - - txHex := "" - wscNotified := make(map[chan struct{}]struct{}) - for _, txIn := range tx.MsgTx().TxIn { - prevOut := &txIn.PreviousOutPoint - if cmap, ok := ops[*prevOut]; ok { - if txHex == "" { - txHex = txHexString(tx) - } - marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Tree(), - tx.Index(), block) + for i, output := range msgTx.TxOut { + _, addrs, _, err := txscript.ExtractPkScriptAddrs( + output.Version, output.PkScript, + m.server.server.chainParams) if err != nil { - rpcsLog.Warnf("Failed to marshal redeemingtx notification: %v", - err) continue } - for wscQuit, wsc := range cmap { - if _, ok := wscNotified[wscQuit]; !ok { - wscNotified[wscQuit] = struct{}{} - wsc.QueueNotification(marshalledJSON) + for _, a := range addrs { + if f.existsAddress(a) { + if clientsToNotify == nil { + clientsToNotify = make(map[chan struct{}]*wsClient) + } + clientsToNotify[q] = c + + op := wire.OutPoint{ + Hash: *tx.Sha(), + Index: uint32(i), + Tree: tx.Tree(), + } + f.addUnspentOutPoint(&op) } } } - } -} -// RegisterTxOutAddressRequests requests notifications to the passed websocket -// client when a transaction output spends to the passed address. -func (m *wsNotificationManager) RegisterTxOutAddressRequests(wsc *wsClient, - addrs []string) { - m.queueNotification <- ¬ificationRegisterAddr{ - wsc: wsc, - addrs: addrs, + f.mu.Unlock() } -} - -// addAddrRequests adds the websocket client wsc to the address to client set -// addrMap so wsc will be notified for any mempool or block transaction outputs -// spending to any of the addresses in addrs. -func (*wsNotificationManager) addAddrRequests( - addrMap map[string]map[chan struct{}]*wsClient, - wsc *wsClient, addrs []string) { - - for _, addr := range addrs { - rpcsLog.Tracef("Adding address %v for address notifications (client "+ - "session %v)", addr, wsc.sessionID) - - // Track the request in the client as well so it can be quickly be - // removed on disconnect. - wsc.addrRequests[addr] = struct{}{} - // Add the client to the set of clients to notify when the - // outpoint is seen. Create map as needed. - cmap, ok := addrMap[addr] - if !ok { - cmap = make(map[chan struct{}]*wsClient) - addrMap[addr] = cmap + if len(clientsToNotify) != 0 { + n := dcrjson.NewRelevantTxAcceptedNtfn(txHexString(msgTx)) + marshalled, err := dcrjson.MarshalCmd(nil, n) + if err != nil { + rpcsLog.Errorf("Failed to marshal notification: %v", err) + return + } + for _, c := range clientsToNotify { + c.QueueNotification(marshalled) } - cmap[wsc.quit] = wsc - } -} - -// UnregisterTxOutAddressRequest removes a request from the passed websocket -// client to be notified when a transaction spends to the passed address. -func (m *wsNotificationManager) UnregisterTxOutAddressRequest(wsc *wsClient, - addr string) { - m.queueNotification <- ¬ificationUnregisterAddr{ - wsc: wsc, - addr: addr, - } -} - -// removeAddrRequest removes the websocket client wsc from the address to -// client set addrs so it will no longer receive notification updates for -// any transaction outputs send to addr. -func (*wsNotificationManager) removeAddrRequest( - addrs map[string]map[chan struct{}]*wsClient, - wsc *wsClient, addr string) { - - // Remove the request tracking from the client. - delete(wsc.addrRequests, addr) - - // Remove the client from the list to notify. - cmap, ok := addrs[addr] - if !ok { - rpcsLog.Warnf("Attempt to remove nonexistent addr request "+ - "<%s> for websocket client %s", addr, wsc.addr) - return - } - delete(cmap, wsc.quit) - - // Remove the map entry altogether if there are no more clients - // interested in it. - if len(cmap) == 0 { - delete(addrs, addr) } } @@ -1271,15 +1261,7 @@ type wsClient struct { // information about all new transactions. verboseTxUpdates bool - // addrRequests is a set of addresses the caller has requested to be - // notified about. It is maintained here so all requests can be removed - // when a wallet disconnects. Owned by the notification manager. - addrRequests map[string]struct{} - - // spentRequests is a set of unspent Outpoints a wallet has requested - // notifications for when they are spent by a processed transaction. - // Owned by the notification manager. - spentRequests map[wire.OutPoint]struct{} + filterData *wsClientFilter // Networking infrastructure. asyncStarted bool @@ -1795,8 +1777,6 @@ func newWebsocketClient(server *rpcServer, conn *websocket.Conn, isAdmin: isAdmin, sessionID: sessionID, server: server, - addrRequests: make(map[string]struct{}), - spentRequests: make(map[wire.OutPoint]struct{}), ntfnChan: make(chan []byte, 1), // nonblocking sync asyncChan: make(chan *parsedRPCCmd, 1), // nonblocking sync sendChan: make(chan wsResponse, websocketSendBufferSize), @@ -1852,6 +1832,47 @@ func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) { return help, nil } +// handleLoadTxFilter implements the loadtxfilter command extension for +// websocket connections. +func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) { + cmd := icmd.(*dcrjson.LoadTxFilterCmd) + + outPoints := make([]*wire.OutPoint, len(cmd.OutPoints)) + for i := range cmd.OutPoints { + hash, err := chainhash.NewHashFromStr(cmd.OutPoints[i].Hash) + if err != nil { + return nil, &dcrjson.RPCError{ + Code: dcrjson.ErrRPCInvalidParameter, + Message: err.Error(), + } + } + outPoints[i] = &wire.OutPoint{ + Hash: *hash, + Index: cmd.OutPoints[i].Index, + Tree: cmd.OutPoints[i].Tree, + } + } + + wsc.Lock() + if cmd.Reload || wsc.filterData == nil { + wsc.filterData = makeWSClientFilter(cmd.Addresses, outPoints) + wsc.Unlock() + } else { + wsc.Unlock() + + wsc.filterData.mu.Lock() + for _, a := range cmd.Addresses { + wsc.filterData.addAddressStr(a) + } + for _, op := range outPoints { + wsc.filterData.addUnspentOutPoint(op) + } + wsc.filterData.mu.Unlock() + } + + return nil, nil +} + // handleNotifyBlocks implements the notifyblocks command extension for // websocket connections. func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { @@ -1904,22 +1925,6 @@ func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error return nil, nil } -// handleNotifySpent implements the notifyspent command extension for -// websocket connections. -func handleNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) { - cmd, ok := icmd.(*dcrjson.NotifySpentCmd) - if !ok { - return nil, dcrjson.ErrRPCInternal - } - outpoints, err := deserializeOutpoints(cmd.OutPoints) - if err != nil { - return nil, err - } - - wsc.server.ntfnMgr.RegisterSpentRequests(wsc, outpoints) - return nil, nil -} - // handleNotifyNewTransations implements the notifynewtransactions command // extension for websocket connections. func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) { @@ -1940,67 +1945,6 @@ func handleStopNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface return nil, nil } -// handleNotifyReceived implements the notifyreceived command extension for -// websocket connections. -func handleNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) { - cmd, ok := icmd.(*dcrjson.NotifyReceivedCmd) - if !ok { - return nil, dcrjson.ErrRPCInternal - } - - // Decode addresses to validate input, but the strings slice is used - // directly if these are all ok. - err := checkAddressValidity(cmd.Addresses) - if err != nil { - return nil, err - } - - wsc.server.ntfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses) - return nil, nil -} - -// handleStopNotifySpent implements the stopnotifyspent command extension for -// websocket connections. -func handleStopNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) { - cmd, ok := icmd.(*dcrjson.StopNotifySpentCmd) - if !ok { - return nil, dcrjson.ErrRPCInternal - } - - outpoints, err := deserializeOutpoints(cmd.OutPoints) - if err != nil { - return nil, err - } - - for _, outpoint := range outpoints { - wsc.server.ntfnMgr.UnregisterSpentRequest(wsc, outpoint) - } - - return nil, nil -} - -// handleStopNotifyReceived implements the stopnotifyreceived command extension -// for websocket connections. -func handleStopNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) { - cmd, ok := icmd.(*dcrjson.StopNotifyReceivedCmd) - if !ok { - return nil, dcrjson.ErrRPCInternal - } - - // Decode addresses to validate input, but the strings slice is used - // directly if these are all ok. - err := checkAddressValidity(cmd.Addresses) - if err != nil { - return nil, err - } - - for _, addr := range cmd.Addresses { - wsc.server.ntfnMgr.UnregisterTxOutAddressRequest(wsc, addr) - } - - return nil, nil -} - // checkAddressValidity checks the validity of each address in the passed // string slice. It does this by attempting to decode each address using the // current active network parameters. If any single address fails to decode @@ -2035,802 +1979,146 @@ func deserializeOutpoints(serializedOuts []dcrjson.OutPoint) ([]*wire.OutPoint, return outpoints, nil } -type rescanKeys struct { - fallbacks map[string]struct{} - pubKeyHashes map[[ripemd160.Size]byte]struct{} - scriptHashes map[[ripemd160.Size]byte]struct{} - compressedPubKeys map[[33]byte]struct{} - uncompressedPubKeys map[[65]byte]struct{} - unspent map[wire.OutPoint]struct{} -} +// rescanBlock rescans a block for any relevant transactions for the passed +// lookup keys. Any discovered transactions are returned hex encoded as a +// string slice. +func rescanBlock(filter *wsClientFilter, block *dcrutil.Block) []string { + var transactions []string -// unspentSlice returns a slice of currently-unspent outpoints for the rescan -// lookup keys. This is primarily intended to be used to register outpoints -// for continuous notifications after a rescan has completed. -func (r *rescanKeys) unspentSlice() []*wire.OutPoint { - ops := make([]*wire.OutPoint, 0, len(r.unspent)) - for op := range r.unspent { - opCopy := op - ops = append(ops, &opCopy) - } - return ops -} - -// ErrRescanReorg defines the error that is returned when an unrecoverable -// reorganize is detected during a rescan. -var ErrRescanReorg = dcrjson.RPCError{ - Code: dcrjson.ErrRPCDatabase, - Message: "Reorganize", -} - -// Decred - TODO: This function needs to scan addresses/pks in both tx trees; right -// now it only looks at the regular tx tree -// rescanBlock rescans all transactions in a single block. This is a helper -// function for handleRescan. -func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *dcrutil.Block, - parent *dcrutil.Block) { - txTreeRegularValid := dcrutil.IsFlagSet16(blk.MsgBlock().Header.VoteBits, - dcrutil.BlockValid) - - // No need to rescan tx from genesis block. - if parent == nil { - return - } - - var allTransactions []*dcrutil.Tx - - if txTreeRegularValid { - allTransactions = append(allTransactions, parent.Transactions()...) - } - allTransactions = append(allTransactions, blk.STransactions()...) - - for _, tx := range allTransactions { - // Hexadecimal representation of this tx. Only created if - // needed, and reused for later notifications if already made. - var txHex string - - // All inputs and outputs must be iterated through to correctly - // modify the unspent map, however, just a single notification - // for any matching transaction inputs or outputs should be - // created and sent. - spentNotified := false - recvNotified := false - - // Get the stake tx type. - txType := stake.DetermineTxType(tx.MsgTx()) - - for i, txin := range tx.MsgTx().TxIn { - // Skip stakebase. - if txType == stake.TxTypeSSGen && i == 0 { - continue + // Need to iterate over both the stake and regular transactions in a + // block, but these are two different slices in the MsgTx. To avoid + // another allocation to create a single slice to range over, the loop + // body logic is run from a closure. + // + // This makes unsynchronized calls to the filter and thus must only be + // called with the filter mutex held. + checkTransaction := func(tx *wire.MsgTx, tree int8) { + // Keep track of whether the transaction has already been added + // to the result. It shouldn't be added twice. + added := false + + inputs := tx.TxIn + if tree == dcrutil.TxTreeRegular { + // Skip previous output checks for coinbase inputs. These do + // not reference a previous output. + if blockchain.IsCoinBaseTx(tx) { + goto LoopOutputs } - - if _, ok := lookups.unspent[txin.PreviousOutPoint]; ok { - delete(lookups.unspent, txin.PreviousOutPoint) - - if spentNotified { - continue - } - - if txHex == "" { - txHex = txHexString(tx) - } - - var marshalledJSON []byte - var err error - if tx.Tree() == dcrutil.TxTreeRegular { - marshalledJSON, err = newRedeemingTxNotification(txHex, - tx.Tree(), tx.Index(), parent) - } else if tx.Tree() == dcrutil.TxTreeStake { - marshalledJSON, err = newRedeemingTxNotification(txHex, - tx.Tree(), tx.Index(), blk) - } - - if err != nil { - rpcsLog.Errorf("Failed to marshal redeemingtx "+ - "notification: %v", err) - continue - } - - err = wsc.QueueNotification(marshalledJSON) - // Stop the rescan early if the websocket client - // disconnected. - if err == ErrClientQuit { - return - } - spentNotified = true + } else { + if stake.DetermineTxType(tx) == stake.TxTypeSSGen { + // Skip the first stakebase input. These do not + // reference a previous output. + inputs = inputs[1:] } } - - for txOutIdx, txout := range tx.MsgTx().TxOut { - _, addrs, _, _ := txscript.ExtractPkScriptAddrs(txout.Version, - txout.PkScript, wsc.server.server.chainParams) - - for _, addr := range addrs { - switch a := addr.(type) { - case *dcrutil.AddressPubKeyHash: - if _, ok := lookups.pubKeyHashes[*a.Hash160()]; !ok { - continue - } - - case *dcrutil.AddressScriptHash: - if _, ok := lookups.scriptHashes[*a.Hash160()]; !ok { - continue - } - - case *dcrutil.AddressSecpPubKey: - found := false - switch sa := a.ScriptAddress(); len(sa) { - case 33: // Compressed - var key [33]byte - copy(key[:], sa) - if _, ok := lookups.compressedPubKeys[key]; ok { - found = true - } - - case 65: // Uncompressed - var key [65]byte - copy(key[:], sa) - if _, ok := lookups.uncompressedPubKeys[key]; ok { - found = true - } - - default: - rpcsLog.Warnf("Skipping rescanned pubkey of unknown "+ - "serialized length %d", len(sa)) - continue - } - - // If the transaction output pays to the pubkey of - // a rescanned P2PKH address, include it as well. - if !found { - pkh := a.AddressPubKeyHash() - if _, - ok := lookups.pubKeyHashes[*pkh.Hash160()]; !ok { - continue - } - } - - default: - // A new address type must have been added. Encode as a - // payment address string and check the fallback map. - addrStr := addr.EncodeAddress() - _, ok := lookups.fallbacks[addrStr] - if !ok { - continue - } - } - - var outpoint wire.OutPoint - if tx.Tree() == dcrutil.TxTreeRegular { - outpoint = wire.OutPoint{ - Hash: *tx.Sha(), - Index: uint32(txOutIdx), - Tree: dcrutil.TxTreeRegular, // decred - } - } else if tx.Tree() == dcrutil.TxTreeStake { - outpoint = wire.OutPoint{ - Hash: *tx.Sha(), - Index: uint32(txOutIdx), - Tree: dcrutil.TxTreeStake, // decred - } - } - lookups.unspent[outpoint] = struct{}{} - - if recvNotified { - continue - } - - if txHex == "" { - txHex = txHexString(tx) - } - - var ntfn *dcrjson.RecvTxNtfn - if tx.Tree() == dcrutil.TxTreeRegular { - ntfn = dcrjson.NewRecvTxNtfn(txHex, blockDetails( - parent, tx.Tree(), tx.Index())) - } else if tx.Tree() == dcrutil.TxTreeStake { - ntfn = dcrjson.NewRecvTxNtfn(txHex, blockDetails( - blk, tx.Tree(), tx.Index())) - } - - marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn) - if err != nil { - rpcsLog.Errorf("Failed to marshal recvtx "+ - "notification: %v", err) - return - } - - err = wsc.QueueNotification(marshalledJSON) - // Stop the rescan early if the websocket client - // disconnected. - if err == ErrClientQuit { - return - } - recvNotified = true + for _, input := range inputs { + if !filter.existsUnspentOutPoint(&input.PreviousOutPoint) { + continue + } + if !added { + transactions = append(transactions, txHexString(tx)) + added = true } } - } -} - -// recoverFromReorg attempts to recover from a detected reorganize during a -// rescan. It fetches a new range of block shas from the database and -// verifies that the new range of blocks is on the same fork as a previous -// range of blocks. If this condition does not hold true, the JSON-RPC error -// for an unrecoverable reorganize is returned. -func recoverFromReorg(chain *blockchain.BlockChain, minBlock, maxBlock int64, - lastBlock *dcrutil.Block) ([]chainhash.Hash, error) { - - hashList, err := chain.HeightRange(minBlock, maxBlock) - if err != nil { - rpcsLog.Errorf("Error looking up block range: %v", err) - return nil, &dcrjson.RPCError{ - Code: dcrjson.ErrRPCDatabase, - Message: "Database error: " + err.Error(), - } - } - if lastBlock == nil || len(hashList) == 0 { - return hashList, nil - } - - blk, err := chain.BlockByHash(&hashList[0]) - if err != nil { - rpcsLog.Errorf("Error looking up possibly reorged block: %v", - err) - return nil, &dcrjson.RPCError{ - Code: dcrjson.ErrRPCDatabase, - Message: "Database error: " + err.Error(), - } - } - jsonErr := descendantBlock(lastBlock, blk) - if jsonErr != nil { - return nil, jsonErr - } - return hashList, nil -} -// descendantBlock returns the appropriate JSON-RPC error if a current block -// fetched during a reorganize is not a direct child of the parent block hash. -func descendantBlock(prev, cur *dcrutil.Block) error { - if prev == nil || cur == nil { - return fmt.Errorf("descendantBlock passed nil block pointer") - } - curSha := &cur.MsgBlock().Header.PrevBlock - prevSha := prev.Sha() - if !prevSha.IsEqual(curSha) { - rpcsLog.Errorf("Stopping rescan for reorged block %v "+ - "(replaced by block %v)", prevSha, curSha) - return &ErrRescanReorg - } - return nil -} - -// scanMempool scans the tx mempool for all requested outpoints/addresses in -// lookups, then issues websocket notifications for relevant transactions. -func scanMempool(wsc *wsClient, lookups *rescanKeys) { - // TODO use optimized structures within mempool, such as outpoints - // and addrindex, to do the work more efficiently. This is very - // expensive to do if the mempool is large. cj - mp := wsc.server.server.txMemPool - mp.RLock() - defer mp.RUnlock() - - for _, txDesc := range mp.pool { - tx := txDesc.Tx - - // Hexadecimal representation of this tx. Only created if - // needed, and reused for later notifications if already made. - var txHex string - - // All inputs and outputs must be iterated through to correctly - // modify the unspent map, however, just a single notification - // for any matching transaction inputs or outputs should be - // created and sent. - spentNotified := false - recvNotified := false - - // Get the stake tx type. - txType := txDesc.Type - - for i, txin := range tx.MsgTx().TxIn { - // Skip stakebase. - if txType == stake.TxTypeSSGen && i == 0 { + LoopOutputs: + for i, output := range tx.TxOut { + _, addrs, _, err := txscript.ExtractPkScriptAddrs( + output.Version, output.PkScript, + activeNetParams.Params) + if err != nil { continue } - - if _, ok := lookups.unspent[txin.PreviousOutPoint]; ok { - delete(lookups.unspent, txin.PreviousOutPoint) - - if spentNotified { + for _, a := range addrs { + if !filter.existsAddress(a) { continue } - if txHex == "" { - txHex = txHexString(tx) + op := wire.OutPoint{ + Hash: tx.TxSha(), + Index: uint32(i), + Tree: tree, } + filter.addUnspentOutPoint(&op) - var marshalledJSON []byte - var err error - marshalledJSON, err = newRedeemingTxNotification(txHex, tx.Tree(), - tx.Index(), nil) - if err != nil { - rpcsLog.Errorf("Failed to marshal redeemingtx "+ - "notification: %v", err) - continue + if !added { + transactions = append(transactions, txHexString(tx)) + added = true } - - err = wsc.QueueNotification(marshalledJSON) - // Stop the rescan early if the websocket client - // disconnected. - if err == ErrClientQuit { - return - } - spentNotified = true } } + } - for txOutIdx, txout := range tx.MsgTx().TxOut { - _, addrs, _, _ := txscript.ExtractPkScriptAddrs(txout.Version, - txout.PkScript, wsc.server.server.chainParams) - - for _, addr := range addrs { - switch a := addr.(type) { - case *dcrutil.AddressPubKeyHash: - if _, ok := lookups.pubKeyHashes[*a.Hash160()]; !ok { - continue - } - - case *dcrutil.AddressScriptHash: - if _, ok := lookups.scriptHashes[*a.Hash160()]; !ok { - continue - } - - case *dcrutil.AddressSecpPubKey: - found := false - switch sa := a.ScriptAddress(); len(sa) { - case 33: // Compressed - var key [33]byte - copy(key[:], sa) - if _, ok := lookups.compressedPubKeys[key]; ok { - found = true - } - - case 65: // Uncompressed - var key [65]byte - copy(key[:], sa) - if _, ok := lookups.uncompressedPubKeys[key]; ok { - found = true - } - - default: - rpcsLog.Warnf("Skipping rescanned pubkey of unknown "+ - "serialized length %d", len(sa)) - continue - } - - // If the transaction output pays to the pubkey of - // a rescanned P2PKH address, include it as well. - if !found { - pkh := a.AddressPubKeyHash() - if _, - ok := lookups.pubKeyHashes[*pkh.Hash160()]; !ok { - continue - } - } - - default: - // A new address type must have been added. Encode as a - // payment address string and check the fallback map. - addrStr := addr.EncodeAddress() - _, ok := lookups.fallbacks[addrStr] - if !ok { - continue - } - } - - var outpoint wire.OutPoint - if tx.Tree() == dcrutil.TxTreeRegular { - outpoint = wire.OutPoint{ - Hash: *tx.Sha(), - Index: uint32(txOutIdx), - Tree: dcrutil.TxTreeRegular, // decred - } - } else if tx.Tree() == dcrutil.TxTreeStake { - outpoint = wire.OutPoint{ - Hash: *tx.Sha(), - Index: uint32(txOutIdx), - Tree: dcrutil.TxTreeStake, // decred - } - } - lookups.unspent[outpoint] = struct{}{} - - if recvNotified { - continue - } - - if txHex == "" { - txHex = txHexString(tx) - } - - var ntfn *dcrjson.RecvTxNtfn - ntfn = dcrjson.NewRecvTxNtfn(txHex, blockDetails(nil, tx.Tree(), - tx.Index())) - - marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn) - if err != nil { - rpcsLog.Errorf("Failed to marshal recvtx "+ - "notification: %v", err) - return - } - - err = wsc.QueueNotification(marshalledJSON) - // Stop the rescan early if the websocket client - // disconnected. - if err == ErrClientQuit { - return - } - recvNotified = true - } - } + msgBlock := block.MsgBlock() + filter.mu.Lock() + for _, tx := range msgBlock.STransactions { + checkTransaction(tx, dcrutil.TxTreeStake) } + for _, tx := range msgBlock.Transactions { + checkTransaction(tx, dcrutil.TxTreeRegular) + } + filter.mu.Unlock() + + return transactions } // handleRescan implements the rescan command extension for websocket // connections. -// -// NOTE: This does not smartly handle reorgs, and fixing requires database -// changes (for safe, concurrent access to full block ranges, and support -// for other chains than the best chain). It will, however, detect whether -// a reorg removed a block that was previously processed, and result in the -// handler erroring. Clients must handle this by finding a block still in -// the chain (perhaps from a rescanprogress notification) to resume their -// rescan. func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) { cmd, ok := icmd.(*dcrjson.RescanCmd) if !ok { return nil, dcrjson.ErrRPCInternal } - outpoints := make([]*wire.OutPoint, 0, len(cmd.OutPoints)) - for i := range cmd.OutPoints { - cmdOutpoint := &cmd.OutPoints[i] - blockHash, err := chainhash.NewHashFromStr(cmdOutpoint.Hash) - if err != nil { - return nil, rpcDecodeHexError(cmdOutpoint.Hash) - } - outpoint := wire.NewOutPoint(blockHash, cmdOutpoint.Index, - cmdOutpoint.Tree) - outpoints = append(outpoints, outpoint) - } - - numAddrs := len(cmd.Addresses) - if numAddrs == 1 { - rpcsLog.Info("Beginning rescan for 1 address") - } else { - rpcsLog.Infof("Beginning rescan for %d addresses", numAddrs) - } - - // Build lookup maps. - lookups := rescanKeys{ - fallbacks: map[string]struct{}{}, - pubKeyHashes: map[[ripemd160.Size]byte]struct{}{}, - scriptHashes: map[[ripemd160.Size]byte]struct{}{}, - compressedPubKeys: map[[33]byte]struct{}{}, - uncompressedPubKeys: map[[65]byte]struct{}{}, - unspent: map[wire.OutPoint]struct{}{}, - } - var compressedPubkey [33]byte - var uncompressedPubkey [65]byte - for _, addrStr := range cmd.Addresses { - addr, err := dcrutil.DecodeAddress(addrStr, activeNetParams.Params) - if err != nil { - jsonErr := dcrjson.RPCError{ - Code: dcrjson.ErrRPCInvalidAddressOrKey, - Message: "Rescan address " + addrStr + ": " + - err.Error(), - } - return nil, &jsonErr - } - switch a := addr.(type) { - case *dcrutil.AddressPubKeyHash: - lookups.pubKeyHashes[*a.Hash160()] = struct{}{} - - case *dcrutil.AddressScriptHash: - lookups.scriptHashes[*a.Hash160()] = struct{}{} - - case *dcrutil.AddressSecpPubKey: - pubkeyBytes := a.ScriptAddress() - switch len(pubkeyBytes) { - case 33: // Compressed - copy(compressedPubkey[:], pubkeyBytes) - lookups.compressedPubKeys[compressedPubkey] = struct{}{} - - case 65: // Uncompressed - copy(uncompressedPubkey[:], pubkeyBytes) - lookups.uncompressedPubKeys[uncompressedPubkey] = struct{}{} - - default: - jsonErr := dcrjson.RPCError{ - Code: dcrjson.ErrRPCInvalidAddressOrKey, - Message: "Pubkey " + addrStr + " is of unknown length", - } - return nil, &jsonErr - } - - default: - // A new address type must have been added. Use encoded - // payment address string as a fallback until a fast path - // is added. - lookups.fallbacks[addrStr] = struct{}{} + // Load client's transaction filter. Must exist in order to continue. + wsc.Lock() + filter := wsc.filterData + wsc.Unlock() + if filter == nil { + return nil, &dcrjson.RPCError{ + Code: dcrjson.ErrRPCMisc, + Message: "Transaction filter must be loaded before rescanning", } } - for _, outpoint := range outpoints { - lookups.unspent[*outpoint] = struct{}{} - } - chain := wsc.server.chain - - minBlockHash, err := chainhash.NewHashFromStr(cmd.BeginBlock) - if err != nil { - return nil, rpcDecodeHexError(cmd.BeginBlock) - } - minBlock, err := chain.BlockHeightByHash(minBlockHash) + blockHashes, err := dcrjson.DecodeConcatenatedHashes(cmd.BlockHashes) if err != nil { - return nil, &dcrjson.RPCError{ - Code: dcrjson.ErrRPCBlockNotFound, - Message: "Error getting block: " + err.Error(), - } + return nil, err } - maxBlock := int64(math.MaxInt64) - if cmd.EndBlock != nil { - maxBlockHash, err := chainhash.NewHashFromStr(*cmd.EndBlock) - if err != nil { - return nil, rpcDecodeHexError(*cmd.EndBlock) - } - maxBlock, err = chain.BlockHeightByHash(maxBlockHash) + discoveredData := make([]dcrjson.RescannedBlock, 0, len(blockHashes)) + + // Iterate over each block in the request and rescan. When a block + // contains relevant transactions, add it to the response. + bc := wsc.server.server.blockManager.chain + var lastBlockHash *chainhash.Hash + for i := range blockHashes { + block, err := bc.BlockByHash(&blockHashes[i]) if err != nil { return nil, &dcrjson.RPCError{ Code: dcrjson.ErrRPCBlockNotFound, - Message: "Error getting block: " + err.Error(), + Message: "Failed to fetch block: " + err.Error(), } } - } - - // lastBlock tracks the previously-rescanned block. - // It equals nil when no previous blocks have been rescanned. - var lastBlock *dcrutil.Block - - // Instead of fetching all block hashes at once, fetch in smaller chunks - // to ensure large rescans consume a limited amount of memory. -fetchRange: - for minBlock < maxBlock { - // Limit the max number of hashes to fetch at once to the - // maximum number of items allowed in a single inventory. - // This value could be higher since it's not creating inventory - // messages, but this mirrors the limiting logic used in the - // peer-to-peer protocol. - maxLoopBlock := maxBlock - if maxLoopBlock-minBlock > wire.MaxInvPerMsg { - maxLoopBlock = minBlock + wire.MaxInvPerMsg - } - hashList, err := chain.HeightRange(minBlock, maxLoopBlock) - if err != nil { - rpcsLog.Errorf("Error looking up block range: %v", err) + if lastBlockHash != nil && block.MsgBlock().Header.PrevBlock != *lastBlockHash { return nil, &dcrjson.RPCError{ - Code: dcrjson.ErrRPCDatabase, - Message: "Database error: " + err.Error(), + Code: dcrjson.ErrRPCInvalidParameter, + Message: fmt.Sprintf("Block %v is not a child of %v", + &blockHashes[i], lastBlockHash), } } - - if len(hashList) == 0 { - // The rescan is finished if no blocks hashes for this - // range were successfully fetched and a stop block - // was provided. - if maxBlock != math.MaxInt64 { - break - } - - // If the rescan is through the current block, set up - // the client to continue to receive notifications - // regarding all rescanned addresses and the current set - // of unspent outputs. - // - // This is done safely by temporarily grabbing exclusive - // access of the block manager. If no more blocks have - // been attached between this pause and the fetch above, - // then it is safe to register the websocket client for - // continuous notifications if necessary. Otherwise, - // continue the fetch loop again to rescan the new - // blocks (or error due to an irrecoverable reorganize). - blockManager := wsc.server.server.blockManager - pauseGuard := blockManager.Pause() - best := blockManager.chain.BestSnapshot() - curHash := best.Hash - again := true - if lastBlock == nil || *lastBlock.Sha() == *curHash { - again = false - n := wsc.server.ntfnMgr - n.RegisterSpentRequests(wsc, lookups.unspentSlice()) - n.RegisterTxOutAddressRequests(wsc, cmd.Addresses) - } - close(pauseGuard) - if err != nil { - rpcsLog.Errorf("Error fetching best block "+ - "hash: %v", err) - return nil, &dcrjson.RPCError{ - Code: dcrjson.ErrRPCDatabase, - Message: "Database error: " + - err.Error(), - } - } - if again { - continue - } - break - } - - loopHashList: - for i := range hashList { - blk, err := chain.BlockByHash(&hashList[i]) - if err != nil { - // Only handle reorgs if a block could not be - // found for the hash. - if dbErr, ok := err.(database.Error); !ok || - dbErr.ErrorCode != database.ErrBlockNotFound { - - rpcsLog.Errorf("Error looking up "+ - "block: %v", err) - return nil, &dcrjson.RPCError{ - Code: dcrjson.ErrRPCDatabase, - Message: "Database error: " + - err.Error(), - } - } - - // If an absolute max block was specified, don't - // attempt to handle the reorg. - if maxBlock != math.MaxInt64 { - rpcsLog.Errorf("Stopping rescan for "+ - "reorged block %v", - cmd.EndBlock) - return nil, &ErrRescanReorg - } - - // If the lookup for the previously valid block - // hash failed, there may have been a reorg. - // Fetch a new range of block hashes and verify - // that the previously processed block (if there - // was any) still exists in the database. If it - // doesn't, we error. - // - // A goto is used to branch executation back to - // before the range was evaluated, as it must be - // reevaluated for the new hashList. - minBlock += int64(i) - hashList, err = recoverFromReorg(chain, - minBlock, maxBlock, lastBlock) - if err != nil { - return nil, err - } - if len(hashList) == 0 { - break fetchRange - } - goto loopHashList - } - - if i == 0 && lastBlock != nil { - // Ensure the new hashList is on the same fork - // as the last block from the old hashList. - jsonErr := descendantBlock(lastBlock, blk) - if jsonErr != nil { - return nil, jsonErr - } - } - - // Fetch the parent too, using the same code as - // described above. - var parent *dcrutil.Block - - // No need to get a parent for the genesis block. - if !hashList[i].IsEqual(activeNetParams.GenesisHash) { - parent, err = wsc.server.chain.BlockByHash( - &blk.MsgBlock().Header.PrevBlock) - } else { - parent = nil - err = nil - } - if err != nil { - if maxBlock != math.MaxInt64 { - rpcsLog.Errorf("Stopping rescan for "+ - "reorged block %v", - cmd.EndBlock) - return nil, &ErrRescanReorg - } - - minBlock += int64(i) - hashList, err = recoverFromReorg(wsc.server.chain, minBlock, - maxBlock, lastBlock) - if err != nil { - return nil, err - } - if len(hashList) == 0 { - break fetchRange - } - goto loopHashList - } - if i == 0 && parent != nil { - // Ensure the new hashList is on the same fork - // as the last block from the old hashList. - jsonErr := descendantBlock(parent, blk) - if jsonErr != nil { - return nil, jsonErr - } - } - - // A select statement is used to stop rescans if the - // client requesting the rescan has disconnected. - select { - case <-wsc.quit: - rpcsLog.Debugf("Stopped rescan at height %v "+ - "for disconnected client", blk.Height()) - return nil, nil - default: - rescanBlock(wsc, &lookups, blk, parent) - lastBlock = blk - } - - // Periodically notify the client of the progress - // completed. Continue with next block if no progress - // notification is needed yet. - if blk.Height()%100 == 0 { - n := dcrjson.NewRescanProgressNtfn(hashList[i].String(), - int32(blk.Height()), - blk.MsgBlock().Header.Timestamp.Unix()) - mn, err := dcrjson.MarshalCmd(nil, n) - if err != nil { - rpcsLog.Errorf("Failed to marshal rescan "+ - "progress notification: %v", err) - continue - } - - if err = wsc.QueueNotification(mn); err == ErrClientQuit { - // Finished if the client disconnected. - rpcsLog.Debugf("Stopped rescan at height %v "+ - "for disconnected client", blk.Height()) - return nil, nil - } - } + lastBlockHash = &blockHashes[i] + + transactions := rescanBlock(filter, block) + if len(transactions) != 0 { + discoveredData = append(discoveredData, dcrjson.RescannedBlock{ + Hash: blockHashes[i].String(), + Transactions: transactions, + }) } - - minBlock += int64(len(hashList)) - } - - // Scan the mempool for addresses. - scanMempool(wsc, &lookups) - - // Notify websocket client of the finished rescan. Due to how dcrd - // asynchronously queues notifications to not block calling code, - // there is no guarantee that any of the notifications created during - // rescan (such as rescanprogress, recvtx and redeemingtx) will be - // received before the rescan RPC returns. Therefore, another method - // is needed to safely inform clients that all rescan notifications have - // been sent. - lastBlockHash := lastBlock.Sha() - n := dcrjson.NewRescanFinishedNtfn(lastBlockHash.String(), - lastBlock.Height(), - lastBlock.MsgBlock().Header.Timestamp.Unix()) - if mn, err := dcrjson.MarshalCmd(nil, n); err != nil { - rpcsLog.Errorf("Failed to marshal rescan finished "+ - "notification: %v", err) - } else { - // The rescan is finished, so we don't care whether the client - // has disconnected at this point, so discard error. - _ = wsc.QueueNotification(mn) } - rpcsLog.Info("Finished rescan") - return nil, nil + return &dcrjson.RescanResult{DiscoveredData: discoveredData}, nil } func init() {