Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Contributing

## Cloning this repository

This repository contains submodules. Be sure to clone it with the option to include submodules. Otherwise, you will not be able to generate the protobuf code.

```bash
git clone --recurse-submodules https://github.com/dapr/durabletask-go
```

If you already cloned the repository without `--recurse-submodules`, you can initialize and update the submodules with:

```bash
git submodule update --init --recursive
```

To grab latest, do some variation of the following nuke your existing submodule folder then run:
```bash
rm -rf submodules/durabletask-protobuf
git submodule add --force https://github.com/dapr/durabletask-protobuf.git submodules/durabletask-protobuf
git submodule update --remote submodules/durabletask-protobuf
```

This will initialize and update the submodules.

## Building the project

This project requires go v1.19.x or greater. You can build a standalone executable by simply running `go build` at the project root.

### Generating protobuf

Use the following command to regenerate the protobuf from the submodule. Use this whenever updating the submodule reference.

```bash
# Run from the repo root and specify the output directory
# This will place the generated files directly in api/protos/, matching the go_package and your repo structure.
protoc --go_out=. --go-grpc_out=. \
-I submodules/durabletask-protobuf/protos \
submodules/durabletask-protobuf/protos/orchestrator_service.proto \
submodules/durabletask-protobuf/protos/backend_service.proto \
submodules/durabletask-protobuf/protos/runtime_state.proto
```

For local development with protobuf changes:

1. If you have local changes to the proto files in a neighboring durabletask-protobuf directory:
```bash
# Point go.mod to your local durabletask-protobuf repo
replace github.com/dapr/durabletask-protobuf => ../durabletask-protobuf

# Regenerate protobuf files using your local proto definitions
protoc --go_out=. --go-grpc_out=. \
-I ../durabletask-protobuf/protos \
../durabletask-protobuf/protos/orchestrator_service.proto \
../durabletask-protobuf/protos/backend_service.proto \
../durabletask-protobuf/protos/runtime_state.proto
```

This will use your local proto files instead of the ones in the submodule, which is useful when testing protobuf changes before submitting them upstream.

### Generating mocks for testing

Test mocks were generated using [mockery](https://github.com/vektra/mockery). Use the following command at the project root to regenerate the mocks.

```bash
mockery --dir ./backend --name="^Backend|^Executor|^TaskWorker" --output ./tests/mocks --with-expecter
```

## Running tests

All automated tests are under `./tests`. A separate test package hierarchy was chosen intentionally to prioritize [black box testing](https://en.wikipedia.org/wiki/Black-box_testing). This strategy also makes it easier to catch accidental breaking API changes.

Run tests with the following command.

```bash
go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./api/helpers
```
40 changes: 0 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,46 +243,6 @@ You can find this code in the [distributedtracing](./samples/distributedtracing)

Note that each orchestration is represented as a single span with activities, timers, and sub-orchestrations as child spans. The generated spans contain a variety of attributes that include information such as orchestration instance IDs, task names, task IDs, etc.

## Cloning this repository

This repository contains submodules. Be sure to clone it with the option to include submodules. Otherwise you will not be able to generate the protobuf code.

```bash
git clone --recurse-submodules https://github.com/dapr/durabletask-go
```

## Building the project

This project requires go v1.19.x or greater. You can build a standalone executable by simply running `go build` at the project root.

### Generating protobuf

Use the following command to regenerate the protobuf from the submodule. Use this whenever updating the submodule reference.

```bash
# NOTE: assumes the .proto file defines: option go_package = "/api/protos"
# NOTE: currently the .proto file actually defines: option go_package = "/internal/protos"; , we are manually changing that to be /api/protos
protoc --go_out=. --go-grpc_out=. -I submodules/durabletask-protobuf/protos orchestrator_service.proto
```

### Generating mocks for testing

Test mocks were generated using [mockery](https://github.com/vektra/mockery). Use the following command at the project root to regenerate the mocks.

```bash
mockery --dir ./backend --name="^Backend|^Executor|^TaskWorker" --output ./tests/mocks --with-expecter
```

## Running tests

All automated tests are under `./tests`. A separate test package hierarchy was chosen intentionally to prioritize [black box testing](https://en.wikipedia.org/wiki/Black-box_testing). This strategy also makes it easier to catch accidental breaking API changes.

Run tests with the following command.

```bash
go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./api/helpers
```

## Running integration tests

You can run pre-built container images to run full integration tests against the durable task host over gRPC.
Expand Down
15 changes: 7 additions & 8 deletions api/protos/backend_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions api/protos/orchestrator_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/protos/orchestrator_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions api/protos/runtime_state.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
FailureDetails: failureDetails,
},
},
Router: e.Router,
}
} else {
responseEvent = &protos.HistoryEvent{
Expand All @@ -229,6 +230,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
TaskExecutionId: task.TaskExecutionId,
},
},
Router: e.Router,
}
}

Expand Down
9 changes: 9 additions & 0 deletions backend/runtimestate/runtimestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
EventType: &protos.HistoryEvent_OrchestratorStarted{
OrchestratorStarted: &protos.OrchestratorStartedEvent{},
},
Router: action.Router,
})

// Duplicate the start event info, updating just the input
Expand All @@ -111,6 +112,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
ParentTraceContext: s.StartEvent.ParentTraceContext,
},
},
Router: action.Router,
},
)

Expand All @@ -135,6 +137,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
FailureDetails: completedAction.FailureDetails,
},
},
Router: action.Router,
})
if s.StartEvent.GetParentInstance() != nil {
msg := &protos.OrchestrationRuntimeStateMessage{
Expand Down Expand Up @@ -170,6 +173,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
Name: createtimer.Name,
},
},
Router: action.Router,
})
// TODO cant pass trace context
s.PendingTimers = append(s.PendingTimers, &protos.HistoryEvent{
Expand All @@ -195,6 +199,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
ParentTraceContext: currentTraceContext,
},
},
Router: action.Router,
}
AddEvent(s, scheduledEvent)
s.PendingTasks = append(s.PendingTasks, scheduledEvent)
Expand All @@ -216,6 +221,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
ParentTraceContext: currentTraceContext,
},
},
Router: action.Router,
})
startEvent := &protos.HistoryEvent{
EventId: -1,
Expand All @@ -236,6 +242,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
ParentTraceContext: currentTraceContext,
},
},
Router: action.Router,
}

s.PendingMessages = append(s.PendingMessages, &protos.OrchestrationRuntimeStateMessage{HistoryEvent: startEvent, TargetInstanceID: createSO.InstanceId})
Expand All @@ -250,6 +257,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
Input: sendEvent.Data,
},
},
Router: action.Router,
}
AddEvent(s, e)
s.PendingMessages = append(s.PendingMessages, &protos.OrchestrationRuntimeStateMessage{HistoryEvent: e, TargetInstanceID: sendEvent.Instance.InstanceId})
Expand All @@ -266,6 +274,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
Recurse: terminate.Recurse,
},
},
Router: action.Router,
},
}
s.PendingMessages = append(s.PendingMessages, msg)
Expand Down
2 changes: 1 addition & 1 deletion submodules/durabletask-protobuf
8 changes: 8 additions & 0 deletions task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type callActivityOption func(*callActivityOptions) error
type callActivityOptions struct {
rawInput *wrapperspb.StringValue
retryPolicy *RetryPolicy
targetAppID *string
}

type RetryPolicy struct {
Expand Down Expand Up @@ -58,6 +59,13 @@ func (policy *RetryPolicy) Validate() error {
return nil
}

func WithAppID(targetAppID string) callActivityOption {
return func(opt *callActivityOptions) error {
opt.targetAppID = &targetAppID
return nil
}
}

// WithActivityInput configures an input for an activity invocation.
// The specified input must be JSON serializable.
func WithActivityInput(input any) callActivityOption {
Expand Down
18 changes: 18 additions & 0 deletions task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/kit/ptr"
)

// Orchestrator is the functional interface for orchestrator functions.
Expand All @@ -28,6 +29,7 @@ type OrchestrationContext struct {
Name string
IsReplaying bool
CurrentTimeUtc time.Time
appID *string

registry *TaskRegistry
rawInput []byte
Expand Down Expand Up @@ -201,6 +203,10 @@ func (ctx *OrchestrationContext) processEvent(e *backend.HistoryEvent) error {
// OrchestratorStarted is only used to update the current orchestration time
ctx.CurrentTimeUtc = e.Timestamp.AsTime()
} else if es := e.GetExecutionStarted(); es != nil {
// Extract source AppID from HistoryEvent Router if this is ExecutionStartedEvent
if e.GetRouter() != nil {
ctx.appID = ptr.Of(e.GetRouter().GetSource())
}
err = ctx.onExecutionStarted(es)
} else if ts := e.GetTaskScheduled(); ts != nil {
err = ctx.onTaskScheduled(e.EventId, ts)
Expand Down Expand Up @@ -278,13 +284,25 @@ func (ctx *OrchestrationContext) internalScheduleActivity(activityName, taskExec
},
}

// Add TaskRouter support for cross-app activities
if ctx.appID != nil {
scheduleTaskAction.Router = &protos.TaskRouter{
Source: *ctx.appID, // Current orchestrator app ID
}

if options.targetAppID != nil {
scheduleTaskAction.Router.Target = options.targetAppID // Target activity app ID
}
}

ctx.pendingActions[scheduleTaskAction.Id] = scheduleTaskAction

task := newTask(ctx)
ctx.pendingTasks[scheduleTaskAction.Id] = task
return task
}

// TODO: cassie wire appID into suborchestration options too for cross app wf
func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task {
options := new(callSubOrchestratorOptions)
for _, configure := range opts {
Expand Down