Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vt/discovery] Add region context in GetAggregateStats - v2 #4476

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions doc/TabletRouting.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,36 +93,7 @@ There are two implementations of the Gateway interface:
discovery section, one per cell) as a source of tablets, a HealthCheck module
to watch their health, and a TabletStatsCache to collect all the health
information. Based on this data, it can find the best tablet to use.
* l2VTGateGateway: It keeps a map of l2vtgate processes to send queries to. See
next section for more details.

## l2vtgate

As we started increasing the number of tablets in a cell, it became clear that a
bottleneck of the system was going to be how many tablets a single vtgate is
connecting to. Since vtgate maintains a streaming health check connection per
tablet, the number of these connections can grow to large numbers. It is common
for vtgate to watch tablets in other cells, to be able to find the master
tablet.

So l2vtgate came to exist, based on very similar concepts and interfaces:

* l2vtgate is an extra hop between a vtgate pool and tablets.
* A l2vtgate pool connects to a subset of tablets, therefore it can have a
reasonable number of streaming health connections. Externally, it exposes the
QueryService RPC interface (that has the Target for the query, keyspace /
shard / tablet type). Internally, it uses a discoveryGateway, as usual.
* vtgate connects to l2vtgate pools (using the l2VTGateGateway instead of the
discoveryGateway). It has a map of which keyspace / shard / tablet type needs
to go to wich l2vtgate pool. At this point, vtgate doesn't maintain any health
information about the tablets, it lets l2vtgate handle it.

Note l2vtgate is not an ideal solution as it is now. For instance, if there are
two cells, and the master for a shard can be in either, l2vtgate still has to
watch the tablets in both cells, to know where the master is. Ideally, we'd want
l2vtgate to be collocated with the tablets in a given cell, and not go
cross-cell.

# Extensions, work in progress

## Regions, cross-cell targeting
Expand Down Expand Up @@ -169,31 +140,6 @@ between vtgate and l2vtgate:
This would also be a good time to merge the vtgate code that uses the VSchema
with the code that doesn't for SrvKeyspace access.

## Hybrid Gateway

It would be nice to re-organize the code a bit inside vtgate to allow for an
hybrid gateway, and get rid of l2vtgate alltogether:

* vtgate would use the discoveryGateway to watch the tablets in the current cell
(and optionally to any other cell we still want to consider local).
* vtgate would use l2vtgateGateway to watch the tablets in a different cell.
* vtgate would expose the RPC APIs currently exposed by the l2vtgate process.

So vtgate would watch the tablets in the local cell only, but also know what
healthy tablets are in the other cells, and be able to send query to them
through their vtgate. The extra hop to the other cell vtgate should be a small
latency price to pay, compared to going cross-cell already.

So queries would go one of two routes:

* client(cell1) -> vtgate(cell1) -> tablet(cell1)
* client(cell1) -> vtgate(cell1) -> vtgate(cell2) -> tablet(cell2)

If the number of tablets in a given cell is still too high for the local vtgate
pool, two or more pools can still be created, each of them knowing about a
subset of the tablets. And they would just forward queries to each others when
addressing the other tablet set.

## Config-based routing

Another possible extension would be to group all routing options for vtgate in a
Expand Down
34 changes: 0 additions & 34 deletions go/cmd/vtgate/plugin_grpcqueryservice.go

This file was deleted.

153 changes: 9 additions & 144 deletions go/vt/discovery/tablet_stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type tabletStatsCacheEntry struct {
all map[string]*TabletStats
// healthy only has the healthy ones.
healthy []*TabletStats
// aggregates has the per-cell aggregates.
// aggregates has the per-region aggregates.
aggregates map[string]*querypb.AggregateStats
}

Expand Down Expand Up @@ -141,7 +141,6 @@ func newTabletStatsCache(hc HealthCheck, ts *topo.Server, cell string, setListen
// upon type change.
hc.SetListener(tc, true /*sendDownEvents*/)
}
go tc.broadcastAggregateStats()
return tc
}

Expand Down Expand Up @@ -266,18 +265,18 @@ func (tc *TabletStatsCache) StatsUpdate(ts *TabletStats) {
tc.updateAggregateMap(ts.Target.Keyspace, ts.Target.Shard, ts.Target.TabletType, e, allArray)
}

// MakeAggregateMap takes a list of TabletStats and builds a per-cell
// makeAggregateMap takes a list of TabletStats and builds a per-region
// AggregateStats map.
func MakeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats {
func (tc *TabletStatsCache) makeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats {
result := make(map[string]*querypb.AggregateStats)
for _, ts := range stats {
cell := ts.Tablet.Alias.Cell
agg, ok := result[cell]
region := tc.getRegionByCell(ts.Tablet.Alias.Cell)
agg, ok := result[region]
if !ok {
agg = &querypb.AggregateStats{
SecondsBehindMasterMin: math.MaxUint32,
}
result[cell] = agg
result[region] = agg
}

if ts.Serving && ts.LastError == nil {
Expand All @@ -295,101 +294,12 @@ func MakeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats {
return result
}

// MakeAggregateMapDiff computes the entries that need to be broadcast
// when the map goes from oldMap to newMap.
func MakeAggregateMapDiff(keyspace, shard string, tabletType topodatapb.TabletType, ter int64, oldMap map[string]*querypb.AggregateStats, newMap map[string]*querypb.AggregateStats) []*srvtopo.TargetStatsEntry {
var result []*srvtopo.TargetStatsEntry
for cell, oldValue := range oldMap {
newValue, ok := newMap[cell]
if ok {
// We have both an old and a new value. If equal,
// skip it.
if oldValue.HealthyTabletCount == newValue.HealthyTabletCount &&
oldValue.UnhealthyTabletCount == newValue.UnhealthyTabletCount &&
oldValue.SecondsBehindMasterMin == newValue.SecondsBehindMasterMin &&
oldValue.SecondsBehindMasterMax == newValue.SecondsBehindMasterMax {
continue
}
// The new value is different, send it.
result = append(result, &srvtopo.TargetStatsEntry{
Target: &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tabletType,
Cell: cell,
},
Stats: newValue,
TabletExternallyReparentedTimestamp: ter,
})
} else {
// We only have the old value, send an empty
// record to clear it.
result = append(result, &srvtopo.TargetStatsEntry{
Target: &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tabletType,
Cell: cell,
},
})
}
}

for cell, newValue := range newMap {
if _, ok := oldMap[cell]; ok {
continue
}
// New value, no old value, just send it.
result = append(result, &srvtopo.TargetStatsEntry{
Target: &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tabletType,
Cell: cell,
},
Stats: newValue,
TabletExternallyReparentedTimestamp: ter,
})
}
return result
}

// updateAggregateMap will update the aggregate map for the
// tabletStatsCacheEntry. It may broadcast the changes too if we have listeners.
// e.mu needs to be locked.
func (tc *TabletStatsCache) updateAggregateMap(keyspace, shard string, tabletType topodatapb.TabletType, e *tabletStatsCacheEntry, stats []*TabletStats) {
// Save the new value
oldAgg := e.aggregates
newAgg := MakeAggregateMap(stats)
e.aggregates = newAgg

// And broadcast the change in the background, if we need to.
tc.mu.RLock()
if !tc.tsm.HasSubscribers() {
// Shortcut: no subscriber, we can be done.
tc.mu.RUnlock()
return
}
tc.mu.RUnlock()

var ter int64
if len(stats) > 0 {
ter = stats[0].TabletExternallyReparentedTimestamp
}
diffs := MakeAggregateMapDiff(keyspace, shard, tabletType, ter, oldAgg, newAgg)
tc.aggregatesChan <- diffs
}

// broadcastAggregateStats is called in the background to send aggregate stats
// in the right order to our subscribers.
func (tc *TabletStatsCache) broadcastAggregateStats() {
for diffs := range tc.aggregatesChan {
tc.mu.RLock()
for _, d := range diffs {
tc.tsm.Broadcast(d)
}
tc.mu.RUnlock()
}
e.aggregates = tc.makeAggregateMap(stats)
}

// GetTabletStats returns the full list of available targets.
Expand Down Expand Up @@ -436,51 +346,6 @@ func (tc *TabletStatsCache) ResetForTesting() {
tc.entries = make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry)
}

// Subscribe is part of the TargetStatsListener interface.
func (tc *TabletStatsCache) Subscribe() (int, []srvtopo.TargetStatsEntry, <-chan (*srvtopo.TargetStatsEntry), error) {
var allTS []srvtopo.TargetStatsEntry

// Make sure the map cannot change. Also blocks any update from
// propagating.
tc.mu.Lock()
defer tc.mu.Unlock()
for keyspace, shardMap := range tc.entries {
for shard, typeMap := range shardMap {
for tabletType, e := range typeMap {
e.mu.RLock()
var ter int64
if len(e.healthy) > 0 {
ter = e.healthy[0].TabletExternallyReparentedTimestamp
}
for cell, agg := range e.aggregates {
allTS = append(allTS, srvtopo.TargetStatsEntry{
Target: &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tabletType,
Cell: cell,
},
Stats: agg,
TabletExternallyReparentedTimestamp: ter,
})
}
e.mu.RUnlock()
}
}
}

// Now create the listener, add it to our list.
id, c := tc.tsm.Subscribe()
return id, allTS, c, nil
}

// Unsubscribe is part of the TargetStatsListener interface.
func (tc *TabletStatsCache) Unsubscribe(i int) error {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.tsm.Unsubscribe(i)
}

// GetAggregateStats is part of the TargetStatsListener interface.
func (tc *TabletStatsCache) GetAggregateStats(target *querypb.Target) (*querypb.AggregateStats, error) {
e := tc.getEntry(target.Keyspace, target.Shard, target.TabletType)
Expand All @@ -498,7 +363,8 @@ func (tc *TabletStatsCache) GetAggregateStats(target *querypb.Target) (*querypb.
return agg, nil
}
}
agg, ok := e.aggregates[target.Cell]
targetRegion := tc.getRegionByCell(target.Cell)
agg, ok := e.aggregates[targetRegion]
if !ok {
return nil, topo.NewError(topo.NoNode, topotools.TargetIdent(target))
}
Expand Down Expand Up @@ -530,4 +396,3 @@ func (tc *TabletStatsCache) GetMasterCell(keyspace, shard string) (cell string,

// Compile-time interface check.
var _ HealthCheckStatsListener = (*TabletStatsCache)(nil)
var _ srvtopo.TargetStatsListener = (*TabletStatsCache)(nil)
20 changes: 0 additions & 20 deletions go/vt/srvtopo/target_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ import (
// routing of queries.
// - discovery.TabletStatsCache will implement the discovery part of the
// interface, and discoverygateway will have the QueryService.
// - hybridgateway will also implement this interface: for each l2vtgate pool,
// it will establish a StreamHealth connection, and store the returned
// health stats.
type TargetStats interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is also essentially obsolete. Let's look at getting rid of this at the next iteration.
Essentially, we should get rid of AggregateStats. We should only have a map[cell]*TabletStatsCache that vtgate directly pulls and populates from the tablets it's connected to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Sounds good.

// GetAggregateStats returns the aggregate stats for the given Target.
// The srvtopo module will use that information to route queries
Expand All @@ -45,23 +42,6 @@ type TargetStats interface {
GetMasterCell(keyspace, shard string) (cell string, qs queryservice.QueryService, err error)
}

// TargetStatsListener is an interface used to propagate TargetStats changes.
// - discovery.TabletStatsCache will implement this interface.
// - the StreamHealth method in l2vtgate will use this interface to surface
// the health of its targets.
type TargetStatsListener interface {
// Subscribe will return the current full state of the TargetStats,
// and a channel that will receive subsequent updates. The int returned
// is the channel id, and can be sent to unsubscribe to stop
// notifications.
Subscribe() (int, []TargetStatsEntry, <-chan (*TargetStatsEntry), error)

// Unsubscribe stops sending updates to the channel returned
// by Subscribe. The channel still needs to be drained to
// avoid deadlocks.
Unsubscribe(int) error
}

// TargetStatsEntry has the updated information for a Target.
type TargetStatsEntry struct {
// Target is what this entry applies to.
Expand Down
6 changes: 0 additions & 6 deletions go/vt/vtgate/gateway/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,6 @@ func (dg *discoveryGateway) GetMasterCell(keyspace, shard string) (string, query
return cell, dg, err
}

// StreamHealth is not forwarded to any other tablet,
// but we handle it directly here.
func (dg *discoveryGateway) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
return StreamHealthFromTargetStatsListener(ctx, dg.tsc, callback)
}

// Close shuts down underlying connections.
// This function hides the inner implementation.
func (dg *discoveryGateway) Close(ctx context.Context) error {
Expand Down
Loading