Skip to content

Commit

Permalink
Add utility to get metrics handler for a Nexus operation (#1559)
Browse files Browse the repository at this point in the history
This is work done by @pdoerner in #1544 that was lost in the nexus feature branch merge.

Co-authored-by: pdoerner <122412190+pdoerner@users.noreply.github.com>
  • Loading branch information
bergundy and pdoerner authored Jul 24, 2024
1 parent 0732f3d commit 9c40461
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 7 deletions.
2 changes: 1 addition & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (b *builder) unitTest() error {
log.Printf("Running unit tests in dirs: %v", testDirs)
for _, testDir := range testDirs {
// Run unit test
args := []string{"go", "test", "-tags", "protolegacy", "-count", "1", "-race", "-v", "-timeout", "10m"}
args := []string{"go", "test", "-tags", "protolegacy", "-count", "1", "-race", "-v", "-timeout", "15m"}
if *runFlag != "" {
args = append(args, "-run", *runFlag)
}
Expand Down
13 changes: 10 additions & 3 deletions internal/internal_nexus_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.temporal.io/api/common/v1"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -109,14 +110,19 @@ func (h *nexusTaskHandler) Execute(task *workflowservice.PollNexusTaskQueueRespo
}

func (h *nexusTaskHandler) execute(task *workflowservice.PollNexusTaskQueueResponse) (*nexuspb.Response, *nexuspb.HandlerError, error) {
metricsHandler, handlerErr := h.metricsHandlerForTask(task)
if handlerErr != nil {
return nil, handlerErr, nil
}
log, handlerErr := h.loggerForTask(task)
if handlerErr != nil {
return nil, handlerErr, nil
}
nctx := &NexusOperationContext{
Client: h.client,
TaskQueue: h.taskQueueName,
Log: log,
Client: h.client,
TaskQueue: h.taskQueueName,
MetricsHandler: metricsHandler,
Log: log,
}
header := nexus.Header(task.GetRequest().GetHeader())
if header == nil {
Expand Down Expand Up @@ -317,6 +323,7 @@ func (h *nexusTaskHandler) loggerForTask(response *workflowservice.PollNexusTask
return log.With(h.logger,
tagNexusService, service,
tagNexusOperation, operation,
tagTaskQueue, h.taskQueueName,
), nil
}

Expand Down
9 changes: 6 additions & 3 deletions internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ import (
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
)

// NexusOperationContext is an internal only struct that holds fields used by the temporalnexus functions.
type NexusOperationContext struct {
Client Client
TaskQueue string
Log log.Logger
Client Client
TaskQueue string
MetricsHandler metrics.Handler
Log log.Logger
}

type nexusOperationContextKeyType struct{}
Expand Down
21 changes: 21 additions & 0 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,32 @@ import (

"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/api/common/v1"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/internal"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)

// GetMetricsHandler returns a metrics handler to be used in a Nexus operation's context.
func GetMetricsHandler(ctx context.Context) metrics.Handler {
nctx, ok := internal.NexusOperationContextFromGoContext(ctx)
if !ok {
panic("temporalnexus GetMetricsHandler: Not a valid Nexus context")
}
return nctx.MetricsHandler
}

// GetLogger returns a logger to be used in a Nexus operation's context.
func GetLogger(ctx context.Context) log.Logger {
nctx, ok := internal.NexusOperationContextFromGoContext(ctx)
if !ok {
panic("temporalnexus GetLogger: Not a valid Nexus context")
}
return nctx.Log
}

type syncOperation[I, O any] struct {
nexus.UnimplementedOperation[I, O]

Expand Down
21 changes: 21 additions & 0 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
historypb "go.temporal.io/api/history/v1"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/operatorservice/v1"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/internal/common/metrics"
ilog "go.temporal.io/sdk/internal/log"
Expand Down Expand Up @@ -320,6 +321,11 @@ func TestSyncOperationFromWorkflow(t *testing.T) {
tc := newTestContext(t, ctx)

op := temporalnexus.NewSyncOperation("op", func(ctx context.Context, c client.Client, outcome string, o nexus.StartOperationOptions) (string, error) {
require.NotPanicsf(t, func() {
temporalnexus.GetMetricsHandler(ctx)
temporalnexus.GetLogger(ctx)
}, "Failed to get metrics handler or logger from operation context.")

switch outcome {
case "successful":
return outcome, nil
Expand Down Expand Up @@ -445,6 +451,11 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
}
}
op := temporalnexus.NewWorkflowRunOperation("op", handlerWorkflow, func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
require.NotPanicsf(t, func() {
temporalnexus.GetMetricsHandler(ctx)
temporalnexus.GetLogger(ctx)
}, "Failed to get metrics handler or logger from operation context.")

if action == "fail-to-start" {
return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error")
}
Expand Down Expand Up @@ -677,6 +688,11 @@ func TestReplay(t *testing.T) {

func TestWorkflowTestSuite_NexusSyncOperation(t *testing.T) {
op := nexus.NewSyncOperation("op", func(ctx context.Context, outcome string, opts nexus.StartOperationOptions) (string, error) {
require.NotPanicsf(t, func() {
temporalnexus.GetMetricsHandler(ctx)
temporalnexus.GetLogger(ctx)
}, "Failed to get metrics handler or logger from operation context.")

switch outcome {
case "ok":
return outcome, nil
Expand Down Expand Up @@ -765,6 +781,11 @@ func TestWorkflowTestSuite_WorkflowRunOperation(t *testing.T) {
"op",
handlerWF,
func(ctx context.Context, id string, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
require.NotPanicsf(t, func() {
temporalnexus.GetMetricsHandler(ctx)
temporalnexus.GetLogger(ctx)
}, "Failed to get metrics handler or logger from operation context.")

return client.StartWorkflowOptions{ID: opts.RequestID}, nil
})

Expand Down

0 comments on commit 9c40461

Please sign in to comment.