Skip to content

Commit

Permalink
Add RLS Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Aug 6, 2024
1 parent c8716e5 commit 439a6ad
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 18 deletions.
72 changes: 64 additions & 8 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"time"
"unsafe"

"github.com/google/uuid"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
Expand Down Expand Up @@ -77,6 +79,42 @@ var (
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
Name: "grpc.lb.rls.cache_entries",
Description: "EXPERIMENTAL. Number of entries in the RLS cache.",
Unit: "entry",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
Default: false,
})
cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
Name: "grpc.lb.rls.cache_size",
Description: "EXPERIMENTAL. The current size of the RLS cache.",
Unit: "By",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
Default: false,
})
defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.default_target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid", "grpc.lb.pick_result"},
Default: false,
})
targetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default target is also returned by the RLS server, RPCs sent to that target from the cache will be counted in this metric, not in grpc.rls.default_target_picks.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid", "grpc.lb.pick_result"},
Default: false,
})
failedPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.failed_picks",
Description: "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the RLS channel being throttled.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target"},
Default: false,
})
)

func init() {
Expand All @@ -95,6 +133,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
done: grpcsync.NewEvent(),
cc: cc,
bopts: opts,
uuid: uuid.New().String(),
purgeTicker: dataCachePurgeTicker(),
dataCachePurgeHook: dataCachePurgeHook,
lbCfg: &lbConfig{},
Expand Down Expand Up @@ -122,6 +161,7 @@ type rlsBalancer struct {
done *grpcsync.Event // Fires when Close() is done.
cc balancer.ClientConn
bopts balancer.BuildOptions
uuid string
purgeTicker *time.Ticker
dataCachePurgeHook func()
logger *internalgrpclog.PrefixLogger
Expand Down Expand Up @@ -240,7 +280,16 @@ func (b *rlsBalancer) purgeDataCache(doneCh chan struct{}) {
case <-b.purgeTicker.C:
b.cacheMu.Lock()
updatePicker := b.dataCache.evictExpiredEntries()

b.stateMu.Lock()
rlsLookupService := b.lbCfg.lookupService
b.stateMu.Unlock()
cacheSize := b.dataCache.currentSize
cacheEntries := int64(len(b.dataCache.entries))
b.cacheMu.Unlock()
grpcTarget := b.bopts.Target.String()
cacheSizeMetric.Record(b.bopts.MetricsRecorder, cacheSize, grpcTarget, rlsLookupService, b.uuid)
cacheEntriesMetric.Record(b.bopts.MetricsRecorder, cacheEntries, grpcTarget, rlsLookupService, b.uuid)
if updatePicker {
b.sendNewPicker()
}
Expand Down Expand Up @@ -304,7 +353,12 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
dataCacheSize := b.dataCache.currentSize
dataCacheEntries := int64(len(b.dataCache.entries))
b.cacheMu.Unlock()
grpcTarget := b.bopts.Target.String()
cacheSizeMetric.Record(b.bopts.MetricsRecorder, dataCacheSize, grpcTarget, newCfg.lookupService, b.uuid)
cacheEntriesMetric.Record(b.bopts.MetricsRecorder, dataCacheEntries, grpcTarget, newCfg.lookupService, b.uuid)
}
return nil
}
Expand Down Expand Up @@ -490,15 +544,17 @@ func (b *rlsBalancer) sendNewPickerLocked() {
if b.defaultPolicy != nil {
b.defaultPolicy.acquireRef()
}

picker := &rlsPicker{
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
rlsServerTarget: b.lbCfg.lookupService,
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
Expand Down
3 changes: 2 additions & 1 deletion balancer/rls/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func (l *lru) getLeastRecentlyUsed() cacheKey {
//
// It is not safe for concurrent access.
type dataCache struct {
maxSize int64 // Maximum allowed size.
maxSize int64 // Maximum allowed size.

currentSize int64 // Current size.
keys *lru // Cache keys maintained in lru order.
entries map[cacheKey]*cacheEntry
Expand Down
33 changes: 24 additions & 9 deletions balancer/rls/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ type rlsPicker struct {

// The picker is given its own copy of the below fields from the RLS LB policy
// to avoid having to grab the mutex on the latter.
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
rlsServerTarget string
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
}

// isFullMethodNameValid return true if name is of the form `/service/method`.
Expand Down Expand Up @@ -164,8 +165,11 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala
// X-Google-RLS-Data header.
res, err := state.Picker.Pick(info)
if err != nil {
targetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, cpw.target, "fail")
return res, err
}
targetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, cpw.target, "complete")

if res.Metadata == nil {
res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
} else {
Expand All @@ -174,6 +178,8 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala
return res, nil
}
}

failedPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget)

Check warning on line 182 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L182

Added line #L182 was not covered by tests
// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
Expand All @@ -184,8 +190,16 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
if p.defaultPolicy != nil {
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
res, err := state.Picker.Pick(info)
pr := "complete"
if err != nil {
pr = "fail"
}
defaultTargetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, p.defaultPolicy.target, pr)
return res, err
}

failedPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget)
return balancer.PickResult{}, errOnNoDefault
}

Expand Down Expand Up @@ -218,8 +232,9 @@ func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []strin

p.lb.cacheMu.Lock()
defer func() {
// Pending request map entry is unconditionally deleted since the request is
// no longer pending.
grpcTarget := p.lb.bopts.Target.String()
cacheSizeMetric.Record(p.lb.bopts.MetricsRecorder, p.lb.dataCache.currentSize, grpcTarget, p.rlsServerTarget, p.lb.uuid)
cacheEntriesMetric.Record(p.lb.bopts.MetricsRecorder, int64(len(p.lb.dataCache.entries)), grpcTarget, p.rlsServerTarget, p.lb.uuid)
p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
delete(p.lb.pendingMap, cacheKey)
p.lb.sendNewPicker()
Expand Down

0 comments on commit 439a6ad

Please sign in to comment.