diff --git a/go/test/endtoend/vtgate/setstatement/main_test.go b/go/test/endtoend/vtgate/setstatement/main_test.go index 0cfe9c0281e..e9c910cd0f1 100644 --- a/go/test/endtoend/vtgate/setstatement/main_test.go +++ b/go/test/endtoend/vtgate/setstatement/main_test.go @@ -115,6 +115,7 @@ func TestMain(m *testing.M) { SchemaSQL: sqlSchema, VSchema: vSchema, } + clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "5"} if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { return 1 } diff --git a/go/test/endtoend/vtgate/setstatement/sysvar_test.go b/go/test/endtoend/vtgate/setstatement/sysvar_test.go index 75b482ec6ca..3bb237c9de3 100644 --- a/go/test/endtoend/vtgate/setstatement/sysvar_test.go +++ b/go/test/endtoend/vtgate/setstatement/sysvar_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -164,8 +165,7 @@ func TestSetSystemVarWithTxFailure(t *testing.T) { assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) } -func TestSetSystemVarWithConnectionFailure(t *testing.T) { - t.Skip("failing at the moment") +func TestSetSystemVarWithConnectionTimeout(t *testing.T) { vtParams := mysql.ConnParams{ Host: "localhost", Port: clusterInstance.VtgateMySQLPort, @@ -178,15 +178,20 @@ func TestSetSystemVarWithConnectionFailure(t *testing.T) { checkedExec(t, conn, "insert into test (id, val1) values (80, null)") checkedExec(t, conn, "set sql_safe_updates = 1") - qr := checkedExec(t, conn, "select connection_id() from test where id = 80") + qr := checkedExec(t, conn, "select @@sql_safe_updates, connection_id() from test where id = 80") + require.Equal(t, "1", qr.Rows[0][0].ToString()) - // kill the mysql connection shard which has transaction open. - vttablet1 := clusterInstance.Keyspaces[0].Shards[0].MasterTablet() // 80- - vttablet1.VttabletProcess.QueryTablet(fmt.Sprintf("kill %s", qr.Rows[0][0].ToString()), keyspaceName, false) + // Connection timeout. + time.Sleep(10 * time.Second) - // we still want to have our system setting applied + // first query will fail _, err = exec(t, conn, `select @@sql_safe_updates from test where id = 80`) - require.NoError(t, err) + require.Error(t, err) + + // this query is able to succeed. + newQR := checkedExec(t, conn, `select @@sql_safe_updates, connection_id() from test where id = 80`) + require.Equal(t, qr.Rows[0][0], newQR.Rows[0][0]) + require.NotEqual(t, qr.Rows[0][1], newQR.Rows[0][1]) } func TestSetSystemVariableAndThenSuccessfulTx(t *testing.T) { diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 5e7387b6818..e590e5e5f52 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -19,6 +19,7 @@ package vtgate import ( "flag" "io" + "regexp" "sync" "time" @@ -241,10 +242,12 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, allErrors.GetErrors() } +var errRegx = regexp.MustCompile("transaction ([a-z0-9:]+) ended") + func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) { if info.reservedID != 0 && info.transactionID == 0 { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) - if sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost { + if sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost || (sqlErr.Number() == mysql.ERQueryInterrupted && errRegx.Match([]byte(sqlErr.Error()))) { session.ResetShard(info.alias) } }