Skip to content

Commit

Permalink
Merge pull request #1388 from youtube/vtgate
Browse files Browse the repository at this point in the history
Update Gateway to support tracking the status of its endpoints.
  • Loading branch information
guoliang100 committed Dec 11, 2015
2 parents 1222f24 + 46ee07c commit 6869c27
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 22 deletions.
81 changes: 61 additions & 20 deletions go/cmd/vtgate/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"github.com/youtube/vitess/go/vt/servenv"
_ "github.com/youtube/vitess/go/vt/status"
"github.com/youtube/vitess/go/vt/vtgate"
)

var (
Expand All @@ -13,7 +14,7 @@ var (
}
td, th {
border: 1px solid #999;
padding: 0.5rem;
padding: 0.2rem;
}
</style>
<table>
Expand Down Expand Up @@ -192,6 +193,48 @@ google.setOnLoadCallback(function() {
});
</script>
`

gatewayStatusTemplate = `
<style>
table {
border-collapse: collapse;
}
td, th {
border: 1px solid #999;
padding: 0.2rem;
}
table tr:nth-child(even) {
background-color: #eee;
}
table tr:nth-child(odd) {
background-color: #fff;
}
</style>
<table>
<tr>
<th>Keyspace</th>
<th>Shard</th>
<th>TabletType</th>
<th>Address</th>
<th>Query Sent</th>
<th>Query Error</th>
<th>QPS</th>
<th>Latency (ms)</th>
</tr>
{{range $i, $status := .}}
<tr>
<td>{{$status.Keyspace}}</td>
<td>{{$status.Shard}}</td>
<td>{{$status.TabletType}}</td>
<td><a href="http://{{$status.Addr}}">{{$status.Name}}</a></td>
<td>{{$status.QueryCount}}</td>
<td>{{$status.QueryError}}</td>
<td>{{$status.QPS}}</td>
<td>{{$status.AvgLatency}}</td>
</tr>
{{end}}
</table>
`

healthCheckTemplate = `
Expand All @@ -201,7 +244,7 @@ google.setOnLoadCallback(function() {
}
td, th {
border: 1px solid #999;
padding: 0.5rem;
padding: 0.2rem;
}
</style>
<table>
Expand All @@ -217,10 +260,10 @@ google.setOnLoadCallback(function() {
</tr>
{{range $i, $eps := .}}
<tr>
<td>{{google3_third_party_golang_vitess_vtctld_srv_cell $eps.Cell}}</td>
<td>{{google3_third_party_golang_vitess_vtctld_srv_keyspace $eps.Cell $eps.Target.Keyspace}}</td>
<td>{{google3_third_party_golang_vitess_vtctld_srv_shard $eps.Cell $eps.Target.Keyspace $eps.Target.Shard}}</td>
<td>{{google3_third_party_golang_vitess_vtctld_srv_type $eps.Cell $eps.Target.Keyspace $eps.Target.Shard $eps.Target.TabletType}}</td>
<td>{{github_com_youtube_vitess_vtctld_srv_cell $eps.Cell}}</td>
<td>{{github_com_youtube_vitess_vtctld_srv_keyspace $eps.Cell $eps.Target.Keyspace}}</td>
<td>{{github_com_youtube_vitess_vtctld_srv_shard $eps.Cell $eps.Target.Keyspace $eps.Target.Shard}}</td>
<td>{{github_com_youtube_vitess_vtctld_srv_type $eps.Cell $eps.Target.Keyspace $eps.Target.Shard $eps.Target.TabletType}}</td>
<td>{{$eps.StatusAsHTML}}</td>
</tr>
{{end}}
Expand All @@ -231,19 +274,17 @@ google.setOnLoadCallback(function() {
// For use by plugins which wish to avoid racing when registering status page parts.
var onStatusRegistered func()

func init() {
servenv.OnRun(func() {
servenv.AddStatusPart("Topology Cache", topoTemplate, func() interface{} {
return resilientSrvTopoServer.CacheStatus()
})
servenv.AddStatusPart("Health Check Cache (NOT FOR QUERY ROUTING)", healthCheckTemplate, func() interface{} {
return healthCheck.CacheStatus()
})
servenv.AddStatusPart("Stats", statsTemplate, func() interface{} {
return nil
})
if onStatusRegistered != nil {
onStatusRegistered()
}
func addStatusParts(vtgate *vtgate.VTGate) {
servenv.AddStatusPart("Topology Cache", topoTemplate, func() interface{} {
return resilientSrvTopoServer.CacheStatus()
})
servenv.AddStatusPart("Gateway Status", gatewayStatusTemplate, func() interface{} {
return vtgate.GetGatewayCacheStatus()
})
servenv.AddStatusPart("Health Check Cache (NOT FOR QUERY ROUTING)", healthCheckTemplate, func() interface{} {
return healthCheck.CacheStatus()
})
if onStatusRegistered != nil {
onStatusRegistered()
}
}
6 changes: 5 additions & 1 deletion go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ startServer:
tabletTypes = append(tabletTypes, tt)
}
}
vtgate.Init(healthCheck, ts, resilientSrvTopoServer, schema, *cell, *retryDelay, *retryCount, *connTimeoutTotal, *connTimeoutPerConn, *connLife, tabletTypes, *maxInFlight, *testGateway)
vtg := vtgate.Init(healthCheck, ts, resilientSrvTopoServer, schema, *cell, *retryDelay, *retryCount, *connTimeoutTotal, *connTimeoutPerConn, *connLife, tabletTypes, *maxInFlight, *testGateway)

servenv.OnRun(func() {
addStatusParts(vtg)
})
servenv.RunDefault()
}
5 changes: 5 additions & 0 deletions go/vt/vtgate/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func (dg *discoveryGateway) Close(ctx context.Context) error {
return nil
}

// CacheStatus returns a list of GatewayEndPointCacheStatus per endpoint.
func (dg *discoveryGateway) CacheStatus() GatewayEndPointCacheStatusList {
return nil
}

// StatsUpdate receives updates about target and realtime stats changes.
func (dg *discoveryGateway) StatsUpdate(*discovery.EndPointStats) {
}
Expand Down
119 changes: 119 additions & 0 deletions go/vt/vtgate/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package vtgate

import (
"flag"
"strings"
"sync"
"time"

log "github.com/golang/glog"
Expand Down Expand Up @@ -56,6 +58,9 @@ type Gateway interface {

// Close shuts down underlying connections.
Close(ctx context.Context) error

// CacheStatus returns a list of GatewayEndPointCacheStatus per endpoint.
CacheStatus() GatewayEndPointCacheStatusList
}

// GatewayCreator is the func which can create the actual gateway object.
Expand Down Expand Up @@ -89,3 +94,117 @@ func GetGatewayCreatorByName(name string) GatewayCreator {
}
return gc
}

// GatewayEndPointCacheStatusList is a slice of GatewayEndPointCacheStatus.
type GatewayEndPointCacheStatusList []*GatewayEndPointCacheStatus

// Len is part of sort.Interface.
func (gepcsl GatewayEndPointCacheStatusList) Len() int {
return len(gepcsl)
}

// Less is part of sort.Interface.
func (gepcsl GatewayEndPointCacheStatusList) Less(i, j int) bool {
iKey := strings.Join([]string{gepcsl[i].Keyspace, gepcsl[i].Shard, string(gepcsl[i].TabletType), gepcsl[i].Name}, ".")
jKey := strings.Join([]string{gepcsl[j].Keyspace, gepcsl[j].Shard, string(gepcsl[j].TabletType), gepcsl[j].Name}, ".")
return iKey < jKey
}

// Swap is part of sort.Interface.
func (gepcsl GatewayEndPointCacheStatusList) Swap(i, j int) {
gepcsl[i], gepcsl[j] = gepcsl[j], gepcsl[i]
}

// GatewayEndPointCacheStatus contains the status per endpoint for a gateway.
type GatewayEndPointCacheStatus struct {
Keyspace string
Shard string
TabletType topodatapb.TabletType
Name string
Addr string

QueryCount uint64
QueryError uint64
QPS uint64
AvgLatency uint64 // in milliseconds
}

// NewGatewayEndPointStatusAggregator creates a GatewayEndPointStatusAggregator.
func NewGatewayEndPointStatusAggregator() *GatewayEndPointStatusAggregator {
gepsa := &GatewayEndPointStatusAggregator{}
go func() {
ticker := time.NewTicker(time.Second)
for _ = range ticker.C {
gepsa.resetNextSlot()
}
}()
return gepsa
}

// GatewayEndPointStatusAggregator tracks endpoint status for a gateway.
type GatewayEndPointStatusAggregator struct {
Keyspace string
Shard string
TabletType topodatapb.TabletType
Name string // the alternative name of an endpoint
Addr string // the host:port of an endpoint

// mu protects below fields.
mu sync.RWMutex
QueryCount uint64
QueryError uint64
// for QPS and latency (avg value over a minute)
queryCountInMinute [60]uint64
latencyInMinute [60]time.Duration
}

// UpdateQueryInfo updates the aggregator with the given information about a query.
func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(tabletType topodatapb.TabletType, elapsed time.Duration, hasError bool) {
gepsa.mu.Lock()
defer gepsa.mu.Unlock()
gepsa.TabletType = tabletType
idx := time.Now().Second() % 60
gepsa.QueryCount++
gepsa.queryCountInMinute[idx]++
gepsa.latencyInMinute[idx] += elapsed
if hasError {
gepsa.QueryError++
}
}

// GetCacheStatus returns a GatewayEndPointCacheStatus representing the current gateway status.
func (gepsa *GatewayEndPointStatusAggregator) GetCacheStatus() *GatewayEndPointCacheStatus {
status := &GatewayEndPointCacheStatus{
Keyspace: gepsa.Keyspace,
Shard: gepsa.Shard,
TabletType: gepsa.TabletType,
Name: gepsa.Name,
Addr: gepsa.Addr,
}
gepsa.mu.RLock()
defer gepsa.mu.RUnlock()
status.QueryCount = gepsa.QueryCount
status.QueryError = gepsa.QueryError
var totalQuery uint64
for _, c := range gepsa.queryCountInMinute {
totalQuery += c
}
var totalLatency time.Duration
for _, d := range gepsa.latencyInMinute {
totalLatency += d
}
status.QPS = totalQuery / 60
if totalQuery > 0 {
status.AvgLatency = uint64(totalLatency.Nanoseconds()) / totalQuery / 100000
}
return status
}

// resetNextSlot resets the next tracking slot.
func (gepsa *GatewayEndPointStatusAggregator) resetNextSlot() {
gepsa.mu.Lock()
defer gepsa.mu.Unlock()
idx := (time.Now().Second() + 1) % 60
gepsa.queryCountInMinute[idx] = 0
gepsa.latencyInMinute[idx] = 0
}
5 changes: 5 additions & 0 deletions go/vt/vtgate/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ func (res *Resolver) Rollback(ctx context.Context, inSession *vtgatepb.Session)
return res.scatterConn.Rollback(ctx, NewSafeSession(inSession))
}

// GetGatewayCacheStatus returns a displayable version of the Gateway cache.
func (res *Resolver) GetGatewayCacheStatus() GatewayEndPointCacheStatusList {
return res.scatterConn.GetGatewayCacheStatus()
}

// StrsEquals compares contents of two string slices.
func StrsEquals(a, b []string) bool {
if len(a) != len(b) {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,11 @@ func (stc *ScatterConn) Close() error {
return stc.gateway.Close(context.Background())
}

// GetGatewayCacheStatus returns a displayable version of the Gateway cache.
func (stc *ScatterConn) GetGatewayCacheStatus() GatewayEndPointCacheStatusList {
return stc.gateway.CacheStatus()
}

// ScatterConnError is the ScatterConn specific error.
type ScatterConnError struct {
Code int
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/shardgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (sg *shardGateway) Close(ctx context.Context) error {
return nil
}

// CacheStatus returns a list of GatewayEndPointCacheStatus per endpoint.
func (sg *shardGateway) CacheStatus() GatewayEndPointCacheStatusList {
return nil
}

func (sg *shardGateway) getConnection(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType) *ShardConn {
sg.mu.Lock()
defer sg.mu.Unlock()
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type RegisterVTGate func(vtgateservice.VTGateService)
var RegisterVTGates []RegisterVTGate

// Init initializes VTGate server.
func Init(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, schema *planbuilder.Schema, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, tabletTypesToWait []topodatapb.TabletType, maxInFlight int, testGateway string) {
func Init(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, schema *planbuilder.Schema, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, tabletTypesToWait []topodatapb.TabletType, maxInFlight int, testGateway string) *VTGate {
if rpcVTGate != nil {
log.Fatalf("VTGate already initialized")
}
Expand Down Expand Up @@ -137,6 +137,7 @@ func Init(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoSer
for _, f := range RegisterVTGates {
f(rpcVTGate)
}
return rpcVTGate
}

// InitializeConnections pre-initializes VTGate by connecting to vttablets of all keyspace/shard/type.
Expand Down Expand Up @@ -638,6 +639,11 @@ func (vtg *VTGate) GetSrvShard(ctx context.Context, keyspace, shard string) (*to
return vtg.resolver.toposerv.GetSrvShard(ctx, vtg.resolver.cell, keyspace, shard)
}

// GetGatewayCacheStatus returns a displayable version of the Gateway cache.
func (vtg *VTGate) GetGatewayCacheStatus() GatewayEndPointCacheStatusList {
return vtg.resolver.GetGatewayCacheStatus()
}

// Any errors that are caused by VTGate dependencies (e.g, VtTablet) should be logged
// as errors in those components, but logged to Info in VTGate itself.
func logError(err error, query map[string]interface{}, logger *logutil.ThrottledLogger) {
Expand Down

0 comments on commit 6869c27

Please sign in to comment.