From a69303004a0b38ea0e803d9bc037f4e9f1c57f8f Mon Sep 17 00:00:00 2001 From: Liang Guo Date: Thu, 10 Dec 2015 15:07:38 -0800 Subject: [PATCH 1/2] Update Gateway to support tracking the status of its endpoints. --- go/cmd/vtgate/status.go | 81 +++++++++++++++------ go/cmd/vtgate/vtgate.go | 6 +- go/vt/vtgate/discoverygateway.go | 5 ++ go/vt/vtgate/gateway.go | 118 +++++++++++++++++++++++++++++++ go/vt/vtgate/resolver.go | 5 ++ go/vt/vtgate/scatter_conn.go | 5 ++ go/vt/vtgate/shardgateway.go | 5 ++ go/vt/vtgate/vtgate.go | 8 ++- 8 files changed, 211 insertions(+), 22 deletions(-) diff --git a/go/cmd/vtgate/status.go b/go/cmd/vtgate/status.go index 50b272be99b..70ddc90edbf 100644 --- a/go/cmd/vtgate/status.go +++ b/go/cmd/vtgate/status.go @@ -3,6 +3,7 @@ package main import ( "github.com/youtube/vitess/go/vt/servenv" _ "github.com/youtube/vitess/go/vt/status" + "github.com/youtube/vitess/go/vt/vtgate" ) var ( @@ -13,7 +14,7 @@ var ( } td, th { border: 1px solid #999; - padding: 0.5rem; + padding: 0.2rem; } @@ -192,6 +193,48 @@ google.setOnLoadCallback(function() { }); +` + + gatewayStatusTemplate = ` + +
+ + + + + + + + + + + {{range $i, $status := .}} + + + + + + + + + + + {{end}} +
KeyspaceShardTabletTypeAddressQuery SentQuery ErrorQPSLatency (ms)
{{$status.Keyspace}}{{$status.Shard}}{{$status.TabletType}}{{$status.Name}}{{$status.QueryCount}}{{$status.QueryError}}{{$status.QPS}}{{$status.AvgLatency}}
` healthCheckTemplate = ` @@ -201,7 +244,7 @@ google.setOnLoadCallback(function() { } td, th { border: 1px solid #999; - padding: 0.5rem; + padding: 0.2rem; } @@ -217,10 +260,10 @@ google.setOnLoadCallback(function() { {{range $i, $eps := .}} - - - - + + + + {{end}} @@ -231,19 +274,17 @@ google.setOnLoadCallback(function() { // For use by plugins which wish to avoid racing when registering status page parts. var onStatusRegistered func() -func init() { - servenv.OnRun(func() { - servenv.AddStatusPart("Topology Cache", topoTemplate, func() interface{} { - return resilientSrvTopoServer.CacheStatus() - }) - servenv.AddStatusPart("Health Check Cache (NOT FOR QUERY ROUTING)", healthCheckTemplate, func() interface{} { - return healthCheck.CacheStatus() - }) - servenv.AddStatusPart("Stats", statsTemplate, func() interface{} { - return nil - }) - if onStatusRegistered != nil { - onStatusRegistered() - } +func addStatusParts(vtgate *vtgate.VTGate) { + servenv.AddStatusPart("Topology Cache", topoTemplate, func() interface{} { + return resilientSrvTopoServer.CacheStatus() + }) + servenv.AddStatusPart("Gateway Status", gatewayStatusTemplate, func() interface{} { + return vtgate.GetGatewayCacheStatus() + }) + servenv.AddStatusPart("Health Check Cache (NOT FOR QUERY ROUTING)", healthCheckTemplate, func() interface{} { + return healthCheck.CacheStatus() }) + if onStatusRegistered != nil { + onStatusRegistered() + } } diff --git a/go/cmd/vtgate/vtgate.go b/go/cmd/vtgate/vtgate.go index 35663a5f263..db819bc36fd 100644 --- a/go/cmd/vtgate/vtgate.go +++ b/go/cmd/vtgate/vtgate.go @@ -99,6 +99,10 @@ startServer: tabletTypes = append(tabletTypes, tt) } } - vtgate.Init(healthCheck, ts, resilientSrvTopoServer, schema, *cell, *retryDelay, *retryCount, *connTimeoutTotal, *connTimeoutPerConn, *connLife, tabletTypes, *maxInFlight, *testGateway) + vtg := vtgate.Init(healthCheck, ts, resilientSrvTopoServer, schema, *cell, *retryDelay, *retryCount, *connTimeoutTotal, *connTimeoutPerConn, *connLife, tabletTypes, *maxInFlight, *testGateway) + + servenv.OnRun(func() { + addStatusParts(vtg) + }) servenv.RunDefault() } diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index 105e586e785..bd6e79a7381 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -183,6 +183,11 @@ func (dg *discoveryGateway) Close(ctx context.Context) error { return nil } +// CacheStatus returns a list of GatewayEndPointCacheStatus per endpoint. +func (dg *discoveryGateway) CacheStatus() GatewayEndPointCacheStatusList { + return nil +} + // StatsUpdate receives updates about target and realtime stats changes. func (dg *discoveryGateway) StatsUpdate(*discovery.EndPointStats) { } diff --git a/go/vt/vtgate/gateway.go b/go/vt/vtgate/gateway.go index 52b4871714f..d7661690d25 100644 --- a/go/vt/vtgate/gateway.go +++ b/go/vt/vtgate/gateway.go @@ -6,6 +6,8 @@ package vtgate import ( "flag" + "strings" + "sync" "time" log "github.com/golang/glog" @@ -56,6 +58,9 @@ type Gateway interface { // Close shuts down underlying connections. Close(ctx context.Context) error + + // CacheStatus returns a list of GatewayEndPointCacheStatus per endpoint. + CacheStatus() GatewayEndPointCacheStatusList } // GatewayCreator is the func which can create the actual gateway object. @@ -89,3 +94,116 @@ func GetGatewayCreatorByName(name string) GatewayCreator { } return gc } + +// GatewayEndPointCacheStatusList is a slice of GatewayEndPointCacheStatus. +type GatewayEndPointCacheStatusList []*GatewayEndPointCacheStatus + +// Len is part of sort.Interface. +func (gepcsl GatewayEndPointCacheStatusList) Len() int { + return len(gepcsl) +} + +// Less is part of sort.Interface. +func (gepcsl GatewayEndPointCacheStatusList) Less(i, j int) bool { + iKey := strings.Join([]string{gepcsl[i].Keyspace, gepcsl[i].Shard, string(gepcsl[i].TabletType), gepcsl[i].Name}, ".") + jKey := strings.Join([]string{gepcsl[j].Keyspace, gepcsl[j].Shard, string(gepcsl[j].TabletType), gepcsl[j].Name}, ".") + return iKey < jKey +} + +// Swap is part of sort.Interface. +func (gepcsl GatewayEndPointCacheStatusList) Swap(i, j int) { + gepcsl[i], gepcsl[j] = gepcsl[j], gepcsl[i] +} + +// GatewayEndPointCacheStatus contains the status per endpoint for a gateway. +type GatewayEndPointCacheStatus struct { + Keyspace string + Shard string + TabletType topodatapb.TabletType + Name string + Addr string + + QueryCount uint64 + QueryError uint64 + QPS uint64 + AvgLatency uint64 // in milliseconds +} + +// NewGatewayEndPointStatusAggregator creates a GatewayEndPointStatusAggregator. +func NewGatewayEndPointStatusAggregator() *GatewayEndPointStatusAggregator { + gepsa := &GatewayEndPointStatusAggregator{} + go func() { + ticker := time.NewTicker(time.Second) + for _ = range ticker.C { + gepsa.resetNextSlot() + } + }() + return gepsa +} + +// GatewayEndPointStatusAggregator tracks endpoint status for a gateway. +type GatewayEndPointStatusAggregator struct { + Keyspace string + Shard string + TabletType topodatapb.TabletType + Name string // the alternative name of an endpoint + Addr string // the host:port of an endpoint + + // mu protects below fields. + mu sync.RWMutex + QueryCount uint64 + QueryError uint64 + // for QPS and latency (avg value over a minute) + queryCountInMinute [60]uint64 + latencyInMinute [60]time.Duration +} + +// UpdateQueryInfo updates the aggregator with the given information about a query. +func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(elapsed time.Duration, hasError bool) { + gepsa.mu.Lock() + defer gepsa.mu.Unlock() + idx := time.Now().Second() % 60 + gepsa.QueryCount++ + gepsa.queryCountInMinute[idx]++ + gepsa.latencyInMinute[idx] += elapsed + if hasError { + gepsa.QueryError++ + } +} + +// GetCacheStatus returns a GatewayEndPointCacheStatus representing the current gateway status. +func (gepsa *GatewayEndPointStatusAggregator) GetCacheStatus() *GatewayEndPointCacheStatus { + status := &GatewayEndPointCacheStatus{ + Keyspace: gepsa.Keyspace, + Shard: gepsa.Shard, + TabletType: gepsa.TabletType, + Name: gepsa.Name, + Addr: gepsa.Addr, + } + gepsa.mu.RLock() + defer gepsa.mu.RUnlock() + status.QueryCount = gepsa.QueryCount + status.QueryError = gepsa.QueryError + var totalQuery uint64 + for _, c := range gepsa.queryCountInMinute { + totalQuery += c + } + var totalLatency time.Duration + for _, d := range gepsa.latencyInMinute { + totalLatency += d + } + status.QPS = totalQuery / 60 + if totalQuery > 0 { + status.AvgLatency = uint64(totalLatency.Nanoseconds()) / totalQuery / 100000 + } + return status +} + +// resetNextSlot resets the next tracking slot. +func (gepsa *GatewayEndPointStatusAggregator) resetNextSlot() { + gepsa.mu.Lock() + defer gepsa.mu.Unlock() + idx := (time.Now().Second() + 1) % 60 + gepsa.queryCountInMinute[idx] = 0 + gepsa.latencyInMinute[idx] = 0 +} diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go index df82595d63a..4c80f87c507 100644 --- a/go/vt/vtgate/resolver.go +++ b/go/vt/vtgate/resolver.go @@ -373,6 +373,11 @@ func (res *Resolver) Rollback(ctx context.Context, inSession *vtgatepb.Session) return res.scatterConn.Rollback(ctx, NewSafeSession(inSession)) } +// GetGatewayCacheStatus returns a displayable version of the Gateway cache. +func (res *Resolver) GetGatewayCacheStatus() GatewayEndPointCacheStatusList { + return res.scatterConn.GetGatewayCacheStatus() +} + // StrsEquals compares contents of two string slices. func StrsEquals(a, b []string) bool { if len(a) != len(b) { diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index c18fccb9b92..0e28b448481 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -542,6 +542,11 @@ func (stc *ScatterConn) Close() error { return stc.gateway.Close(context.Background()) } +// GetGatewayCacheStatus returns a displayable version of the Gateway cache. +func (stc *ScatterConn) GetGatewayCacheStatus() GatewayEndPointCacheStatusList { + return stc.gateway.CacheStatus() +} + // ScatterConnError is the ScatterConn specific error. type ScatterConnError struct { Code int diff --git a/go/vt/vtgate/shardgateway.go b/go/vt/vtgate/shardgateway.go index 0a7d263cce9..03f4f5b3d94 100644 --- a/go/vt/vtgate/shardgateway.go +++ b/go/vt/vtgate/shardgateway.go @@ -152,6 +152,11 @@ func (sg *shardGateway) Close(ctx context.Context) error { return nil } +// CacheStatus returns a list of GatewayEndPointCacheStatus per endpoint. +func (sg *shardGateway) CacheStatus() GatewayEndPointCacheStatusList { + return nil +} + func (sg *shardGateway) getConnection(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType) *ShardConn { sg.mu.Lock() defer sg.mu.Unlock() diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index b97e1e7db9a..458398714b2 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -96,7 +96,7 @@ type RegisterVTGate func(vtgateservice.VTGateService) var RegisterVTGates []RegisterVTGate // Init initializes VTGate server. -func Init(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, schema *planbuilder.Schema, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, tabletTypesToWait []topodatapb.TabletType, maxInFlight int, testGateway string) { +func Init(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, schema *planbuilder.Schema, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, tabletTypesToWait []topodatapb.TabletType, maxInFlight int, testGateway string) *VTGate { if rpcVTGate != nil { log.Fatalf("VTGate already initialized") } @@ -137,6 +137,7 @@ func Init(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoSer for _, f := range RegisterVTGates { f(rpcVTGate) } + return rpcVTGate } // InitializeConnections pre-initializes VTGate by connecting to vttablets of all keyspace/shard/type. @@ -638,6 +639,11 @@ func (vtg *VTGate) GetSrvShard(ctx context.Context, keyspace, shard string) (*to return vtg.resolver.toposerv.GetSrvShard(ctx, vtg.resolver.cell, keyspace, shard) } +// GetGatewayCacheStatus returns a displayable version of the Gateway cache. +func (vtg *VTGate) GetGatewayCacheStatus() GatewayEndPointCacheStatusList { + return vtg.resolver.GetGatewayCacheStatus() +} + // Any errors that are caused by VTGate dependencies (e.g, VtTablet) should be logged // as errors in those components, but logged to Info in VTGate itself. func logError(err error, query map[string]interface{}, logger *logutil.ThrottledLogger) { From 46ee07cdf96b3a79b627162659ff0bd0e19537ef Mon Sep 17 00:00:00 2001 From: Liang Guo Date: Fri, 11 Dec 2015 13:46:02 -0800 Subject: [PATCH 2/2] Update gateway status aggregator. --- go/vt/vtgate/gateway.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/gateway.go b/go/vt/vtgate/gateway.go index d7661690d25..b825d357479 100644 --- a/go/vt/vtgate/gateway.go +++ b/go/vt/vtgate/gateway.go @@ -159,9 +159,10 @@ type GatewayEndPointStatusAggregator struct { } // UpdateQueryInfo updates the aggregator with the given information about a query. -func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(elapsed time.Duration, hasError bool) { +func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(tabletType topodatapb.TabletType, elapsed time.Duration, hasError bool) { gepsa.mu.Lock() defer gepsa.mu.Unlock() + gepsa.TabletType = tabletType idx := time.Now().Second() % 60 gepsa.QueryCount++ gepsa.queryCountInMinute[idx]++
{{google3_third_party_golang_vitess_vtctld_srv_cell $eps.Cell}}{{google3_third_party_golang_vitess_vtctld_srv_keyspace $eps.Cell $eps.Target.Keyspace}}{{google3_third_party_golang_vitess_vtctld_srv_shard $eps.Cell $eps.Target.Keyspace $eps.Target.Shard}}{{google3_third_party_golang_vitess_vtctld_srv_type $eps.Cell $eps.Target.Keyspace $eps.Target.Shard $eps.Target.TabletType}}{{github_com_youtube_vitess_vtctld_srv_cell $eps.Cell}}{{github_com_youtube_vitess_vtctld_srv_keyspace $eps.Cell $eps.Target.Keyspace}}{{github_com_youtube_vitess_vtctld_srv_shard $eps.Cell $eps.Target.Keyspace $eps.Target.Shard}}{{github_com_youtube_vitess_vtctld_srv_type $eps.Cell $eps.Target.Keyspace $eps.Target.Shard $eps.Target.TabletType}} {{$eps.StatusAsHTML}}