Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer/rls: Add picker metrics #7484

Merged
merged 5 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
Expand Down Expand Up @@ -77,6 +78,28 @@ var (
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.default_target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"},
Default: false,
easwars marked this conversation as resolved.
Show resolved Hide resolved
})
targetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default target is also returned by the RLS server, RPCs sent to that target from the cache will be counted in this metric, not in grpc.rls.default_target_picks.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"},
Default: false,
})
failedPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.failed_picks",
Description: "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the RLS channel being throttled.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target"},
Default: false,
})
)

func init() {
Expand Down Expand Up @@ -490,15 +513,19 @@ func (b *rlsBalancer) sendNewPickerLocked() {
if b.defaultPolicy != nil {
b.defaultPolicy.acquireRef()
}

picker := &rlsPicker{
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
rlsServerTarget: b.lbCfg.lookupService,
grpcTarget: b.bopts.Target.String(),
metricsRecorder: b.bopts.MetricsRecorder,
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
Expand Down
102 changes: 79 additions & 23 deletions balancer/rls/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"google.golang.org/grpc/balancer/rls/internal/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -61,12 +62,15 @@

// The picker is given its own copy of the below fields from the RLS LB policy
// to avoid having to grab the mutex on the latter.
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
rlsServerTarget string
easwars marked this conversation as resolved.
Show resolved Hide resolved
grpcTarget string
metricsRecorder estats.MetricsRecorder
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
}

// isFullMethodNameValid return true if name is of the form `/service/method`.
Expand All @@ -85,7 +89,17 @@
reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)

p.lb.cacheMu.Lock()
defer p.lb.cacheMu.Unlock()
var pr balancer.PickResult
var err error

// Record metrics without the cache mutex held, to prevent lock contention
// between concurrent RPC's and their Pick calls. Metrics Recording can
// potentially be expensive.
metricsCallback := func() {}
defer func() {
p.lb.cacheMu.Unlock()
metricsCallback()
}()

// Lookup data cache and pending request map using request path and keys.
cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
Expand All @@ -98,7 +112,8 @@
case dcEntry == nil && pendingEntry == nil:
throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
return pr, err
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

Expand All @@ -113,8 +128,8 @@
p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
}
// Delegate to child policies.
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
return pr, err
}

// We get here only if the data cache entry has expired. If entry is in
Expand All @@ -126,32 +141,50 @@
// message received from the control plane is still fine, as it could be
// useful for debugging purposes.
st := dcEntry.status
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
return pr, err
}

// We get here only if the entry has expired and is not in backoff.
throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
return pr, err
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

// Data cache hit. Pending request exists.
default:
if dcEntry.expiryTime.After(now) {
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
return pr, err

Check warning on line 160 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L159-L160

Added lines #L159 - L160 were not covered by tests
}
// Data cache entry has expired and pending request exists. Queue pick.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}

// errToPickResult is a helper function which converts the error value returned
// by Pick() to a string that represents the pick result.
func errToPickResult(err error) string {
if err == nil {
return "complete"
}
if errors.Is(err, balancer.ErrNoSubConnAvailable) {
return "queue"
}
if _, ok := status.FromError(err); ok {
return "drop"

Check warning on line 177 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L177

Added line #L177 was not covered by tests
}
return "fail"
}

// delegateToChildPoliciesLocked is a helper function which iterates through the
// list of child policy wrappers in a cache entry and attempts to find a child
// policy to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. Returns
// a function to be invoked to record metrics.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, func(), error) {
const rlsDataHeaderName = "x-google-rls-data"
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
Expand All @@ -164,29 +197,52 @@
// X-Google-RLS-Data header.
res, err := state.Picker.Pick(info)
if err != nil {
return res, err
pr := errToPickResult(err)
return res, func() {
if pr == "queue" {
// Don't record metrics for queued Picks.
return
}
targetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, cpw.target, pr)
}, err
}

if res.Metadata == nil {
res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
} else {
res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
}
return res, nil
return res, func() {
targetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, cpw.target, "complete")
}, nil
}
}

// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
return balancer.PickResult{}, func() {}, balancer.ErrNoSubConnAvailable

Check warning on line 223 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L223

Added line #L223 was not covered by tests
}

// useDefaultPickIfPossible is a helper method which delegates to the default
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
// target if one is configured, or fails the pick with the given error. Returns
// a function to be invoked to record metrics.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, func(), error) {
if p.defaultPolicy != nil {
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
res, err := state.Picker.Pick(info)
pr := errToPickResult(err)
return res, func() {
if pr == "queue" {
// Don't record metrics for queued Picks.
return
}
defaultTargetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, p.defaultPolicy.target, pr)
}, err
}
return balancer.PickResult{}, errOnNoDefault

return balancer.PickResult{}, func() {
failedPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget)
}, errOnNoDefault
}

// sendRouteLookupRequestLocked adds an entry to the pending request map and
Expand Down
Loading