Skip to content

Commit

Permalink
balancer/rls: Add cache metrics (#7495)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Aug 14, 2024
1 parent c8951ab commit 9706bf8
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 25 deletions.
27 changes: 20 additions & 7 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ var (
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
Name: "grpc.lb.rls.cache_entries",
Description: "EXPERIMENTAL. Number of entries in the RLS cache.",
Unit: "entry",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
Default: false,
})
cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
Name: "grpc.lb.rls.cache_size",
Description: "EXPERIMENTAL. The current size of the RLS cache.",
Unit: "By",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
Default: false,
})
defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.default_target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
Expand Down Expand Up @@ -126,7 +140,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.MetricsRecorder, opts.Target.String())
lb.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
Expand Down Expand Up @@ -317,18 +331,17 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
b.stateMu.Unlock()
<-done

// We cannot do cache operations above because `cacheMu` needs to be grabbed
// before `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.updateRLSServerTarget(newCfg.lookupService)
if resizeCache {
// If the new config changes reduces the size of the data cache, we
// might have to evict entries to get the cache size down to the newly
// specified size.
//
// And we cannot do this operation above (where we compute the
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
b.cacheMu.Unlock()
}
b.cacheMu.Unlock()
return nil
}

Expand Down
49 changes: 36 additions & 13 deletions balancer/rls/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"container/list"
"time"

"github.com/google/uuid"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/backoff"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -163,22 +165,40 @@ func (l *lru) getLeastRecentlyUsed() cacheKey {
//
// It is not safe for concurrent access.
type dataCache struct {
maxSize int64 // Maximum allowed size.
currentSize int64 // Current size.
keys *lru // Cache keys maintained in lru order.
entries map[cacheKey]*cacheEntry
logger *internalgrpclog.PrefixLogger
shutdown *grpcsync.Event
maxSize int64 // Maximum allowed size.
currentSize int64 // Current size.
keys *lru // Cache keys maintained in lru order.
entries map[cacheKey]*cacheEntry
logger *internalgrpclog.PrefixLogger
shutdown *grpcsync.Event
rlsServerTarget string

// Read only after initialization.
grpcTarget string
uuid string
metricsRecorder estats.MetricsRecorder
}

func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache {
return &dataCache{
maxSize: size,
keys: newLRU(),
entries: make(map[cacheKey]*cacheEntry),
logger: logger,
shutdown: grpcsync.NewEvent(),
func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, metricsRecorder estats.MetricsRecorder, grpcTarget string) *dataCache {
dc := &dataCache{
maxSize: size,
keys: newLRU(),
entries: make(map[cacheKey]*cacheEntry),
logger: logger,
shutdown: grpcsync.NewEvent(),
grpcTarget: grpcTarget,
uuid: uuid.New().String(),
metricsRecorder: metricsRecorder,
}
cacheSizeMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid)
cacheEntriesMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid)
return dc
}

// updateRLSServerTarget updates the RLS Server Target the RLS Balancer is
// configured with.
func (dc *dataCache) updateRLSServerTarget(rlsServerTarget string) {
dc.rlsServerTarget = rlsServerTarget
}

// resize changes the maximum allowed size of the data cache.
Expand Down Expand Up @@ -319,6 +339,7 @@ func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) {
dc.currentSize -= entry.size
entry.size = newSize
dc.currentSize += entry.size
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
}

func (dc *dataCache) getEntry(key cacheKey) *cacheEntry {
Expand Down Expand Up @@ -351,6 +372,8 @@ func (dc *dataCache) deleteAndCleanup(key cacheKey, entry *cacheEntry) {
delete(dc.entries, key)
dc.currentSize -= entry.size
dc.keys.removeEntry(key)
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
}

func (dc *dataCache) stop() {
Expand Down
11 changes: 6 additions & 5 deletions balancer/rls/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/testutils/stats"
)

var (
Expand Down Expand Up @@ -119,7 +120,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) {

func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -133,7 +134,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) {

func (s) TestDataCache_AddForcesResize(t *testing.T) {
initCacheEntries()
dc := newDataCache(1, nil)
dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "")

// The first entry in cacheEntries has a minimum expiry time in the future.
// This entry would stop the resize operation since we do not evict entries
Expand Down Expand Up @@ -162,7 +163,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) {

func (s) TestDataCache_Resize(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down Expand Up @@ -193,7 +194,7 @@ func (s) TestDataCache_Resize(t *testing.T) {

func (s) TestDataCache_EvictExpiredEntries(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -220,7 +221,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
}

initCacheEntries()
dc := newDataCache(5, nil)
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down
14 changes: 14 additions & 0 deletions internal/testutils/stats/test_metrics_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,17 @@ func (r *TestMetricsRecorder) TagConn(ctx context.Context, _ *stats.ConnTagInfo)
}

func (r *TestMetricsRecorder) HandleConn(context.Context, stats.ConnStats) {}

// NoopMetricsRecorder is a noop MetricsRecorder to be used in tests to prevent
// nil panics.
type NoopMetricsRecorder struct{}

func (r *NoopMetricsRecorder) RecordInt64Count(*estats.Int64CountHandle, int64, ...string) {}

func (r *NoopMetricsRecorder) RecordFloat64Count(*estats.Float64CountHandle, float64, ...string) {}

func (r *NoopMetricsRecorder) RecordInt64Histo(*estats.Int64HistoHandle, int64, ...string) {}

func (r *NoopMetricsRecorder) RecordFloat64Histo(*estats.Float64HistoHandle, float64, ...string) {}

func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64, ...string) {}

0 comments on commit 9706bf8

Please sign in to comment.