Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer/rls: Add picker metrics #7484

Merged
merged 5 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
Expand Down Expand Up @@ -77,6 +78,28 @@ var (
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.default_target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"},
Default: false,
easwars marked this conversation as resolved.
Show resolved Hide resolved
})
targetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default target is also returned by the RLS server, RPCs sent to that target from the cache will be counted in this metric, not in grpc.rls.default_target_picks.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"},
Default: false,
})
failedPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.failed_picks",
Description: "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the RLS channel being throttled.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target"},
Default: false,
})
)

func init() {
Expand Down Expand Up @@ -490,15 +513,17 @@ func (b *rlsBalancer) sendNewPickerLocked() {
if b.defaultPolicy != nil {
b.defaultPolicy.acquireRef()
}

picker := &rlsPicker{
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
rlsServerTarget: b.lbCfg.lookupService,
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
Expand Down
74 changes: 51 additions & 23 deletions balancer/rls/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@

// The picker is given its own copy of the below fields from the RLS LB policy
// to avoid having to grab the mutex on the latter.
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
rlsServerTarget string
easwars marked this conversation as resolved.
Show resolved Hide resolved
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
}

// isFullMethodNameValid return true if name is of the form `/service/method`.
Expand All @@ -85,7 +86,13 @@
reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)

p.lb.cacheMu.Lock()
defer p.lb.cacheMu.Unlock()
var pr balancer.PickResult
var err error
metricsCallback := func() {}
defer func() {
p.lb.cacheMu.Unlock()
metricsCallback()
}()
easwars marked this conversation as resolved.
Show resolved Hide resolved

// Lookup data cache and pending request map using request path and keys.
cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
Expand All @@ -98,7 +105,8 @@
case dcEntry == nil && pendingEntry == nil:
throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
return pr, err
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

Expand All @@ -113,8 +121,8 @@
p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
}
// Delegate to child policies.
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
return pr, err
}

// We get here only if the data cache entry has expired. If entry is in
Expand All @@ -126,21 +134,23 @@
// message received from the control plane is still fine, as it could be
// useful for debugging purposes.
st := dcEntry.status
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
return pr, err
}

// We get here only if the entry has expired and is not in backoff.
throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
return pr, err
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

// Data cache hit. Pending request exists.
default:
if dcEntry.expiryTime.After(now) {
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
return pr, err

Check warning on line 153 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L152-L153

Added lines #L152 - L153 were not covered by tests
}
// Data cache entry has expired and pending request exists. Queue pick.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
Expand All @@ -150,8 +160,9 @@
// delegateToChildPoliciesLocked is a helper function which iterates through the
// list of child policy wrappers in a cache entry and attempts to find a child
// policy to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. Returns
// a function to be invoked to record metrics.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, func(), error) {
const rlsDataHeaderName = "x-google-rls-data"
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
Expand All @@ -164,29 +175,46 @@
// X-Google-RLS-Data header.
res, err := state.Picker.Pick(info)
if err != nil {
return res, err
return res, func() {
targetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, cpw.target, "fail")
}, err
}

if res.Metadata == nil {
res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
} else {
res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
}
return res, nil
return res, func() {
targetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, cpw.target, "complete")
}, nil
}
}

// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
return balancer.PickResult{}, func() {}, balancer.ErrNoSubConnAvailable

Check warning on line 196 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L196

Added line #L196 was not covered by tests
}

// useDefaultPickIfPossible is a helper method which delegates to the default
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
// target if one is configured, or fails the pick with the given error. Returns
// a function to be invoked to record metrics.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, func(), error) {
if p.defaultPolicy != nil {
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
res, err := state.Picker.Pick(info)
pr := "complete"
if err != nil {
pr = "fail"
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
return res, func() {
defaultTargetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, p.defaultPolicy.target, pr)
}, err
}
return balancer.PickResult{}, errOnNoDefault

return balancer.PickResult{}, func() {
failedPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget)
}, errOnNoDefault
}

// sendRouteLookupRequestLocked adds an entry to the pending request map and
Expand Down
Loading