Skip to content

Commit

Permalink
Fix btcsuite#303 and btcsuite#346 and change addrindex sort order
Browse files Browse the repository at this point in the history
Fix btcsuite#303 by changing the addrindex key prefix to 3 characters so that
it's easy to check length when dropping the index. To drop the old
index, check to make sure we aren't dropping any entries that end in
"sx" or "tx" as those aren't part of the addrindex. Update test to
deal with the new prefix length.

Fix btcsuite#346 by changing the pointers in the mempool's addrindex map to
wire.ShaHash 32-byte values. This lets them be deleted even if the
transaction data changes places in memory upon expanding the maps.

Change the way addrindex uint32s are stored to big-endian in order to
sort the transactions on disk in chronological/dependency order.

Change the "searchrawtransactions" RPC call to return transactions
from the database before the memory pool so that they're returned in
order. This commit DOES NOT do topological sorting of the memory pool
transactions to ensure they're returned in dependency order. This may
be a good idea for a future enhancement.

Add addrindex versioning to automatically drop the old/incompatible
version of the index and rebuild with the new sort method and key
prefix.
  • Loading branch information
aakselrod committed Mar 20, 2015
1 parent ccc3a9b commit cce8596
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 47 deletions.
2 changes: 1 addition & 1 deletion database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// Errors that the various database functions may return.
var (
ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built up yet")
ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built up yet or is old version")
ErrUnsupportedAddressType = errors.New("address type is not supported " +
"by the address-index")
ErrPrevShaMissing = errors.New("previous sha missing from database")
Expand Down
22 changes: 22 additions & 0 deletions database/ldb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,28 @@ func (db *LevelDb) NewestSha() (rsha *wire.ShaHash, rblkid int64, err error) {
return &sha, db.lastBlkIdx, nil
}

// checkAddrIndexVersion returns an error if the address index version stored
// in the database is less than the current version, or if it doesn't exist.
// This function is used on startup to signal OpenDB to drop the address index
// if it's in an old, incompatible format.
func (db *LevelDb) checkAddrIndexVersion() error {
db.dbLock.Lock()
defer db.dbLock.Unlock()

data, err := db.lDb.Get(addrIndexVersionKey, db.ro)
if err != nil {
return database.ErrAddrIndexDoesNotExist
}

indexVersion := binary.LittleEndian.Uint16(data[0:])

if indexVersion != uint16(addrIndexCurrentVersion) {
return database.ErrAddrIndexDoesNotExist
}

return nil
}

// fetchAddrIndexTip returns the last block height and block sha to be indexed.
// Meta-data about the address tip is currently cached in memory, and will be
// updated accordingly by functions that modify the state. This function is
Expand Down
2 changes: 1 addition & 1 deletion database/ldb/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestAddrIndexKeySerialization(t *testing.T) {
}

serializedKey := addrIndexToKey(&fakeIndex)
copy(packedIndex[:], serializedKey[22:34])
copy(packedIndex[:], serializedKey[23:35])
unpackedIndex := unpackTxIndex(packedIndex)

if unpackedIndex.blkHeight != fakeIndex.blkHeight {
Expand Down
9 changes: 7 additions & 2 deletions database/ldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ blocknarrow:

// Load the last block whose transactions have been indexed by address.
if sha, idx, err := ldb.fetchAddrIndexTip(); err == nil {
ldb.lastAddrIndexBlkSha = *sha
ldb.lastAddrIndexBlkIdx = idx
if err = ldb.checkAddrIndexVersion(); err == nil {
ldb.lastAddrIndexBlkSha = *sha
ldb.lastAddrIndexBlkIdx = idx
} else {
ldb.deleteOldAddrIndex()
ldb.DeleteAddrIndex()
}
} else {
ldb.lastAddrIndexBlkIdx = -1
}
Expand Down
111 changes: 94 additions & 17 deletions database/ldb/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,27 @@ const (
// --------------------------------------------------------
// | Prefix | Hash160 | BlkHeight | Tx Offset | Tx Size |
// --------------------------------------------------------
// | 2 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes |
// | 3 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes |
// --------------------------------------------------------
addrIndexKeyLength = 2 + ripemd160.Size + 4 + 4 + 4
addrIndexKeyLength = 3 + ripemd160.Size + 4 + 4 + 4

batchDeleteThreshold = 10000

addrIndexCurrentVersion = 1
)

var addrIndexMetaDataKey = []byte("addrindex")

// All address index entries share this prefix to facilitate the use of
// iterators.
var addrIndexKeyPrefix = []byte("a-")
var addrIndexKeyPrefix = []byte("a+-")

// Address index version is required to drop/rebuild address index if version
// is older than current as the format of the index may have changed. This is
// true when going from no version to version 1 as the address index is stored
// as big endian in version 1 and little endian in the original code. Version
// is stored as two bytes, little endian (to match all the code but the index).
var addrIndexVersionKey = []byte("addrindexversion")

type txUpdateObj struct {
txSha *wire.ShaHash
Expand Down Expand Up @@ -372,25 +381,29 @@ func (db *LevelDb) FetchTxBySha(txsha *wire.ShaHash) ([]*database.TxListReply, e
}

// addrIndexToKey serializes the passed txAddrIndex for storage within the DB.
// We want to use BigEndian to store at least block height and TX offset
// in order to ensure that the transactions are sorted in the index.
// This gives us the ability to use the index in more client-side
// applications that are order-dependent (specifically by dependency).
func addrIndexToKey(index *txAddrIndex) []byte {
record := make([]byte, addrIndexKeyLength, addrIndexKeyLength)
copy(record[0:2], addrIndexKeyPrefix)
copy(record[2:22], index.hash160[:])
copy(record[0:3], addrIndexKeyPrefix)
copy(record[3:23], index.hash160[:])

// The index itself.
binary.LittleEndian.PutUint32(record[22:26], uint32(index.blkHeight))
binary.LittleEndian.PutUint32(record[26:30], uint32(index.txoffset))
binary.LittleEndian.PutUint32(record[30:34], uint32(index.txlen))
binary.BigEndian.PutUint32(record[23:27], uint32(index.blkHeight))
binary.BigEndian.PutUint32(record[27:31], uint32(index.txoffset))
binary.BigEndian.PutUint32(record[31:35], uint32(index.txlen))

return record
}

// unpackTxIndex deserializes the raw bytes of a address tx index.
func unpackTxIndex(rawIndex [12]byte) *txAddrIndex {
return &txAddrIndex{
blkHeight: int64(binary.LittleEndian.Uint32(rawIndex[0:4])),
txoffset: int(binary.LittleEndian.Uint32(rawIndex[4:8])),
txlen: int(binary.LittleEndian.Uint32(rawIndex[8:12])),
blkHeight: int64(binary.BigEndian.Uint32(rawIndex[0:4])),
txoffset: int(binary.BigEndian.Uint32(rawIndex[4:8])),
txlen: int(binary.BigEndian.Uint32(rawIndex[8:12])),
}
}

Expand Down Expand Up @@ -446,9 +459,9 @@ func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int,
}

// Create the prefix for our search.
addrPrefix := make([]byte, 22, 22)
copy(addrPrefix[0:2], addrIndexKeyPrefix)
copy(addrPrefix[2:22], addrKey)
addrPrefix := make([]byte, 23, 23)
copy(addrPrefix[0:3], addrIndexKeyPrefix)
copy(addrPrefix[3:23], addrKey)

iter := db.lDb.NewIterator(bytesPrefix(addrPrefix), nil)
for skip != 0 && iter.Next() {
Expand All @@ -459,7 +472,7 @@ func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int,
var replies []*database.TxListReply
var rawIndex [12]byte
for iter.Next() && limit != 0 {
copy(rawIndex[:], iter.Key()[22:34])
copy(rawIndex[:], iter.Key()[23:35])
addrIndex := unpackTxIndex(rawIndex)

tx, blkSha, blkHeight, _, err := db.fetchTxDataByLoc(addrIndex.blkHeight,
Expand Down Expand Up @@ -528,6 +541,12 @@ func (db *LevelDb) UpdateAddrIndexForBlock(blkSha *wire.ShaHash, blkHeight int64
binary.LittleEndian.PutUint64(newIndexTip[32:40], uint64(blkHeight))
batch.Put(addrIndexMetaDataKey, newIndexTip)

// Ensure we're writing an address index version
newIndexVersion := make([]byte, 2, 2)
binary.LittleEndian.PutUint16(newIndexVersion[0:2],
uint16(addrIndexCurrentVersion))
batch.Put(addrIndexVersionKey, newIndexVersion)

if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}
Expand All @@ -552,9 +571,65 @@ func (db *LevelDb) DeleteAddrIndex() error {
numInBatch := 0
for iter.Next() {
key := iter.Key()
batch.Delete(key)
// With a 24-bit index key prefix, 1 in every 2^24 keys is a collision.
// We check the length to make sure we only delete address index keys.
if len(key) == addrIndexKeyLength {
batch.Delete(key)
numInBatch++
}

// Delete in chunks to potentially avoid very large batches.
if numInBatch >= batchDeleteThreshold {
if err := db.lDb.Write(batch, db.wo); err != nil {
iter.Release()
return err
}
batch.Reset()
numInBatch = 0
}
}
iter.Release()
if err := iter.Error(); err != nil {
return err
}

numInBatch++
batch.Delete(addrIndexMetaDataKey)
batch.Delete(addrIndexVersionKey)

if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}

db.lastAddrIndexBlkIdx = -1
db.lastAddrIndexBlkSha = wire.ShaHash{}

return nil
}

// deleteOldAddrIndex deletes the entire addrindex stored within the DB for a
// 2-byte addrIndexKeyPrefix. It also resets the cached in-memory metadata about
// the addr index.
func (db *LevelDb) deleteOldAddrIndex() error {
db.dbLock.Lock()
defer db.dbLock.Unlock()

batch := db.lBatch()
defer batch.Reset()

// Delete the entire index along with any metadata about it.
iter := db.lDb.NewIterator(bytesPrefix([]byte("a-")), db.ro)
numInBatch := 0
for iter.Next() {
key := iter.Key()
// With a 24-bit index key prefix, 1 in every 2^24 keys is a collision.
// We check the length to make sure we only delete address index keys.
// We also check the last two bytes to make sure the suffix doesn't
// match other types of index that are 34 bytes long.
if len(key) == 34 && (key[33] != byte(120) ||
(key[32] != byte(116) && key[32] != byte(115))) {
batch.Delete(key)
numInBatch++
}

// Delete in chunks to potentially avoid very large batches.
if numInBatch >= batchDeleteThreshold {
Expand All @@ -572,6 +647,8 @@ func (db *LevelDb) DeleteAddrIndex() error {
}

batch.Delete(addrIndexMetaDataKey)
batch.Delete(addrIndexVersionKey)

if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}
Expand Down
16 changes: 9 additions & 7 deletions mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type txMemPool struct {
pool map[wire.ShaHash]*TxDesc
orphans map[wire.ShaHash]*btcutil.Tx
orphansByPrev map[wire.ShaHash]*list.List
addrindex map[string]map[*btcutil.Tx]struct{} // maps address to txs
addrindex map[string]map[wire.ShaHash]struct{} // maps address to txs
outpoints map[wire.OutPoint]*btcutil.Tx
lastUpdated time.Time // last time pool was updated
pennyTotal float64 // exponentially decaying total for penny spends.
Expand Down Expand Up @@ -653,7 +653,7 @@ func (mp *txMemPool) removeScriptFromAddrIndex(pkScript []byte, tx *btcutil.Tx)
return err
}
for _, addr := range addresses {
delete(mp.addrindex[addr.EncodeAddress()], tx)
delete(mp.addrindex[addr.EncodeAddress()], *tx.Sha())
}

return nil
Expand Down Expand Up @@ -777,9 +777,9 @@ func (mp *txMemPool) indexScriptAddressToTx(pkScript []byte, tx *btcutil.Tx) err

for _, addr := range addresses {
if mp.addrindex[addr.EncodeAddress()] == nil {
mp.addrindex[addr.EncodeAddress()] = make(map[*btcutil.Tx]struct{})
mp.addrindex[addr.EncodeAddress()] = make(map[wire.ShaHash]struct{})
}
mp.addrindex[addr.EncodeAddress()][tx] = struct{}{}
mp.addrindex[addr.EncodeAddress()][*tx.Sha()] = struct{}{}
}

return nil
Expand Down Expand Up @@ -965,8 +965,10 @@ func (mp *txMemPool) FilterTransactionsByAddress(addr btcutil.Address) ([]*btcut

if txs, exists := mp.addrindex[addr.EncodeAddress()]; exists {
addressTxs := make([]*btcutil.Tx, 0, len(txs))
for tx := range txs {
addressTxs = append(addressTxs, tx)
for txHash := range txs {
if tx, exists := mp.pool[txHash]; exists {
addressTxs = append(addressTxs, tx.Tx)
}
}
return addressTxs, nil
}
Expand Down Expand Up @@ -1494,7 +1496,7 @@ func newTxMemPool(server *server) *txMemPool {
outpoints: make(map[wire.OutPoint]*btcutil.Tx),
}
if cfg.AddrIndex {
memPool.addrindex = make(map[string]map[*btcutil.Tx]struct{})
memPool.addrindex = make(map[string]map[wire.ShaHash]struct{})
}
return memPool
}
43 changes: 24 additions & 19 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2598,15 +2598,6 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan

var addressTxs []*database.TxListReply

// First check the mempool for relevent transactions.
memPoolTxs, err := s.server.txMemPool.FilterTransactionsByAddress(addr)
if err == nil && len(memPoolTxs) != 0 {
for _, tx := range memPoolTxs {
txReply := &database.TxListReply{Tx: tx.MsgTx(), Sha: tx.Sha()}
addressTxs = append(addressTxs, txReply)
}
}

var numRequested, numToSkip int
if c.Count != nil {
numRequested = *c.Count
Expand All @@ -2620,17 +2611,31 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan
numToSkip = 0
}
}
if len(addressTxs) >= numRequested {
// Tx's in the mempool exceed the requested number of tx's.
// Slice off any possible overflow.
addressTxs = addressTxs[:numRequested]
} else {
// Otherwise, we'll also take a look into the database.
dbTxs, err := s.server.db.FetchTxsForAddr(addr, numToSkip,
numRequested-len(addressTxs))
if err == nil && len(dbTxs) != 0 {
for _, txReply := range dbTxs {

// While it's more efficient to check the mempool for relevant transactions
// first, we want to return results in order of occurrence/dependency so
// we'll check the mempool only if there aren't enough results returned
// by the database.
dbTxs, err := s.server.db.FetchTxsForAddr(addr, numToSkip,
numRequested-len(addressTxs))
if err == nil {
for _, txReply := range dbTxs {
addressTxs = append(addressTxs, txReply)
}
}

// This code (and txMemPool.FilterTransactionsByAddress()) doesn't sort by
// dependency. This might be something we want to do in the future when we
// return results for the client's convenience, or leave it to the client.
if len(addressTxs) < numRequested {
memPoolTxs, err := s.server.txMemPool.FilterTransactionsByAddress(addr)
if err == nil {
for _, tx := range memPoolTxs {
txReply := &database.TxListReply{Tx: tx.MsgTx(), Sha: tx.Sha()}
addressTxs = append(addressTxs, txReply)
if len(addressTxs) == numRequested {
break
}
}
}
}
Expand Down

0 comments on commit cce8596

Please sign in to comment.