diff --git a/backend/runtimestate/runtimestate.go b/backend/runtimestate/runtimestate.go index 2490177..8ee58c8 100644 --- a/backend/runtimestate/runtimestate.go +++ b/backend/runtimestate/runtimestate.go @@ -12,6 +12,7 @@ import ( "github.com/dapr/durabletask-go/api" "github.com/dapr/durabletask-go/api/helpers" "github.com/dapr/durabletask-go/api/protos" + "github.com/dapr/kit/ptr" ) var ErrDuplicateEvent = errors.New("duplicate event") @@ -140,8 +141,35 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb. Router: action.Router, }) if s.StartEvent.GetParentInstance() != nil { + // Create a router for the completion event that routes back to the parent + var completionRouter *protos.TaskRouter + if action.Router != nil { + var parentAppID *string + + allEvents := append(s.OldEvents, s.NewEvents...) + for _, event := range allEvents { + if es := event.GetExecutionStarted(); es != nil && event.GetRouter() != nil { + parentAppID = ptr.Of(event.GetRouter().GetSource()) + break + } + } + + if parentAppID != nil { + completionRouter = &protos.TaskRouter{ + Source: action.Router.Source, + Target: parentAppID, + } + } else { + completionRouter = action.Router + } + } + msg := &protos.OrchestrationRuntimeStateMessage{ - HistoryEvent: &protos.HistoryEvent{EventId: -1, Timestamp: timestamppb.Now()}, + HistoryEvent: &protos.HistoryEvent{ + EventId: -1, + Timestamp: timestamppb.Now(), + Router: completionRouter, + }, TargetInstanceID: s.StartEvent.GetParentInstance().OrchestrationInstance.InstanceId, } if completedAction.OrchestrationStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED { diff --git a/task/activity.go b/task/activity.go index f351925..56a2bac 100644 --- a/task/activity.go +++ b/task/activity.go @@ -59,7 +59,7 @@ func (policy *RetryPolicy) Validate() error { return nil } -func WithAppID(targetAppID string) callActivityOption { +func WithActivityAppID(targetAppID string) callActivityOption { return func(opt *callActivityOptions) error { opt.targetAppID = &targetAppID return nil diff --git a/task/orchestrator.go b/task/orchestrator.go index ae91aa2..c1ba014 100644 --- a/task/orchestrator.go +++ b/task/orchestrator.go @@ -52,9 +52,9 @@ type OrchestrationContext struct { // callSubOrchestratorOptions is a struct that holds the options for the CallSubOrchestrator orchestrator method. type callSubOrchestratorOptions struct { - instanceID string - rawInput *wrapperspb.StringValue - + instanceID string + rawInput *wrapperspb.StringValue + targetAppID *string retryPolicy *RetryPolicy } @@ -64,6 +64,14 @@ type subOrchestratorOption func(*callSubOrchestratorOptions) error // ContinueAsNewOption is a functional option type for the ContinueAsNew orchestrator method. type ContinueAsNewOption func(*OrchestrationContext) +// WithSubOrchestratorAppID is a functional option type for the CallSubOrchestrator orchestrator method that specifies the app ID of the target activity. +func WithSubOrchestratorAppID(appID string) subOrchestratorOption { + return func(opts *callSubOrchestratorOptions) error { + opts.targetAppID = &appID + return nil + } +} + // WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the // runtime to carry forward any unprocessed external events to the new instance. func WithKeepUnprocessedEvents() ContinueAsNewOption { @@ -205,7 +213,14 @@ func (ctx *OrchestrationContext) processEvent(e *backend.HistoryEvent) error { } else if es := e.GetExecutionStarted(); es != nil { // Extract source AppID from HistoryEvent Router if this is ExecutionStartedEvent if e.GetRouter() != nil { - ctx.appID = ptr.Of(e.GetRouter().GetSource()) + router := e.GetRouter() + // For cross-app suborchestrations, if we have a target, use that as our appID + // since that's where we're actually executing + if router.Target != nil { + ctx.appID = ptr.Of(router.GetTarget()) + } else { + ctx.appID = ptr.Of(router.GetSource()) + } } err = ctx.onExecutionStarted(es) } else if ts := e.GetTaskScheduled(); ts != nil { @@ -338,6 +353,15 @@ func (ctx *OrchestrationContext) internalCallSubOrchestrator(orchestratorName st }, }, } + if ctx.appID != nil { + createSubOrchestrationAction.Router = &protos.TaskRouter{ + Source: *ctx.appID, + } + + if options.targetAppID != nil { + createSubOrchestrationAction.Router.Target = options.targetAppID + } + } ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction task := newTask(ctx) @@ -758,6 +782,13 @@ func (ctx *OrchestrationContext) setCompleteInternal( }, }, } + + if ctx.appID != nil { + completedAction.Router = &protos.TaskRouter{ + Source: *ctx.appID, + } + } + ctx.pendingActions[sequenceNumber] = completedAction return nil }