Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update nexus link converter #6460

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,6 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err)
}

nexusLink, err := ConvertLinkWorkflowEventToNexusLink(args.workflowEventLink)
if err != nil {
return err
}

callCtx, cancel := context.WithTimeout(
ctx,
e.Config.RequestTimeout(ns.Name().String(), task.EndpointName),
Expand All @@ -214,7 +209,7 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
CallbackHeader: nexus.Header{
commonnexus.CallbackTokenHeader: token,
},
Links: []nexus.Link{nexusLink},
Links: []nexus.Link{args.nexusLink},
})

methodTag := metrics.NexusMethodTag("StartOperation")
Expand Down Expand Up @@ -270,7 +265,7 @@ type startArgs struct {
endpointID string
header map[string]string
payload *commonpb.Payload
workflowEventLink *commonpb.Link_WorkflowEvent
nexusLink nexus.Link
namespaceFailoverVersion int64
}

Expand Down Expand Up @@ -302,7 +297,7 @@ func (e taskExecutor) loadOperationArgs(
}
args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput()
args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader()
args.workflowEventLink = &commonpb.Link_WorkflowEvent{
args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{
Namespace: ns.Name().String(),
WorkflowId: ref.WorkflowKey.WorkflowID,
RunId: ref.WorkflowKey.RunID,
Expand All @@ -312,7 +307,7 @@ func (e taskExecutor) loadOperationArgs(
EventType: event.GetEventType(),
},
},
}
})
args.namespaceFailoverVersion = event.Version
return nil
})
Expand Down
3 changes: 1 addition & 2 deletions components/nexusoperations/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ func TestProcessInvocationTask(t *testing.T) {
},
},
}
handlerNexusLink, err := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink)
require.NoError(t, err)
handlerNexusLink := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink)

cases := []struct {
name string
Expand Down
116 changes: 82 additions & 34 deletions components/nexusoperations/link_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,59 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// This file is duplicated in temporalio/temporal/components/nexusoperations/link_converter.go
// Any changes here or there must be replicated. This is temporary until the
// temporal repo updates to the most recent SDK version.

package nexusoperations

import (
"fmt"
"net/url"
"regexp"
"strconv"
"strings"

"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
)

const (
urlSchemeTemporalKey = "temporal"
urlPathNamespaceKey = "namespace"
urlPathWorkflowIDKey = "workflowID"
urlPathRunIDKey = "runID"
urlPathTemplate = "/namespaces/%s/workflows/%s/%s/history"
urlTemplate = "temporal://" + urlPathTemplate

linkWorkflowEventReferenceTypeKey = "referenceType"
linkEventReferenceEventIDKey = "eventID"
linkEventReferenceEventTypeKey = "eventType"
)

func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) (nexus.Link, error) {
u, err := url.Parse(fmt.Sprintf(
"temporal:///namespaces/%s/workflows/%s/%s/history",
url.PathEscape(we.GetNamespace()),
url.PathEscape(we.GetWorkflowId()),
url.PathEscape(we.GetRunId()),
var (
rePatternNamespace = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathNamespaceKey)
rePatternWorkflowID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathWorkflowIDKey)
rePatternRunID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathRunIDKey)
urlPathRE = regexp.MustCompile(fmt.Sprintf(
`^/namespaces/%s/workflows/%s/%s/history$`,
rePatternNamespace,
rePatternWorkflowID,
rePatternRunID,
))
if err != nil {
return nexus.Link{}, err
)

// ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link {
u := &url.URL{
Scheme: urlSchemeTemporalKey,
Path: fmt.Sprintf(urlPathTemplate, we.GetNamespace(), we.GetWorkflowId(), we.GetRunId()),
RawPath: fmt.Sprintf(
urlPathTemplate,
url.PathEscape(we.GetNamespace()),
url.PathEscape(we.GetWorkflowId()),
url.PathEscape(we.GetRunId()),
),
}

switch ref := we.GetReference().(type) {
Expand All @@ -57,9 +82,10 @@
return nexus.Link{
URL: u,
Type: string(we.ProtoReflect().Descriptor().FullName()),
}, nil
}
}

// ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.
func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error) {
we := &commonpb.Link_WorkflowEvent{}
if link.Type != string(we.ProtoReflect().Descriptor().FullName()) {
Expand All @@ -70,54 +96,76 @@
)
}

if link.URL.Scheme != "temporal" {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")
if link.URL.Scheme != urlSchemeTemporalKey {
return nil, fmt.Errorf(

Check failure on line 100 in components/nexusoperations/link_converter.go

View workflow job for this annotation

GitHub Actions / lint

do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\n\t\"failed to parse link to Link_WorkflowEvent: invalid scheme: %s\",\n\tlink.URL.Scheme,\n)" (err113)
"failed to parse link to Link_WorkflowEvent: invalid scheme: %s",
link.URL.Scheme,
)
}

matches := urlPathRE.FindStringSubmatch(link.URL.EscapedPath())
if len(matches) != 4 {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: malformed URL path")

Check failure on line 108 in components/nexusoperations/link_converter.go

View workflow job for this annotation

GitHub Actions / lint

do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\"failed to parse link to Link_WorkflowEvent: malformed URL path\")" (err113)
}

pathParts := strings.Split(link.URL.Path, "/")
if len(pathParts) != 7 {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")
var err error
we.Namespace, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathNamespaceKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
}
if pathParts[0] != "" || pathParts[1] != "namespaces" || pathParts[3] != "workflows" || pathParts[6] != "history" {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")

we.WorkflowId, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathWorkflowIDKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
}
we.Namespace = pathParts[2]
we.WorkflowId = pathParts[4]
we.RunId = pathParts[5]
switch link.URL.Query().Get(linkWorkflowEventReferenceTypeKey) {

we.RunId, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathRunIDKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
}

switch refType := link.URL.Query().Get(linkWorkflowEventReferenceTypeKey); refType {
case string((&commonpb.Link_WorkflowEvent_EventReference{}).ProtoReflect().Descriptor().Name()):
eventRef, err := convertURLQueryToLinkWorkflowEventEventReference(link.URL.Query())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
}
we.Reference = &commonpb.Link_WorkflowEvent_EventRef{
EventRef: eventRef,
}
default:
return nil, fmt.Errorf(

Check failure on line 137 in components/nexusoperations/link_converter.go

View workflow job for this annotation

GitHub Actions / lint

do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\n\t\"failed to parse link to Link_WorkflowEvent: unknown reference type: %q\",\n\trefType,\n)" (err113)
"failed to parse link to Link_WorkflowEvent: unknown reference type: %q",
refType,
)
}

return we, nil
}

func convertLinkWorkflowEventEventReferenceToURLQuery(eventRef *commonpb.Link_WorkflowEvent_EventReference) string {
values := url.Values{
linkWorkflowEventReferenceTypeKey: []string{string(eventRef.ProtoReflect().Descriptor().Name())},
linkEventReferenceEventIDKey: []string{strconv.FormatInt(eventRef.GetEventId(), 10)},
linkEventReferenceEventTypeKey: []string{eventRef.GetEventType().String()},
values := url.Values{}
values.Set(linkWorkflowEventReferenceTypeKey, string(eventRef.ProtoReflect().Descriptor().Name()))
if eventRef.GetEventId() > 0 {
values.Set(linkEventReferenceEventIDKey, strconv.FormatInt(eventRef.GetEventId(), 10))
}
values.Set(linkEventReferenceEventTypeKey, eventRef.GetEventType().String())
return values.Encode()
}

func convertURLQueryToLinkWorkflowEventEventReference(queryValues url.Values) (*commonpb.Link_WorkflowEvent_EventReference, error) {
eventID, err := strconv.ParseInt(queryValues.Get(linkEventReferenceEventIDKey), 10, 64)
if err != nil {
return nil, err
var err error
eventRef := &commonpb.Link_WorkflowEvent_EventReference{}
eventIDValue := queryValues.Get(linkEventReferenceEventIDKey)
if eventIDValue != "" {
eventRef.EventId, err = strconv.ParseInt(queryValues.Get(linkEventReferenceEventIDKey), 10, 64)
if err != nil {
return nil, err
}
}
eventType, err := enumspb.EventTypeFromString(queryValues.Get(linkEventReferenceEventTypeKey))
eventRef.EventType, err = enumspb.EventTypeFromString(queryValues.Get(linkEventReferenceEventTypeKey))
if err != nil {
return nil, err
}
return &commonpb.Link_WorkflowEvent_EventReference{
EventId: eventID,
EventType: eventType,
}, nil
return eventRef, nil
}
Loading
Loading