Skip to content

Commit

Permalink
Nexus handler error translation (#1626)
Browse files Browse the repository at this point in the history
* Nexus handler error translation

* Address review comments
  • Loading branch information
bergundy authored Sep 11, 2024
1 parent 0fc83e9 commit 5f46ca8
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 5 deletions.
68 changes: 67 additions & 1 deletion internal/internal_nexus_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"go.temporal.io/api/common/v1"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
Expand Down Expand Up @@ -231,6 +233,7 @@ func (h *nexusTaskHandler) handleStartOperation(
},
}, nil, nil
}
err = convertKnownErrors(err)
var handlerErr *nexus.HandlerError
if errors.As(err, &handlerErr) {
return nil, nexusHandlerErrorToProto(handlerErr), nil
Expand Down Expand Up @@ -302,6 +305,7 @@ func (h *nexusTaskHandler) handleCancelOperation(ctx context.Context, nctx *Nexu
return nil, nil, ctx.Err()
}
if err != nil {
err = convertKnownErrors(err)
var handlerErr *nexus.HandlerError
if errors.As(err, &handlerErr) {
return nil, nexusHandlerErrorToProto(handlerErr), nil
Expand All @@ -319,7 +323,7 @@ func (h *nexusTaskHandler) handleCancelOperation(ctx context.Context, nctx *Nexu

func (h *nexusTaskHandler) internalError(err error) *nexuspb.HandlerError {
h.logger.Error("error processing nexus task", "error", err)
return nexusHandlerError(nexus.HandlerErrorTypeInternal, "internal error")
return nexusHandlerError(nexus.HandlerErrorTypeInternal, err.Error())
}

func (h *nexusTaskHandler) goContextForTask(nctx *NexusOperationContext, header nexus.Header) (context.Context, context.CancelFunc, *nexuspb.HandlerError) {
Expand Down Expand Up @@ -416,3 +420,65 @@ func (p *payloadSerializer) Serialize(v any) (*nexus.Content, error) {
}

var emptyReaderNopCloser = io.NopCloser(bytes.NewReader([]byte{}))

// convertKnownErrors converts known errors to corresponding Nexus HandlerError.
func convertKnownErrors(err error) error {
// Handle common errors returned from various client methods.
if workflowErr, ok := err.(*WorkflowExecutionError); ok {
return nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, workflowErr.Error())
}
if queryRejectedErr, ok := err.(*QueryRejectedError); ok {
return nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, queryRejectedErr.Error())
}

// Not using errors.As to be consistent ApplicationError checking with the rest of the SDK.
if appErr, ok := err.(*ApplicationError); ok {
if appErr.NonRetryable() {
return nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, appErr.Error())
}
return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, appErr.Error())
}
return convertServiceError(err)
}

// convertServiceError converts a serviceerror into a Nexus HandlerError if possible.
// If exposeDetails is true, the error message from the given error is exposed in the converted HandlerError, otherwise,
// a default message with minimal information is attached to the returned error.
// Roughly taken from https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
// and
// https://github.com/grpc-ecosystem/grpc-gateway/blob/a7cf811e6ffabeaddcfb4ff65602c12671ff326e/runtime/errors.go#L56.
func convertServiceError(err error) error {
var st *status.Status

// Temporal serviceerrors have a Status() method.
stGetter, ok := err.(interface{ Status() *status.Status })
if !ok {
// Not a serviceerror, passthrough.
return err
}

st = stGetter.Status()
errMessage := err.Error()

switch st.Code() {
case codes.AlreadyExists, codes.Canceled, codes.InvalidArgument, codes.FailedPrecondition, codes.OutOfRange:
return nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, errMessage)
case codes.Aborted, codes.Unavailable:
return nexus.HandlerErrorf(nexus.HandlerErrorTypeUnavailable, errMessage)
case codes.DataLoss, codes.Internal, codes.Unknown, codes.Unauthenticated, codes.PermissionDenied:
// Note that codes.Unauthenticated, codes.PermissionDenied have Nexus error types but we convert to internal
// because this is not a client auth error and happens when the handler fails to auth with Temporal and should
// be considered retryable.
return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, errMessage)
case codes.NotFound:
return nexus.HandlerErrorf(nexus.HandlerErrorTypeNotFound, errMessage)
case codes.ResourceExhausted:
return nexus.HandlerErrorf(nexus.HandlerErrorTypeResourceExhausted, errMessage)
case codes.Unimplemented:
return nexus.HandlerErrorf(nexus.HandlerErrorTypeNotImplemented, errMessage)
case codes.DeadlineExceeded:
return nexus.HandlerErrorf(nexus.HandlerErrorTypeDownstreamTimeout, errMessage)
}

return err
}
71 changes: 67 additions & 4 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
historypb "go.temporal.io/api/history/v1"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/interceptor"
Expand Down Expand Up @@ -182,10 +183,20 @@ var syncOp = temporalnexus.NewSyncOperation("sync-op", func(ctx context.Context,
Message: "fail",
},
}
case "fmt-errorf":
return "", fmt.Errorf("arbitrary error message")
case "handlererror":
return "", nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, s)
case "already-started":
return "", serviceerror.NewWorkflowExecutionAlreadyStarted("faking workflow already started", "dont-care", "dont-care")
case "retryable-application-error":
return "", temporal.NewApplicationError("fake app error for test", "FakeTestError")
case "non-retryable-application-error":
return "", temporal.NewApplicationErrorWithOptions("fake app error for test", "FakeTestError", temporal.ApplicationErrorOptions{
NonRetryable: true,
})
case "panic":
panic("panic")
panic("panic requested")
}
return "", nil
})
Expand Down Expand Up @@ -213,9 +224,8 @@ func TestNexusSyncOperation(t *testing.T) {

w := worker.New(tc.client, tc.taskQueue, worker.Options{})
service := nexus.NewService("test")
require.NoError(t, service.Register(syncOp, workflowOp))
require.NoError(t, service.Register(syncOp))
w.RegisterNexusService(service)
w.RegisterWorkflow(waitForCancelWorkflow)
require.NoError(t, w.Start())
t.Cleanup(w.Stop)

Expand Down Expand Up @@ -248,6 +258,14 @@ func TestNexusSyncOperation(t *testing.T) {
require.Equal(t, "fail", unsuccessfulOperationErr.Failure.Message)
})

t.Run("fmt-errorf", func(t *testing.T) {
tc.metricsHandler.Clear()
_, err := nexus.ExecuteOperation(ctx, nc, syncOp, "fmt-errorf", nexus.ExecuteOperationOptions{})
var unexpectedResponseErr *nexus.UnexpectedResponseError
require.ErrorAs(t, err, &unexpectedResponseErr)
require.Contains(t, unexpectedResponseErr.Message, `"500 Internal Server Error": arbitrary error message`)
})

t.Run("handlererror", func(t *testing.T) {
_, err := nexus.ExecuteOperation(ctx, nc, syncOp, "handlererror", nexus.ExecuteOperationOptions{})
var unexpectedResponseErr *nexus.UnexpectedResponseError
Expand All @@ -263,12 +281,57 @@ func TestNexusSyncOperation(t *testing.T) {
}, time.Second*3, time.Millisecond*100)
})

t.Run("already-started", func(t *testing.T) {
_, err := nexus.ExecuteOperation(ctx, nc, syncOp, "already-started", nexus.ExecuteOperationOptions{})
var unexpectedResponseErr *nexus.UnexpectedResponseError
require.ErrorAs(t, err, &unexpectedResponseErr)
require.Equal(t, http.StatusBadRequest, unexpectedResponseErr.Response.StatusCode)
require.Contains(t, unexpectedResponseErr.Message, `"400 Bad Request": faking workflow already started`)

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireCounter(t, metrics.NexusTaskExecutionFailedCounter, service.Name, syncOp.Name())
}, time.Second*3, time.Millisecond*100)
})

t.Run("retryable-application-error", func(t *testing.T) {
_, err := nexus.ExecuteOperation(ctx, nc, syncOp, "retryable-application-error", nexus.ExecuteOperationOptions{})
var unexpectedResponseErr *nexus.UnexpectedResponseError
require.ErrorAs(t, err, &unexpectedResponseErr)
require.Equal(t, http.StatusInternalServerError, unexpectedResponseErr.Response.StatusCode)
require.Contains(t, unexpectedResponseErr.Message, `"500 Internal Server Error": fake app error for test`)

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireCounter(t, metrics.NexusTaskExecutionFailedCounter, service.Name, syncOp.Name())
}, time.Second*3, time.Millisecond*100)
})

t.Run("non-retryable-application-error", func(t *testing.T) {
_, err := nexus.ExecuteOperation(ctx, nc, syncOp, "non-retryable-application-error", nexus.ExecuteOperationOptions{})
var unexpectedResponseErr *nexus.UnexpectedResponseError
require.ErrorAs(t, err, &unexpectedResponseErr)
require.Equal(t, http.StatusBadRequest, unexpectedResponseErr.Response.StatusCode)
require.Contains(t, unexpectedResponseErr.Message, `"400 Bad Request": fake app error for test`)

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireCounter(t, metrics.NexusTaskExecutionFailedCounter, service.Name, syncOp.Name())
}, time.Second*3, time.Millisecond*100)
})

t.Run("panic", func(t *testing.T) {
_, err := nexus.ExecuteOperation(ctx, nc, syncOp, "panic", nexus.ExecuteOperationOptions{})
var unexpectedResponseErr *nexus.UnexpectedResponseError
require.ErrorAs(t, err, &unexpectedResponseErr)
require.Equal(t, 500, unexpectedResponseErr.Response.StatusCode)
require.Contains(t, unexpectedResponseErr.Message, "internal error")
require.Contains(t, unexpectedResponseErr.Message, "panic: panic requested")
})
}

Expand Down

0 comments on commit 5f46ca8

Please sign in to comment.