Skip to content

Commit

Permalink
OTLP HTTP Exporter: Propagate HTTP 429s (#9905)
Browse files Browse the repository at this point in the history
Changes otlphttp status code handling to propagate the error code as a
grpc status code. This follows the logic that was implemented for the
http receiver
[here](https://github.com/open-telemetry/opentelemetry-collector/pull/9893/files).

Fixes #9892

**Testing:** local tests were updated, going to test a local build.
  • Loading branch information
jaronoff97 authored Apr 18, 2024
1 parent fcdfdaa commit dc48d0e
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 106 deletions.
25 changes: 25 additions & 0 deletions .chloggen/fix-429-logic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlphttpexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fixes a bug that was preventing the otlp http exporter from propagating status.

# One or more tracking issues or pull requests related to the change
issues: [9892]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 5 additions & 2 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/httphelper"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -184,16 +185,18 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p
respStatus := readResponseStatus(resp)

// Format the error message. Use the status if it is present in the response.
var errString string
var formattedErr error
if respStatus != nil {
formattedErr = fmt.Errorf(
errString = fmt.Sprintf(
"error exporting items, request to %s responded with HTTP Status Code %d, Message=%s, Details=%v",
url, resp.StatusCode, respStatus.Message, respStatus.Details)
} else {
formattedErr = fmt.Errorf(
errString = fmt.Sprintf(
"error exporting items, request to %s responded with HTTP Status Code %d",
url, resp.StatusCode)
}
formattedErr = httphelper.NewStatusFromMsgAndHTTPCode(errString, resp.StatusCode).Err()

if isRetryableStatusCode(resp.StatusCode) {
// A retry duration of 0 seconds will trigger the default backoff policy
Expand Down
16 changes: 8 additions & 8 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ func TestErrorResponses(t *testing.T) {
isPermErr: true,
},
{
name: "419",
name: "429",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
responseBody: status.New(codes.ResourceExhausted, "Quota exceeded"),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]"),
status.New(codes.ResourceExhausted, errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]").Err(),
time.Duration(0)*time.Second)
},
},
Expand All @@ -149,7 +149,7 @@ func TestErrorResponses(t *testing.T) {
responseBody: status.New(codes.InvalidArgument, "Bad gateway"),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix(srv)+"502, Message=Bad gateway, Details=[]"),
status.New(codes.Unavailable, errMsgPrefix(srv)+"502, Message=Bad gateway, Details=[]").Err(),
time.Duration(0)*time.Second)
},
},
Expand All @@ -159,7 +159,7 @@ func TestErrorResponses(t *testing.T) {
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]"),
status.New(codes.Unavailable, errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]").Err(),
time.Duration(0)*time.Second)
},
},
Expand All @@ -170,7 +170,7 @@ func TestErrorResponses(t *testing.T) {
headers: map[string]string{"Retry-After": "30"},
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]"),
status.New(codes.Unavailable, errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]").Err(),
time.Duration(30)*time.Second)
},
},
Expand All @@ -180,7 +180,7 @@ func TestErrorResponses(t *testing.T) {
responseBody: status.New(codes.InvalidArgument, "Gateway timeout"),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix(srv)+"504, Message=Gateway timeout, Details=[]"),
status.New(codes.Unavailable, errMsgPrefix(srv)+"504, Message=Gateway timeout, Details=[]").Err(),
time.Duration(0)*time.Second)
},
},
Expand All @@ -190,7 +190,7 @@ func TestErrorResponses(t *testing.T) {
responseBody: status.New(codes.InvalidArgument, strings.Repeat("a", maxHTTPResponseReadBytes+1)),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix(srv)+"503"),
status.New(codes.Unavailable, errMsgPrefix(srv)+"503").Err(),
time.Duration(0)*time.Second)
},
},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.63.2
)

require (
Expand Down Expand Up @@ -64,7 +65,6 @@ require (
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
36 changes: 36 additions & 0 deletions internal/httphelper/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package httphelper // import "go.opentelemetry.io/collector/internal/httphelper"

import (
"net/http"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// NewStatusFromMsgAndHTTPCode returns a gRPC status based on an error message string and a http status code.
// This function is shared between the http receiver and http exporter for error propagation.
func NewStatusFromMsgAndHTTPCode(errMsg string, statusCode int) *status.Status {
var c codes.Code
// Mapping based on https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
// 429 mapping to ResourceExhausted and 400 mapping to StatusBadRequest are exceptions.
switch statusCode {
case http.StatusBadRequest:
c = codes.InvalidArgument
case http.StatusUnauthorized:
c = codes.Unauthenticated
case http.StatusForbidden:
c = codes.PermissionDenied
case http.StatusNotFound:
c = codes.Unimplemented
case http.StatusTooManyRequests:
c = codes.ResourceExhausted
case http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
c = codes.Unavailable
default:
c = codes.Unknown
}
return status.New(c, errMsg)
}
83 changes: 83 additions & 0 deletions internal/httphelper/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package httphelper

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func Test_ErrorMsgAndHTTPCodeToStatus(t *testing.T) {
tests := []struct {
name string
errMsg string
statusCode int
expected *status.Status
}{
{
name: "Bad Request",
errMsg: "test",
statusCode: http.StatusBadRequest,
expected: status.New(codes.InvalidArgument, "test"),
},
{
name: "Unauthorized",
errMsg: "test",
statusCode: http.StatusUnauthorized,
expected: status.New(codes.Unauthenticated, "test"),
},
{
name: "Forbidden",
errMsg: "test",
statusCode: http.StatusForbidden,
expected: status.New(codes.PermissionDenied, "test"),
},
{
name: "Not Found",
errMsg: "test",
statusCode: http.StatusNotFound,
expected: status.New(codes.Unimplemented, "test"),
},
{
name: "Too Many Requests",
errMsg: "test",
statusCode: http.StatusTooManyRequests,
expected: status.New(codes.ResourceExhausted, "test"),
},
{
name: "Bad Gateway",
errMsg: "test",
statusCode: http.StatusBadGateway,
expected: status.New(codes.Unavailable, "test"),
},
{
name: "Service Unavailable",
errMsg: "test",
statusCode: http.StatusServiceUnavailable,
expected: status.New(codes.Unavailable, "test"),
},
{
name: "Gateway Timeout",
errMsg: "test",
statusCode: http.StatusGatewayTimeout,
expected: status.New(codes.Unavailable, "test"),
},
{
name: "Unsupported Media Type",
errMsg: "test",
statusCode: http.StatusUnsupportedMediaType,
expected: status.New(codes.Unknown, "test"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := NewStatusFromMsgAndHTTPCode(tt.errMsg, tt.statusCode)
assert.Equal(t, tt.expected, result)
})
}
}
23 changes: 0 additions & 23 deletions receiver/otlpreceiver/internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,3 @@ func GetHTTPStatusCodeFromStatus(s *status.Status) int {
return http.StatusInternalServerError
}
}

func NewStatusFromMsgAndHTTPCode(errMsg string, statusCode int) *status.Status {
var c codes.Code
// Mapping based on https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
// 429 mapping to ResourceExhausted and 400 mapping to StatusBadRequest are exceptions.
switch statusCode {
case http.StatusBadRequest:
c = codes.InvalidArgument
case http.StatusUnauthorized:
c = codes.Unauthenticated
case http.StatusForbidden:
c = codes.PermissionDenied
case http.StatusNotFound:
c = codes.Unimplemented
case http.StatusTooManyRequests:
c = codes.ResourceExhausted
case http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
c = codes.Unavailable
default:
c = codes.Unknown
}
return status.New(c, errMsg)
}
70 changes: 0 additions & 70 deletions receiver/otlpreceiver/internal/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,73 +74,3 @@ func Test_GetHTTPStatusCodeFromStatus(t *testing.T) {
})
}
}

func Test_ErrorMsgAndHTTPCodeToStatus(t *testing.T) {
tests := []struct {
name string
errMsg string
statusCode int
expected *status.Status
}{
{
name: "Bad Request",
errMsg: "test",
statusCode: http.StatusBadRequest,
expected: status.New(codes.InvalidArgument, "test"),
},
{
name: "Unauthorized",
errMsg: "test",
statusCode: http.StatusUnauthorized,
expected: status.New(codes.Unauthenticated, "test"),
},
{
name: "Forbidden",
errMsg: "test",
statusCode: http.StatusForbidden,
expected: status.New(codes.PermissionDenied, "test"),
},
{
name: "Not Found",
errMsg: "test",
statusCode: http.StatusNotFound,
expected: status.New(codes.Unimplemented, "test"),
},
{
name: "Too Many Requests",
errMsg: "test",
statusCode: http.StatusTooManyRequests,
expected: status.New(codes.ResourceExhausted, "test"),
},
{
name: "Bad Gateway",
errMsg: "test",
statusCode: http.StatusBadGateway,
expected: status.New(codes.Unavailable, "test"),
},
{
name: "Service Unavailable",
errMsg: "test",
statusCode: http.StatusServiceUnavailable,
expected: status.New(codes.Unavailable, "test"),
},
{
name: "Gateway Timeout",
errMsg: "test",
statusCode: http.StatusGatewayTimeout,
expected: status.New(codes.Unavailable, "test"),
},
{
name: "Unsupported Media Type",
errMsg: "test",
statusCode: http.StatusUnsupportedMediaType,
expected: status.New(codes.Unknown, "test"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := NewStatusFromMsgAndHTTPCode(tt.errMsg, tt.statusCode)
assert.Equal(t, tt.expected, result)
})
}
}
5 changes: 3 additions & 2 deletions receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/internal/httphelper"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics"
Expand Down Expand Up @@ -152,15 +153,15 @@ func writeError(w http.ResponseWriter, encoder encoder, err error, statusCode in
if ok {
statusCode = errors.GetHTTPStatusCodeFromStatus(s)
} else {
s = errors.NewStatusFromMsgAndHTTPCode(err.Error(), statusCode)
s = httphelper.NewStatusFromMsgAndHTTPCode(err.Error(), statusCode)
}
writeStatusResponse(w, encoder, statusCode, s.Proto())
}

// errorHandler encodes the HTTP error message inside a rpc.Status message as required
// by the OTLP protocol.
func errorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusCode int) {
s := errors.NewStatusFromMsgAndHTTPCode(errMsg, statusCode)
s := httphelper.NewStatusFromMsgAndHTTPCode(errMsg, statusCode)
switch getMimeTypeFromContentType(r.Header.Get("Content-Type")) {
case pbContentType:
writeStatusResponse(w, pbEncoder, statusCode, s.Proto())
Expand Down

0 comments on commit dc48d0e

Please sign in to comment.