diff --git a/doc/TabletRouting.md b/doc/TabletRouting.md index 6d32f5f8ae1..167975e6469 100644 --- a/doc/TabletRouting.md +++ b/doc/TabletRouting.md @@ -93,36 +93,7 @@ There are two implementations of the Gateway interface: discovery section, one per cell) as a source of tablets, a HealthCheck module to watch their health, and a TabletStatsCache to collect all the health information. Based on this data, it can find the best tablet to use. -* l2VTGateGateway: It keeps a map of l2vtgate processes to send queries to. See - next section for more details. -## l2vtgate - -As we started increasing the number of tablets in a cell, it became clear that a -bottleneck of the system was going to be how many tablets a single vtgate is -connecting to. Since vtgate maintains a streaming health check connection per -tablet, the number of these connections can grow to large numbers. It is common -for vtgate to watch tablets in other cells, to be able to find the master -tablet. - -So l2vtgate came to exist, based on very similar concepts and interfaces: - -* l2vtgate is an extra hop between a vtgate pool and tablets. -* A l2vtgate pool connects to a subset of tablets, therefore it can have a - reasonable number of streaming health connections. Externally, it exposes the - QueryService RPC interface (that has the Target for the query, keyspace / - shard / tablet type). Internally, it uses a discoveryGateway, as usual. -* vtgate connects to l2vtgate pools (using the l2VTGateGateway instead of the - discoveryGateway). It has a map of which keyspace / shard / tablet type needs - to go to wich l2vtgate pool. At this point, vtgate doesn't maintain any health - information about the tablets, it lets l2vtgate handle it. - -Note l2vtgate is not an ideal solution as it is now. For instance, if there are -two cells, and the master for a shard can be in either, l2vtgate still has to -watch the tablets in both cells, to know where the master is. Ideally, we'd want -l2vtgate to be collocated with the tablets in a given cell, and not go -cross-cell. - # Extensions, work in progress ## Regions, cross-cell targeting @@ -169,31 +140,6 @@ between vtgate and l2vtgate: This would also be a good time to merge the vtgate code that uses the VSchema with the code that doesn't for SrvKeyspace access. -## Hybrid Gateway - -It would be nice to re-organize the code a bit inside vtgate to allow for an -hybrid gateway, and get rid of l2vtgate alltogether: - -* vtgate would use the discoveryGateway to watch the tablets in the current cell - (and optionally to any other cell we still want to consider local). -* vtgate would use l2vtgateGateway to watch the tablets in a different cell. -* vtgate would expose the RPC APIs currently exposed by the l2vtgate process. - -So vtgate would watch the tablets in the local cell only, but also know what -healthy tablets are in the other cells, and be able to send query to them -through their vtgate. The extra hop to the other cell vtgate should be a small -latency price to pay, compared to going cross-cell already. - -So queries would go one of two routes: - -* client(cell1) -> vtgate(cell1) -> tablet(cell1) -* client(cell1) -> vtgate(cell1) -> vtgate(cell2) -> tablet(cell2) - -If the number of tablets in a given cell is still too high for the local vtgate -pool, two or more pools can still be created, each of them knowing about a -subset of the tablets. And they would just forward queries to each others when -addressing the other tablet set. - ## Config-based routing Another possible extension would be to group all routing options for vtgate in a diff --git a/go/cmd/vtgate/plugin_grpcqueryservice.go b/go/cmd/vtgate/plugin_grpcqueryservice.go deleted file mode 100644 index 16c163d095c..00000000000 --- a/go/cmd/vtgate/plugin_grpcqueryservice.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -// Imports and register the gRPC queryservice server - -import ( - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vtgate" - "vitess.io/vitess/go/vt/vttablet/grpcqueryservice" - "vitess.io/vitess/go/vt/vttablet/queryservice" -) - -func init() { - vtgate.RegisterL2VTGates = append(vtgate.RegisterL2VTGates, func(qs queryservice.QueryService) { - if servenv.GRPCCheckServiceMap("queryservice") { - grpcqueryservice.Register(servenv.GRPCServer, qs) - } - }) -} diff --git a/go/vt/discovery/tablet_stats_cache.go b/go/vt/discovery/tablet_stats_cache.go index 13b60a9225f..7fa1809e5ce 100644 --- a/go/vt/discovery/tablet_stats_cache.go +++ b/go/vt/discovery/tablet_stats_cache.go @@ -68,7 +68,7 @@ type tabletStatsCacheEntry struct { all map[string]*TabletStats // healthy only has the healthy ones. healthy []*TabletStats - // aggregates has the per-cell aggregates. + // aggregates has the per-region aggregates. aggregates map[string]*querypb.AggregateStats } @@ -141,7 +141,6 @@ func newTabletStatsCache(hc HealthCheck, ts *topo.Server, cell string, setListen // upon type change. hc.SetListener(tc, true /*sendDownEvents*/) } - go tc.broadcastAggregateStats() return tc } @@ -266,18 +265,18 @@ func (tc *TabletStatsCache) StatsUpdate(ts *TabletStats) { tc.updateAggregateMap(ts.Target.Keyspace, ts.Target.Shard, ts.Target.TabletType, e, allArray) } -// MakeAggregateMap takes a list of TabletStats and builds a per-cell +// makeAggregateMap takes a list of TabletStats and builds a per-region // AggregateStats map. -func MakeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats { +func (tc *TabletStatsCache) makeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats { result := make(map[string]*querypb.AggregateStats) for _, ts := range stats { - cell := ts.Tablet.Alias.Cell - agg, ok := result[cell] + region := tc.getRegionByCell(ts.Tablet.Alias.Cell) + agg, ok := result[region] if !ok { agg = &querypb.AggregateStats{ SecondsBehindMasterMin: math.MaxUint32, } - result[cell] = agg + result[region] = agg } if ts.Serving && ts.LastError == nil { @@ -295,101 +294,12 @@ func MakeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats { return result } -// MakeAggregateMapDiff computes the entries that need to be broadcast -// when the map goes from oldMap to newMap. -func MakeAggregateMapDiff(keyspace, shard string, tabletType topodatapb.TabletType, ter int64, oldMap map[string]*querypb.AggregateStats, newMap map[string]*querypb.AggregateStats) []*srvtopo.TargetStatsEntry { - var result []*srvtopo.TargetStatsEntry - for cell, oldValue := range oldMap { - newValue, ok := newMap[cell] - if ok { - // We have both an old and a new value. If equal, - // skip it. - if oldValue.HealthyTabletCount == newValue.HealthyTabletCount && - oldValue.UnhealthyTabletCount == newValue.UnhealthyTabletCount && - oldValue.SecondsBehindMasterMin == newValue.SecondsBehindMasterMin && - oldValue.SecondsBehindMasterMax == newValue.SecondsBehindMasterMax { - continue - } - // The new value is different, send it. - result = append(result, &srvtopo.TargetStatsEntry{ - Target: &querypb.Target{ - Keyspace: keyspace, - Shard: shard, - TabletType: tabletType, - Cell: cell, - }, - Stats: newValue, - TabletExternallyReparentedTimestamp: ter, - }) - } else { - // We only have the old value, send an empty - // record to clear it. - result = append(result, &srvtopo.TargetStatsEntry{ - Target: &querypb.Target{ - Keyspace: keyspace, - Shard: shard, - TabletType: tabletType, - Cell: cell, - }, - }) - } - } - - for cell, newValue := range newMap { - if _, ok := oldMap[cell]; ok { - continue - } - // New value, no old value, just send it. - result = append(result, &srvtopo.TargetStatsEntry{ - Target: &querypb.Target{ - Keyspace: keyspace, - Shard: shard, - TabletType: tabletType, - Cell: cell, - }, - Stats: newValue, - TabletExternallyReparentedTimestamp: ter, - }) - } - return result -} - // updateAggregateMap will update the aggregate map for the // tabletStatsCacheEntry. It may broadcast the changes too if we have listeners. // e.mu needs to be locked. func (tc *TabletStatsCache) updateAggregateMap(keyspace, shard string, tabletType topodatapb.TabletType, e *tabletStatsCacheEntry, stats []*TabletStats) { // Save the new value - oldAgg := e.aggregates - newAgg := MakeAggregateMap(stats) - e.aggregates = newAgg - - // And broadcast the change in the background, if we need to. - tc.mu.RLock() - if !tc.tsm.HasSubscribers() { - // Shortcut: no subscriber, we can be done. - tc.mu.RUnlock() - return - } - tc.mu.RUnlock() - - var ter int64 - if len(stats) > 0 { - ter = stats[0].TabletExternallyReparentedTimestamp - } - diffs := MakeAggregateMapDiff(keyspace, shard, tabletType, ter, oldAgg, newAgg) - tc.aggregatesChan <- diffs -} - -// broadcastAggregateStats is called in the background to send aggregate stats -// in the right order to our subscribers. -func (tc *TabletStatsCache) broadcastAggregateStats() { - for diffs := range tc.aggregatesChan { - tc.mu.RLock() - for _, d := range diffs { - tc.tsm.Broadcast(d) - } - tc.mu.RUnlock() - } + e.aggregates = tc.makeAggregateMap(stats) } // GetTabletStats returns the full list of available targets. @@ -436,51 +346,6 @@ func (tc *TabletStatsCache) ResetForTesting() { tc.entries = make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry) } -// Subscribe is part of the TargetStatsListener interface. -func (tc *TabletStatsCache) Subscribe() (int, []srvtopo.TargetStatsEntry, <-chan (*srvtopo.TargetStatsEntry), error) { - var allTS []srvtopo.TargetStatsEntry - - // Make sure the map cannot change. Also blocks any update from - // propagating. - tc.mu.Lock() - defer tc.mu.Unlock() - for keyspace, shardMap := range tc.entries { - for shard, typeMap := range shardMap { - for tabletType, e := range typeMap { - e.mu.RLock() - var ter int64 - if len(e.healthy) > 0 { - ter = e.healthy[0].TabletExternallyReparentedTimestamp - } - for cell, agg := range e.aggregates { - allTS = append(allTS, srvtopo.TargetStatsEntry{ - Target: &querypb.Target{ - Keyspace: keyspace, - Shard: shard, - TabletType: tabletType, - Cell: cell, - }, - Stats: agg, - TabletExternallyReparentedTimestamp: ter, - }) - } - e.mu.RUnlock() - } - } - } - - // Now create the listener, add it to our list. - id, c := tc.tsm.Subscribe() - return id, allTS, c, nil -} - -// Unsubscribe is part of the TargetStatsListener interface. -func (tc *TabletStatsCache) Unsubscribe(i int) error { - tc.mu.Lock() - defer tc.mu.Unlock() - return tc.tsm.Unsubscribe(i) -} - // GetAggregateStats is part of the TargetStatsListener interface. func (tc *TabletStatsCache) GetAggregateStats(target *querypb.Target) (*querypb.AggregateStats, error) { e := tc.getEntry(target.Keyspace, target.Shard, target.TabletType) @@ -498,7 +363,8 @@ func (tc *TabletStatsCache) GetAggregateStats(target *querypb.Target) (*querypb. return agg, nil } } - agg, ok := e.aggregates[target.Cell] + targetRegion := tc.getRegionByCell(target.Cell) + agg, ok := e.aggregates[targetRegion] if !ok { return nil, topo.NewError(topo.NoNode, topotools.TargetIdent(target)) } @@ -530,4 +396,3 @@ func (tc *TabletStatsCache) GetMasterCell(keyspace, shard string) (cell string, // Compile-time interface check. var _ HealthCheckStatsListener = (*TabletStatsCache)(nil) -var _ srvtopo.TargetStatsListener = (*TabletStatsCache)(nil) diff --git a/go/vt/srvtopo/target_stats.go b/go/vt/srvtopo/target_stats.go index ed49e41f02f..6af44eedb81 100644 --- a/go/vt/srvtopo/target_stats.go +++ b/go/vt/srvtopo/target_stats.go @@ -27,9 +27,6 @@ import ( // routing of queries. // - discovery.TabletStatsCache will implement the discovery part of the // interface, and discoverygateway will have the QueryService. -// - hybridgateway will also implement this interface: for each l2vtgate pool, -// it will establish a StreamHealth connection, and store the returned -// health stats. type TargetStats interface { // GetAggregateStats returns the aggregate stats for the given Target. // The srvtopo module will use that information to route queries @@ -45,23 +42,6 @@ type TargetStats interface { GetMasterCell(keyspace, shard string) (cell string, qs queryservice.QueryService, err error) } -// TargetStatsListener is an interface used to propagate TargetStats changes. -// - discovery.TabletStatsCache will implement this interface. -// - the StreamHealth method in l2vtgate will use this interface to surface -// the health of its targets. -type TargetStatsListener interface { - // Subscribe will return the current full state of the TargetStats, - // and a channel that will receive subsequent updates. The int returned - // is the channel id, and can be sent to unsubscribe to stop - // notifications. - Subscribe() (int, []TargetStatsEntry, <-chan (*TargetStatsEntry), error) - - // Unsubscribe stops sending updates to the channel returned - // by Subscribe. The channel still needs to be drained to - // avoid deadlocks. - Unsubscribe(int) error -} - // TargetStatsEntry has the updated information for a Target. type TargetStatsEntry struct { // Target is what this entry applies to. diff --git a/go/vt/vtgate/gateway/discoverygateway.go b/go/vt/vtgate/gateway/discoverygateway.go index 9b075d2f593..26731f72e69 100644 --- a/go/vt/vtgate/gateway/discoverygateway.go +++ b/go/vt/vtgate/gateway/discoverygateway.go @@ -210,12 +210,6 @@ func (dg *discoveryGateway) GetMasterCell(keyspace, shard string) (string, query return cell, dg, err } -// StreamHealth is not forwarded to any other tablet, -// but we handle it directly here. -func (dg *discoveryGateway) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { - return StreamHealthFromTargetStatsListener(ctx, dg.tsc, callback) -} - // Close shuts down underlying connections. // This function hides the inner implementation. func (dg *discoveryGateway) Close(ctx context.Context) error { diff --git a/go/vt/vtgate/gateway/discoverygateway_test.go b/go/vt/vtgate/gateway/discoverygateway_test.go index 4f0680f5763..510a86d459c 100644 --- a/go/vt/vtgate/gateway/discoverygateway_test.go +++ b/go/vt/vtgate/gateway/discoverygateway_test.go @@ -206,6 +206,107 @@ func TestShuffleTablets(t *testing.T) { } } +func TestDiscoveryGatewayGetAggregateStats(t *testing.T) { + keyspace := "ks" + shard := "0" + hc := discovery.NewFakeHealthCheck() + dg := createDiscoveryGateway(hc, nil, "cell1", 2).(*discoveryGateway) + + // replica should only use local ones + hc.Reset() + dg.tsc.ResetForTesting() + hc.AddTestTablet("cell1", "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) + hc.AddTestTablet("cell1", "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) + target := &querypb.Target{ + Keyspace: keyspace, + Shard: shard, + TabletType: topodatapb.TabletType_REPLICA, + Cell: "cell1", + } + tsl, err := dg.tsc.GetAggregateStats(target) + if err != nil { + t.Error(err) + } + if tsl.HealthyTabletCount != 2 { + t.Errorf("Expected 2 healthy replica tablets, got: %v", tsl.HealthyTabletCount) + } +} + +func TestDiscoveryGatewayGetAggregateStatsRegion(t *testing.T) { + keyspace := "ks" + shard := "0" + hc := discovery.NewFakeHealthCheck() + dg := createDiscoveryGateway(hc, nil, "local-east", 2).(*discoveryGateway) + + topo.UpdateCellsToRegionsForTests(map[string]string{ + "local-west": "local", + "local-east": "local", + "remote": "remote", + }) + + hc.Reset() + dg.tsc.ResetForTesting() + hc.AddTestTablet("remote", "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) + hc.AddTestTablet("local-west", "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) + hc.AddTestTablet("local-east", "3.3.3.3", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) + + // Non master targets in the same region as the gateway should be discoverable + target := &querypb.Target{ + Keyspace: keyspace, + Shard: shard, + TabletType: topodatapb.TabletType_REPLICA, + Cell: "local-west", + } + tsl, err := dg.tsc.GetAggregateStats(target) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if tsl.HealthyTabletCount != 2 { + t.Errorf("Expected 2 healthy replica tablets, got: %v", tsl.HealthyTabletCount) + } +} + +func TestDiscoveryGatewayGetAggregateStatsMaster(t *testing.T) { + keyspace := "ks" + shard := "0" + hc := discovery.NewFakeHealthCheck() + dg := createDiscoveryGateway(hc, nil, "cell1", 2).(*discoveryGateway) + + // replica should only use local ones + hc.Reset() + dg.tsc.ResetForTesting() + hc.AddTestTablet("cell1", "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_MASTER, true, 10, nil) + target := &querypb.Target{ + Keyspace: keyspace, + Shard: shard, + TabletType: topodatapb.TabletType_MASTER, + Cell: "cell1", + } + tsl, err := dg.tsc.GetAggregateStats(target) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if tsl.HealthyTabletCount != 1 { + t.Errorf("Expected one healthy master, got: %v", tsl.HealthyTabletCount) + } + + // You can get aggregate regardless of the cell when requesting a master + target = &querypb.Target{ + Keyspace: keyspace, + Shard: shard, + TabletType: topodatapb.TabletType_MASTER, + Cell: "cell2", + } + + tsl, err = dg.tsc.GetAggregateStats(target) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if tsl.HealthyTabletCount != 1 { + t.Errorf("Expected one healthy master, got: %v", tsl.HealthyTabletCount) + } +} + func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) { keyspace := "ks" shard := "0" @@ -230,6 +331,50 @@ func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) { } } +func BenchmarkOneCellGetAggregateStats(b *testing.B) { benchmarkCellsGetAggregateStats(1, b) } + +func BenchmarkTenCellGetAggregateStats(b *testing.B) { benchmarkCellsGetAggregateStats(10, b) } + +func Benchmark100CellGetAggregateStats(b *testing.B) { benchmarkCellsGetAggregateStats(100, b) } + +func Benchmark1000CellGetAggregateStats(b *testing.B) { benchmarkCellsGetAggregateStats(1000, b) } + +func benchmarkCellsGetAggregateStats(i int, b *testing.B) { + keyspace := "ks" + shard := "0" + hc := discovery.NewFakeHealthCheck() + dg := createDiscoveryGateway(hc, nil, "cell0", 2).(*discoveryGateway) + cellsToregions := make(map[string]string) + for j := 0; j < i; j++ { + cell := fmt.Sprintf("cell%v", j) + cellsToregions[cell] = "local" + } + + topo.UpdateCellsToRegionsForTests(cellsToregions) + hc.Reset() + dg.tsc.ResetForTesting() + + for j := 0; j < i; j++ { + cell := fmt.Sprintf("cell%v", j) + ip := fmt.Sprintf("%v.%v.%v,%v", j, j, j, j) + hc.AddTestTablet(cell, ip, 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) + } + + target := &querypb.Target{ + Keyspace: keyspace, + Shard: shard, + TabletType: topodatapb.TabletType_REPLICA, + Cell: "cell0", + } + + for n := 0; n < b.N; n++ { + _, err := dg.tsc.GetAggregateStats(target) + if err != nil { + b.Fatalf("Expected no error, got %v", err) + } + } +} + func testDiscoveryGatewayGeneric(t *testing.T, streaming bool, f func(dg Gateway, target *querypb.Target) error) { keyspace := "ks" shard := "0" diff --git a/go/vt/vtgate/gateway/gateway.go b/go/vt/vtgate/gateway/gateway.go index 6dc71db18c3..d9f5166ea4f 100644 --- a/go/vt/vtgate/gateway/gateway.go +++ b/go/vt/vtgate/gateway/gateway.go @@ -20,7 +20,6 @@ package gateway import ( "flag" - "fmt" "time" "golang.org/x/net/context" @@ -31,7 +30,6 @@ import ( "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/vttablet/queryservice" - querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -128,53 +126,3 @@ func WaitForTablets(gw Gateway, tabletTypesToWait []topodatapb.TabletType) error } return err } - -// StreamHealthFromTargetStatsListener responds to a StreamHealth -// streaming RPC using a srvtopo.TargetStatsListener implementation. -func StreamHealthFromTargetStatsListener(ctx context.Context, l srvtopo.TargetStatsListener, callback func(*querypb.StreamHealthResponse) error) error { - // Subscribe to the TargetStatsListener aggregate stats. - id, entries, c, err := l.Subscribe() - if err != nil { - return err - } - defer func() { - // Unsubscribe so we don't receive more updates, and - // drain the channel. - l.Unsubscribe(id) - for range c { - } - }() - - // Send all current entries. - for _, e := range entries { - shr := &querypb.StreamHealthResponse{ - Target: e.Target, - TabletExternallyReparentedTimestamp: e.TabletExternallyReparentedTimestamp, - AggregateStats: e.Stats, - } - if err := callback(shr); err != nil { - return err - } - } - - // Now listen for updates, or the end of the connection. - for { - select { - case <-ctx.Done(): - return ctx.Err() - case e, ok := <-c: - if !ok { - // Channel is closed, should never happen. - return fmt.Errorf("channel closed") - } - shr := &querypb.StreamHealthResponse{ - Target: e.Target, - TabletExternallyReparentedTimestamp: e.TabletExternallyReparentedTimestamp, - AggregateStats: e.Stats, - } - if err := callback(shr); err != nil { - return err - } - } - } -} diff --git a/go/vt/vtgate/gateway/hybridgateway.go b/go/vt/vtgate/gateway/hybridgateway.go deleted file mode 100644 index a2a1b0f3f7c..00000000000 --- a/go/vt/vtgate/gateway/hybridgateway.go +++ /dev/null @@ -1,202 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gateway - -import ( - "fmt" - - "golang.org/x/net/context" - - "vitess.io/vitess/go/stats" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vttablet/queryservice" -) - -// HybridGateway implements the gateway.Gateway interface by forwarding -// the queries to the right underlying implementation: -// - it has one gateway that watches for tablets. Usually a DiscoveryGateway. -// Useful for local tablets, or remote tablets that can be accessed. -// - it has a list of remote vtgate connections to talk to l2 vtgate processes. -// Useful for remote tablets that are far away, or if the number of local -// tablets grows too big. -// -// Note the WaitForTablets method for now only waits on the local gateway. -type HybridGateway struct { - queryservice.QueryService - - // gw is the local gateway that has the local connections. - gw Gateway - - // l2vtgates is the list of remote connections to other vtgate pools. - l2vtgates []*L2VTGateConn -} - -// NewHybridGateway returns a new HybridGateway based on the provided -// parameters. gw can be nil, in which case it is assumed there is no -// local tablets. -func NewHybridGateway(gw Gateway, addrs []string, retryCount int) (*HybridGateway, error) { - h := &HybridGateway{ - gw: gw, - } - - for i, addr := range addrs { - conn, err := NewL2VTGateConn(fmt.Sprintf("%v", i), addr, retryCount) - if err != nil { - h.Close(context.Background()) - return nil, fmt.Errorf("dialing %v failed: %v", addr, err) - } - h.l2vtgates = append(h.l2vtgates, conn) - } - - h.QueryService = queryservice.Wrap(nil, h.route) - return h, nil -} - -// Close is part of the queryservice.QueryService interface. -func (h *HybridGateway) Close(ctx context.Context) error { - for _, l := range h.l2vtgates { - l.Close(ctx) - } - return nil -} - -// WaitForTablets is part of the Gateway interface. -// We just forward to the local Gateway, if any. -func (h *HybridGateway) WaitForTablets(ctx context.Context, tabletTypesToWait []topodatapb.TabletType) error { - if h.gw != nil { - return h.gw.WaitForTablets(ctx, tabletTypesToWait) - } - - // No local tablets, we don't wait for anything here. - return nil -} - -// RegisterStats registers the l2vtgate connection counts stats. -func (h *HybridGateway) RegisterStats() { - stats.NewCountersFuncWithMultiLabels( - "L2VtgateConnections", - "number of l2vtgate connection", - []string{"Keyspace", "ShardName", "TabletType"}, - h.servingConnStats) -} - -func (h *HybridGateway) servingConnStats() map[string]int64 { - res := make(map[string]int64) - for _, l := range h.l2vtgates { - l.servingConnStats(res) - } - return res -} - -// CacheStatus is part of the Gateway interface. It just concatenates -// all statuses from all underlying parts. -func (h *HybridGateway) CacheStatus() TabletCacheStatusList { - var result TabletCacheStatusList - - // Start with the local Gateway part. - if h.gw != nil { - result = h.gw.CacheStatus() - } - - // Then add each gateway one at a time. - for _, l := range h.l2vtgates { - partial := l.CacheStatus() - result = append(result, partial...) - } - - return result -} - -// route sends the action to the right underlying implementation. -// This doesn't retry, and doesn't collect stats, as these two are -// done by the underlying gw or l2VTGateConn. -// -// FIXME(alainjobart) now we only use gw, or the one l2vtgates we have. -// Need to deprecate this code in favor of using GetAggregateStats. -func (h *HybridGateway) route(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, inner func(context.Context, *querypb.Target, queryservice.QueryService) (error, bool)) error { - if h.gw != nil { - err, _ := inner(ctx, target, h.gw) - return NewShardError(err, target, nil, inTransaction) - } - if len(h.l2vtgates) == 1 { - err, _ := inner(ctx, target, h.l2vtgates[0]) - return NewShardError(err, target, nil, inTransaction) - } - return NewShardError(topo.NewError(topo.NoNode, ""), target, nil, inTransaction) -} - -// GetAggregateStats is part of the srvtopo.TargetStats interface, included -// in the gateway.Gateway interface. -func (h *HybridGateway) GetAggregateStats(target *querypb.Target) (*querypb.AggregateStats, queryservice.QueryService, error) { - // Start with the local Gateway part. - if h.gw != nil { - stats, qs, err := h.gw.GetAggregateStats(target) - if !topo.IsErrType(err, topo.NoNode) { - // The local gateway either worked, or returned an - // error. But it knows about this target. - return stats, qs, err - } - } - - // The local gateway doesn't know about this target, - // try the remote ones. - for _, l := range h.l2vtgates { - stats, err := l.GetAggregateStats(target) - if !topo.IsErrType(err, topo.NoNode) { - // This remote gateway either worked, or returned an - // error. But it knows about this target. - return stats, l, err - } - } - - // We couldn't find a way to resolve this. - return nil, nil, topo.NewError(topo.NoNode, target.String()) -} - -// GetMasterCell is part of the srvtopo.TargetStats interface, included -// in the gateway.Gateway interface. -func (h *HybridGateway) GetMasterCell(keyspace, shard string) (cell string, qs queryservice.QueryService, err error) { - // Start with the local Gateway part. - if h.gw != nil { - cell, qs, err := h.gw.GetMasterCell(keyspace, shard) - if !topo.IsErrType(err, topo.NoNode) { - // The local gateway either worked, or returned an - // error. But it knows about this target. - return cell, qs, err - } - // The local gateway doesn't know about this target, - // try the remote ones. - } - - for _, l := range h.l2vtgates { - cell, err := l.GetMasterCell(keyspace, shard) - if !topo.IsErrType(err, topo.NoNode) { - // This remote gateway either worked, or returned an - // error. But it knows about this target. - return cell, l, err - } - } - - // We couldn't find a way to resolve this. - return "", nil, topo.NewError(topo.NoNode, keyspace+"/"+shard) -} - -var _ Gateway = (*HybridGateway)(nil) -var _ srvtopo.TargetStats = (*HybridGateway)(nil) diff --git a/go/vt/vtgate/gateway/l2vtgateconn.go b/go/vt/vtgate/gateway/l2vtgateconn.go deleted file mode 100644 index d4264eea828..00000000000 --- a/go/vt/vtgate/gateway/l2vtgateconn.go +++ /dev/null @@ -1,271 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gateway - -import ( - "fmt" - "sort" - "sync" - "time" - - "golang.org/x/net/context" - "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/vttablet/queryservice" - "vitess.io/vitess/go/vt/vttablet/tabletconn" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" -) - -// L2VTGateConn keeps a single connection to a vtgate backend. The -// underlying vtgate backend must have been started with the -// '-enable_forwarding' flag. -// -// It will keep a healthcheck connection going to the target, to get -// the list of available Targets. It remembers them, and exposes a -// srvtopo.TargetStats interface to query them. -type L2VTGateConn struct { - queryservice.QueryService - - // addr is the destination address. Immutable. - addr string - - // name is the name to display for stats. Immutable. - name string - - // retryCount is the number of times to retry an action. Immutable. - retryCount int - - // cancel is associated with the life cycle of this L2VTGateConn. - // It is called when Close is called. - cancel context.CancelFunc - - // mu protects the following fields. - mu sync.RWMutex - // stats has all the stats we received from the other side. - stats map[l2VTGateConnKey]*l2VTGateConnValue - // statusAggregators is a map indexed by the key - // name:keyspace/shard/tablet type - statusAggregators map[string]*TabletStatusAggregator -} - -type l2VTGateConnKey struct { - keyspace string - shard string - tabletType topodatapb.TabletType -} - -type l2VTGateConnValue struct { - tabletExternallyReparentedTimestamp int64 - - // aggregates has the per-cell aggregates. - aggregates map[string]*querypb.AggregateStats -} - -// NewL2VTGateConn creates a new L2VTGateConn object. It also starts -// the background go routine to monitor its health. -func NewL2VTGateConn(name, addr string, retryCount int) (*L2VTGateConn, error) { - conn, err := tabletconn.GetDialer()(&topodatapb.Tablet{ - Hostname: addr, - }, grpcclient.FailFast(true)) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithCancel(context.Background()) - c := &L2VTGateConn{ - addr: addr, - name: name, - cancel: cancel, - stats: make(map[l2VTGateConnKey]*l2VTGateConnValue), - statusAggregators: make(map[string]*TabletStatusAggregator), - } - c.QueryService = queryservice.Wrap(conn, c.withRetry) - go c.checkConn(ctx) - return c, nil -} - -// Close is part of the queryservice.QueryService interface. -func (c *L2VTGateConn) Close(ctx context.Context) error { - c.cancel() - return nil -} - -func (c *L2VTGateConn) servingConnStats(res map[string]int64) { - c.mu.Lock() - defer c.mu.Unlock() - for k, s := range c.stats { - key := fmt.Sprintf("%s.%s.%s", k.keyspace, k.shard, topoproto.TabletTypeLString(k.tabletType)) - var htc int32 - for _, stats := range s.aggregates { - htc += stats.HealthyTabletCount - } - res[key] += int64(htc) - } -} - -func (c *L2VTGateConn) checkConn(ctx context.Context) { - for { - err := c.StreamHealth(ctx, c.streamHealthCallback) - log.Warningf("StreamHealth to %v failed, will retry after 30s: %v", c.addr, err) - time.Sleep(30 * time.Second) - } -} - -func (c *L2VTGateConn) streamHealthCallback(shr *querypb.StreamHealthResponse) error { - key := l2VTGateConnKey{ - keyspace: shr.Target.Keyspace, - shard: shr.Target.Shard, - tabletType: shr.Target.TabletType, - } - c.mu.Lock() - defer c.mu.Unlock() - e, ok := c.stats[key] - if !ok { - // No current value for this keyspace/shard/tablet type. - // Check if we received a delete, drop it. - if shr.AggregateStats == nil || (shr.AggregateStats.HealthyTabletCount == 0 && shr.AggregateStats.UnhealthyTabletCount == 0) { - return nil - } - - // It's a record for a keyspace/shard/tablet type we - // don't know yet, just create our new record with one - // entry in the map for the cell. - c.stats[key] = &l2VTGateConnValue{ - tabletExternallyReparentedTimestamp: shr.TabletExternallyReparentedTimestamp, - aggregates: map[string]*querypb.AggregateStats{ - shr.Target.Cell: shr.AggregateStats, - }, - } - return nil - } - - // Save our new value. - e.tabletExternallyReparentedTimestamp = shr.TabletExternallyReparentedTimestamp - e.aggregates[shr.Target.Cell] = shr.AggregateStats - return nil -} - -// GetAggregateStats is the discovery part of srvtopo.TargetStats interface. -func (c *L2VTGateConn) GetAggregateStats(target *querypb.Target) (*querypb.AggregateStats, error) { - key := l2VTGateConnKey{ - keyspace: target.Keyspace, - shard: target.Shard, - tabletType: target.TabletType, - } - c.mu.RLock() - defer c.mu.RUnlock() - e, ok := c.stats[key] - if !ok { - return nil, topo.NewError(topo.NoNode, target.String()) - } - - a, ok := e.aggregates[target.Cell] - if !ok { - return nil, topo.NewError(topo.NoNode, target.String()) - } - return a, nil -} - -// GetMasterCell is the discovery part of the srvtopo.TargetStats interface. -func (c *L2VTGateConn) GetMasterCell(keyspace, shard string) (cell string, err error) { - key := l2VTGateConnKey{ - keyspace: keyspace, - shard: shard, - tabletType: topodatapb.TabletType_MASTER, - } - c.mu.RLock() - defer c.mu.RUnlock() - e, ok := c.stats[key] - if !ok { - return "", topo.NewError(topo.NoNode, keyspace+"/"+shard) - } - - for cell := range e.aggregates { - return cell, nil - } - return "", topo.NewError(topo.NoNode, keyspace+"/"+shard) -} - -// CacheStatus returns a list of TabletCacheStatus per -// name:keyspace/shard/tablet type. -func (c *L2VTGateConn) CacheStatus() TabletCacheStatusList { - c.mu.RLock() - res := make(TabletCacheStatusList, 0, len(c.statusAggregators)) - for _, aggr := range c.statusAggregators { - res = append(res, aggr.GetCacheStatus()) - } - c.mu.RUnlock() - sort.Sort(res) - return res -} - -func (c *L2VTGateConn) updateStats(target *querypb.Target, startTime time.Time, err error) { - elapsed := time.Now().Sub(startTime) - aggr := c.getStatsAggregator(target) - aggr.UpdateQueryInfo("", target.TabletType, elapsed, err != nil) -} - -func (c *L2VTGateConn) getStatsAggregator(target *querypb.Target) *TabletStatusAggregator { - key := fmt.Sprintf("%v:%v/%v/%v", c.name, target.Keyspace, target.Shard, target.TabletType.String()) - - // get existing aggregator - c.mu.RLock() - aggr, ok := c.statusAggregators[key] - c.mu.RUnlock() - if ok { - return aggr - } - - // create a new one, but check again before the creation - c.mu.Lock() - defer c.mu.Unlock() - aggr, ok = c.statusAggregators[key] - if ok { - return aggr - } - aggr = NewTabletStatusAggregator(target.Keyspace, target.Shard, target.TabletType, key) - c.statusAggregators[key] = aggr - return aggr -} - -// withRetry uses the connection to execute the action. If there are -// retryable errors, it retries retryCount times before failing. It -// does not retry if the connection is in the middle of a -// transaction. While returning the error check if it maybe a result -// of a resharding event, and set the re-resolve bit and let the upper -// layers re-resolve and retry. -func (c *L2VTGateConn) withRetry(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, inner func(context.Context, *querypb.Target, queryservice.QueryService) (error, bool)) error { - var err error - for i := 0; i < c.retryCount+1; i++ { - startTime := time.Now() - var canRetry bool - err, canRetry = inner(ctx, target, conn) - if target != nil { - // target can be nil for StreamHealth calls. - c.updateStats(target, startTime, err) - } - if canRetry { - continue - } - break - } - return NewShardError(err, target, nil, inTransaction) -} diff --git a/go/vt/vtgate/gatewaytest/grpc_discovery_test.go b/go/vt/vtgate/gatewaytest/grpc_discovery_test.go index b14c5d5af1e..6bcdec6e7cc 100644 --- a/go/vt/vtgate/gatewaytest/grpc_discovery_test.go +++ b/go/vt/vtgate/gatewaytest/grpc_discovery_test.go @@ -28,7 +28,6 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vtgate" "vitess.io/vitess/go/vt/vtgate/gateway" "vitess.io/vitess/go/vt/vttablet/grpcqueryservice" "vitess.io/vitess/go/vt/vttablet/tabletconntest" @@ -90,77 +89,3 @@ func TestGRPCDiscovery(t *testing.T) { // run the test suite. TestSuite(t, "discovery-grpc", dg, service) } - -// TestL2VTGateDiscovery tests the hybrid gateway with a gRPC -// connection from the gateway to a l2vtgate in-process object. -func TestL2VTGateDiscovery(t *testing.T) { - flag.Set("tablet_protocol", "grpc") - flag.Set("gateway_implementation", "discoverygateway") - flag.Set("enable_forwarding", "true") - - // Fake services for the tablet, topo server. - service, ts, cell := CreateFakeServers(t) - - // Tablet: listen on a random port. - listener, err := net.Listen("tcp", ":0") - if err != nil { - t.Fatalf("Cannot listen: %v", err) - } - host := listener.Addr().(*net.TCPAddr).IP.String() - port := listener.Addr().(*net.TCPAddr).Port - defer listener.Close() - - // Tablet: create a gRPC server and listen on the port. - server := grpc.NewServer() - grpcqueryservice.Register(server, service) - go server.Serve(listener) - defer server.Stop() - - // L2VTGate: Create the discovery healthcheck, and the gateway. - // Wait for the right tablets to be present. - hc := discovery.NewHealthCheck(10*time.Second, 2*time.Minute) - rs := srvtopo.NewResilientServer(ts, "TestL2VTGateDiscovery") - l2vtgate := vtgate.Init(context.Background(), hc, rs, cell, 2, nil) - hc.AddTablet(&topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: cell, - Uid: 44, - }, - Keyspace: tabletconntest.TestTarget.Keyspace, - Shard: tabletconntest.TestTarget.Shard, - Type: tabletconntest.TestTarget.TabletType, - Hostname: host, - PortMap: map[string]int32{ - "grpc": int32(port), - }, - }, "test_tablet") - ctx := context.Background() - err = l2vtgate.Gateway().WaitForTablets(ctx, []topodatapb.TabletType{tabletconntest.TestTarget.TabletType}) - if err != nil { - t.Fatalf("WaitForTablets failed: %v", err) - } - - // L2VTGate: listen on a random port. - listener, err = net.Listen("tcp", ":0") - if err != nil { - t.Fatalf("Cannot listen: %v", err) - } - defer listener.Close() - - // L2VTGate: create a gRPC server and listen on the port. - server = grpc.NewServer() - grpcqueryservice.Register(server, l2vtgate.L2VTGate()) - go server.Serve(listener) - defer server.Stop() - - // VTGate: create the HybridGateway, with no local gateway, - // and just the remote address in the l2vtgate pool. - hg, err := gateway.NewHybridGateway(nil, []string{listener.Addr().String()}, 2) - if err != nil { - t.Fatalf("gateway.NewHybridGateway() failed: %v", err) - } - defer hg.Close(ctx) - - // and run the test suite. - TestSuite(t, "l2vtgate-grpc", hg, service) -} diff --git a/go/vt/vtgate/l2vtgate.go b/go/vt/vtgate/l2vtgate.go deleted file mode 100644 index b33fd5f8613..00000000000 --- a/go/vt/vtgate/l2vtgate.go +++ /dev/null @@ -1,108 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package vtgate - -import ( - "time" - - "golang.org/x/net/context" - - "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/gateway" - "vitess.io/vitess/go/vt/vttablet/queryservice" - - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" -) - -var ( - l2VTGate *L2VTGate -) - -// L2VTGate implements queryservice.QueryService and forwards queries to -// the underlying gateway. -type L2VTGate struct { - queryservice.QueryService - timings *stats.MultiTimings - errorCounts *stats.CountersWithMultiLabels - gateway gateway.Gateway -} - -// RegisterL2VTGate defines the type of registration mechanism. -type RegisterL2VTGate func(queryservice.QueryService) - -// RegisterL2VTGates stores register funcs for L2VTGate server. -var RegisterL2VTGates []RegisterL2VTGate - -// initL2VTGate creates the single L2VTGate with the provided parameters. -func initL2VTGate(gw gateway.Gateway) *L2VTGate { - if l2VTGate != nil { - log.Fatalf("L2VTGate already initialized") - } - - l2VTGate = &L2VTGate{ - timings: stats.NewMultiTimings( - "QueryServiceCall", - "l2VTGate query service call timings", - []string{"Operation", "Keyspace", "ShardName", "DbType"}), - errorCounts: stats.NewCountersWithMultiLabels( - "QueryServiceCallErrorCount", - "Error count from calls to the query service", - []string{"Operation", "Keyspace", "ShardName", "DbType"}), - gateway: gw, - } - l2VTGate.QueryService = queryservice.Wrap( - gw, - func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, inner func(context.Context, *querypb.Target, queryservice.QueryService) (error, bool)) (err error) { - if target != nil { - startTime, statsKey := l2VTGate.startAction(name, target) - defer l2VTGate.endAction(startTime, statsKey, &err) - } - err, _ = inner(ctx, target, conn) - return err - }, - ) - servenv.OnRun(func() { - for _, f := range RegisterL2VTGates { - f(l2VTGate) - } - }) - return l2VTGate -} - -func (l *L2VTGate) startAction(name string, target *querypb.Target) (time.Time, []string) { - statsKey := []string{name, target.Keyspace, target.Shard, topoproto.TabletTypeLString(target.TabletType)} - startTime := time.Now() - return startTime, statsKey -} - -func (l *L2VTGate) endAction(startTime time.Time, statsKey []string, err *error) { - if *err != nil { - // Don't increment the error counter for duplicate - // keys or bad queries, as those errors are caused by - // client queries and are not VTGate's fault. - ec := vterrors.Code(*err) - if ec != vtrpcpb.Code_ALREADY_EXISTS && ec != vtrpcpb.Code_INVALID_ARGUMENT { - l.errorCounts.Add(statsKey, 1) - } - } - l.timings.Record(statsKey, startTime) -} diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 75305867e18..2b2e7b81d8c 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -28,7 +28,6 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/acl" - "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/tb" @@ -61,7 +60,6 @@ var ( queryPlanCacheSize = flag.Int64("gate_query_cache_size", 10000, "gate server query cache size, maximum number of queries to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache.") legacyAutocommit = flag.Bool("legacy_autocommit", false, "DEPRECATED: set this flag to true to get the legacy behavior: all transactions will need an explicit begin, and DMLs outside transactions will return an error.") enableForwarding = flag.Bool("enable_forwarding", false, "if specified, this process will also expose a QueryService interface that allows other vtgates to talk through this vtgate to the underlying tablets.") - l2vtgateAddrs flagutil.StringListValue disableLocalGateway = flag.Bool("disable_local_gateway", false, "if specified, this process will not route any queries to local tablets in the local cell") ) @@ -118,7 +116,6 @@ type VTGate struct { resolver *Resolver txConn *TxConn gw gateway.Gateway - l2vtgate *L2VTGate // stats objects. // TODO(sougou): This needs to be cleaned up. There @@ -162,30 +159,12 @@ func Init(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, ce // Start with the gateway. If we can't reach the topology service, // we can't go on much further, so we log.Fatal out. var gw gateway.Gateway - var l2vtgate *L2VTGate if !*disableLocalGateway { gw = gateway.GetCreator()(hc, serv, cell, retryCount) gw.RegisterStats() if err := gateway.WaitForTablets(gw, tabletTypesToWait); err != nil { log.Fatalf("gateway.WaitForTablets failed: %v", err) } - - // l2vtgate gives access to the underlying Gateway - // from an exported QueryService interface. - if *enableForwarding { - l2vtgate = initL2VTGate(gw) - } - } - - // If we have other vtgate pools to connect to, create a - // HybridGateway to perform the routing. - if len(l2vtgateAddrs) > 0 { - hgw, err := gateway.NewHybridGateway(gw, l2vtgateAddrs, retryCount) - if err != nil { - log.Fatalf("gateway.NewHybridGateway failed: %v", err) - } - hgw.RegisterStats() - gw = hgw } // Check we have something to do. @@ -215,7 +194,6 @@ func Init(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, ce resolver: resolver, txConn: tc, gw: gw, - l2vtgate: l2vtgate, timings: stats.NewMultiTimings( "VtgateApi", "VtgateApi timings", @@ -295,11 +273,6 @@ func (vtg *VTGate) Gateway() gateway.Gateway { return vtg.gw } -// L2VTGate returns the L2VTGate object. Mostly used for tests. -func (vtg *VTGate) L2VTGate() *L2VTGate { - return vtg.l2vtgate -} - // Execute executes a non-streaming query. This is a V3 function. func (vtg *VTGate) Execute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (newSession *vtgatepb.Session, qr *sqltypes.Result, err error) { // In this context, we don't care if we can't fully parse destination @@ -1195,7 +1168,3 @@ func unambiguousKeyspaceBSQ(queries []*vtgatepb.BoundShardQuery) string { return keyspace } } - -func init() { - flag.Var(&l2vtgateAddrs, "l2vtgate_addrs", "Specifies a comma-separated list of other l2 vtgate pools to connect to. These other vtgates must run with the --enable_forwarding flag") -} diff --git a/test/config.json b/test/config.json index 2222f062857..d629184ec8e 100644 --- a/test/config.json +++ b/test/config.json @@ -90,17 +90,6 @@ "worker_test" ] }, - "initial_sharding_l2vtgate": { - "File": "initial_sharding_l2vtgate.py", - "Args": [], - "Command": [], - "Manual": false, - "Shard": 2, - "RetryMax": 0, - "Tags": [ - "worker_test" - ] - }, "legacy_resharding": { "File": "legacy_resharding.py", "Args": [], @@ -426,17 +415,6 @@ "site_test" ] }, - "vtgatev2_l2vtgate": { - "File": "vtgatev2_l2vtgate_test.py", - "Args": [], - "Command": [], - "Manual": false, - "Shard": 1, - "RetryMax": 0, - "Tags": [ - "site_test" - ] - }, "vtgatev3": { "File": "vtgatev3_test.py", "Args": [], diff --git a/test/initial_sharding.py b/test/initial_sharding.py index e6aa8728253..cb4fb3428b7 100755 --- a/test/initial_sharding.py +++ b/test/initial_sharding.py @@ -35,16 +35,6 @@ import tablet import utils -# use_l2vtgate is set if we want to use l2vtgate processes. -# We'll set them up to have: -# l2vtgate1: covers the initial shard, and -80 -# l2vtgate2: covers 80- -use_l2vtgate = False - -# the l2vtgate processes, if applicable -l2vtgate1 = None -l2vtgate2 = None - # initial shard, covers everything shard_master = tablet.Tablet() shard_replica = tablet.Tablet() @@ -217,8 +207,6 @@ def _check_lots_not_present(self, count, base=0): should_be_here=False) def test_resharding(self): - global l2vtgate1, l2vtgate2 - # create the keyspace with just one shard shard_master.init_tablet( 'replica', @@ -280,33 +268,12 @@ def test_resharding(self): # We must start vtgate after tablets are up, or else wait until 1min refresh # (that is the tablet_refresh_interval parameter for discovery gateway) # we want cache_ttl at zero so we re-read the topology for every test query. - if use_l2vtgate: - l2vtgate1 = utils.VtGate() - l2vtgate1.start(extra_args=['--enable_forwarding'], tablets= - [shard_master, shard_replica, shard_rdonly1]) - l2vtgate1.wait_for_endpoints('test_keyspace.0.master', 1) - l2vtgate1.wait_for_endpoints('test_keyspace.0.replica', 1) - l2vtgate1.wait_for_endpoints('test_keyspace.0.rdonly', 1) - - _, l2vtgate1_addr = l2vtgate1.rpc_endpoint() - - # Clear utils.vtgate, so it doesn't point to the previous l2vtgate1. - utils.vtgate = None - utils.VtGate().start(cache_ttl='0', l2vtgates=[l2vtgate1_addr,], - extra_args=['-disable_local_gateway']) - utils.vtgate.wait_for_endpoints('test_keyspace.0.master', 1, - var='L2VtgateConnections') - utils.vtgate.wait_for_endpoints('test_keyspace.0.replica', 1, - var='L2VtgateConnections') - utils.vtgate.wait_for_endpoints('test_keyspace.0.rdonly', 1, - var='L2VtgateConnections') - else: - utils.VtGate().start(cache_ttl='0', tablets=[ - shard_master, shard_replica, shard_rdonly1]) - utils.vtgate.wait_for_endpoints('test_keyspace.0.master', 1) - utils.vtgate.wait_for_endpoints('test_keyspace.0.replica', 1) - utils.vtgate.wait_for_endpoints('test_keyspace.0.rdonly', 1) + utils.VtGate().start(cache_ttl='0', tablets=[ + shard_master, shard_replica, shard_rdonly1]) + utils.vtgate.wait_for_endpoints('test_keyspace.0.master', 1) + utils.vtgate.wait_for_endpoints('test_keyspace.0.replica', 1) + utils.vtgate.wait_for_endpoints('test_keyspace.0.rdonly', 1) # check the Map Reduce API works correctly, should use ExecuteShards, # as we're not sharded yet. @@ -391,62 +358,13 @@ def test_resharding(self): # must restart vtgate after tablets are up, or else wait until 1min refresh # we want cache_ttl at zero so we re-read the topology for every test query. utils.vtgate.kill() - if use_l2vtgate: - l2vtgate1.kill() - - l2vtgate1 = utils.VtGate() - l2vtgate1.start(extra_args=['--enable_forwarding', - '-tablet_filters', - 'test_keyspace|0,test_keyspace|-80'], - tablets=[shard_master, shard_replica, shard_rdonly1, - shard_0_master, shard_0_replica, - shard_0_rdonly1]) - l2vtgate1.wait_for_endpoints('test_keyspace.0.master', 1) - l2vtgate1.wait_for_endpoints('test_keyspace.0.replica', 1) - l2vtgate1.wait_for_endpoints('test_keyspace.0.rdonly', 1) - l2vtgate1.wait_for_endpoints('test_keyspace.-80.master', 1) - l2vtgate1.wait_for_endpoints('test_keyspace.-80.replica', 1) - l2vtgate1.wait_for_endpoints('test_keyspace.-80.rdonly', 1) - l2vtgate1.verify_no_endpoint('test_keyspace.80-.master') - l2vtgate1.verify_no_endpoint('test_keyspace.80-.replica') - l2vtgate1.verify_no_endpoint('test_keyspace.80-.rdonly') - - # FIXME(alainjobart) we clear tablet_types_to_wait, as this - # l2vtgate2 doesn't serve the current test_keyspace shard, which - # is test_keyspace.0. This is not ideal, we should re-work - # which keyspace/shard a l2vtgate can wait for, as the ones - # filtered by tablet_filters. - l2vtgate2 = utils.VtGate() - l2vtgate2.start(extra_args=['--enable_forwarding', - '-tablet_filters', - 'test_keyspace|80-'], tablets= - [shard_1_master, shard_1_replica, shard_1_rdonly1], - tablet_types_to_wait='') - l2vtgate2.wait_for_endpoints('test_keyspace.80-.master', 1) - l2vtgate2.wait_for_endpoints('test_keyspace.80-.replica', 1) - l2vtgate2.wait_for_endpoints('test_keyspace.80-.rdonly', 1) - l2vtgate2.verify_no_endpoint('test_keyspace.0.master') - l2vtgate2.verify_no_endpoint('test_keyspace.0.replica') - l2vtgate2.verify_no_endpoint('test_keyspace.0.rdonly') - l2vtgate2.verify_no_endpoint('test_keyspace.-80.master') - l2vtgate2.verify_no_endpoint('test_keyspace.-80.replica') - l2vtgate2.verify_no_endpoint('test_keyspace.-80.rdonly') - - _, l2vtgate1_addr = l2vtgate1.rpc_endpoint() - _, l2vtgate2_addr = l2vtgate2.rpc_endpoint() - utils.vtgate = None - utils.VtGate().start(cache_ttl='0', l2vtgates=[l2vtgate1_addr, - l2vtgate2_addr,], - extra_args=['-disable_local_gateway']) - var = 'L2VtgateConnections' - else: - utils.vtgate = None - utils.VtGate().start(cache_ttl='0', tablets=[ - shard_master, shard_replica, shard_rdonly1, - shard_0_master, shard_0_replica, shard_0_rdonly1, - shard_1_master, shard_1_replica, shard_1_rdonly1]) - var = None + utils.vtgate = None + utils.VtGate().start(cache_ttl='0', tablets=[ + shard_master, shard_replica, shard_rdonly1, + shard_0_master, shard_0_replica, shard_0_rdonly1, + shard_1_master, shard_1_replica, shard_1_rdonly1]) + var = None # Wait for the endpoints, either local or remote. utils.vtgate.wait_for_endpoints('test_keyspace.0.master', 1, var=var) @@ -626,12 +544,9 @@ def test_resharding(self): # make sure rdonly tablets are back to serving before hitting vtgate. for t in [shard_0_rdonly1, shard_1_rdonly1]: t.wait_for_vttablet_state('SERVING') - if use_l2vtgate: - l2vtgate1.wait_for_endpoints('test_keyspace.-80.rdonly', 1) - l2vtgate2.wait_for_endpoints('test_keyspace.80-.rdonly', 1) - else: - utils.vtgate.wait_for_endpoints('test_keyspace.-80.rdonly', 1) - utils.vtgate.wait_for_endpoints('test_keyspace.80-.rdonly', 1) + + utils.vtgate.wait_for_endpoints('test_keyspace.-80.rdonly', 1) + utils.vtgate.wait_for_endpoints('test_keyspace.80-.rdonly', 1) # check the Map Reduce API works correctly, should use ExecuteKeyRanges # on both destination shards now. diff --git a/test/initial_sharding_l2vtgate.py b/test/initial_sharding_l2vtgate.py deleted file mode 100755 index 5a2a3df68a8..00000000000 --- a/test/initial_sharding_l2vtgate.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Re-runs initial_sharding.py with a l2vtgate process.""" - -import initial_sharding -import utils - -if __name__ == '__main__': - initial_sharding.use_l2vtgate = True - utils.main(initial_sharding) diff --git a/test/vtgatev2_l2vtgate_test.py b/test/vtgatev2_l2vtgate_test.py deleted file mode 100755 index cf869a2701e..00000000000 --- a/test/vtgatev2_l2vtgate_test.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Re-runs vtgatev2_test.py with a l2vtgate process.""" - -import utils -import vtgatev2_test - -# This test is just re-running an entire vtgatev2_test.py with a -# l2vtgate process in the middle. -if __name__ == '__main__': - vtgatev2_test.use_l2vtgate = True - utils.main(vtgatev2_test) diff --git a/test/vtgatev2_test.py b/test/vtgatev2_test.py index a95d640202f..4d45f44d307 100755 --- a/test/vtgatev2_test.py +++ b/test/vtgatev2_test.py @@ -35,16 +35,6 @@ from vtdb import vtgate_client from vtdb import vtgate_cursor -# use_l2vtgate controls if we're adding a l2vtgate process in between -# vtgate and the tablets. -use_l2vtgate = False - -# l2vtgate is the L2VTGate object, if any -l2vtgate = None - -# l2vtgate_addr is the address of the l2vtgate to send to vtgate -l2vtgate_addr = None - shard_0_master = tablet.Tablet() shard_0_replica1 = tablet.Tablet() shard_0_replica2 = tablet.Tablet() @@ -154,8 +144,6 @@ def tearDownModule(): logging.debug('Tearing down the servers and setup') if utils.vtgate: utils.vtgate.kill() - if l2vtgate: - l2vtgate.kill() tablet.kill_tablets([shard_0_master, shard_0_replica1, shard_0_replica2, shard_1_master, @@ -184,7 +172,6 @@ def tearDownModule(): def setup_tablets(): """Start up a master mysql and vttablet.""" - global l2vtgate, l2vtgate_addr logging.debug('Setting up tablets') utils.run_vtctl(['CreateKeyspace', KEYSPACE_NAME]) @@ -252,46 +239,22 @@ def setup_tablets(): 'Partitions(rdonly): -80 80-\n' 'Partitions(replica): -80 80-\n') - if use_l2vtgate: - l2vtgate = utils.VtGate() - l2vtgate.start(extra_args=['--enable_forwarding'], tablets= - [shard_0_master, shard_0_replica1, shard_0_replica2, - shard_1_master, shard_1_replica1, shard_1_replica2]) - _, l2vtgate_addr = l2vtgate.rpc_endpoint() - - # Clear utils.vtgate, so it doesn't point to the previous l2vtgate. - utils.vtgate = None - - # This vgate doesn't watch any local tablets, so we disable_local_gateway. - utils.VtGate().start(l2vtgates=[l2vtgate_addr,], - extra_args=['-disable_local_gateway']) - else: - utils.VtGate().start(tablets= - [shard_0_master, shard_0_replica1, shard_0_replica2, - shard_1_master, shard_1_replica1, shard_1_replica2]) + utils.VtGate().start(tablets= + [shard_0_master, shard_0_replica1, shard_0_replica2, + shard_1_master, shard_1_replica1, shard_1_replica2]) wait_for_all_tablets() def restart_vtgate(port): - if use_l2vtgate: - utils.VtGate(port=port).start(l2vtgates=[l2vtgate_addr,], - extra_args=['-disable_local_gateway']) - else: - utils.VtGate(port=port).start( - tablets=[shard_0_master, shard_0_replica1, shard_0_replica2, - shard_1_master, shard_1_replica1, shard_1_replica2]) + utils.VtGate(port=port).start( + tablets=[shard_0_master, shard_0_replica1, shard_0_replica2, + shard_1_master, shard_1_replica1, shard_1_replica2]) def wait_for_endpoints(name, count): - if use_l2vtgate: - # Wait for the l2vtgate to have a healthy connection. - l2vtgate.wait_for_endpoints(name, count) - # Also wait for vtgate to have received the remote healthy connection. - utils.vtgate.wait_for_endpoints(name, count, var='L2VtgateConnections') - else: - utils.vtgate.wait_for_endpoints(name, count) + utils.vtgate.wait_for_endpoints(name, count) def wait_for_all_tablets(): @@ -411,17 +374,12 @@ def test_query_routing(self): self.assertIn(kid, SHARD_KID_MAP[SHARD_NAMES[shard_index]]) # Do a cross shard range query and assert all rows are fetched. - # Use this test to also test the vtgate vars (and l2vtgate vars if - # applicable) are correctly updated. + # Use this test to also test the vtgate vars are correctly updated. v = utils.vtgate.get_vars() key0 = 'Execute.' + KEYSPACE_NAME + '.' + SHARD_NAMES[0] + '.master' key1 = 'Execute.' + KEYSPACE_NAME + '.' + SHARD_NAMES[1] + '.master' before0 = v['VttabletCall']['Histograms'][key0]['Count'] before1 = v['VttabletCall']['Histograms'][key1]['Count'] - if use_l2vtgate: - lv = l2vtgate.get_vars() - lbefore0 = lv['QueryServiceCall']['Histograms'][key0]['Count'] - lbefore1 = lv['QueryServiceCall']['Histograms'][key1]['Count'] cursor = vtgate_conn.cursor( tablet_type='master', keyspace=KEYSPACE_NAME, @@ -435,12 +393,6 @@ def test_query_routing(self): after1 = v['VttabletCall']['Histograms'][key1]['Count'] self.assertEqual(after0 - before0, 1) self.assertEqual(after1 - before1, 1) - if use_l2vtgate: - lv = l2vtgate.get_vars() - lafter0 = lv['QueryServiceCall']['Histograms'][key0]['Count'] - lafter1 = lv['QueryServiceCall']['Histograms'][key1]['Count'] - self.assertEqual(lafter0 - lbefore0, 1) - self.assertEqual(lafter1 - lbefore1, 1) def test_rollback(self): vtgate_conn = get_connection()