Skip to content

Commit

Permalink
performance: turn cache misses during assembly into cache hits during…
Browse files Browse the repository at this point in the history
… eval (#4617)
  • Loading branch information
icorderi authored Nov 4, 2022
1 parent 0af97b3 commit 207f964
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 12 deletions.
27 changes: 27 additions & 0 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,17 @@ func (au *accountUpdates) close() {
au.baseKVs.prune(0)
}

// flushCaches flushes any pending data in caches so that it is fully available during future lookups.
func (au *accountUpdates) flushCaches() {
au.accountsMu.Lock()

au.baseAccounts.flushPendingWrites()
au.baseResources.flushPendingWrites()
au.baseKVs.flushPendingWrites()

au.accountsMu.Unlock()
}

func (au *accountUpdates) LookupResource(rnd basics.Round, addr basics.Address, aidx basics.CreatableIndex, ctype basics.CreatableType) (ledgercore.AccountResource, basics.Round, error) {
return au.lookupResource(rnd, addr, aidx, ctype, true /* take lock */)
}
Expand Down Expand Up @@ -816,6 +827,8 @@ type accountUpdatesLedgerEvaluator struct {
prevHeader bookkeeping.BlockHeader
}

func (aul *accountUpdatesLedgerEvaluator) FlushCaches() {}

// GenesisHash returns the genesis hash
func (aul *accountUpdatesLedgerEvaluator) GenesisHash() crypto.Digest {
return aul.au.ledger.GenesisHash()
Expand Down Expand Up @@ -1327,6 +1340,12 @@ func (au *accountUpdates) lookupResource(rnd basics.Round, addr basics.Address,
return macct.AccountResource(), rnd, nil
}

// check baseAccoiunts again to see if it does not exist
if au.baseResources.readNotFound(addr, aidx) {
// it seems the account doesnt exist
return ledgercore.AccountResource{}, rnd, nil
}

if synchronized {
au.accountsMu.RUnlock()
needUnlock = false
Expand All @@ -1346,6 +1365,7 @@ func (au *accountUpdates) lookupResource(rnd basics.Round, addr basics.Address,
au.baseResources.writePending(persistedData, addr)
return persistedData.AccountResource(), rnd, nil
}
au.baseResources.writeNotFoundPending(addr, aidx)
// otherwise return empty
return ledgercore.AccountResource{}, rnd, nil
}
Expand Down Expand Up @@ -1428,6 +1448,12 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add
return macct.accountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil
}

// check baseAccoiunts again to see if it does not exist
if au.baseAccounts.readNotFound(addr) {
// it seems the account doesnt exist
return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil
}

if synchronized {
au.accountsMu.RUnlock()
needUnlock = false
Expand All @@ -1447,6 +1473,7 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add
au.baseAccounts.writePending(persistedData)
return persistedData.accountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil
}
au.baseAccounts.writeNotFoundPending(addr)
// otherwise return empty
return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil
}
Expand Down
2 changes: 2 additions & 0 deletions ledger/evalindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type indexerLedgerConnector struct {
roundResources EvalForIndexerResources
}

func (l indexerLedgerConnector) FlushCaches() {}

// BlockHdr is part of LedgerForEvaluator interface.
func (l indexerLedgerConnector) BlockHdr(round basics.Round) (bookkeeping.BlockHeader, error) {
if round != l.latestRound {
Expand Down
4 changes: 4 additions & 0 deletions ledger/internal/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ type LedgerForEvaluator interface {
GenesisProto() config.ConsensusParams
LatestTotals() (basics.Round, ledgercore.AccountTotals, error)
VotersForStateProof(basics.Round) (*ledgercore.VotersForRound, error)
FlushCaches()
}

// EvaluatorOptions defines the evaluator creation options
Expand Down Expand Up @@ -1500,6 +1501,9 @@ func (validator *evalTxValidator) run() {
// AddBlock: Eval(context.Background(), l, blk, false, txcache, nil)
// tracker: Eval(context.Background(), l, blk, false, txcache, nil)
func Eval(ctx context.Context, l LedgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool) (ledgercore.StateDelta, error) {
// flush the pending writes in the cache to make everything read so far available during eval
l.FlushCaches()

eval, err := StartEvaluator(l, blk.BlockHeader,
EvaluatorOptions{
PaysetHint: len(blk.Payset),
Expand Down
2 changes: 2 additions & 0 deletions ledger/internal/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ func (ledger *evalTestLedger) StartEvaluator(hdr bookkeeping.BlockHeader, payset
})
}

func (ledger *evalTestLedger) FlushCaches() {}

// GetCreatorForRound takes a CreatableIndex and a CreatableType and tries to
// look up a creator address, setting ok to false if the query succeeded but no
// creator was found.
Expand Down
1 change: 1 addition & 0 deletions ledger/internal/prefetcher/prefetcher_alignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (l *prefetcherAlignmentTestLedger) LatestTotals() (basics.Round, ledgercore
func (l *prefetcherAlignmentTestLedger) VotersForStateProof(basics.Round) (*ledgercore.VotersForRound, error) {
return nil, nil
}
func (l *prefetcherAlignmentTestLedger) FlushCaches() {}

func parseLoadedAccountDataEntries(loadedAccountDataEntries []prefetcher.LoadedAccountDataEntry) map[basics.Address]struct{} {
if len(loadedAccountDataEntries) == 0 {
Expand Down
5 changes: 5 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,11 @@ func (l *Ledger) StartEvaluator(hdr bookkeeping.BlockHeader, paysetHint, maxTxnB
})
}

// FlushCaches flushes any pending data in caches so that it is fully available during future lookups.
func (l *Ledger) FlushCaches() {
l.accts.flushCaches()
}

// Validate uses the ledger to validate block blk as a candidate next block.
// It returns an error if blk is not the expected next block, or if blk is
// not a valid block (e.g., it has duplicate transactions, overspends some
Expand Down
40 changes: 39 additions & 1 deletion ledger/lruaccts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type lruAccounts struct {
log logging.Logger
// pendingWritesWarnThreshold is the threshold beyond we would write a warning for exceeding the number of pendingAccounts entries
pendingWritesWarnThreshold int

pendingNotFound chan basics.Address
notFound map[basics.Address]struct{}
}

// init initializes the lruAccounts for use.
Expand All @@ -45,6 +48,8 @@ func (m *lruAccounts) init(log logging.Logger, pendingWrites int, pendingWritesW
m.accountsList = newPersistedAccountList().allocateFreeNodes(pendingWrites)
m.accounts = make(map[basics.Address]*persistedAccountDataListNode, pendingWrites)
m.pendingAccounts = make(chan persistedAccountData, pendingWrites)
m.notFound = make(map[basics.Address]struct{}, pendingWrites)
m.pendingNotFound = make(chan basics.Address, pendingWrites)
m.log = log
m.pendingWritesWarnThreshold = pendingWritesWarnThreshold
}
Expand All @@ -58,19 +63,39 @@ func (m *lruAccounts) read(addr basics.Address) (data persistedAccountData, has
return persistedAccountData{}, false
}

// readNotFound returns whether we have attempted to read this address but it did not exist in the db.
// thread locking semantics : read lock
func (m *lruAccounts) readNotFound(addr basics.Address) bool {
_, ok := m.notFound[addr]
return ok
}

// flushPendingWrites flushes the pending writes to the main lruAccounts cache.
// thread locking semantics : write lock
func (m *lruAccounts) flushPendingWrites() {
pendingEntriesCount := len(m.pendingAccounts)
if pendingEntriesCount >= m.pendingWritesWarnThreshold {
m.log.Warnf("lruAccounts: number of entries in pendingAccounts(%d) exceed the warning threshold of %d", pendingEntriesCount, m.pendingWritesWarnThreshold)
}

outer:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case pendingAccountData := <-m.pendingAccounts:
m.write(pendingAccountData)
default:
return
break outer
}
}

pendingEntriesCount = len(m.pendingNotFound)
outer2:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case addr := <-m.pendingNotFound:
m.notFound[addr] = struct{}{}
default:
break outer2
}
}
}
Expand All @@ -85,6 +110,16 @@ func (m *lruAccounts) writePending(acct persistedAccountData) {
}
}

// writeNotFoundPending tags an address as not existing in the db.
// the function doesn't block, and in case of a buffer overflow the entry would not be added.
// thread locking semantics : no lock is required.
func (m *lruAccounts) writeNotFoundPending(addr basics.Address) {
select {
case m.pendingNotFound <- addr:
default:
}
}

// write a single persistedAccountData to the lruAccounts cache.
// when writing the entry, the round number would be used to determine if it's a newer
// version of what's already on the cache or not. In all cases, the entry is going
Expand Down Expand Up @@ -117,5 +152,8 @@ func (m *lruAccounts) prune(newSize int) (removed int) {
m.accountsList.remove(back)
removed++
}

// clear the notFound list
m.notFound = make(map[basics.Address]struct{}, len(m.notFound))
return
}
40 changes: 39 additions & 1 deletion ledger/lruresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type lruResources struct {

// pendingWritesWarnThreshold is the threshold beyond we would write a warning for exceeding the number of pendingResources entries
pendingWritesWarnThreshold int

pendingNotFound chan accountCreatable
notFound map[accountCreatable]struct{}
}

// init initializes the lruResources for use.
Expand All @@ -56,6 +59,8 @@ func (m *lruResources) init(log logging.Logger, pendingWrites int, pendingWrites
m.resourcesList = newPersistedResourcesList().allocateFreeNodes(pendingWrites)
m.resources = make(map[accountCreatable]*persistedResourcesDataListNode, pendingWrites)
m.pendingResources = make(chan cachedResourceData, pendingWrites)
m.notFound = make(map[accountCreatable]struct{}, pendingWrites)
m.pendingNotFound = make(chan accountCreatable, pendingWrites)
m.log = log
m.pendingWritesWarnThreshold = pendingWritesWarnThreshold
}
Expand All @@ -69,6 +74,13 @@ func (m *lruResources) read(addr basics.Address, aidx basics.CreatableIndex) (da
return persistedResourcesData{}, false
}

// readNotFound returns whether we have attempted to read this address but it did not exist in the db.
// thread locking semantics : read lock
func (m *lruResources) readNotFound(addr basics.Address, idx basics.CreatableIndex) bool {
_, ok := m.notFound[accountCreatable{address: addr, index: idx}]
return ok
}

// read the persistedResourcesData object that the lruResources has for the given address.
// thread locking semantics : read lock
func (m *lruResources) readAll(addr basics.Address) (ret []persistedResourcesData) {
Expand All @@ -87,12 +99,25 @@ func (m *lruResources) flushPendingWrites() {
if pendingEntriesCount >= m.pendingWritesWarnThreshold {
m.log.Warnf("lruResources: number of entries in pendingResources(%d) exceed the warning threshold of %d", pendingEntriesCount, m.pendingWritesWarnThreshold)
}

outer:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case pendingResourceData := <-m.pendingResources:
m.write(pendingResourceData.persistedResourcesData, pendingResourceData.address)
default:
return
break outer
}
}

pendingEntriesCount = len(m.pendingNotFound)
outer2:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case key := <-m.pendingNotFound:
m.notFound[key] = struct{}{}
default:
break outer2
}
}
}
Expand All @@ -107,6 +132,16 @@ func (m *lruResources) writePending(acct persistedResourcesData, addr basics.Add
}
}

// writeNotFoundPending tags an address as not existing in the db.
// the function doesn't block, and in case of a buffer overflow the entry would not be added.
// thread locking semantics : no lock is required.
func (m *lruResources) writeNotFoundPending(addr basics.Address, idx basics.CreatableIndex) {
select {
case m.pendingNotFound <- accountCreatable{address: addr, index: idx}:
default:
}
}

// write a single persistedAccountData to the lruResources cache.
// when writing the entry, the round number would be used to determine if it's a newer
// version of what's already on the cache or not. In all cases, the entry is going
Expand Down Expand Up @@ -139,5 +174,8 @@ func (m *lruResources) prune(newSize int) (removed int) {
m.resourcesList.remove(back)
removed++
}

// clear the notFound list
m.notFound = make(map[accountCreatable]struct{}, len(m.notFound))
return
}
2 changes: 2 additions & 0 deletions shared/pingpong/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type PpConfig struct {
RandomizeFee bool
RandomizeAmt bool
RandomizeDst bool
MaxRandomDst uint64
MaxFee uint64
MinFee uint64
MaxAmt uint64
Expand Down Expand Up @@ -98,6 +99,7 @@ var DefaultConfig = PpConfig{
RandomizeFee: false,
RandomizeAmt: false,
RandomizeDst: false,
MaxRandomDst: 200000,
MaxFee: 10000,
MinFee: 1000,
MaxAmt: 1000,
Expand Down
39 changes: 29 additions & 10 deletions shared/pingpong/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ func (ppa *pingPongAccount) String() string {

// WorkerState object holds a running pingpong worker
type WorkerState struct {
cfg PpConfig
accounts map[string]*pingPongAccount
cinfo CreatablesInfo
cfg PpConfig
accounts map[string]*pingPongAccount
randomAccounts []string
cinfo CreatablesInfo

nftStartTime int64
localNftIndex uint64
Expand Down Expand Up @@ -633,7 +634,11 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac *libgoal.Client) {

// NewPingpong creates a new pingpong WorkerState
func NewPingpong(cfg PpConfig) *WorkerState {
return &WorkerState{cfg: cfg, nftHolders: make(map[string]int)}
return &WorkerState{
cfg: cfg,
nftHolders: make(map[string]int),
randomAccounts: make([]string, 0, cfg.MaxRandomDst),
}
}

func (pps *WorkerState) randAssetID() (aidx uint64) {
Expand Down Expand Up @@ -703,18 +708,28 @@ func (pps *WorkerState) sendFromTo(
fee := pps.fee()

to := toList[i]
if pps.cfg.RandomizeDst {
var addr basics.Address
crypto.RandBytes(addr[:])
to = addr.String()
} else if len(belowMinBalanceAccounts) > 0 && (crypto.RandUint64()%100 < 50) {
if len(belowMinBalanceAccounts) > 0 && (crypto.RandUint64()%100 < 50) {
// make 50% of the calls attempt to refund low-balanced accounts.
// ( if there is any )
// pick the first low balance account
for acct := range belowMinBalanceAccounts {
to = acct
break
}
} else if pps.cfg.RandomizeDst {
// check if we need to create a new random account, or use an existing one
if uint64(len(pps.randomAccounts)) >= pps.cfg.MaxRandomDst {
// use pre-created random account
i := rand.Int63n(int64(len(pps.randomAccounts)))
to = pps.randomAccounts[i]
} else {
// create new random account
var addr basics.Address
crypto.RandBytes(addr[:])
to = addr.String()
// push new account
pps.randomAccounts = append(pps.randomAccounts, to)
}
}

// Broadcast transaction
Expand Down Expand Up @@ -970,7 +985,11 @@ type paymentUpdate struct {

func (au *paymentUpdate) apply(pps *WorkerState) {
pps.accounts[au.from].balance -= (au.fee + au.amt)
pps.accounts[au.to].balance += au.amt
// update account balance
to := pps.accounts[au.to]
if to != nil {
to.balance += au.amt
}
}

// return true with probability 1/i
Expand Down

0 comments on commit 207f964

Please sign in to comment.