diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index 8f1a572d1b..836a7c670d 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -328,6 +328,17 @@ func (au *accountUpdates) close() { au.baseKVs.prune(0) } +// flushCaches flushes any pending data in caches so that it is fully available during future lookups. +func (au *accountUpdates) flushCaches() { + au.accountsMu.Lock() + + au.baseAccounts.flushPendingWrites() + au.baseResources.flushPendingWrites() + au.baseKVs.flushPendingWrites() + + au.accountsMu.Unlock() +} + func (au *accountUpdates) LookupResource(rnd basics.Round, addr basics.Address, aidx basics.CreatableIndex, ctype basics.CreatableType) (ledgercore.AccountResource, basics.Round, error) { return au.lookupResource(rnd, addr, aidx, ctype, true /* take lock */) } @@ -816,6 +827,8 @@ type accountUpdatesLedgerEvaluator struct { prevHeader bookkeeping.BlockHeader } +func (aul *accountUpdatesLedgerEvaluator) FlushCaches() {} + // GenesisHash returns the genesis hash func (aul *accountUpdatesLedgerEvaluator) GenesisHash() crypto.Digest { return aul.au.ledger.GenesisHash() @@ -1327,6 +1340,12 @@ func (au *accountUpdates) lookupResource(rnd basics.Round, addr basics.Address, return macct.AccountResource(), rnd, nil } + // check baseAccoiunts again to see if it does not exist + if au.baseResources.readNotFound(addr, aidx) { + // it seems the account doesnt exist + return ledgercore.AccountResource{}, rnd, nil + } + if synchronized { au.accountsMu.RUnlock() needUnlock = false @@ -1346,6 +1365,7 @@ func (au *accountUpdates) lookupResource(rnd basics.Round, addr basics.Address, au.baseResources.writePending(persistedData, addr) return persistedData.AccountResource(), rnd, nil } + au.baseResources.writeNotFoundPending(addr, aidx) // otherwise return empty return ledgercore.AccountResource{}, rnd, nil } @@ -1428,6 +1448,12 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add return macct.accountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil } + // check baseAccoiunts again to see if it does not exist + if au.baseAccounts.readNotFound(addr) { + // it seems the account doesnt exist + return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil + } + if synchronized { au.accountsMu.RUnlock() needUnlock = false @@ -1447,6 +1473,7 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add au.baseAccounts.writePending(persistedData) return persistedData.accountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil } + au.baseAccounts.writeNotFoundPending(addr) // otherwise return empty return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil } diff --git a/ledger/evalindexer.go b/ledger/evalindexer.go index daefd3035c..3e2d8ca34b 100644 --- a/ledger/evalindexer.go +++ b/ledger/evalindexer.go @@ -79,6 +79,8 @@ type indexerLedgerConnector struct { roundResources EvalForIndexerResources } +func (l indexerLedgerConnector) FlushCaches() {} + // BlockHdr is part of LedgerForEvaluator interface. func (l indexerLedgerConnector) BlockHdr(round basics.Round) (bookkeeping.BlockHeader, error) { if round != l.latestRound { diff --git a/ledger/internal/eval.go b/ledger/internal/eval.go index a149c7d018..b42d24f6b5 100644 --- a/ledger/internal/eval.go +++ b/ledger/internal/eval.go @@ -603,6 +603,7 @@ type LedgerForEvaluator interface { GenesisProto() config.ConsensusParams LatestTotals() (basics.Round, ledgercore.AccountTotals, error) VotersForStateProof(basics.Round) (*ledgercore.VotersForRound, error) + FlushCaches() } // EvaluatorOptions defines the evaluator creation options @@ -1500,6 +1501,9 @@ func (validator *evalTxValidator) run() { // AddBlock: Eval(context.Background(), l, blk, false, txcache, nil) // tracker: Eval(context.Background(), l, blk, false, txcache, nil) func Eval(ctx context.Context, l LedgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool) (ledgercore.StateDelta, error) { + // flush the pending writes in the cache to make everything read so far available during eval + l.FlushCaches() + eval, err := StartEvaluator(l, blk.BlockHeader, EvaluatorOptions{ PaysetHint: len(blk.Payset), diff --git a/ledger/internal/eval_test.go b/ledger/internal/eval_test.go index da73bc60ad..495ff60970 100644 --- a/ledger/internal/eval_test.go +++ b/ledger/internal/eval_test.go @@ -519,6 +519,8 @@ func (ledger *evalTestLedger) StartEvaluator(hdr bookkeeping.BlockHeader, payset }) } +func (ledger *evalTestLedger) FlushCaches() {} + // GetCreatorForRound takes a CreatableIndex and a CreatableType and tries to // look up a creator address, setting ok to false if the query succeeded but no // creator was found. diff --git a/ledger/internal/prefetcher/prefetcher_alignment_test.go b/ledger/internal/prefetcher/prefetcher_alignment_test.go index 5d78ce286d..2b553c974a 100644 --- a/ledger/internal/prefetcher/prefetcher_alignment_test.go +++ b/ledger/internal/prefetcher/prefetcher_alignment_test.go @@ -173,6 +173,7 @@ func (l *prefetcherAlignmentTestLedger) LatestTotals() (basics.Round, ledgercore func (l *prefetcherAlignmentTestLedger) VotersForStateProof(basics.Round) (*ledgercore.VotersForRound, error) { return nil, nil } +func (l *prefetcherAlignmentTestLedger) FlushCaches() {} func parseLoadedAccountDataEntries(loadedAccountDataEntries []prefetcher.LoadedAccountDataEntry) map[basics.Address]struct{} { if len(loadedAccountDataEntries) == 0 { diff --git a/ledger/ledger.go b/ledger/ledger.go index 99fe759668..09ec1b3cd3 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -812,6 +812,11 @@ func (l *Ledger) StartEvaluator(hdr bookkeeping.BlockHeader, paysetHint, maxTxnB }) } +// FlushCaches flushes any pending data in caches so that it is fully available during future lookups. +func (l *Ledger) FlushCaches() { + l.accts.flushCaches() +} + // Validate uses the ledger to validate block blk as a candidate next block. // It returns an error if blk is not the expected next block, or if blk is // not a valid block (e.g., it has duplicate transactions, overspends some diff --git a/ledger/lruaccts.go b/ledger/lruaccts.go index 2c8752c4b2..f698f9de76 100644 --- a/ledger/lruaccts.go +++ b/ledger/lruaccts.go @@ -37,6 +37,9 @@ type lruAccounts struct { log logging.Logger // pendingWritesWarnThreshold is the threshold beyond we would write a warning for exceeding the number of pendingAccounts entries pendingWritesWarnThreshold int + + pendingNotFound chan basics.Address + notFound map[basics.Address]struct{} } // init initializes the lruAccounts for use. @@ -45,6 +48,8 @@ func (m *lruAccounts) init(log logging.Logger, pendingWrites int, pendingWritesW m.accountsList = newPersistedAccountList().allocateFreeNodes(pendingWrites) m.accounts = make(map[basics.Address]*persistedAccountDataListNode, pendingWrites) m.pendingAccounts = make(chan persistedAccountData, pendingWrites) + m.notFound = make(map[basics.Address]struct{}, pendingWrites) + m.pendingNotFound = make(chan basics.Address, pendingWrites) m.log = log m.pendingWritesWarnThreshold = pendingWritesWarnThreshold } @@ -58,6 +63,13 @@ func (m *lruAccounts) read(addr basics.Address) (data persistedAccountData, has return persistedAccountData{}, false } +// readNotFound returns whether we have attempted to read this address but it did not exist in the db. +// thread locking semantics : read lock +func (m *lruAccounts) readNotFound(addr basics.Address) bool { + _, ok := m.notFound[addr] + return ok +} + // flushPendingWrites flushes the pending writes to the main lruAccounts cache. // thread locking semantics : write lock func (m *lruAccounts) flushPendingWrites() { @@ -65,12 +77,25 @@ func (m *lruAccounts) flushPendingWrites() { if pendingEntriesCount >= m.pendingWritesWarnThreshold { m.log.Warnf("lruAccounts: number of entries in pendingAccounts(%d) exceed the warning threshold of %d", pendingEntriesCount, m.pendingWritesWarnThreshold) } + +outer: for ; pendingEntriesCount > 0; pendingEntriesCount-- { select { case pendingAccountData := <-m.pendingAccounts: m.write(pendingAccountData) default: - return + break outer + } + } + + pendingEntriesCount = len(m.pendingNotFound) +outer2: + for ; pendingEntriesCount > 0; pendingEntriesCount-- { + select { + case addr := <-m.pendingNotFound: + m.notFound[addr] = struct{}{} + default: + break outer2 } } } @@ -85,6 +110,16 @@ func (m *lruAccounts) writePending(acct persistedAccountData) { } } +// writeNotFoundPending tags an address as not existing in the db. +// the function doesn't block, and in case of a buffer overflow the entry would not be added. +// thread locking semantics : no lock is required. +func (m *lruAccounts) writeNotFoundPending(addr basics.Address) { + select { + case m.pendingNotFound <- addr: + default: + } +} + // write a single persistedAccountData to the lruAccounts cache. // when writing the entry, the round number would be used to determine if it's a newer // version of what's already on the cache or not. In all cases, the entry is going @@ -117,5 +152,8 @@ func (m *lruAccounts) prune(newSize int) (removed int) { m.accountsList.remove(back) removed++ } + + // clear the notFound list + m.notFound = make(map[basics.Address]struct{}, len(m.notFound)) return } diff --git a/ledger/lruresources.go b/ledger/lruresources.go index 8ab62f0ff6..70a2a4c14e 100644 --- a/ledger/lruresources.go +++ b/ledger/lruresources.go @@ -48,6 +48,9 @@ type lruResources struct { // pendingWritesWarnThreshold is the threshold beyond we would write a warning for exceeding the number of pendingResources entries pendingWritesWarnThreshold int + + pendingNotFound chan accountCreatable + notFound map[accountCreatable]struct{} } // init initializes the lruResources for use. @@ -56,6 +59,8 @@ func (m *lruResources) init(log logging.Logger, pendingWrites int, pendingWrites m.resourcesList = newPersistedResourcesList().allocateFreeNodes(pendingWrites) m.resources = make(map[accountCreatable]*persistedResourcesDataListNode, pendingWrites) m.pendingResources = make(chan cachedResourceData, pendingWrites) + m.notFound = make(map[accountCreatable]struct{}, pendingWrites) + m.pendingNotFound = make(chan accountCreatable, pendingWrites) m.log = log m.pendingWritesWarnThreshold = pendingWritesWarnThreshold } @@ -69,6 +74,13 @@ func (m *lruResources) read(addr basics.Address, aidx basics.CreatableIndex) (da return persistedResourcesData{}, false } +// readNotFound returns whether we have attempted to read this address but it did not exist in the db. +// thread locking semantics : read lock +func (m *lruResources) readNotFound(addr basics.Address, idx basics.CreatableIndex) bool { + _, ok := m.notFound[accountCreatable{address: addr, index: idx}] + return ok +} + // read the persistedResourcesData object that the lruResources has for the given address. // thread locking semantics : read lock func (m *lruResources) readAll(addr basics.Address) (ret []persistedResourcesData) { @@ -87,12 +99,25 @@ func (m *lruResources) flushPendingWrites() { if pendingEntriesCount >= m.pendingWritesWarnThreshold { m.log.Warnf("lruResources: number of entries in pendingResources(%d) exceed the warning threshold of %d", pendingEntriesCount, m.pendingWritesWarnThreshold) } + +outer: for ; pendingEntriesCount > 0; pendingEntriesCount-- { select { case pendingResourceData := <-m.pendingResources: m.write(pendingResourceData.persistedResourcesData, pendingResourceData.address) default: - return + break outer + } + } + + pendingEntriesCount = len(m.pendingNotFound) +outer2: + for ; pendingEntriesCount > 0; pendingEntriesCount-- { + select { + case key := <-m.pendingNotFound: + m.notFound[key] = struct{}{} + default: + break outer2 } } } @@ -107,6 +132,16 @@ func (m *lruResources) writePending(acct persistedResourcesData, addr basics.Add } } +// writeNotFoundPending tags an address as not existing in the db. +// the function doesn't block, and in case of a buffer overflow the entry would not be added. +// thread locking semantics : no lock is required. +func (m *lruResources) writeNotFoundPending(addr basics.Address, idx basics.CreatableIndex) { + select { + case m.pendingNotFound <- accountCreatable{address: addr, index: idx}: + default: + } +} + // write a single persistedAccountData to the lruResources cache. // when writing the entry, the round number would be used to determine if it's a newer // version of what's already on the cache or not. In all cases, the entry is going @@ -139,5 +174,8 @@ func (m *lruResources) prune(newSize int) (removed int) { m.resourcesList.remove(back) removed++ } + + // clear the notFound list + m.notFound = make(map[accountCreatable]struct{}, len(m.notFound)) return } diff --git a/shared/pingpong/config.go b/shared/pingpong/config.go index f595b05f47..b5dd17bc5f 100644 --- a/shared/pingpong/config.go +++ b/shared/pingpong/config.go @@ -36,6 +36,7 @@ type PpConfig struct { RandomizeFee bool RandomizeAmt bool RandomizeDst bool + MaxRandomDst uint64 MaxFee uint64 MinFee uint64 MaxAmt uint64 @@ -98,6 +99,7 @@ var DefaultConfig = PpConfig{ RandomizeFee: false, RandomizeAmt: false, RandomizeDst: false, + MaxRandomDst: 200000, MaxFee: 10000, MinFee: 1000, MaxAmt: 1000, diff --git a/shared/pingpong/pingpong.go b/shared/pingpong/pingpong.go index 2983fc8c6e..30d25d6e9b 100644 --- a/shared/pingpong/pingpong.go +++ b/shared/pingpong/pingpong.go @@ -130,9 +130,10 @@ func (ppa *pingPongAccount) String() string { // WorkerState object holds a running pingpong worker type WorkerState struct { - cfg PpConfig - accounts map[string]*pingPongAccount - cinfo CreatablesInfo + cfg PpConfig + accounts map[string]*pingPongAccount + randomAccounts []string + cinfo CreatablesInfo nftStartTime int64 localNftIndex uint64 @@ -633,7 +634,11 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac *libgoal.Client) { // NewPingpong creates a new pingpong WorkerState func NewPingpong(cfg PpConfig) *WorkerState { - return &WorkerState{cfg: cfg, nftHolders: make(map[string]int)} + return &WorkerState{ + cfg: cfg, + nftHolders: make(map[string]int), + randomAccounts: make([]string, 0, cfg.MaxRandomDst), + } } func (pps *WorkerState) randAssetID() (aidx uint64) { @@ -703,11 +708,7 @@ func (pps *WorkerState) sendFromTo( fee := pps.fee() to := toList[i] - if pps.cfg.RandomizeDst { - var addr basics.Address - crypto.RandBytes(addr[:]) - to = addr.String() - } else if len(belowMinBalanceAccounts) > 0 && (crypto.RandUint64()%100 < 50) { + if len(belowMinBalanceAccounts) > 0 && (crypto.RandUint64()%100 < 50) { // make 50% of the calls attempt to refund low-balanced accounts. // ( if there is any ) // pick the first low balance account @@ -715,6 +716,20 @@ func (pps *WorkerState) sendFromTo( to = acct break } + } else if pps.cfg.RandomizeDst { + // check if we need to create a new random account, or use an existing one + if uint64(len(pps.randomAccounts)) >= pps.cfg.MaxRandomDst { + // use pre-created random account + i := rand.Int63n(int64(len(pps.randomAccounts))) + to = pps.randomAccounts[i] + } else { + // create new random account + var addr basics.Address + crypto.RandBytes(addr[:]) + to = addr.String() + // push new account + pps.randomAccounts = append(pps.randomAccounts, to) + } } // Broadcast transaction @@ -970,7 +985,11 @@ type paymentUpdate struct { func (au *paymentUpdate) apply(pps *WorkerState) { pps.accounts[au.from].balance -= (au.fee + au.amt) - pps.accounts[au.to].balance += au.amt + // update account balance + to := pps.accounts[au.to] + if to != nil { + to.balance += au.amt + } } // return true with probability 1/i