Skip to content

Commit

Permalink
test: added more e2e test on commit flow failure and handling
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Jul 18, 2024
1 parent 6f83313 commit 66b528f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 21 deletions.
23 changes: 12 additions & 11 deletions go/test/endtoend/transaction/twopc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,23 @@ func start(t *testing.T) (*mysql.Conn, func()) {
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)

deleteAll := func() {
tables := []string{"twopc_user"}
for _, table := range tables {
_, _ = utils.ExecAllowError(t, conn, "delete from "+table)
}
}

deleteAll()

return conn, func() {
deleteAll()
conn.Close()
cluster.PanicHandler(t)
cleanup(t)
}
}

func cleanup(t *testing.T) {
cluster.PanicHandler(t)

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

_, _ = utils.ExecAllowError(t, conn, "delete from twopc_user")
}

type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value

var tables = map[string]extractInterestingValues{
Expand Down
68 changes: 65 additions & 3 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,11 @@ func compareMaps(t *testing.T, expected, actual map[string][]string, flexibleExp
}
}

// TestDTResolver tests transaction resolver for distributed transaction on a failure after commit decision is made.
func TestDTResolver(t *testing.T) {
// TestDTResolveAfterMMCommit tests that transaction is committed on recovery
// failure after MM commit.
func TestDTResolveAfterMMCommit(t *testing.T) {
defer cleanup(t)

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
defer vtgateConn.Close()
Expand Down Expand Up @@ -552,7 +555,7 @@ func TestDTResolver(t *testing.T) {

// This setting will return from VTGate after commit happens on metadata manager.
// The RMs will be left in PREPARE state to resolve.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommit_FailNow", "", ""), nil)
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil)
_, err = conn.Execute(newCtx, "commit", nil)
require.NoError(t, err)

Expand Down Expand Up @@ -592,3 +595,62 @@ func TestDTResolver(t *testing.T) {
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
}

// TestDTResolveAfterPrepare tests that transaction is rolled back on recovery
// failure after RM prepare and before MM commit.
func TestDTResolveAfterPrepare(t *testing.T) {
defer cleanup(t)

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
defer vtgateConn.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan *binlogdatapb.VEvent)
runVStream(t, ctx, ch, vtgateConn)

conn := vtgateConn.Session("", nil)
qCtx, cancel := context.WithCancel(context.Background())
defer cancel()
// Insert into multiple shards
_, err = conn.Execute(qCtx, "begin", nil)
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil)
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil)
require.NoError(t, err)

// This setting will return from VTGate after commit happens on metadata manager.
// The RMs will be left in PREPARE state to resolve.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("RMPrepared_FailNow", "", ""), nil)
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After RM prepared")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
logTable := retrieveTransitions(t, ch, tableMap, dtMap)
expectations := map[string][]string{
"ks.dt_state:80-": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
"update:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]",
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]",
},
"ks.dt_participant:80-": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]",
},
"ks.redo_state:-80": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
},
}
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
}
37 changes: 30 additions & 7 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,24 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
return err
}

// retrieve the caller ID to execute the testing flow
callerID := callerid.EffectiveCallerIDFromContext(ctx)

// Test code to simulate a failure after RM prepare
if failNow, err := checkTestFailure(callerID, "RMPrepared_FailNow"); failNow {
return err
}

err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
if err != nil {
return err
}

// Test code to simulate a failure in the middle of a 2PC commit.
callerID := callerid.EffectiveCallerIDFromContext(ctx)
if callerID != nil && callerID.String() == "MMCommit_FailNow" {
log.Errorf("Fail After MM commit")
// as commit decision is made. Transaction is a commit.
return nil

// Test code to simulate a failure after MM commit
if failNow, err := checkTestFailure(callerID, "MMCommitted_FailNow"); failNow {
return err
}

err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
return txc.tabletGateway.CommitPrepared(ctx, s.Target, dtid)
})
Expand All @@ -237,6 +242,24 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid)
}

func checkTestFailure(callerID *vtrpcpb.CallerID, expectCaller string) (bool, error) {
if callerID == nil || callerID.GetPrincipal() != expectCaller {
return false, nil
}
switch callerID.Principal {
case "RMPrepared_FailNow":
log.Errorf("Fail After RM prepared")
// no commit decision is made. Transaction should be a rolled back.
return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared")
case "MMCommitted_FailNow":
log.Errorf("Fail After MM commit")
// commit decision is made. Transaction should be committed.
return true, nil
default:
return false, nil
}
}

// Rollback rolls back the current transaction. There are no retries on this operation.
func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error {
if !session.InTransaction() {
Expand Down

0 comments on commit 66b528f

Please sign in to comment.