Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion backend/runtimestate/runtimestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
"github.com/dapr/kit/ptr"
)

var ErrDuplicateEvent = errors.New("duplicate event")
Expand Down Expand Up @@ -140,8 +141,35 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
Router: action.Router,
})
if s.StartEvent.GetParentInstance() != nil {
// Create a router for the completion event that routes back to the parent
var completionRouter *protos.TaskRouter
if action.Router != nil {
var parentAppID *string

allEvents := append(s.OldEvents, s.NewEvents...)
for _, event := range allEvents {
if es := event.GetExecutionStarted(); es != nil && event.GetRouter() != nil {
parentAppID = ptr.Of(event.GetRouter().GetSource())
break
}
}

if parentAppID != nil {
completionRouter = &protos.TaskRouter{
Source: action.Router.Source,
Target: parentAppID,
}
} else {
completionRouter = action.Router
}
}

msg := &protos.OrchestrationRuntimeStateMessage{
HistoryEvent: &protos.HistoryEvent{EventId: -1, Timestamp: timestamppb.Now()},
HistoryEvent: &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.Now(),
Router: completionRouter,
},
TargetInstanceID: s.StartEvent.GetParentInstance().OrchestrationInstance.InstanceId,
}
if completedAction.OrchestrationStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED {
Expand Down
2 changes: 1 addition & 1 deletion task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (policy *RetryPolicy) Validate() error {
return nil
}

func WithAppID(targetAppID string) callActivityOption {
func WithActivityAppID(targetAppID string) callActivityOption {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because the suborchestration and activity options live in the same task package.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine since we have not released an SDK version with the original func name. If that wasn't the case we should need to create an alias for backwards compat.

return func(opt *callActivityOptions) error {
opt.targetAppID = &targetAppID
return nil
Expand Down
39 changes: 35 additions & 4 deletions task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ type OrchestrationContext struct {

// callSubOrchestratorOptions is a struct that holds the options for the CallSubOrchestrator orchestrator method.
type callSubOrchestratorOptions struct {
instanceID string
rawInput *wrapperspb.StringValue

instanceID string
rawInput *wrapperspb.StringValue
targetAppID *string
retryPolicy *RetryPolicy
}

Expand All @@ -64,6 +64,14 @@ type subOrchestratorOption func(*callSubOrchestratorOptions) error
// ContinueAsNewOption is a functional option type for the ContinueAsNew orchestrator method.
type ContinueAsNewOption func(*OrchestrationContext)

// WithSubOrchestratorAppID is a functional option type for the CallSubOrchestrator orchestrator method that specifies the app ID of the target activity.
func WithSubOrchestratorAppID(appID string) subOrchestratorOption {
return func(opts *callSubOrchestratorOptions) error {
opts.targetAppID = &appID
return nil
}
}

// WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the
// runtime to carry forward any unprocessed external events to the new instance.
func WithKeepUnprocessedEvents() ContinueAsNewOption {
Expand Down Expand Up @@ -205,7 +213,14 @@ func (ctx *OrchestrationContext) processEvent(e *backend.HistoryEvent) error {
} else if es := e.GetExecutionStarted(); es != nil {
// Extract source AppID from HistoryEvent Router if this is ExecutionStartedEvent
if e.GetRouter() != nil {
ctx.appID = ptr.Of(e.GetRouter().GetSource())
router := e.GetRouter()
// For cross-app suborchestrations, if we have a target, use that as our appID
// since that's where we're actually executing
if router.Target != nil {
ctx.appID = ptr.Of(router.GetTarget())
} else {
ctx.appID = ptr.Of(router.GetSource())
}
}
err = ctx.onExecutionStarted(es)
} else if ts := e.GetTaskScheduled(); ts != nil {
Expand Down Expand Up @@ -338,6 +353,15 @@ func (ctx *OrchestrationContext) internalCallSubOrchestrator(orchestratorName st
},
},
}
if ctx.appID != nil {
createSubOrchestrationAction.Router = &protos.TaskRouter{
Source: *ctx.appID,
}

if options.targetAppID != nil {
createSubOrchestrationAction.Router.Target = options.targetAppID
}
}
ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction

task := newTask(ctx)
Expand Down Expand Up @@ -758,6 +782,13 @@ func (ctx *OrchestrationContext) setCompleteInternal(
},
},
}

if ctx.appID != nil {
completedAction.Router = &protos.TaskRouter{
Source: *ctx.appID,
}
}

ctx.pendingActions[sequenceNumber] = completedAction
return nil
}
Expand Down