Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a test to ensure that tx commit order is properly tested #522

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 90 additions & 17 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func TestProcessAll(t *testing.T) {
http.ListenAndServe("localhost:6060", nil)
}()

var testTxTracerCommitIndexes = new([]int)

tests := []struct {
name string
workers int
Expand Down Expand Up @@ -400,7 +402,7 @@ func TestProcessAll(t *testing.T) {
workers: 10,
runs: 1,
addStores: true,
requests: addTxTracerToTxEntries(requestList(250)),
requests: addTxTracerToTxEntries(requestList(250), newTestResetTxTracer),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
wait := rand.Intn(10)
Expand All @@ -413,7 +415,7 @@ func TestProcessAll(t *testing.T) {
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

if v, ok := ctx.Context().Value("test_tracer").(*testTxTracer); ok {
if v, ok := ctx.Context().Value("test_tracer").(*testResetTxTracer); ok {
v.OnTxExecute()
}

Expand All @@ -434,6 +436,39 @@ func TestProcessAll(t *testing.T) {
},
expectedErr: nil,
},
{
name: "Test tx Commit done in requests order",
workers: 10,
runs: 1,
addStores: true,
requests: addTxTracerToTxEntries(requestList(250), func(txIndex int) sdk.TxTracer { return newTestCommitTxTracer(txIndex, testTxTracerCommitIndexes) }),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
wait := rand.Intn(10)
time.Sleep(time.Duration(wait) * time.Millisecond)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))
time.Sleep(time.Duration(wait) * time.Millisecond)
// write to the store with this tx's index
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: newVal,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
expectedCommitTxIndexes := make([]int, 250)
for i := 0; i < 250; i++ {
expectedCommitTxIndexes[i] = i
}

require.Equal(t, expectedCommitTxIndexes, *testTxTracerCommitIndexes)
},
expectedErr: nil,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -468,41 +503,79 @@ func TestProcessAll(t *testing.T) {
}
}

func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry) []*sdk.DeliverTxEntry {
func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry, factory func(txIndex int) sdk.TxTracer) []*sdk.DeliverTxEntry {
for _, txEntry := range txEntries {
txEntry.TxTracer = newTestTxTracer(txEntry.AbsoluteIndex)
txEntry.TxTracer = factory(txEntry.AbsoluteIndex)
}

return txEntries
}

var _ sdk.TxTracer = &testTxTracer{}

func newTestTxTracer(txIndex int) *testTxTracer {
return &testTxTracer{txIndex: txIndex, canExecute: true}
}
var _ sdk.TxTracer = testTxTracer{}

type testTxTracer struct {
txIndex int
canExecute bool
onCommit func()
onInjectInContext func(ctx sdk.Context) sdk.Context
onReset func()
}

func (t *testTxTracer) Commit() {
t.canExecute = false
func (t testTxTracer) Commit() {
if t.onCommit != nil {
t.onCommit()
}
}

func (t *testTxTracer) InjectInContext(ctx sdk.Context) sdk.Context {
func (t testTxTracer) InjectInContext(ctx sdk.Context) sdk.Context {
if t.onInjectInContext != nil {
return t.onInjectInContext(ctx)
}

return ctx.WithContext(context.WithValue(ctx.Context(), "test_tracer", t))
}

func (t *testTxTracer) Reset() {
t.canExecute = true
func (t testTxTracer) Reset() {
if t.onReset != nil {
t.onReset()
}
}

func newTestResetTxTracer(txIndex int) sdk.TxTracer {
var tracer *testResetTxTracer
tracer = &testResetTxTracer{
testTxTracer: testTxTracer{
onCommit: func() {
tracer.canExecute = false
},
onReset: func() {
tracer.canExecute = true
},
},

txIndex: txIndex,
canExecute: true,
}
return tracer
}

type testResetTxTracer struct {
testTxTracer

txIndex int
canExecute bool
}

func (t *testTxTracer) OnTxExecute() {
func (t *testResetTxTracer) OnTxExecute() {
if !t.canExecute {
panic(fmt.Errorf("task #%d was asked to execute but the tracer is not in the correct state, most probably due to missing Reset call or over execution", t.txIndex))
}

t.canExecute = false
}

func newTestCommitTxTracer(txIndex int, committedTxIndexes *[]int) sdk.TxTracer {
return &testTxTracer{
onCommit: func() {
*committedTxIndexes = append(*committedTxIndexes, txIndex)
},
}
}
Loading