Skip to content

Commit

Permalink
Batch rescan database updates
Browse files Browse the repository at this point in the history
RPC rescans were performing batching of 2000 blocks at a time, but separate
database updates would occur for each block that contained any matching
transactions.  To improve rescan performance, batch the update using a
background goroutine worker when the total amount of transactions exceeds a
value (currently untuned, but I observe significant rescan times with a first
try guess of 256).
  • Loading branch information
jrick committed Apr 17, 2024
1 parent fe3c447 commit 14c704a
Showing 1 changed file with 104 additions and 33 deletions.
137 changes: 104 additions & 33 deletions wallet/rescan.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,43 +168,39 @@ func (w *Wallet) logRescannedTx(txmgrNs walletdb.ReadBucket, height int32, tx *w
// does not update the network backend with data to watch for future
// relevant transactions as the rescanner is assumed to handle this
// task.
func (w *Wallet) saveRescanned(ctx context.Context, hash *chainhash.Hash,
txs []*wire.MsgTx, logTxs bool) error {
func (w *Wallet) saveRescanned(ctx context.Context, dbtx walletdb.ReadWriteTx,
hash *chainhash.Hash, txs []*wire.MsgTx, logTxs bool) (err error) {

const op errors.Op = "wallet.saveRescanned"
defer func() {
if err != nil {
err = errors.E(op, err)
}
}()

txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
blockMeta, err := w.txStore.GetBlockMetaForHash(txmgrNs, hash)
if err != nil {
return err
}
header, err := w.txStore.GetBlockHeader(dbtx, hash)
if err != nil {
return err
}

defer w.lockedOutpointMu.Unlock()
w.lockedOutpointMu.Lock()
for _, tx := range txs {
if logTxs {
w.logRescannedTx(txmgrNs, blockMeta.Height, tx)
}

err := walletdb.Update(ctx, w.db, func(dbtx walletdb.ReadWriteTx) error {
txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
blockMeta, err := w.txStore.GetBlockMetaForHash(txmgrNs, hash)
rec, err := udb.NewTxRecordFromMsgTx(tx, time.Now())
if err != nil {
return err
}
header, err := w.txStore.GetBlockHeader(dbtx, hash)
_, err = w.processTransactionRecord(ctx, dbtx, rec, header, &blockMeta)
if err != nil {
return err
}

for _, tx := range txs {
if logTxs {
w.logRescannedTx(txmgrNs, blockMeta.Height, tx)
}

rec, err := udb.NewTxRecordFromMsgTx(tx, time.Now())
if err != nil {
return err
}
_, err = w.processTransactionRecord(ctx, dbtx, rec, header, &blockMeta)
if err != nil {
return err
}
}
return w.txStore.UpdateProcessedTxsBlockMarker(dbtx, hash)
})
if err != nil {
return errors.E(op, err)
}
return nil
}
Expand Down Expand Up @@ -256,19 +252,94 @@ func (w *Wallet) rescan(ctx context.Context, n NetworkBackend,
}
}
log.Infof("Rescanning block range [%v, %v]...", height, through)
saveRescanned := func(block *chainhash.Hash, txs []*wire.MsgTx) error {
return w.saveRescanned(ctx, block, txs, logTxs)

// Helper func to save batches of matching transactions.
saveRescanned := func(blocks []*chainhash.Hash, txs [][]*wire.MsgTx) error {
if len(blocks) != len(txs) {
return errors.E(errors.Bug, "len(blocks) must match len(txs)")
}
if len(blocks) == 0 {
return nil
}

w.lockedOutpointMu.Lock()
defer w.lockedOutpointMu.Unlock()

return walletdb.Update(ctx, w.db, func(dbtx walletdb.ReadWriteTx) error {
for i := range blocks {
block := blocks[i]
txs := txs[i]

err := w.saveRescanned(ctx, dbtx, block, txs, logTxs)
if err != nil {
return err
}
}

return nil
})
}

// Use a background goroutine reading a channel of
// transactions to process from the rescan. This allows
// grouping the updates instead of potentially performing a
// single db update for every block in the rescanned range.
type rescannedBlock struct {
blockHash *chainhash.Hash
txs []*wire.MsgTx
errc chan error
}
err = n.Rescan(ctx, rescanBlocks, saveRescanned)
ch := make(chan rescannedBlock, 1)
blockHashes := make([]*chainhash.Hash, 0, maxBlocksPerRescan)
txs := make([][]*wire.MsgTx, 0, maxBlocksPerRescan)
lastBatchErr := make(chan error)
go func() {
numTxs := 0
for item := range ch {
errc := item.errc

blockHashes = append(blockHashes, item.blockHash)
txs = append(txs, item.txs)
numTxs += len(item.txs)

if numTxs >= 256 { // XXX: tune this
err := saveRescanned(blockHashes, txs)
if err != nil {
errc <- err
return
}

blockHashes = blockHashes[:0]
txs = txs[:0]
numTxs = 0
}

errc <- nil
}

lastBatchErr <- saveRescanned(blockHashes, txs)
}()

err = n.Rescan(ctx, rescanBlocks, func(blockHash *chainhash.Hash, txs []*wire.MsgTx) error {
errc := make(chan error)
ch <- rescannedBlock{
blockHash: blockHash,
txs: txs,
errc: errc,
}
return <-errc
})
close(ch) // End the background worker
if err != nil {
return err
}
err = walletdb.Update(ctx, w.db, func(dbtx walletdb.ReadWriteTx) error {
return w.txStore.UpdateProcessedTxsBlockMarker(dbtx, &rescanBlocks[len(rescanBlocks)-1])
})
err = <-lastBatchErr
if err != nil {
return err
}
err = walletdb.Update(ctx, w.db, func(dbtx walletdb.ReadWriteTx) error {
return w.txStore.UpdateProcessedTxsBlockMarker(dbtx, &rescanBlocks[len(rescanBlocks)-1])
})
if p != nil {
p <- RescanProgress{ScannedThrough: through}
}
Expand Down

0 comments on commit 14c704a

Please sign in to comment.