Skip to content

Commit

Permalink
Catchpoints: Add the kv hashes into the trie (#4812)
Browse files Browse the repository at this point in the history
* Add the kv hashes into the trie

* Periodically commit, and report proper stats
  • Loading branch information
jannotti authored Nov 18, 2022
1 parent 6acbd18 commit fe580fd
Showing 1 changed file with 59 additions and 16 deletions.
75 changes: 59 additions & 16 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou
close(ct.catchpointDataSlowWriting)

err = ct.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
return ct.accountsInitializeHashes(ctx, tx, dbRound)
return ct.initializeHashes(ctx, tx, dbRound)
})
if err != nil {
return err
Expand Down Expand Up @@ -1487,9 +1487,9 @@ func (ct *catchpointTracker) catchpointEnabled() bool {
return ct.catchpointInterval != 0
}

// accountsInitializeHashes initializes account hashes.
// initializeHashes initializes account/resource/kv hashes.
// as part of the initialization, it tests if a hash table matches to account base and updates the former.
func (ct *catchpointTracker) accountsInitializeHashes(ctx context.Context, tx *sql.Tx, rnd basics.Round) error {
func (ct *catchpointTracker) initializeHashes(ctx context.Context, tx *sql.Tx, rnd basics.Round) error {
hashRound, err := accountsHashRound(ctx, tx)
if err != nil {
return err
Expand All @@ -1511,23 +1511,23 @@ func (ct *catchpointTracker) accountsInitializeHashes(ctx context.Context, tx *s
// create the merkle trie for the balances
committer, err := MakeMerkleCommitter(tx, false)
if err != nil {
return fmt.Errorf("accountsInitialize was unable to makeMerkleCommitter: %v", err)
return fmt.Errorf("initializeHashes was unable to makeMerkleCommitter: %v", err)
}

trie, err := merkletrie.MakeTrie(committer, TrieMemoryConfig)
if err != nil {
return fmt.Errorf("accountsInitialize was unable to MakeTrie: %v", err)
return fmt.Errorf("initializeHashes was unable to MakeTrie: %v", err)
}

// we might have a database that was previously initialized, and now we're adding the balances trie. In that case, we need to add all the existing balances to this trie.
// we can figure this out by examining the hash of the root:
rootHash, err := trie.RootHash()
if err != nil {
return fmt.Errorf("accountsInitialize was unable to retrieve trie root hash: %v", err)
return fmt.Errorf("initializeHashes was unable to retrieve trie root hash: %v", err)
}

if rootHash.IsZero() {
ct.log.Infof("accountsInitialize rebuilding merkle trie for round %d", rnd)
ct.log.Infof("initializeHashes rebuilding merkle trie for round %d", rnd)
accountBuilderIt := makeOrderedAccountsIter(tx, trieRebuildAccountChunkSize)
defer accountBuilderIt.Close(ctx)
startTrieBuildTime := time.Now()
Expand All @@ -1550,16 +1550,16 @@ func (ct *catchpointTracker) accountsInitializeHashes(ctx context.Context, tx *s
for _, acct := range accts {
added, err := trie.Add(acct.digest)
if err != nil {
return fmt.Errorf("accountsInitialize was unable to add changes to trie: %v", err)
return fmt.Errorf("initializeHashes was unable to add acct to trie: %v", err)
}
if !added {
// we need to translate the "addrid" into actual account address so that
// we can report the failure.
addr, err := lookupAccountAddressFromAddressID(ctx, tx, acct.addrid)
if err != nil {
ct.log.Warnf("accountsInitialize attempted to add duplicate hash '%s' to merkle trie for account id %d : %v", hex.EncodeToString(acct.digest), acct.addrid, err)
ct.log.Warnf("initializeHashes attempted to add duplicate acct hash '%s' to merkle trie for account id %d : %v", hex.EncodeToString(acct.digest), acct.addrid, err)
} else {
ct.log.Warnf("accountsInitialize attempted to add duplicate hash '%s' to merkle trie for account %v", hex.EncodeToString(acct.digest), addr)
ct.log.Warnf("initializeHashes attempted to add duplicate acct hash '%s' to merkle trie for account %v", hex.EncodeToString(acct.digest), addr)
}
}
}
Expand All @@ -1569,22 +1569,22 @@ func (ct *catchpointTracker) accountsInitializeHashes(ctx context.Context, tx *s
// if anything goes wrong, it will still get rolled back.
_, err = trie.Evict(true)
if err != nil {
return fmt.Errorf("accountsInitialize was unable to commit changes to trie: %v", err)
return fmt.Errorf("initializeHashes was unable to commit changes to trie: %v", err)
}
pendingTrieHashes = 0
}

if time.Since(lastRebuildTime) > 5*time.Second {
// let the user know that the trie is still being rebuilt.
ct.log.Infof("accountsInitialize still building the trie, and processed so far %d trie entries", trieHashCount)
ct.log.Infof("initializeHashes still building the trie, and processed so far %d trie entries", trieHashCount)
lastRebuildTime = time.Now()
}
} else if processedRows > 0 {
totalOrderedAccounts += processedRows
// if it's not ordered, we can ignore it for now; we'll just increase the counters and emit logs periodically.
if time.Since(lastRebuildTime) > 5*time.Second {
// let the user know that the trie is still being rebuilt.
ct.log.Infof("accountsInitialize still building the trie, and hashed so far %d accounts", totalOrderedAccounts)
ct.log.Infof("initializeHashes still building the trie, and hashed so far %d accounts", totalOrderedAccounts)
lastRebuildTime = time.Now()
}
}
Expand All @@ -1594,16 +1594,59 @@ func (ct *catchpointTracker) accountsInitializeHashes(ctx context.Context, tx *s
// if anything goes wrong, it will still get rolled back.
_, err = trie.Evict(true)
if err != nil {
return fmt.Errorf("accountsInitialize was unable to commit changes to trie: %v", err)
return fmt.Errorf("initializeHashes was unable to commit changes to trie: %v", err)
}

// Now add the kvstore hashes
pendingTrieHashes = 0
kvs, err := tx.QueryContext(ctx, "SELECT key, value FROM kvstore")
if err != nil {
return err
}
defer kvs.Close()
for kvs.Next() {
var k []byte
var v []byte
err := kvs.Scan(&k, &v)
if err != nil {
return err
}
hash := kvHashBuilderV6(string(k), v)
trieHashCount++
pendingTrieHashes++
added, err := trie.Add(hash)
if err != nil {
return fmt.Errorf("initializeHashes was unable to add kv (key=%s) to trie: %v", hex.EncodeToString(k), err)
}
if !added {
ct.log.Warnf("initializeHashes attempted to add duplicate kv hash '%s' to merkle trie for key %s", hex.EncodeToString(hash), k)
}
if pendingTrieHashes >= trieRebuildCommitFrequency {
// this trie Evict will commit using the current transaction.
// if anything goes wrong, it will still get rolled back.
_, err = trie.Evict(true)
if err != nil {
return fmt.Errorf("initializeHashes was unable to commit changes to trie: %v", err)
}
pendingTrieHashes = 0
}
// We could insert code to report things every 5 seconds, like was done for accounts.
}

// this trie Evict will commit using the current transaction.
// if anything goes wrong, it will still get rolled back.
_, err = trie.Evict(true)
if err != nil {
return fmt.Errorf("initializeHashes was unable to commit changes to trie: %v", err)
}

// we've just updated the merkle trie, update the hashRound to reflect that.
err = updateAccountsHashRound(ctx, tx, rnd)
if err != nil {
return fmt.Errorf("accountsInitialize was unable to update the account hash round to %d: %v", rnd, err)
return fmt.Errorf("initializeHashes was unable to update the account hash round to %d: %v", rnd, err)
}

ct.log.Infof("accountsInitialize rebuilt the merkle trie with %d entries in %v", trieHashCount, time.Since(startTrieBuildTime))
ct.log.Infof("initializeHashes rebuilt the merkle trie with %d entries in %v", trieHashCount, time.Since(startTrieBuildTime))
}
ct.balancesTrie = trie
return nil
Expand Down

0 comments on commit fe580fd

Please sign in to comment.