Skip to content

Commit

Permalink
Add request failure code label to metrics (#1472)
Browse files Browse the repository at this point in the history
Co-authored-by: Chad Retz <chad.retz@gmail.com>
  • Loading branch information
Sushisource and cretz authored May 14, 2024
1 parent c69831e commit 3b68c6d
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 27 deletions.
13 changes: 5 additions & 8 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ type (
// worker options, the ones here wrap the ones in worker options. The same
// interceptor should not be set here and in worker options.
Interceptors []ClientInterceptor

// If set true, error code labels will not be included on request failure metrics.
DisableErrorCodeMetricTags bool
}

// HeadersProvider returns a map of gRPC headers that should be used on every request.
Expand Down Expand Up @@ -813,14 +816,8 @@ func newDialParameters(options *ClientOptions, excludeInternalFromRetry *atomic.
return dialParameters{
UserConnectionOptions: options.ConnectionOptions,
HostPort: options.HostPort,
RequiredInterceptors: requiredInterceptors(
options.MetricsHandler,
options.HeadersProvider,
options.TrafficController,
excludeInternalFromRetry,
options.Credentials,
),
DefaultServiceConfig: defaultServiceConfig,
RequiredInterceptors: requiredInterceptors(options, excludeInternalFromRetry),
DefaultServiceConfig: defaultServiceConfig,
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const (
OperationTagName = "operation"
CauseTagName = "cause"
WorkflowTaskFailureReason = "failure_reason"
RequestFailureCode = "status_code"
)

// Metric tag values
Expand Down
10 changes: 7 additions & 3 deletions internal/common/metrics/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type HandlerContextKey struct{}
type LongPollContextKey struct{}

// NewGRPCInterceptor creates a new gRPC unary interceptor to record metrics.
func NewGRPCInterceptor(defaultHandler Handler, suffix string) grpc.UnaryClientInterceptor {
func NewGRPCInterceptor(defaultHandler Handler, suffix string, disableRequestFailCodes bool) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
Expand Down Expand Up @@ -78,7 +78,7 @@ func NewGRPCInterceptor(defaultHandler Handler, suffix string) grpc.UnaryClientI
start := time.Now()
recordRequestStart(handler, longPoll, suffix)
err := invoker(ctx, method, req, reply, cc, opts...)
recordRequestEnd(handler, longPoll, suffix, start, err)
recordRequestEnd(handler, longPoll, suffix, start, err, disableRequestFailCodes)
return err
}
}
Expand All @@ -93,7 +93,7 @@ func recordRequestStart(handler Handler, longPoll bool, suffix string) {
handler.Counter(metric).Inc(1)
}

func recordRequestEnd(handler Handler, longPoll bool, suffix string, start time.Time, err error) {
func recordRequestEnd(handler Handler, longPoll bool, suffix string, start time.Time, err error, disableRequestFailCodes bool) {
// Record latency
timerMetric := TemporalRequestLatency
if longPoll {
Expand All @@ -109,6 +109,10 @@ func recordRequestEnd(handler Handler, longPoll bool, suffix string, start time.
failureMetric = TemporalLongRequestFailure
}
failureMetric += suffix
errStatus, _ := status.FromError(err)
if !disableRequestFailCodes {
handler = handler.WithTags(RequestFailureCodeTags(errStatus.Code()))
}
handler.Counter(failureMetric).Inc(1)

// If it's a resource exhausted, extract cause if present and increment
Expand Down
2 changes: 1 addition & 1 deletion internal/common/metrics/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestGRPCInterceptor(t *testing.T) {
handler := metrics.NewCapturingHandler()
cc, err := grpc.Dial(l.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(metrics.NewGRPCInterceptor(handler, "_my_suffix")))
grpc.WithUnaryInterceptor(metrics.NewGRPCInterceptor(handler, "_my_suffix", true)))
require.NoError(t, err)
defer func() { _ = cc.Close() }()
client := grpc_health_v1.NewHealthClient(cc)
Expand Down
56 changes: 56 additions & 0 deletions internal/common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@

package metrics

import (
"strconv"

"google.golang.org/grpc/codes"
)

// RootTags returns a set of base tags for all metrics.
func RootTags(namespace string) map[string]string {
return map[string]string{
Expand Down Expand Up @@ -94,3 +100,53 @@ func WorkflowTaskFailedTags(reason string) map[string]string {
WorkflowTaskFailureReason: reason,
}
}

// RequestFailureCodeTags returns a set of tags for a request failure.
func RequestFailureCodeTags(statusCode codes.Code) map[string]string {
asStr := canonicalString(statusCode)
return map[string]string{
RequestFailureCode: asStr,
}
}

// Annoyingly gRPC defines this, but does not expose it publicly.
func canonicalString(c codes.Code) string {
switch c {
case codes.OK:
return "OK"
case codes.Canceled:
return "CANCELLED"
case codes.Unknown:
return "UNKNOWN"
case codes.InvalidArgument:
return "INVALID_ARGUMENT"
case codes.DeadlineExceeded:
return "DEADLINE_EXCEEDED"
case codes.NotFound:
return "NOT_FOUND"
case codes.AlreadyExists:
return "ALREADY_EXISTS"
case codes.PermissionDenied:
return "PERMISSION_DENIED"
case codes.ResourceExhausted:
return "RESOURCE_EXHAUSTED"
case codes.FailedPrecondition:
return "FAILED_PRECONDITION"
case codes.Aborted:
return "ABORTED"
case codes.OutOfRange:
return "OUT_OF_RANGE"
case codes.Unimplemented:
return "UNIMPLEMENTED"
case codes.Internal:
return "INTERNAL"
case codes.Unavailable:
return "UNAVAILABLE"
case codes.DataLoss:
return "DATA_LOSS"
case codes.Unauthenticated:
return "UNAUTHENTICATED"
default:
return "CODE(" + strconv.FormatInt(int64(c), 10) + ")"
}
}
21 changes: 9 additions & 12 deletions internal/grpc_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,34 +145,31 @@ func dial(params dialParameters) (*grpc.ClientConn, error) {
}

func requiredInterceptors(
metricsHandler metrics.Handler,
headersProvider HeadersProvider,
controller TrafficController,
clientOptions *ClientOptions,
excludeInternalFromRetry *atomic.Bool,
credentials Credentials,
) []grpc.UnaryClientInterceptor {
interceptors := []grpc.UnaryClientInterceptor{
errorInterceptor,
// Report aggregated metrics for the call, this is done outside of the retry loop.
metrics.NewGRPCInterceptor(metricsHandler, ""),
metrics.NewGRPCInterceptor(clientOptions.MetricsHandler, "", clientOptions.DisableErrorCodeMetricTags),
// By default the grpc retry interceptor *is disabled*, preventing accidental use of retries.
// We add call options for retry configuration based on the values present in the context.
retry.NewRetryOptionsInterceptor(excludeInternalFromRetry),
// Performs retries *IF* retry options are set for the call.
grpc_retry.UnaryClientInterceptor(),
// Report metrics for every call made to the server.
metrics.NewGRPCInterceptor(metricsHandler, attemptSuffix),
metrics.NewGRPCInterceptor(clientOptions.MetricsHandler, attemptSuffix, clientOptions.DisableErrorCodeMetricTags),
}
if headersProvider != nil {
interceptors = append(interceptors, headersProviderInterceptor(headersProvider))
if clientOptions.HeadersProvider != nil {
interceptors = append(interceptors, headersProviderInterceptor(clientOptions.HeadersProvider))
}
if controller != nil {
interceptors = append(interceptors, trafficControllerInterceptor(controller))
if clientOptions.TrafficController != nil {
interceptors = append(interceptors, trafficControllerInterceptor(clientOptions.TrafficController))
}
// Add credentials interceptor. This is intentionally added after headers
// provider to overwrite anything set there.
if credentials != nil {
if interceptor := credentials.gRPCInterceptor(); interceptor != nil {
if clientOptions.Credentials != nil {
if interceptor := clientOptions.Credentials.gRPCInterceptor(); interceptor != nil {
interceptors = append(interceptors, interceptor)
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ func TestHeadersProvider_Error(t *testing.T) {
}

func TestHeadersProvider_NotIncludedWhenNil(t *testing.T) {
interceptors := requiredInterceptors(nil, nil, nil, nil, nil)
interceptors := requiredInterceptors(&ClientOptions{}, nil)
require.Equal(t, 5, len(interceptors))
}

func TestHeadersProvider_IncludedWithHeadersProvider(t *testing.T) {
interceptors := requiredInterceptors(nil,
authHeadersProvider{token: "test-auth-token"}, nil, nil, nil)
opts := &ClientOptions{HeadersProvider: authHeadersProvider{token: "test-auth-token"}}
interceptors := requiredInterceptors(opts, nil)
require.Equal(t, 6, len(interceptors))
}

Expand Down
12 changes: 12 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4828,6 +4828,18 @@ func (ts *IntegrationTestSuite) TestNondeterministicUpdateRegistertion() {
ts.EqualValues(expected, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestRequestFailureMetric() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Unset namespace field will cause an invalid argument error
_, _ = ts.client.WorkflowService().DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{})

ts.assertMetricCount(metrics.TemporalRequestFailure, 1,
metrics.OperationTagName, "DescribeNamespace",
metrics.RequestFailureCode, "INVALID_ARGUMENT")
}

// executeWorkflow executes a given workflow and waits for the result
func (ts *IntegrationTestSuite) executeWorkflow(
wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{},
Expand Down

0 comments on commit 3b68c6d

Please sign in to comment.