Skip to content

Commit

Permalink
Ctx management suggestion
Browse files Browse the repository at this point in the history
  • Loading branch information
algonautshant authored and algorandskiy committed Dec 8, 2022
1 parent 6f485c3 commit 8ebc577
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 54 deletions.
10 changes: 0 additions & 10 deletions data/accountManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,13 @@ func registryCloseTest(t testing.TB, registry account.ParticipationRegistry, dbf
}

func TestAccountManagerKeysRegistry(t *testing.T) {
if testing.Short() {
t.Log("this is a long test and skipping for -short")
return
}

partitiontest.PartitionTest(t)
registry, dbName := getRegistryImpl(t, false, true)
defer registryCloseTest(t, registry, dbName)
testAccountManagerKeys(t, registry, true)
}

func testAccountManagerKeys(t *testing.T, registry account.ParticipationRegistry, flushRegistry bool) {
if testing.Short() {
t.Log("this is a long test and skipping for -short")
return
}

log := logging.TestingLog(t)
log.SetLevel(logging.Error)

Expand Down
8 changes: 2 additions & 6 deletions data/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestLedgerCirculation(t *testing.T) {
cfg := config.GetDefaultLocal()
cfg.Archival = true
log := logging.TestingLog(t)
log.SetLevel(logging.Error)
log.SetLevel(logging.Warn)
realLedger, err := ledger.OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
require.NoError(t, err, "could not open ledger")
defer realLedger.Close()
Expand Down Expand Up @@ -325,10 +325,6 @@ func TestLedgerSeed(t *testing.T) {
}

func TestConsensusVersion(t *testing.T) {
if testing.Short() {
t.Log("this is a long test and skipping for -short")
return
}
partitiontest.PartitionTest(t)

// find a consensus protocol that leads to ConsensusCurrentVersion
Expand Down Expand Up @@ -517,7 +513,7 @@ func TestLedgerErrorValidate(t *testing.T) {
defer realLedger.Close()

l := Ledger{Ledger: realLedger, log: log}
l.log.SetLevel(logging.Warn)
l.log.SetLevel(logging.Debug)
require.NotNil(t, &l)

totalsRound, _, err := realLedger.LatestTotals()
Expand Down
15 changes: 6 additions & 9 deletions data/txDupCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,21 @@ type txSaltedCache struct {
ctx context.Context
}

func makeSaltedCache(ctx context.Context, size int, refreshInterval time.Duration) *txSaltedCache {
c := &txSaltedCache{
digestCache: digestCache{
cur: map[crypto.Digest]struct{}{},
maxSize: size,
},
ctx: ctx,
func makeSaltedCache(size int) *txSaltedCache {
return &txSaltedCache{
digestCache: *makeDigestCache(size),
}
}

func (c *txSaltedCache) start(ctx context.Context, refreshInterval time.Duration) {
c.ctx = ctx
if refreshInterval != 0 {
go c.salter(refreshInterval)
}

c.mu.Lock()
defer c.mu.Unlock()
c.moreSalt()

return c
}

// salter is a goroutine refreshing the cache by schedule
Expand Down
22 changes: 13 additions & 9 deletions data/txDupCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

func TestTxHandlerDigestCache(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

const size = 20
cache := makeDigestCache(size)
Expand Down Expand Up @@ -116,10 +116,11 @@ func (c *txSaltedCache) check(msg []byte) bool {
// TestTxHandlerSaltedCacheBasic is the same as TestTxHandlerDigestCache but for the salted cache
func TestTxHandlerSaltedCacheBasic(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

const size = 20
cache := makeSaltedCache(context.Background(), size, 0)
cache := makeSaltedCache(size)
cache.start(context.Background(), 0)
require.Zero(t, cache.Len())

// add some unique random
Expand Down Expand Up @@ -197,13 +198,13 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) {
}

func TestTxHandlerSaltedCacheScheduled(t *testing.T) {
return
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

const size = 20
updateInterval := 1000 * time.Microsecond
cache := makeSaltedCache(context.Background(), size, updateInterval)
cache := makeSaltedCache(size)
cache.start(context.Background(), updateInterval)
require.Zero(t, cache.Len())

// add some unique random
Expand All @@ -224,10 +225,11 @@ func TestTxHandlerSaltedCacheScheduled(t *testing.T) {

func TestTxHandlerSaltedCacheManual(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

const size = 20
cache := makeSaltedCache(context.Background(), 2*size, 0)
cache := makeSaltedCache(2 * size)
cache.start(context.Background(), 0)
require.Zero(t, cache.Len())

// add some unique random
Expand Down Expand Up @@ -291,7 +293,9 @@ func (m digestCacheMaker) make(size int) cachePusher {
return &digestCachePusher{c: makeDigestCache(size)}
}
func (m saltedCacheMaker) make(size int) cachePusher {
return &saltedCachePusher{c: makeSaltedCache(context.Background(), size, 0)}
scp := &saltedCachePusher{c: makeSaltedCache(size)}
scp.c.start(context.Background(), 0)
return scp
}

type digestCachePusher struct {
Expand Down
12 changes: 3 additions & 9 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func MakeTxHandler(opts TxHandlerOpts) *TxHandler {
return nil
}

ctx, ctxCancel := context.WithCancel(context.Background())
handler := &TxHandler{
txPool: opts.TxPool,
genesisID: opts.GenesisID,
Expand All @@ -133,22 +132,17 @@ func MakeTxHandler(opts TxHandlerOpts) *TxHandler {
backlogQueue: make(chan *txBacklogMsg, txBacklogSize),
postVerificationQueue: make(chan *txBacklogMsg, txBacklogSize),
net: opts.Net,
msgCache: makeSaltedCache(ctx, 2*txBacklogSize, 60*time.Second),
msgCache: makeSaltedCache(2 * txBacklogSize),
txCanonicalCache: makeDigestCache(2 * txBacklogSize),
cacheConfig: txHandlerConfig{opts.Config.TxFilterRawMsgEnabled(), opts.Config.TxFilterCanonicalEnabled()},
ctx: ctx,
ctxCancel: ctxCancel,
}
return handler
}

// Start enables the processing of incoming messages at the transaction handler
func (handler *TxHandler) Start() {
if handler.ctx.Err() != nil {
// existing context has gone, create a new one
handler.ctx, handler.ctxCancel = context.WithCancel(context.Background())
handler.msgCache = makeSaltedCache(handler.ctx, 2*txBacklogSize, 60*time.Second)
}
handler.ctx, handler.ctxCancel = context.WithCancel(context.Background())
handler.msgCache.start(handler.ctx, 60*time.Second)
handler.net.RegisterHandlers([]network.TaggedMessageHandler{
{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)},
})
Expand Down
24 changes: 13 additions & 11 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func makeRandomTransactions(num int) ([]transactions.SignedTxn, []byte) {

func TestTxHandlerProcessIncomingTxn(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

const numTxns = 11
handler := makeTestTxHandlerOrphaned(1)
Expand Down Expand Up @@ -282,7 +282,7 @@ func ipow(x, n int) int {

func TestPow(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

require.Equal(t, 1, ipow(10, 0))
require.Equal(t, 10, ipow(10, 1))
Expand Down Expand Up @@ -529,7 +529,7 @@ func BenchmarkTxHandlerIncDeDup(b *testing.B) {

func TestTxHandlerProcessIncomingGroup(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

type T struct {
inputSize int
Expand Down Expand Up @@ -567,7 +567,7 @@ func TestTxHandlerProcessIncomingGroup(t *testing.T) {

func TestTxHandlerProcessIncomingCensoring(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

craftNonCanonical := func(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) []byte {
// make non-canonical encoding and ensure it is not accepted
Expand Down Expand Up @@ -725,12 +725,14 @@ func makeTestTxHandlerOrphanedWithContext(ctx context.Context, backlogSize int,
if cacheSize <= 0 {
cacheSize = txBacklogSize
}
return &TxHandler{
handler := &TxHandler{
backlogQueue: make(chan *txBacklogMsg, backlogSize),
msgCache: makeSaltedCache(ctx, cacheSize, refreshInterval),
msgCache: makeSaltedCache(cacheSize),
txCanonicalCache: makeDigestCache(cacheSize),
cacheConfig: txHandlerConfig{true, true},
}
handler.msgCache.start(ctx, refreshInterval)
return handler
}

func makeTestTxHandler(dl *Ledger, cfg config.Local) *TxHandler {
Expand All @@ -744,7 +746,7 @@ func makeTestTxHandler(dl *Ledger, cfg config.Local) *TxHandler {

func TestTxHandlerProcessIncomingCache(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

handler := makeTestTxHandlerOrphaned(20)

Expand Down Expand Up @@ -820,7 +822,7 @@ func TestTxHandlerProcessIncomingCache(t *testing.T) {

func TestTxHandlerProcessIncomingCacheRotation(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

stxns1, blob1 := makeRandomTransactions(1)
require.Equal(t, 1, len(stxns1))
Expand Down Expand Up @@ -894,7 +896,7 @@ func TestTxHandlerProcessIncomingCacheRotation(t *testing.T) {
// TestTxHandlerProcessIncomingCacheBacklogDrop checks if dropped messages are also removed from caches
func TestTxHandlerProcessIncomingCacheBacklogDrop(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

handler := makeTestTxHandlerOrphanedWithContext(context.Background(), 1, 20, 0)

Expand Down Expand Up @@ -1491,7 +1493,7 @@ func runHandlerBenchmarkWithBacklog(maxGroupSize, tps int, invalidRate float32,

func TestTxHandlerPostProcessError(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

collect := func() map[string]float64 {
// collect all specific error reason metrics except TxGroupErrorReasonNotWellFormed,
Expand Down Expand Up @@ -1548,7 +1550,7 @@ func TestTxHandlerPostProcessError(t *testing.T) {

func TestTxHandlerPostProcessErrorWithVerify(t *testing.T) {
partitiontest.PartitionTest(t)
//t.Parallel()
t.Parallel()

txn := transactions.Transaction{}
stxn := transactions.SignedTxn{Txn: txn}
Expand Down

0 comments on commit 8ebc577

Please sign in to comment.