Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer/rls: Add picker metrics #7484

Merged
merged 5 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
Expand Down Expand Up @@ -77,6 +78,28 @@ var (
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.default_target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"},
Default: false,
easwars marked this conversation as resolved.
Show resolved Hide resolved
})
targetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default target is also returned by the RLS server, RPCs sent to that target from the cache will be counted in this metric, not in grpc.rls.default_target_picks.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"},
Default: false,
})
failedPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.failed_picks",
Description: "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the RLS channel being throttled.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target"},
Default: false,
})
)

func init() {
Expand Down Expand Up @@ -490,15 +513,19 @@ func (b *rlsBalancer) sendNewPickerLocked() {
if b.defaultPolicy != nil {
b.defaultPolicy.acquireRef()
}

picker := &rlsPicker{
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
rlsServerTarget: b.lbCfg.lookupService,
grpcTarget: b.bopts.Target.String(),
metricsRecorder: b.bopts.MetricsRecorder,
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
Expand Down
84 changes: 61 additions & 23 deletions balancer/rls/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"google.golang.org/grpc/balancer/rls/internal/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -61,12 +62,15 @@

// The picker is given its own copy of the below fields from the RLS LB policy
// to avoid having to grab the mutex on the latter.
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
rlsServerTarget string
easwars marked this conversation as resolved.
Show resolved Hide resolved
grpcTarget string
metricsRecorder estats.MetricsRecorder
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
}

// isFullMethodNameValid return true if name is of the form `/service/method`.
Expand All @@ -85,7 +89,13 @@
reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)

p.lb.cacheMu.Lock()
defer p.lb.cacheMu.Unlock()
var pr balancer.PickResult
var err error
metricsCallback := func() {}
defer func() {
p.lb.cacheMu.Unlock()
metricsCallback()
}()
easwars marked this conversation as resolved.
Show resolved Hide resolved

// Lookup data cache and pending request map using request path and keys.
cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
Expand All @@ -98,7 +108,8 @@
case dcEntry == nil && pendingEntry == nil:
throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
return pr, err
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

Expand All @@ -113,8 +124,8 @@
p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
}
// Delegate to child policies.
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
return pr, err
}

// We get here only if the data cache entry has expired. If entry is in
Expand All @@ -126,21 +137,23 @@
// message received from the control plane is still fine, as it could be
// useful for debugging purposes.
st := dcEntry.status
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
return pr, err
}

// We get here only if the entry has expired and is not in backoff.
throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
return pr, err
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

// Data cache hit. Pending request exists.
default:
if dcEntry.expiryTime.After(now) {
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
return pr, err

Check warning on line 156 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L155-L156

Added lines #L155 - L156 were not covered by tests
}
// Data cache entry has expired and pending request exists. Queue pick.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
Expand All @@ -150,8 +163,9 @@
// delegateToChildPoliciesLocked is a helper function which iterates through the
// list of child policy wrappers in a cache entry and attempts to find a child
// policy to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. Returns
// a function to be invoked to record metrics.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, func(), error) {
const rlsDataHeaderName = "x-google-rls-data"
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
Expand All @@ -164,29 +178,53 @@
// X-Google-RLS-Data header.
res, err := state.Picker.Pick(info)
if err != nil {
return res, err
pr := "fail"
if _, ok := status.FromError(err); ok {
pr = "drop"

Check warning on line 183 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L183

Added line #L183 was not covered by tests
}
return res, func() {
targetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, cpw.target, pr)
}, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making a helper since this logic is used in two places now:

// errToPickResult is a helper function which converts the error value returned
// by Pick() to a string that represents the pick result.
func errToPickResult(err error) string {
	if err == nil {
		return "complete"
	}
	if errors.Is(err, balancer.ErrNoSubConnAvailable) {
		return "queue"
	}
	if _, ok := status.FromError(err); ok {
		return "drop"
	}
	return "fail"
}

And we need to handle the balancer.ErrNoSubConnAvailable case in your code, don't we? Otherwise, we will wrongly mark picks that a child policy wanted to be queued as "fail".

And if the decision is to not record a metric for queued picks, we need to make sure that we ignore that explicitly after we handle the balancer.ErrNoSubConnAvailable case, as in the helper I described above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh for some reason I thought the balancer.ErrNoSubConnAvailable was only possible to be emitted from this RLS Layer. I forgot this could emit from the child as well in these error cases. I will try your helper out.

}

if res.Metadata == nil {
res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
} else {
res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
}
return res, nil
return res, func() {
targetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, cpw.target, "complete")
}, nil
}
}

// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
return balancer.PickResult{}, func() {}, balancer.ErrNoSubConnAvailable

Check warning on line 203 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L203

Added line #L203 was not covered by tests
}

// useDefaultPickIfPossible is a helper method which delegates to the default
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
// target if one is configured, or fails the pick with the given error. Returns
// a function to be invoked to record metrics.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, func(), error) {
if p.defaultPolicy != nil {
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
res, err := state.Picker.Pick(info)
pr := "complete"
if err != nil {
pr = "fail"
if _, ok := status.FromError(err); ok {
pr = "drop"

Check warning on line 217 in balancer/rls/picker.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/picker.go#L217

Added line #L217 was not covered by tests
}
}
return res, func() {
defaultTargetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, p.defaultPolicy.target, pr)
}, err
}
return balancer.PickResult{}, errOnNoDefault

return balancer.PickResult{}, func() {
failedPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget)
}, errOnNoDefault
}

// sendRouteLookupRequestLocked adds an entry to the pending request map and
Expand Down
Loading