From 2ca1da7c7e29a6bf3bfdc1e652458a3b9cf27134 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 4 Sep 2020 23:23:54 +0530 Subject: [PATCH 1/3] reset session on connection close for reserved connection when not in transaction Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 5e7387b6818..033a94741da 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -244,7 +244,7 @@ func (stc *ScatterConn) ExecuteMultiShard( func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) { if info.reservedID != 0 && info.transactionID == 0 { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) - if sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost { + if sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost || sqlErr.Number() == mysql.ERQueryInterrupted { session.ResetShard(info.alias) } } From 5517ae86b5419c5300102559d79f06de97bf8bdb Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 4 Sep 2020 23:24:33 +0530 Subject: [PATCH 2/3] add e2e test Signed-off-by: Harshit Gangal --- .../endtoend/vtgate/setstatement/main_test.go | 1 + .../endtoend/vtgate/setstatement/sysvar_test.go | 17 ++++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vtgate/setstatement/main_test.go b/go/test/endtoend/vtgate/setstatement/main_test.go index 0cfe9c0281e..e9c910cd0f1 100644 --- a/go/test/endtoend/vtgate/setstatement/main_test.go +++ b/go/test/endtoend/vtgate/setstatement/main_test.go @@ -115,6 +115,7 @@ func TestMain(m *testing.M) { SchemaSQL: sqlSchema, VSchema: vSchema, } + clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "5"} if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { return 1 } diff --git a/go/test/endtoend/vtgate/setstatement/sysvar_test.go b/go/test/endtoend/vtgate/setstatement/sysvar_test.go index 75b482ec6ca..93037cf1076 100644 --- a/go/test/endtoend/vtgate/setstatement/sysvar_test.go +++ b/go/test/endtoend/vtgate/setstatement/sysvar_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -164,8 +165,7 @@ func TestSetSystemVarWithTxFailure(t *testing.T) { assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) } -func TestSetSystemVarWithConnectionFailure(t *testing.T) { - t.Skip("failing at the moment") +func TestSetSystemVarWithConnectionTimeout(t *testing.T) { vtParams := mysql.ConnParams{ Host: "localhost", Port: clusterInstance.VtgateMySQLPort, @@ -180,13 +180,16 @@ func TestSetSystemVarWithConnectionFailure(t *testing.T) { checkedExec(t, conn, "set sql_safe_updates = 1") qr := checkedExec(t, conn, "select connection_id() from test where id = 80") - // kill the mysql connection shard which has transaction open. - vttablet1 := clusterInstance.Keyspaces[0].Shards[0].MasterTablet() // 80- - vttablet1.VttabletProcess.QueryTablet(fmt.Sprintf("kill %s", qr.Rows[0][0].ToString()), keyspaceName, false) + // Connection timeout. + time.Sleep(10 * time.Second) - // we still want to have our system setting applied + // first query will fail _, err = exec(t, conn, `select @@sql_safe_updates from test where id = 80`) - require.NoError(t, err) + require.Error(t, err) + + // we still want to have our system setting applied + newQR := checkedExec(t, conn, `select connection_id() from test where id = 80`) + require.NotEqual(t, qr.Rows[0][0], newQR.Rows[0][0]) } func TestSetSystemVariableAndThenSuccessfulTx(t *testing.T) { From 1d64f21a742c82abd41d8b886874ef5f6779778d Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 7 Sep 2020 13:14:11 +0530 Subject: [PATCH 3/3] check transaction ended error message on exec failure on reserved connection Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/setstatement/sysvar_test.go | 10 ++++++---- go/vt/vtgate/scatter_conn.go | 5 ++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/vtgate/setstatement/sysvar_test.go b/go/test/endtoend/vtgate/setstatement/sysvar_test.go index 93037cf1076..3bb237c9de3 100644 --- a/go/test/endtoend/vtgate/setstatement/sysvar_test.go +++ b/go/test/endtoend/vtgate/setstatement/sysvar_test.go @@ -178,7 +178,8 @@ func TestSetSystemVarWithConnectionTimeout(t *testing.T) { checkedExec(t, conn, "insert into test (id, val1) values (80, null)") checkedExec(t, conn, "set sql_safe_updates = 1") - qr := checkedExec(t, conn, "select connection_id() from test where id = 80") + qr := checkedExec(t, conn, "select @@sql_safe_updates, connection_id() from test where id = 80") + require.Equal(t, "1", qr.Rows[0][0].ToString()) // Connection timeout. time.Sleep(10 * time.Second) @@ -187,9 +188,10 @@ func TestSetSystemVarWithConnectionTimeout(t *testing.T) { _, err = exec(t, conn, `select @@sql_safe_updates from test where id = 80`) require.Error(t, err) - // we still want to have our system setting applied - newQR := checkedExec(t, conn, `select connection_id() from test where id = 80`) - require.NotEqual(t, qr.Rows[0][0], newQR.Rows[0][0]) + // this query is able to succeed. + newQR := checkedExec(t, conn, `select @@sql_safe_updates, connection_id() from test where id = 80`) + require.Equal(t, qr.Rows[0][0], newQR.Rows[0][0]) + require.NotEqual(t, qr.Rows[0][1], newQR.Rows[0][1]) } func TestSetSystemVariableAndThenSuccessfulTx(t *testing.T) { diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 033a94741da..e590e5e5f52 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -19,6 +19,7 @@ package vtgate import ( "flag" "io" + "regexp" "sync" "time" @@ -241,10 +242,12 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, allErrors.GetErrors() } +var errRegx = regexp.MustCompile("transaction ([a-z0-9:]+) ended") + func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) { if info.reservedID != 0 && info.transactionID == 0 { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) - if sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost || sqlErr.Number() == mysql.ERQueryInterrupted { + if sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost || (sqlErr.Number() == mysql.ERQueryInterrupted && errRegx.Match([]byte(sqlErr.Error()))) { session.ResetShard(info.alias) } }