From 2f42c84b439937e0cdbbc955093119f40f6b607a Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Wed, 28 Aug 2024 15:13:20 -0500 Subject: [PATCH] Update nexus link converter --- components/nexusoperations/executors.go | 13 +- components/nexusoperations/executors_test.go | 3 +- components/nexusoperations/link_converter.go | 116 ++++++++++---- .../nexusoperations/link_converter_test.go | 148 ++++++++++++++++-- tests/nexus_api_test.go | 9 +- tests/nexus_workflow_test.go | 3 +- 6 files changed, 227 insertions(+), 65 deletions(-) diff --git a/components/nexusoperations/executors.go b/components/nexusoperations/executors.go index f742c64f4ef..d2d1d2f9b02 100644 --- a/components/nexusoperations/executors.go +++ b/components/nexusoperations/executors.go @@ -194,11 +194,6 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err) } - nexusLink, err := ConvertLinkWorkflowEventToNexusLink(args.workflowEventLink) - if err != nil { - return err - } - callCtx, cancel := context.WithTimeout( ctx, e.Config.RequestTimeout(ns.Name().String(), task.EndpointName), @@ -214,7 +209,7 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ CallbackHeader: nexus.Header{ commonnexus.CallbackTokenHeader: token, }, - Links: []nexus.Link{nexusLink}, + Links: []nexus.Link{args.nexusLink}, }) methodTag := metrics.NexusMethodTag("StartOperation") @@ -270,7 +265,7 @@ type startArgs struct { endpointID string header map[string]string payload *commonpb.Payload - workflowEventLink *commonpb.Link_WorkflowEvent + nexusLink nexus.Link namespaceFailoverVersion int64 } @@ -302,7 +297,7 @@ func (e taskExecutor) loadOperationArgs( } args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput() args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader() - args.workflowEventLink = &commonpb.Link_WorkflowEvent{ + args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{ Namespace: ns.Name().String(), WorkflowId: ref.WorkflowKey.WorkflowID, RunId: ref.WorkflowKey.RunID, @@ -312,7 +307,7 @@ func (e taskExecutor) loadOperationArgs( EventType: event.GetEventType(), }, }, - } + }) args.namespaceFailoverVersion = event.Version return nil }) diff --git a/components/nexusoperations/executors_test.go b/components/nexusoperations/executors_test.go index 61cba164559..727636cbc6d 100644 --- a/components/nexusoperations/executors_test.go +++ b/components/nexusoperations/executors_test.go @@ -98,8 +98,7 @@ func TestProcessInvocationTask(t *testing.T) { }, }, } - handlerNexusLink, err := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink) - require.NoError(t, err) + handlerNexusLink := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink) cases := []struct { name string diff --git a/components/nexusoperations/link_converter.go b/components/nexusoperations/link_converter.go index 36b67ead56e..2b8719d4825 100644 --- a/components/nexusoperations/link_converter.go +++ b/components/nexusoperations/link_converter.go @@ -20,13 +20,17 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// This file is duplicated in temporalio/temporal/components/nexusoperations/link_converter.go +// Any changes here or there must be replicated. This is temporary until the +// temporal repo updates to the most recent SDK version. + package nexusoperations import ( "fmt" "net/url" + "regexp" "strconv" - "strings" "github.com/nexus-rpc/sdk-go/nexus" commonpb "go.temporal.io/api/common/v1" @@ -34,20 +38,41 @@ import ( ) const ( + urlSchemeTemporalKey = "temporal" + urlPathNamespaceKey = "namespace" + urlPathWorkflowIDKey = "workflowID" + urlPathRunIDKey = "runID" + urlPathTemplate = "/namespaces/%s/workflows/%s/%s/history" + urlTemplate = "temporal://" + urlPathTemplate + linkWorkflowEventReferenceTypeKey = "referenceType" linkEventReferenceEventIDKey = "eventID" linkEventReferenceEventTypeKey = "eventType" ) -func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) (nexus.Link, error) { - u, err := url.Parse(fmt.Sprintf( - "temporal:///namespaces/%s/workflows/%s/%s/history", - url.PathEscape(we.GetNamespace()), - url.PathEscape(we.GetWorkflowId()), - url.PathEscape(we.GetRunId()), +var ( + rePatternNamespace = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathNamespaceKey) + rePatternWorkflowID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathWorkflowIDKey) + rePatternRunID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathRunIDKey) + urlPathRE = regexp.MustCompile(fmt.Sprintf( + `^/namespaces/%s/workflows/%s/%s/history$`, + rePatternNamespace, + rePatternWorkflowID, + rePatternRunID, )) - if err != nil { - return nexus.Link{}, err +) + +// ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link. +func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link { + u := &url.URL{ + Scheme: urlSchemeTemporalKey, + Path: fmt.Sprintf(urlPathTemplate, we.GetNamespace(), we.GetWorkflowId(), we.GetRunId()), + RawPath: fmt.Sprintf( + urlPathTemplate, + url.PathEscape(we.GetNamespace()), + url.PathEscape(we.GetWorkflowId()), + url.PathEscape(we.GetRunId()), + ), } switch ref := we.GetReference().(type) { @@ -57,9 +82,10 @@ func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) (nexus return nexus.Link{ URL: u, Type: string(we.ProtoReflect().Descriptor().FullName()), - }, nil + } } +// ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent. func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error) { we := &commonpb.Link_WorkflowEvent{} if link.Type != string(we.ProtoReflect().Descriptor().FullName()) { @@ -70,54 +96,76 @@ func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_Workfl ) } - if link.URL.Scheme != "temporal" { - return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent") + if link.URL.Scheme != urlSchemeTemporalKey { + return nil, fmt.Errorf( + "failed to parse link to Link_WorkflowEvent: invalid scheme: %s", + link.URL.Scheme, + ) + } + + matches := urlPathRE.FindStringSubmatch(link.URL.EscapedPath()) + if len(matches) != 4 { + return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: malformed URL path") } - pathParts := strings.Split(link.URL.Path, "/") - if len(pathParts) != 7 { - return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent") + var err error + we.Namespace, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathNamespaceKey)]) + if err != nil { + return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err) } - if pathParts[0] != "" || pathParts[1] != "namespaces" || pathParts[3] != "workflows" || pathParts[6] != "history" { - return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent") + + we.WorkflowId, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathWorkflowIDKey)]) + if err != nil { + return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err) } - we.Namespace = pathParts[2] - we.WorkflowId = pathParts[4] - we.RunId = pathParts[5] - switch link.URL.Query().Get(linkWorkflowEventReferenceTypeKey) { + + we.RunId, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathRunIDKey)]) + if err != nil { + return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err) + } + + switch refType := link.URL.Query().Get(linkWorkflowEventReferenceTypeKey); refType { case string((&commonpb.Link_WorkflowEvent_EventReference{}).ProtoReflect().Descriptor().Name()): eventRef, err := convertURLQueryToLinkWorkflowEventEventReference(link.URL.Query()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err) } we.Reference = &commonpb.Link_WorkflowEvent_EventRef{ EventRef: eventRef, } + default: + return nil, fmt.Errorf( + "failed to parse link to Link_WorkflowEvent: unknown reference type: %q", + refType, + ) } return we, nil } func convertLinkWorkflowEventEventReferenceToURLQuery(eventRef *commonpb.Link_WorkflowEvent_EventReference) string { - values := url.Values{ - linkWorkflowEventReferenceTypeKey: []string{string(eventRef.ProtoReflect().Descriptor().Name())}, - linkEventReferenceEventIDKey: []string{strconv.FormatInt(eventRef.GetEventId(), 10)}, - linkEventReferenceEventTypeKey: []string{eventRef.GetEventType().String()}, + values := url.Values{} + values.Set(linkWorkflowEventReferenceTypeKey, string(eventRef.ProtoReflect().Descriptor().Name())) + if eventRef.GetEventId() > 0 { + values.Set(linkEventReferenceEventIDKey, strconv.FormatInt(eventRef.GetEventId(), 10)) } + values.Set(linkEventReferenceEventTypeKey, eventRef.GetEventType().String()) return values.Encode() } func convertURLQueryToLinkWorkflowEventEventReference(queryValues url.Values) (*commonpb.Link_WorkflowEvent_EventReference, error) { - eventID, err := strconv.ParseInt(queryValues.Get(linkEventReferenceEventIDKey), 10, 64) - if err != nil { - return nil, err + var err error + eventRef := &commonpb.Link_WorkflowEvent_EventReference{} + eventIDValue := queryValues.Get(linkEventReferenceEventIDKey) + if eventIDValue != "" { + eventRef.EventId, err = strconv.ParseInt(queryValues.Get(linkEventReferenceEventIDKey), 10, 64) + if err != nil { + return nil, err + } } - eventType, err := enumspb.EventTypeFromString(queryValues.Get(linkEventReferenceEventTypeKey)) + eventRef.EventType, err = enumspb.EventTypeFromString(queryValues.Get(linkEventReferenceEventTypeKey)) if err != nil { return nil, err } - return &commonpb.Link_WorkflowEvent_EventReference{ - EventId: eventID, - EventType: eventType, - }, nil + return eventRef, nil } diff --git a/components/nexusoperations/link_converter_test.go b/components/nexusoperations/link_converter_test.go index 6326a130d11..72cda322b88 100644 --- a/components/nexusoperations/link_converter_test.go +++ b/components/nexusoperations/link_converter_test.go @@ -38,10 +38,10 @@ import ( func TestConvertLinkWorkflowEventToNexusLink(t *testing.T) { type testcase struct { - name string - input *commonpb.Link_WorkflowEvent - output nexus.Link - errMsg string + name string + input *commonpb.Link_WorkflowEvent + output nexus.Link + outputURL string } cases := []testcase{ @@ -62,13 +62,15 @@ func TestConvertLinkWorkflowEventToNexusLink(t *testing.T) { URL: &url.URL{ Scheme: "temporal", Path: "/namespaces/ns/workflows/wf-id/run-id/history", + RawPath: "/namespaces/ns/workflows/wf-id/run-id/history", RawQuery: "eventID=1&eventType=WorkflowExecutionStarted&referenceType=EventReference", }, Type: "temporal.api.common.v1.Link.WorkflowEvent", }, + outputURL: "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventID=1&eventType=WorkflowExecutionStarted&referenceType=EventReference", }, { - name: "valid with percent-encoding", + name: "valid with angle bracket", input: &commonpb.Link_WorkflowEvent{ Namespace: "ns", WorkflowId: "wf-id>", @@ -84,10 +86,12 @@ func TestConvertLinkWorkflowEventToNexusLink(t *testing.T) { URL: &url.URL{ Scheme: "temporal", Path: "/namespaces/ns/workflows/wf-id>/run-id/history", + RawPath: "/namespaces/ns/workflows/wf-id%3E/run-id/history", RawQuery: "eventID=1&eventType=WorkflowExecutionStarted&referenceType=EventReference", }, Type: "temporal.api.common.v1.Link.WorkflowEvent", }, + outputURL: "temporal:///namespaces/ns/workflows/wf-id%3E/run-id/history?eventID=1&eventType=WorkflowExecutionStarted&referenceType=EventReference", }, { name: "valid with slash", @@ -111,18 +115,38 @@ func TestConvertLinkWorkflowEventToNexusLink(t *testing.T) { }, Type: "temporal.api.common.v1.Link.WorkflowEvent", }, + outputURL: "temporal:///namespaces/ns/workflows/wf-id%2F/run-id/history?eventID=1&eventType=WorkflowExecutionStarted&referenceType=EventReference", + }, + { + name: "valid event id missing", + input: &commonpb.Link_WorkflowEvent{ + Namespace: "ns", + WorkflowId: "wf-id", + RunId: "run-id", + Reference: &commonpb.Link_WorkflowEvent_EventRef{ + EventRef: &commonpb.Link_WorkflowEvent_EventReference{ + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + }, + }, + }, + output: nexus.Link{ + URL: &url.URL{ + Scheme: "temporal", + Path: "/namespaces/ns/workflows/wf-id/run-id/history", + RawPath: "/namespaces/ns/workflows/wf-id/run-id/history", + RawQuery: "eventType=WorkflowExecutionStarted&referenceType=EventReference", + }, + Type: "temporal.api.common.v1.Link.WorkflowEvent", + }, + outputURL: "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventType=WorkflowExecutionStarted&referenceType=EventReference", }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - output, err := nexusoperations.ConvertLinkWorkflowEventToNexusLink(tc.input) - if tc.errMsg != "" { - require.ErrorContains(t, err, tc.errMsg) - } else { - require.NoError(t, err) - require.Equal(t, tc.output, output) - } + output := nexusoperations.ConvertLinkWorkflowEventToNexusLink(tc.input) + require.Equal(t, tc.output, output) + require.Equal(t, tc.outputURL, output.URL.String()) }) } } @@ -159,11 +183,12 @@ func TestConvertNexusLinkToLinkWorkflowEvent(t *testing.T) { }, }, { - name: "valid with percent-encoding", + name: "valid with angle bracket", input: nexus.Link{ URL: &url.URL{ Scheme: "temporal", Path: "/namespaces/ns/workflows/wf-id>/run-id/history", + RawPath: "/namespaces/ns/workflows/wf-id%2E/run-id/history", RawQuery: "referenceType=EventReference&eventID=1&eventType=WorkflowExecutionStarted", }, Type: "temporal.api.common.v1.Link.WorkflowEvent", @@ -180,6 +205,103 @@ func TestConvertNexusLinkToLinkWorkflowEvent(t *testing.T) { }, }, }, + { + name: "valid with slash", + input: nexus.Link{ + URL: &url.URL{ + Scheme: "temporal", + Path: "/namespaces/ns/workflows/wf-id//run-id/history", + RawPath: "/namespaces/ns/workflows/wf-id%2F/run-id/history", + RawQuery: "referenceType=EventReference&eventID=1&eventType=WorkflowExecutionStarted", + }, + Type: "temporal.api.common.v1.Link.WorkflowEvent", + }, + output: &commonpb.Link_WorkflowEvent{ + Namespace: "ns", + WorkflowId: "wf-id/", + RunId: "run-id", + Reference: &commonpb.Link_WorkflowEvent_EventRef{ + EventRef: &commonpb.Link_WorkflowEvent_EventReference{ + EventId: 1, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + }, + }, + }, + }, + { + name: "valid event id missing", + input: nexus.Link{ + URL: &url.URL{ + Scheme: "temporal", + Path: "/namespaces/ns/workflows/wf-id/run-id/history", + RawPath: "/namespaces/ns/workflows/wf-id/run-id/history", + RawQuery: "referenceType=EventReference&eventID=&eventType=WorkflowExecutionStarted", + }, + Type: "temporal.api.common.v1.Link.WorkflowEvent", + }, + output: &commonpb.Link_WorkflowEvent{ + Namespace: "ns", + WorkflowId: "wf-id", + RunId: "run-id", + Reference: &commonpb.Link_WorkflowEvent_EventRef{ + EventRef: &commonpb.Link_WorkflowEvent_EventReference{ + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + }, + }, + }, + }, + { + name: "invalid scheme", + input: nexus.Link{ + URL: &url.URL{ + Scheme: "random", + Path: "/namespaces/ns/workflows/wf-id/run-id/history", + RawPath: "/namespaces/ns/workflows/wf-id/run-id/history", + RawQuery: "referenceType=EventReference&eventID=1&eventType=WorkflowExecutionStarted", + }, + Type: "temporal.api.common.v1.Link.WorkflowEvent", + }, + errMsg: "failed to parse link to Link_WorkflowEvent", + }, + { + name: "invalid path missing history", + input: nexus.Link{ + URL: &url.URL{ + Scheme: "temporal", + Path: "/namespaces/ns/workflows/wf-id/run-id/", + RawPath: "/namespaces/ns/workflows/wf-id/run-id/", + RawQuery: "referenceType=EventReference&eventID=1&eventType=WorkflowExecutionStarted", + }, + Type: "temporal.api.common.v1.Link.WorkflowEvent", + }, + errMsg: "failed to parse link to Link_WorkflowEvent", + }, + { + name: "invalid path missing namespace", + input: nexus.Link{ + URL: &url.URL{ + Scheme: "temporal", + Path: "/namespaces//workflows/wf-id/run-id/history", + RawPath: "/namespaces//workflows/wf-id/run-id/history", + RawQuery: "referenceType=EventReference&eventID=1&eventType=WorkflowExecutionStarted", + }, + Type: "temporal.api.common.v1.Link.WorkflowEvent", + }, + errMsg: "failed to parse link to Link_WorkflowEvent", + }, + { + name: "invalid event type", + input: nexus.Link{ + URL: &url.URL{ + Scheme: "temporal", + Path: "/namespaces/ns/workflows/wf-id/run-id/history", + RawPath: "/namespaces/ns/workflows/wf-id/run-id/history", + RawQuery: "referenceType=EventReference&eventID=1&eventType=WorkflowExecution", + }, + Type: "temporal.api.common.v1.Link.WorkflowEvent", + }, + errMsg: "failed to parse link to Link_WorkflowEvent", + }, } for _, tc := range cases { diff --git a/tests/nexus_api_test.go b/tests/nexus_api_test.go index 5d655997b3b..e4942d7790b 100644 --- a/tests/nexus_api_test.go +++ b/tests/nexus_api_test.go @@ -73,8 +73,7 @@ func (s *ClientFunctionalSuite) TestNexusStartOperation_Outcomes() { }, }, } - callerNexusLink, err := nexusoperations.ConvertLinkWorkflowEventToNexusLink(callerLink) - s.NoError(err) + callerNexusLink := nexusoperations.ConvertLinkWorkflowEventToNexusLink(callerLink) handlerLink := &commonpb.Link_WorkflowEvent{ Namespace: "handler-ns", @@ -87,8 +86,7 @@ func (s *ClientFunctionalSuite) TestNexusStartOperation_Outcomes() { }, }, } - handlerNexusLink, err := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink) - s.NoError(err) + handlerNexusLink := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink) type testcase struct { outcome string @@ -142,7 +140,8 @@ func (s *ClientFunctionalSuite) TestNexusStartOperation_Outcomes() { require.NoError(t, err) require.Equal(t, "test-id", res.Pending.ID) require.Len(t, res.Links, 1) - require.Equal(t, handlerNexusLink, res.Links[0]) + require.Equal(t, handlerNexusLink.URL.String(), res.Links[0].URL.String()) + require.Equal(t, handlerNexusLink.Type, res.Links[0].Type) }, }, { diff --git a/tests/nexus_workflow_test.go b/tests/nexus_workflow_test.go index 8dadd3c89f9..d8e036a99ad 100644 --- a/tests/nexus_workflow_test.go +++ b/tests/nexus_workflow_test.go @@ -410,8 +410,7 @@ func (s *ClientFunctionalSuite) TestNexusOperationAsyncCompletion() { }, }, } - handlerNexusLink, err := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink) - s.NoError(err) + handlerNexusLink := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink) h := nexustest.Handler{ OnStartOperation: func(