Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

performance: turn cache misses during assembly into cache hits during eval #4617

Merged
merged 4 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,17 @@ func (au *accountUpdates) close() {
au.baseKVs.prune(0)
}

// flushCaches flushes any pending data in caches so that it is fully available during future lookups.
func (au *accountUpdates) flushCaches() {
au.accountsMu.Lock()

au.baseAccounts.flushPendingWrites()
au.baseResources.flushPendingWrites()
au.baseKVs.flushPendingWrites()
Copy link
Contributor Author

@icorderi icorderi Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebased on master to get the boxes feature and added a flush on the baseKVs LRU.


au.accountsMu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer defer?

}

func (au *accountUpdates) LookupResource(rnd basics.Round, addr basics.Address, aidx basics.CreatableIndex, ctype basics.CreatableType) (ledgercore.AccountResource, basics.Round, error) {
return au.lookupResource(rnd, addr, aidx, ctype, true /* take lock */)
}
Expand Down Expand Up @@ -816,6 +827,8 @@ type accountUpdatesLedgerEvaluator struct {
prevHeader bookkeeping.BlockHeader
}

func (aul *accountUpdatesLedgerEvaluator) FlushCaches() {}

// GenesisHash returns the genesis hash
func (aul *accountUpdatesLedgerEvaluator) GenesisHash() crypto.Digest {
return aul.au.ledger.GenesisHash()
Expand Down Expand Up @@ -1327,6 +1340,12 @@ func (au *accountUpdates) lookupResource(rnd basics.Round, addr basics.Address,
return macct.AccountResource(), rnd, nil
}

// check baseAccoiunts again to see if it does not exist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"baseResources"

if au.baseResources.readNotFound(addr, aidx) {
// it seems the account doesnt exist
return ledgercore.AccountResource{}, rnd, nil
}

if synchronized {
au.accountsMu.RUnlock()
needUnlock = false
Expand All @@ -1346,6 +1365,7 @@ func (au *accountUpdates) lookupResource(rnd basics.Round, addr basics.Address,
au.baseResources.writePending(persistedData, addr)
return persistedData.AccountResource(), rnd, nil
}
au.baseResources.writeNotFoundPending(addr, aidx)
// otherwise return empty
return ledgercore.AccountResource{}, rnd, nil
}
Expand Down Expand Up @@ -1428,6 +1448,12 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add
return macct.accountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil
}

// check baseAccoiunts again to see if it does not exist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"baseAccounts" (sp)

if au.baseAccounts.readNotFound(addr) {
// it seems the account doesnt exist
return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil
}

if synchronized {
au.accountsMu.RUnlock()
needUnlock = false
Expand All @@ -1447,6 +1473,7 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add
au.baseAccounts.writePending(persistedData)
return persistedData.accountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil
}
au.baseAccounts.writeNotFoundPending(addr)
// otherwise return empty
return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil
}
Expand Down
2 changes: 2 additions & 0 deletions ledger/evalindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type indexerLedgerConnector struct {
roundResources EvalForIndexerResources
}

func (l indexerLedgerConnector) FlushCaches() {}

// BlockHdr is part of LedgerForEvaluator interface.
func (l indexerLedgerConnector) BlockHdr(round basics.Round) (bookkeeping.BlockHeader, error) {
if round != l.latestRound {
Expand Down
4 changes: 4 additions & 0 deletions ledger/internal/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ type LedgerForEvaluator interface {
GenesisProto() config.ConsensusParams
LatestTotals() (basics.Round, ledgercore.AccountTotals, error)
VotersForStateProof(basics.Round) (*ledgercore.VotersForRound, error)
FlushCaches()
}

// EvaluatorOptions defines the evaluator creation options
Expand Down Expand Up @@ -1500,6 +1501,9 @@ func (validator *evalTxValidator) run() {
// AddBlock: Eval(context.Background(), l, blk, false, txcache, nil)
// tracker: Eval(context.Background(), l, blk, false, txcache, nil)
func Eval(ctx context.Context, l LedgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool) (ledgercore.StateDelta, error) {
// flush the pending writes in the cache to make everything read so far available during eval
l.FlushCaches()

eval, err := StartEvaluator(l, blk.BlockHeader,
EvaluatorOptions{
PaysetHint: len(blk.Payset),
Expand Down
2 changes: 2 additions & 0 deletions ledger/internal/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ func (ledger *evalTestLedger) StartEvaluator(hdr bookkeeping.BlockHeader, payset
})
}

func (ledger *evalTestLedger) FlushCaches() {}

// GetCreatorForRound takes a CreatableIndex and a CreatableType and tries to
// look up a creator address, setting ok to false if the query succeeded but no
// creator was found.
Expand Down
1 change: 1 addition & 0 deletions ledger/internal/prefetcher/prefetcher_alignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (l *prefetcherAlignmentTestLedger) LatestTotals() (basics.Round, ledgercore
func (l *prefetcherAlignmentTestLedger) VotersForStateProof(basics.Round) (*ledgercore.VotersForRound, error) {
return nil, nil
}
func (l *prefetcherAlignmentTestLedger) FlushCaches() {}

func parseLoadedAccountDataEntries(loadedAccountDataEntries []prefetcher.LoadedAccountDataEntry) map[basics.Address]struct{} {
if len(loadedAccountDataEntries) == 0 {
Expand Down
5 changes: 5 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,11 @@ func (l *Ledger) StartEvaluator(hdr bookkeeping.BlockHeader, paysetHint, maxTxnB
})
}

// FlushCaches flushes any pending data in caches so that it is fully available during future lookups.
func (l *Ledger) FlushCaches() {
l.accts.flushCaches()
}

// Validate uses the ledger to validate block blk as a candidate next block.
// It returns an error if blk is not the expected next block, or if blk is
// not a valid block (e.g., it has duplicate transactions, overspends some
Expand Down
40 changes: 39 additions & 1 deletion ledger/lruaccts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type lruAccounts struct {
log logging.Logger
// pendingWritesWarnThreshold is the threshold beyond we would write a warning for exceeding the number of pendingAccounts entries
pendingWritesWarnThreshold int

pendingNotFound chan basics.Address
notFound map[basics.Address]struct{}
}

// init initializes the lruAccounts for use.
Expand All @@ -45,6 +48,8 @@ func (m *lruAccounts) init(log logging.Logger, pendingWrites int, pendingWritesW
m.accountsList = newPersistedAccountList().allocateFreeNodes(pendingWrites)
m.accounts = make(map[basics.Address]*persistedAccountDataListNode, pendingWrites)
m.pendingAccounts = make(chan persistedAccountData, pendingWrites)
m.notFound = make(map[basics.Address]struct{}, pendingWrites)
m.pendingNotFound = make(chan basics.Address, pendingWrites)
m.log = log
m.pendingWritesWarnThreshold = pendingWritesWarnThreshold
}
Expand All @@ -58,19 +63,39 @@ func (m *lruAccounts) read(addr basics.Address) (data persistedAccountData, has
return persistedAccountData{}, false
}

// readNotFound returns whether we have attempted to read this address but it did not exist in the db.
// thread locking semantics : read lock
func (m *lruAccounts) readNotFound(addr basics.Address) bool {
_, ok := m.notFound[addr]
return ok
}

// flushPendingWrites flushes the pending writes to the main lruAccounts cache.
// thread locking semantics : write lock
func (m *lruAccounts) flushPendingWrites() {
pendingEntriesCount := len(m.pendingAccounts)
if pendingEntriesCount >= m.pendingWritesWarnThreshold {
m.log.Warnf("lruAccounts: number of entries in pendingAccounts(%d) exceed the warning threshold of %d", pendingEntriesCount, m.pendingWritesWarnThreshold)
}

outer:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case pendingAccountData := <-m.pendingAccounts:
m.write(pendingAccountData)
default:
return
break outer
}
}

pendingEntriesCount = len(m.pendingNotFound)
outer2:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case addr := <-m.pendingNotFound:
m.notFound[addr] = struct{}{}
default:
break outer2
}
}
}
Expand All @@ -85,6 +110,16 @@ func (m *lruAccounts) writePending(acct persistedAccountData) {
}
}

// writeNotFoundPending tags an address as not existing in the db.
// the function doesn't block, and in case of a buffer overflow the entry would not be added.
// thread locking semantics : no lock is required.
func (m *lruAccounts) writeNotFoundPending(addr basics.Address) {
select {
case m.pendingNotFound <- addr:
default:
}
}

// write a single persistedAccountData to the lruAccounts cache.
// when writing the entry, the round number would be used to determine if it's a newer
// version of what's already on the cache or not. In all cases, the entry is going
Expand Down Expand Up @@ -117,5 +152,8 @@ func (m *lruAccounts) prune(newSize int) (removed int) {
m.accountsList.remove(back)
removed++
}

// clear the notFound list
m.notFound = make(map[basics.Address]struct{}, len(m.notFound))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supposedly, deleting in a loop over all keys is recognized by the compiler and optimized to be cheaper than reallocating. @cce posted something about it. https://go-review.googlesource.com/c/go/+/110055

return
}
40 changes: 39 additions & 1 deletion ledger/lruresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type lruResources struct {

// pendingWritesWarnThreshold is the threshold beyond we would write a warning for exceeding the number of pendingResources entries
pendingWritesWarnThreshold int

pendingNotFound chan accountCreatable
notFound map[accountCreatable]struct{}
}

// init initializes the lruResources for use.
Expand All @@ -56,6 +59,8 @@ func (m *lruResources) init(log logging.Logger, pendingWrites int, pendingWrites
m.resourcesList = newPersistedResourcesList().allocateFreeNodes(pendingWrites)
m.resources = make(map[accountCreatable]*persistedResourcesDataListNode, pendingWrites)
m.pendingResources = make(chan cachedResourceData, pendingWrites)
m.notFound = make(map[accountCreatable]struct{}, pendingWrites)
m.pendingNotFound = make(chan accountCreatable, pendingWrites)
m.log = log
m.pendingWritesWarnThreshold = pendingWritesWarnThreshold
}
Expand All @@ -69,6 +74,13 @@ func (m *lruResources) read(addr basics.Address, aidx basics.CreatableIndex) (da
return persistedResourcesData{}, false
}

// readNotFound returns whether we have attempted to read this address but it did not exist in the db.
// thread locking semantics : read lock
func (m *lruResources) readNotFound(addr basics.Address, idx basics.CreatableIndex) bool {
_, ok := m.notFound[accountCreatable{address: addr, index: idx}]
return ok
}

// read the persistedResourcesData object that the lruResources has for the given address.
// thread locking semantics : read lock
func (m *lruResources) readAll(addr basics.Address) (ret []persistedResourcesData) {
Expand All @@ -87,12 +99,25 @@ func (m *lruResources) flushPendingWrites() {
if pendingEntriesCount >= m.pendingWritesWarnThreshold {
m.log.Warnf("lruResources: number of entries in pendingResources(%d) exceed the warning threshold of %d", pendingEntriesCount, m.pendingWritesWarnThreshold)
}

outer:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case pendingResourceData := <-m.pendingResources:
m.write(pendingResourceData.persistedResourcesData, pendingResourceData.address)
default:
return
break outer
}
}

pendingEntriesCount = len(m.pendingNotFound)
outer2:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case key := <-m.pendingNotFound:
m.notFound[key] = struct{}{}
default:
break outer2
}
}
}
Expand All @@ -107,6 +132,16 @@ func (m *lruResources) writePending(acct persistedResourcesData, addr basics.Add
}
}

// writeNotFoundPending tags an address as not existing in the db.
// the function doesn't block, and in case of a buffer overflow the entry would not be added.
// thread locking semantics : no lock is required.
func (m *lruResources) writeNotFoundPending(addr basics.Address, idx basics.CreatableIndex) {
select {
case m.pendingNotFound <- accountCreatable{address: addr, index: idx}:
default:
}
}

// write a single persistedAccountData to the lruResources cache.
// when writing the entry, the round number would be used to determine if it's a newer
// version of what's already on the cache or not. In all cases, the entry is going
Expand Down Expand Up @@ -139,5 +174,8 @@ func (m *lruResources) prune(newSize int) (removed int) {
m.resourcesList.remove(back)
removed++
}

// clear the notFound list
m.notFound = make(map[accountCreatable]struct{}, len(m.notFound))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

return
}
2 changes: 2 additions & 0 deletions shared/pingpong/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type PpConfig struct {
RandomizeFee bool
RandomizeAmt bool
RandomizeDst bool
MaxRandomDst uint64
MaxFee uint64
MinFee uint64
MaxAmt uint64
Expand Down Expand Up @@ -98,6 +99,7 @@ var DefaultConfig = PpConfig{
RandomizeFee: false,
RandomizeAmt: false,
RandomizeDst: false,
MaxRandomDst: 200000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change default value might change nightly results. @egieseke @algobarb please be prepared

MaxFee: 10000,
MinFee: 1000,
MaxAmt: 1000,
Expand Down
39 changes: 29 additions & 10 deletions shared/pingpong/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ func (ppa *pingPongAccount) String() string {

// WorkerState object holds a running pingpong worker
type WorkerState struct {
cfg PpConfig
accounts map[string]*pingPongAccount
cinfo CreatablesInfo
cfg PpConfig
accounts map[string]*pingPongAccount
randomAccounts []string
cinfo CreatablesInfo

nftStartTime int64
localNftIndex uint64
Expand Down Expand Up @@ -633,7 +634,11 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac *libgoal.Client) {

// NewPingpong creates a new pingpong WorkerState
func NewPingpong(cfg PpConfig) *WorkerState {
return &WorkerState{cfg: cfg, nftHolders: make(map[string]int)}
return &WorkerState{
cfg: cfg,
nftHolders: make(map[string]int),
randomAccounts: make([]string, 0, cfg.MaxRandomDst),
}
}

func (pps *WorkerState) randAssetID() (aidx uint64) {
Expand Down Expand Up @@ -703,18 +708,28 @@ func (pps *WorkerState) sendFromTo(
fee := pps.fee()

to := toList[i]
if pps.cfg.RandomizeDst {
var addr basics.Address
crypto.RandBytes(addr[:])
to = addr.String()
} else if len(belowMinBalanceAccounts) > 0 && (crypto.RandUint64()%100 < 50) {
if len(belowMinBalanceAccounts) > 0 && (crypto.RandUint64()%100 < 50) {
// make 50% of the calls attempt to refund low-balanced accounts.
// ( if there is any )
// pick the first low balance account
for acct := range belowMinBalanceAccounts {
to = acct
break
}
} else if pps.cfg.RandomizeDst {
// check if we need to create a new random account, or use an existing one
if uint64(len(pps.randomAccounts)) >= pps.cfg.MaxRandomDst {
// use pre-created random account
i := rand.Int63n(int64(len(pps.randomAccounts)))
to = pps.randomAccounts[i]
} else {
// create new random account
var addr basics.Address
crypto.RandBytes(addr[:])
to = addr.String()
// push new account
pps.randomAccounts = append(pps.randomAccounts, to)
}
}

// Broadcast transaction
Expand Down Expand Up @@ -970,7 +985,11 @@ type paymentUpdate struct {

func (au *paymentUpdate) apply(pps *WorkerState) {
pps.accounts[au.from].balance -= (au.fee + au.amt)
pps.accounts[au.to].balance += au.amt
// update account balance
to := pps.accounts[au.to]
if to != nil {
to.balance += au.amt
}
}

// return true with probability 1/i
Expand Down