diff --git a/go/test/endtoend/vtgate/reservedconn/main_test.go b/go/test/endtoend/vtgate/reservedconn/main_test.go index e3b5ec78567..ebe133a2c81 100644 --- a/go/test/endtoend/vtgate/reservedconn/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/main_test.go @@ -126,10 +126,8 @@ func TestMain(m *testing.M) { } // Start vtgate - clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s"} - vtgateProcess := clusterInstance.NewVtgateInstance() - vtgateProcess.SysVarSetEnabled = true - if err := vtgateProcess.Setup(); err != nil { + clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s", "-enable_system_settings=true"} // enable reserved connection. + if err := clusterInstance.StartVtgate(); err != nil { return 1 } diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go new file mode 100644 index 00000000000..598d23345fe --- /dev/null +++ b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reservedconn + +import ( + "context" + "flag" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sqlSchema = `create table test(id bigint primary key)Engine=InnoDB;` + + vSchema = ` + { + "sharded":true, + "vindexes": { + "hash_index": { + "type": "hash" + } + }, + "tables": { + "test":{ + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + } + ] + } + } + } + ` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s", "-enable_system_settings=true"} // enable reserved connection. + if err := clusterInstance.StartVtgate(); err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func TestServingChange(t *testing.T) { + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + checkedExec(t, conn, "use @rdonly") + checkedExec(t, conn, "set sql_mode = ''") + + // to see rdonly is available and + // also this will create reserved connection on rdonly on -80 and 80- shards. + _, err = exec(t, conn, "select * from test") + for err != nil { + _, err = exec(t, conn, "select * from test") + } + + // changing rdonly tablet to spare (non serving). + rdonlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "spare") + require.NoError(t, err) + + // this should fail as there is no rdonly present + _, err = exec(t, conn, "select * from test") + require.Error(t, err) + + // changing replica tablet to rdonly to make rdonly available for serving. + replicaTablet := clusterInstance.Keyspaces[0].Shards[0].Replica() + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "rdonly") + require.NoError(t, err) + + // to see/make the new rdonly available + err = clusterInstance.VtctlclientProcess.ExecuteCommand("Ping", replicaTablet.Alias) + require.NoError(t, err) + + // this should pass now as there is rdonly present + _, err = exec(t, conn, "select * from test") + assert.NoError(t, err) +} + +func exec(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) { + t.Helper() + return conn.ExecuteFetch(query, 1000, true) +} + +func checkedExec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + require.NoError(t, err) + return qr +} diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go new file mode 100644 index 00000000000..627f203647e --- /dev/null +++ b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reservedconn + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sqlSchema = `create table test(id bigint primary key)Engine=InnoDB;` + + vSchema = ` + { + "sharded":true, + "vindexes": { + "hash_index": { + "type": "hash" + } + }, + "tables": { + "test":{ + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + } + ] + } + } + } + ` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "5"} + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s", "-enable_system_settings=true"} // enable reserved connection. + if err := clusterInstance.StartVtgate(); err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func TestTabletChange(t *testing.T) { + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + checkedExec(t, conn, "use @master") + checkedExec(t, conn, "set sql_mode = ''") + + // this will create reserved connection on master on -80 and 80- shards. + checkedExec(t, conn, "select * from test") + + // Change Master + err = clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", fmt.Sprintf("%s/%s", keyspaceName, "-80")) + require.NoError(t, err) + + // this should pass as there is new master tablet and is serving. + _, err = exec(t, conn, "select * from test") + assert.NoError(t, err) +} + +func exec(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) { + t.Helper() + return conn.ExecuteFetch(query, 1000, true) +} + +func checkedExec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + require.NoError(t, err) + return qr +} diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index c762eb48bec..bbb1ec98baa 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -21,20 +21,18 @@ import ( "sort" "sync" - "vitess.io/vitess/go/sync2" - "github.com/golang/protobuf/proto" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/sandboxconn" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var ( @@ -146,16 +144,23 @@ func (fhc *FakeHealthCheck) ReplaceTablet(old, new *topodatapb.Tablet) { } // TabletConnection returns the TabletConn of the given tablet. -func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) { +func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { aliasStr := topoproto.TabletAliasString(alias) fhc.mu.RLock() defer fhc.mu.RUnlock() for _, item := range fhc.items { if proto.Equal(alias, item.ts.Tablet.Alias) { + if !item.ts.Serving { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) + } + if target != nil && !proto.Equal(item.ts.Target, target) { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, item.ts.Target, target) + } + return item.ts.Conn, nil } } - return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet %v not found", aliasStr) + return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "tablet %v not found", aliasStr) } // CacheStatus returns the status for each tablet diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 4af9dfd7202..90a64a788aa 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -45,19 +45,18 @@ import ( "sync" "time" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/queryservice" + "github.com/golang/protobuf/proto" "vitess.io/vitess/go/flagutil" - - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/queryservice" ) var ( @@ -177,7 +176,7 @@ type HealthCheck interface { WaitForAllServingTablets(ctx context.Context, targets []*query.Target) error // TabletConnection returns the TabletConn of the given tablet. - TabletConnection(alias *topodata.TabletAlias) (queryservice.QueryService, error) + TabletConnection(alias *topodata.TabletAlias, target *query.Target) (queryservice.QueryService, error) // RegisterStats registers the connection counts stats RegisterStats() @@ -693,7 +692,7 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query. } // TabletConnection returns the Connection to a given tablet. -func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias) (queryservice.QueryService, error) { +func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target *query.Target) (queryservice.QueryService, error) { hc.mu.Lock() thc := hc.healthByAlias[tabletAliasString(topoproto.TabletAliasString(alias))] hc.mu.Unlock() @@ -701,6 +700,12 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias) (querys //TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } + if !thc.Serving { + return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, vterrors.NotServing) + } + if target != nil && !proto.Equal(thc.Target, target) { + return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, thc.Target, target) + } return thc.Connection(), nil } diff --git a/go/vt/srvtopo/resolver.go b/go/vt/srvtopo/resolver.go index 7134b0b9852..e668e0e248c 100644 --- a/go/vt/srvtopo/resolver.go +++ b/go/vt/srvtopo/resolver.go @@ -40,7 +40,7 @@ type Gateway interface { queryservice.QueryService // QueryServiceByAlias returns a QueryService - QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) + QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) } // A Resolver can resolve keyspace ids and key ranges into ResolvedShard* diff --git a/go/vt/vterrors/constants.go b/go/vt/vterrors/constants.go new file mode 100644 index 00000000000..d66a97464d7 --- /dev/null +++ b/go/vt/vterrors/constants.go @@ -0,0 +1,34 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vterrors + +import "regexp" + +// Operation not allowed error +const ( + NotServing = "operation not allowed in state NOT_SERVING" + ShuttingDown = "operation not allowed in state SHUTTING_DOWN" +) + +// RxOp regex for operation not allowed error +var RxOp = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)") + +// WrongTablet for invalid tablet type error +const WrongTablet = "wrong tablet type" + +// RxWrongTablet regex for invalid tablet type error +var RxWrongTablet = regexp.MustCompile("(wrong|invalid) tablet type") diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index e324e998efd..b5d24c28829 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -405,6 +405,6 @@ func (dg *DiscoveryGateway) getStatsAggregator(target *querypb.Target) *TabletSt } // QueryServiceByAlias satisfies the Gateway interface -func (dg *DiscoveryGateway) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) { +func (dg *DiscoveryGateway) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *querypb.Target) (queryservice.QueryService, error) { return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "DiscoveryGateway does not implement QueryServiceByAlias") } diff --git a/go/vt/vtgate/gateway.go b/go/vt/vtgate/gateway.go index 1398ad5419e..86a015ab77a 100644 --- a/go/vt/vtgate/gateway.go +++ b/go/vt/vtgate/gateway.go @@ -17,6 +17,8 @@ import ( "flag" "time" + querypb "vitess.io/vitess/go/vt/proto/query" + "context" "vitess.io/vitess/go/vt/log" @@ -68,7 +70,7 @@ type Gateway interface { TabletsCacheStatus() discovery.TabletsCacheStatusList // TabletByAlias returns a QueryService - QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) + QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) } // Creator is the factory method which can create the actual gateway object. diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index 05db3b7ffa4..dd63c2cd301 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -533,7 +533,7 @@ func removeShard(tabletAlias *topodatapb.TabletAlias, sessions []*vtgatepb.Sessi } } if idx == -1 { - return sessions, nil + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] tried to remove missing shard") } return append(sessions[:idx], sessions[idx+1:]...), nil } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index ad8df6b6a0c..ea8e27b26c9 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -145,6 +145,14 @@ func (stc *ScatterConn) endAction(startTime time.Time, allErrors *concurrency.Al stc.timings.Record(statsKey, startTime) } +type reset int + +const ( + none reset = iota + shard + newQS +) + // ExecuteMultiShard is like Execute, // but each shard gets its own Sql Queries and BindVariables. // @@ -206,60 +214,82 @@ func (stc *ScatterConn) ExecuteMultiShard( if autocommit { // As this is auto-commit, the transactionID is supposed to be zero. - if info.transactionID != int64(0) { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", info.transactionID) + if transactionID != int64(0) { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", transactionID) } } qs, err = getQueryService(rs, info) if err != nil { - return nil, err + // an error here could mean that the tablet we were targeting earlier has changed type. + // if we have a transaction, we'll have to fail, but if we only had a reserved connection, + // we can create a new reserved connection to a new tablet that is on the right shard + // and has the right type + switch info.actionNeeded { + case nothing: + info.actionNeeded = reserve + case begin: + info.actionNeeded = reserveBegin + default: + return nil, err + } + retry := checkAndResetShardSession(info, err, session) + if retry != newQS { + return nil, err + } + qs = rs.Gateway + } + + retryRequest := func(exec func()) { + retry := checkAndResetShardSession(info, err, session) + switch retry { + case newQS: + // Current tablet is not available, try querying new tablet using gateway. + qs = rs.Gateway + fallthrough + case shard: + // if we need to reset a reserved connection, here is our chance to try executing again, + // against a new connection + exec() + } } switch info.actionNeeded { case nothing: innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts) if err != nil { - shouldRetry := checkAndResetShardSession(info, err, session) - if shouldRetry { - // we seem to have lost our connection. if it was a reserved connection, let's try to recreate it + retryRequest(func() { + // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserve innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts) - } - if err != nil { - return info.updateReservedID(reservedID, alias), err - } + }) } case begin: - innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, info.reservedID, opts) + innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, reservedID, opts) if err != nil { if transactionID != 0 { - return info.updateTransactionID(transactionID, alias), err + // if we had an open transaction, we can't repair anything and have to exit here. + // we still keep the transaction open - an error doesn't immediately close the transaction + break } - shouldRetry := checkAndResetShardSession(info, err, session) - if shouldRetry { - // we seem to have lost our connection. if it was a reserved connection, let's try to recreate it + retryRequest(func() { + // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserveBegin innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) - } - if err != nil { - return info.updateTransactionAndReservedID(transactionID, reservedID, alias), err - } - + }) } case reserve: - innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, info.transactionID, opts) - if err != nil { - return info.updateReservedID(reservedID, alias), err - } + innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts) case reserveBegin: innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) - if err != nil { - return info.updateTransactionAndReservedID(transactionID, reservedID, alias), err - } default: return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded) } + // We need to new shard info irrespective of the error. + newInfo := info.updateTransactionAndReservedID(transactionID, reservedID, alias) + if err != nil { + return newInfo, err + } mu.Lock() defer mu.Unlock() @@ -267,7 +297,7 @@ func (stc *ScatterConn) ExecuteMultiShard( if ignoreMaxMemoryRows || len(qr.Rows) <= *maxMemoryRows { qr.AppendResult(innerqr) } - return info.updateTransactionAndReservedID(transactionID, reservedID, alias), nil + return newInfo, nil }, ) @@ -278,28 +308,32 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, allErrors.GetErrors() } -var errRegx = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)") - -func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) bool { - if info.reservedID != 0 && info.transactionID == 0 && wasConnectionClosed(err) { - session.ResetShard(info.alias) - return true +func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) reset { + retry := none + if info.reservedID != 0 && info.transactionID == 0 { + if wasConnectionClosed(err) { + retry = shard + } + if requireNewQS(err) { + retry = newQS + } } - return false + if retry != none { + _ = session.ResetShard(info.alias) + } + return retry } func getQueryService(rs *srvtopo.ResolvedShard, info *shardActionInfo) (queryservice.QueryService, error) { _, usingLegacyGw := rs.Gateway.(*DiscoveryGateway) - if usingLegacyGw { - switch info.actionNeeded { - case reserve, reserveBegin: - return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "reserved connections are not supported on old gen gateway") - } + if usingLegacyGw && + (info.actionNeeded == reserve || info.actionNeeded == reserveBegin) { + return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "reserved connections are not supported on old gen gateway") } if usingLegacyGw || info.alias == nil { return rs.Gateway, nil } - return rs.Gateway.QueryServiceByAlias(info.alias) + return rs.Gateway.QueryServiceByAlias(info.alias, rs.Target) } func (stc *ScatterConn) processOneStreamingResult(mu *sync.Mutex, fieldSent *bool, qr *sqltypes.Result, callback func(*sqltypes.Result) error) error { @@ -675,12 +709,21 @@ func (stc *ScatterConn) ExecuteLock( return qr, err } +var txClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)") + func wasConnectionClosed(err error) bool { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) + message := sqlErr.Error() return sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost || - (sqlErr.Number() == mysql.ERQueryInterrupted && errRegx.MatchString(sqlErr.Error())) + (sqlErr.Number() == mysql.ERQueryInterrupted && txClosed.MatchString(message)) +} + +func requireNewQS(err error) bool { + code := vterrors.Code(err) + msg := err.Error() + return code == vtrpcpb.Code_FAILED_PRECONDITION && (vterrors.RxOp.MatchString(msg) || vterrors.RxWrongTablet.MatchString(msg)) } // actionInfo looks at the current session, and returns information about what needs to be done for this tablet @@ -738,25 +781,9 @@ type shardActionInfo struct { alias *topodatapb.TabletAlias } -func (sai *shardActionInfo) updateTransactionID(txID int64, alias *topodatapb.TabletAlias) *shardActionInfo { - if txID == 0 { - // As transaction id is ZERO, there is nothing to update in session shard sessions. - return nil - } - return sai.updateTransactionAndReservedID(txID, sai.reservedID, alias) -} - -func (sai *shardActionInfo) updateReservedID(rID int64, alias *topodatapb.TabletAlias) *shardActionInfo { - if rID == 0 { - // As reserved id is ZERO, there is nothing to update in session shard sessions. - return nil - } - return sai.updateTransactionAndReservedID(sai.transactionID, rID, alias) -} - func (sai *shardActionInfo) updateTransactionAndReservedID(txID int64, rID int64, alias *topodatapb.TabletAlias) *shardActionInfo { - if txID == 0 && rID == 0 { - // As transaction id and reserved id is ZERO, there is nothing to update in session shard sessions. + if txID == sai.transactionID && rID == sai.reservedID { + // As transaction id and reserved id have not changed, there is nothing to update in session shard sessions. return nil } newInfo := *sai diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 5f84a56246d..4a5a8dcd8d5 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -19,6 +19,8 @@ package vtgate import ( "testing" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/mysql" @@ -321,6 +323,68 @@ func TestReservedConnFail(t *testing.T) { assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") require.Equal(t, 1, len(session.ShardSessions)) assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + oldRId = session.Session.ShardSessions[0].ReservedId + + sbc0.Queries = nil + sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "operation not allowed in state NOT_SERVING during query: query1") + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + oldRId = session.Session.ShardSessions[0].ReservedId + + sbc0.Queries = nil + sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "invalid tablet type: REPLICA, want: MASTER or MASTER") + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + oldRId = session.Session.ShardSessions[0].ReservedId + oldAlias := session.Session.ShardSessions[0].TabletAlias + + // Test Setup + tablet0 := sbc0.Tablet() + ths := hc.GetHealthyTabletStats(&querypb.Target{ + Keyspace: tablet0.GetKeyspace(), + Shard: tablet0.GetShard(), + TabletType: tablet0.GetType(), + }) + sbc0Th := ths[0] + sbc0Th.Serving = false + sbc0Rep := hc.AddTestTablet("aa", "0", 2, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + + sbc0.Queries = nil + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 0, len(sbc0.Queries), "no attempt should be made as the tablet is not serving") + assert.Equal(t, 1, len(sbc0Rep.Queries), "first attempt should pass as it is healthy") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + assert.NotEqual(t, oldAlias, session.Session.ShardSessions[0].TabletAlias, "tablet alias should have changed as this is a different tablet") + oldRId = session.Session.ShardSessions[0].ReservedId + oldAlias = session.Session.ShardSessions[0].TabletAlias + + // Test Setup + tablet0Rep := sbc0Rep.Tablet() + newThs := hc.GetHealthyTabletStats(&querypb.Target{ + Keyspace: tablet0Rep.GetKeyspace(), + Shard: tablet0Rep.GetShard(), + TabletType: tablet0Rep.GetType(), + }) + sbc0RepTh := newThs[0] + sbc0RepTh.Target = &querypb.Target{ + Keyspace: tablet0Rep.GetKeyspace(), + Shard: tablet0Rep.GetShard(), + TabletType: topodatapb.TabletType_SPARE, + } + sbc0Th.Serving = true + + sbc0Rep.Queries = nil + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 1, len(sbc0.Queries), "first attempt should pass as it is healthy and matches the target") + assert.Equal(t, 0, len(sbc0Rep.Queries), " no attempt should be made as the tablet target is changed") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + assert.NotEqual(t, oldAlias, session.Session.ShardSessions[0].TabletAlias, "tablet alias should have changed as this is a different tablet") } func TestIsConnClosed(t *testing.T) { diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 5556cafdb80..504945bd70a 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -132,8 +132,8 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop } // QueryServiceByAlias satisfies the Gateway interface -func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) { - return gw.hc.TabletConnection(alias) +func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { + return gw.hc.TabletConnection(alias, target) } // RegisterStats registers the stats to export the lag since the last refresh diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 4747dcf2664..b056b8db508 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -87,7 +87,7 @@ func (txc *TxConn) queryService(alias *topodatapb.TabletAlias) (queryservice.Que if qs != nil { return qs, nil } - return txc.gateway.QueryServiceByAlias(alias) + return txc.gateway.QueryServiceByAlias(alias, nil) } func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSession) error { diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 63d8819d516..c99e826b0dd 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -481,7 +481,7 @@ func (sbc *SandboxConn) VStreamResults(ctx context.Context, target *querypb.Targ } // QueryServiceByAlias is part of the Gateway interface. -func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) { +func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *querypb.Target) (queryservice.QueryService, error) { return sbc, nil } diff --git a/go/vt/vttablet/tabletconntest/fakequeryservice.go b/go/vt/vttablet/tabletconntest/fakequeryservice.go index 84849ea06ea..9fb15d053d5 100644 --- a/go/vt/vttablet/tabletconntest/fakequeryservice.go +++ b/go/vt/vttablet/tabletconntest/fakequeryservice.go @@ -720,7 +720,7 @@ func (f *FakeQueryService) VStreamResults(ctx context.Context, target *querypb.T } // QueryServiceByAlias satisfies the Gateway interface -func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) { +func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *querypb.Target) (queryservice.QueryService, error) { panic("not implemented") } diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index c1498e43f44..39cab911fd2 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -348,36 +348,19 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target if sm.state != StateServing || !sm.replHealthy { // This specific error string needs to be returned for vtgate buffering to work. - return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "operation not allowed in state NOT_SERVING") + return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) } shuttingDown := sm.wantState != StateServing if shuttingDown && !allowOnShutdown { // This specific error string needs to be returned for vtgate buffering to work. - return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "operation not allowed in state SHUTTING_DOWN") + return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.ShuttingDown) } - if target != nil { - switch { - case target.Keyspace != sm.target.Keyspace: - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid keyspace %v does not match expected %v", target.Keyspace, sm.target.Keyspace) - case target.Shard != sm.target.Shard: - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid shard %v does not match expected %v", target.Shard, sm.target.Shard) - case target.TabletType != sm.target.TabletType: - for _, otherType := range sm.alsoAllow { - if target.TabletType == otherType { - goto ok - } - } - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid tablet type: %v, want: %v or %v", target.TabletType, sm.target.TabletType, sm.alsoAllow) - } - } else { - if !tabletenv.IsLocalContext(ctx) { - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target") - } + err = sm.verifyTargetLocked(ctx, target) + if err != nil { + return err } - -ok: sm.requests.Add(1) return nil } @@ -392,6 +375,10 @@ func (sm *stateManager) EndRequest() { func (sm *stateManager) VerifyTarget(ctx context.Context, target *querypb.Target) error { sm.mu.Lock() defer sm.mu.Unlock() + return sm.verifyTargetLocked(ctx, target) +} + +func (sm *stateManager) verifyTargetLocked(ctx context.Context, target *querypb.Target) error { if target != nil { switch { case target.Keyspace != sm.target.Keyspace: @@ -404,7 +391,7 @@ func (sm *stateManager) VerifyTarget(ctx context.Context, target *querypb.Target return nil } } - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid tablet type: %v, want: %v or %v", target.TabletType, sm.target.TabletType, sm.alsoAllow) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: %v, want: %v or %v", vterrors.WrongTablet, sm.target.TabletType, sm.target.TabletType, sm.alsoAllow) } } else { if !tabletenv.IsLocalContext(ctx) { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 577dbc8f6ad..53396e052ba 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -531,9 +531,9 @@ func TestStateManagerValidations(t *testing.T) { target.Shard = "" target.TabletType = topodatapb.TabletType_REPLICA err = sm.StartRequest(ctx, target, false) - assert.Contains(t, err.Error(), "invalid tablet type") + assert.Contains(t, err.Error(), "wrong tablet type") err = sm.VerifyTarget(ctx, target) - assert.Contains(t, err.Error(), "invalid tablet type") + assert.Contains(t, err.Error(), "wrong tablet type") sm.alsoAllow = []topodatapb.TabletType{topodatapb.TabletType_REPLICA} err = sm.StartRequest(ctx, target, false) diff --git a/test/config.json b/test/config.json index e111dfe4832..b33c0dca1ca 100644 --- a/test/config.json +++ b/test/config.json @@ -562,6 +562,24 @@ "RetryMax": 0, "Tags": [] }, + "vtgate_reserved_conn1": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/reservedconn/reconnect1"], + "Command": [], + "Manual": false, + "Shard": "17", + "RetryMax": 0, + "Tags": [] + }, + "vtgate_reserved_conn2": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/reservedconn/reconnect2"], + "Command": [], + "Manual": false, + "Shard": "17", + "RetryMax": 0, + "Tags": [] + }, "vtgate_transaction": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction"],