Skip to content

Commit

Permalink
test: added more transaction resolver test
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Jul 18, 2024
1 parent 66b528f commit 648c380
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 14 deletions.
6 changes: 5 additions & 1 deletion go/test/endtoend/transaction/twopc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent,
}

func retrieveTransitions(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap map[string][]*querypb.Field, dtMap map[string]string) map[string][]string {
return retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 1*time.Second)
}

func retrieveTransitionsWithTimeout(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap map[string][]*querypb.Field, dtMap map[string]string, timeout time.Duration) map[string][]string {
logTable := make(map[string][]string)

keepWaiting := true
Expand All @@ -231,7 +235,7 @@ func retrieveTransitions(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap ma
if re.FieldEvent != nil {
tableMap[re.FieldEvent.TableName] = re.FieldEvent.Fields
}
case <-time.After(2 * time.Second):
case <-time.After(timeout):
keepWaiting = false
}
}
Expand Down
63 changes: 54 additions & 9 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -553,16 +554,15 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil)
require.NoError(t, err)

// This setting will return from VTGate after commit happens on metadata manager.
// The RMs will be left in PREPARE state to resolve.
// The caller ID is used to simulate the failure at the desired point.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil)
_, err = conn.Execute(newCtx, "commit", nil)
require.NoError(t, err)
require.ErrorContains(t, err, "Fail After MM commit")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
logTable := retrieveTransitions(t, ch, tableMap, dtMap)
logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second)
expectations := map[string][]string{
"ks.dt_state:80-": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
Expand Down Expand Up @@ -599,8 +599,6 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
// TestDTResolveAfterPrepare tests that transaction is rolled back on recovery
// failure after RM prepare and before MM commit.
func TestDTResolveAfterPrepare(t *testing.T) {
defer cleanup(t)

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
defer vtgateConn.Close()
Expand All @@ -622,16 +620,15 @@ func TestDTResolveAfterPrepare(t *testing.T) {
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil)
require.NoError(t, err)

// This setting will return from VTGate after commit happens on metadata manager.
// The RMs will be left in PREPARE state to resolve.
// The caller ID is used to simulate the failure at the desired point.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("RMPrepared_FailNow", "", ""), nil)
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After RM prepared")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
logTable := retrieveTransitions(t, ch, tableMap, dtMap)
logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second)
expectations := map[string][]string{
"ks.dt_state:80-": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
Expand All @@ -654,3 +651,51 @@ func TestDTResolveAfterPrepare(t *testing.T) {
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
}

// TestDTResolveAfterTransactionRecord tests that transaction is rolled back on recovery
// failure after TR created and before RM prepare.
func TestDTResolveAfterTransactionRecord(t *testing.T) {
vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
defer vtgateConn.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan *binlogdatapb.VEvent)
runVStream(t, ctx, ch, vtgateConn)

conn := vtgateConn.Session("", nil)
qCtx, cancel := context.WithCancel(context.Background())
defer cancel()
// Insert into multiple shards
_, err = conn.Execute(qCtx, "begin", nil)
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil)
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil)
require.NoError(t, err)

// The caller ID is used to simulate the failure at the desired point.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("TRCreated_FailNow", "", ""), nil)
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After TR created")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second)
expectations := map[string][]string{
"ks.dt_state:80-": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
"update:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]",
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]",
},
"ks.dt_participant:80-": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]",
},
}
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
}
17 changes: 13 additions & 4 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
return err
}

// retrieve the caller ID to execute the testing flow
callerID := callerid.EffectiveCallerIDFromContext(ctx)

// Test code to simulate a failure after RM prepare
if failNow, err := checkTestFailure(callerID, "TRCreated_FailNow"); failNow {
return err
}

err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
return txc.tabletGateway.Prepare(ctx, s.Target, s.TransactionId, dtid)
})
Expand All @@ -214,9 +222,6 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
return err
}

// retrieve the caller ID to execute the testing flow
callerID := callerid.EffectiveCallerIDFromContext(ctx)

// Test code to simulate a failure after RM prepare
if failNow, err := checkTestFailure(callerID, "RMPrepared_FailNow"); failNow {
return err
Expand Down Expand Up @@ -247,14 +252,18 @@ func checkTestFailure(callerID *vtrpcpb.CallerID, expectCaller string) (bool, er
return false, nil
}
switch callerID.Principal {
case "TRCreated_FailNow":
log.Errorf("Fail After TR created")
// no commit decision is made. Transaction should be a rolled back.
return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After TR created")
case "RMPrepared_FailNow":
log.Errorf("Fail After RM prepared")
// no commit decision is made. Transaction should be a rolled back.
return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared")
case "MMCommitted_FailNow":
log.Errorf("Fail After MM commit")
// commit decision is made. Transaction should be committed.
return true, nil
return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After MM commit")
default:
return false, nil
}
Expand Down

0 comments on commit 648c380

Please sign in to comment.