Skip to content

Commit 595fa5c

Browse files
authored
[1.16] Go Workflow durabletask (#4854)
The Go SDK workflow wrapper over durabletask-go is to be deprecated, in favour of using the durabletask-go client directly. The Go SDK wrapper is unnecessary indirection, and prevents some features from being used by the user. Signed-off-by: joshvanl <me@joshvanl.dev>
1 parent 9fa3b5a commit 595fa5c

File tree

2 files changed

+84
-258
lines changed

2 files changed

+84
-258
lines changed

daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md

Lines changed: 36 additions & 207 deletions
Original file line numberDiff line numberDiff line change
@@ -867,7 +867,8 @@ public class DemoWorkflow extends Workflow {
867867
- The `TestWorkflow` method
868868
- Creating the workflow with input and output.
869869
- API calls. In the example below, these calls start and call the workflow activities.
870-
870+
871+
871872
```go
872873
package main
873874

@@ -877,8 +878,11 @@ import (
877878
"log"
878879
"time"
879880

880-
"github.com/dapr/go-sdk/client"
881-
"github.com/dapr/go-sdk/workflow"
881+
"github.com/dapr/durabletask-go/api"
882+
"github.com/dapr/durabletask-go/backend"
883+
"github.com/dapr/durabletask-go/client"
884+
"github.com/dapr/durabletask-go/task"
885+
dapr "github.com/dapr/go-sdk/client"
882886
)
883887

884888
var stage = 0
@@ -888,110 +892,68 @@ const (
888892
)
889893

890894
func main() {
891-
w, err := workflow.NewWorker()
892-
if err != nil {
893-
log.Fatal(err)
894-
}
895+
registry := task.NewTaskRegistry()
895896

896-
fmt.Println("Worker initialized")
897-
898-
if err := w.RegisterWorkflow(TestWorkflow); err != nil {
897+
if err := registry.AddOrchestrator(TestWorkflow); err != nil {
899898
log.Fatal(err)
900899
}
901900
fmt.Println("TestWorkflow registered")
902901

903-
if err := w.RegisterActivity(TestActivity); err != nil {
902+
if err := registry.AddActivity(TestActivity); err != nil {
904903
log.Fatal(err)
905904
}
906905
fmt.Println("TestActivity registered")
907906

908-
// Start workflow runner
909-
if err := w.Start(); err != nil {
910-
log.Fatal(err)
907+
daprClient, err := dapr.NewClient()
908+
if err != nil {
909+
log.Fatalf("failed to create Dapr client: %v", err)
911910
}
912-
fmt.Println("runner started")
913911

914-
daprClient, err := client.NewClient()
915-
if err != nil {
916-
log.Fatalf("failed to intialise client: %v", err)
912+
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
913+
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
914+
log.Fatalf("failed to start work item listener: %v", err)
917915
}
918-
defer daprClient.Close()
916+
917+
fmt.Println("runner started")
918+
919919
ctx := context.Background()
920920

921921
// Start workflow test
922-
respStart, err := daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
923-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
924-
WorkflowComponent: workflowComponent,
925-
WorkflowName: "TestWorkflow",
926-
Options: nil,
927-
Input: 1,
928-
SendRawInput: false,
929-
})
922+
id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
930923
if err != nil {
931924
log.Fatalf("failed to start workflow: %v", err)
932925
}
933-
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
926+
fmt.Printf("workflow started with id: %v\n", id)
934927

935928
// Pause workflow test
936-
err = daprClient.PauseWorkflow(ctx, &client.PauseWorkflowRequest{
937-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
938-
WorkflowComponent: workflowComponent,
939-
})
940-
929+
err = client.PurgeOrchestrationState(ctx, id)
941930
if err != nil {
942931
log.Fatalf("failed to pause workflow: %v", err)
943932
}
944933

945-
respGet, err := daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
946-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
947-
WorkflowComponent: workflowComponent,
948-
})
934+
respGet, err := client.FetchOrchestrationMetadata(ctx, id)
949935
if err != nil {
950936
log.Fatalf("failed to get workflow: %v", err)
951937
}
952-
953-
if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
954-
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
955-
}
956-
957-
fmt.Printf("workflow paused\n")
938+
fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)
958939

959940
// Resume workflow test
960-
err = daprClient.ResumeWorkflow(ctx, &client.ResumeWorkflowRequest{
961-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
962-
WorkflowComponent: workflowComponent,
963-
})
964-
941+
err = client.ResumeOrchestration(ctx, id, "")
965942
if err != nil {
966943
log.Fatalf("failed to resume workflow: %v", err)
967944
}
945+
fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)
968946

969-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
970-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
971-
WorkflowComponent: workflowComponent,
972-
})
947+
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
973948
if err != nil {
974949
log.Fatalf("failed to get workflow: %v", err)
975950
}
976-
977-
if respGet.RuntimeStatus != workflow.StatusRunning.String() {
978-
log.Fatalf("workflow not running")
979-
}
980-
981-
fmt.Println("workflow resumed")
951+
fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)
982952

983953
fmt.Printf("stage: %d\n", stage)
984954

985955
// Raise Event Test
986-
987-
err = daprClient.RaiseEventWorkflow(ctx, &client.RaiseEventWorkflowRequest{
988-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
989-
WorkflowComponent: workflowComponent,
990-
EventName: "testEvent",
991-
EventData: "testData",
992-
SendRawData: false,
993-
})
994-
956+
err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))
995957
if err != nil {
996958
fmt.Printf("failed to raise event: %v", err)
997959
}
@@ -1002,177 +964,44 @@ func main() {
1002964

1003965
fmt.Printf("stage: %d\n", stage)
1004966

1005-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
1006-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1007-
WorkflowComponent: workflowComponent,
1008-
})
967+
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
1009968
if err != nil {
1010969
log.Fatalf("failed to get workflow: %v", err)
1011970
}
1012971

1013972
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
1014973

1015974
// Purge workflow test
1016-
err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
1017-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1018-
WorkflowComponent: workflowComponent,
1019-
})
975+
err = client.PurgeOrchestrationState(ctx, id)
1020976
if err != nil {
1021977
log.Fatalf("failed to purge workflow: %v", err)
1022978
}
1023-
1024-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
1025-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1026-
WorkflowComponent: workflowComponent,
1027-
})
1028-
if err != nil && respGet != nil {
1029-
log.Fatal("failed to purge workflow")
1030-
}
1031-
1032979
fmt.Println("workflow purged")
1033-
1034-
fmt.Printf("stage: %d\n", stage)
1035-
1036-
// Terminate workflow test
1037-
respStart, err = daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
1038-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1039-
WorkflowComponent: workflowComponent,
1040-
WorkflowName: "TestWorkflow",
1041-
Options: nil,
1042-
Input: 1,
1043-
SendRawInput: false,
1044-
})
1045-
if err != nil {
1046-
log.Fatalf("failed to start workflow: %v", err)
1047-
}
1048-
1049-
fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
1050-
1051-
err = daprClient.TerminateWorkflow(ctx, &client.TerminateWorkflowRequest{
1052-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1053-
WorkflowComponent: workflowComponent,
1054-
})
1055-
if err != nil {
1056-
log.Fatalf("failed to terminate workflow: %v", err)
1057-
}
1058-
1059-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
1060-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1061-
WorkflowComponent: workflowComponent,
1062-
})
1063-
if err != nil {
1064-
log.Fatalf("failed to get workflow: %v", err)
1065-
}
1066-
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
1067-
log.Fatal("failed to terminate workflow")
1068-
}
1069-
1070-
fmt.Println("workflow terminated")
1071-
1072-
err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
1073-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1074-
WorkflowComponent: workflowComponent,
1075-
})
1076-
1077-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
1078-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1079-
WorkflowComponent: workflowComponent,
1080-
})
1081-
if err == nil || respGet != nil {
1082-
log.Fatalf("failed to purge workflow: %v", err)
1083-
}
1084-
1085-
fmt.Println("workflow purged")
1086-
1087-
stage = 0
1088-
fmt.Println("workflow client test")
1089-
1090-
wfClient, err := workflow.NewClient()
1091-
if err != nil {
1092-
log.Fatalf("[wfclient] faield to initialize: %v", err)
1093-
}
1094-
1095-
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
1096-
if err != nil {
1097-
log.Fatalf("[wfclient] failed to start workflow: %v", err)
1098-
}
1099-
1100-
fmt.Printf("[wfclient] started workflow with id: %s\n", id)
1101-
1102-
metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
1103-
if err != nil {
1104-
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
1105-
}
1106-
1107-
fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())
1108-
1109-
if stage != 1 {
1110-
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
1111-
}
1112-
1113-
fmt.Printf("[wfclient] stage: %d\n", stage)
1114-
1115-
// raise event
1116-
1117-
if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
1118-
log.Fatalf("[wfclient] failed to raise event: %v", err)
1119-
}
1120-
1121-
fmt.Println("[wfclient] event raised")
1122-
1123-
// Sleep to allow the workflow to advance
1124-
time.Sleep(time.Second)
1125-
1126-
if stage != 2 {
1127-
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
1128-
}
1129-
1130-
fmt.Printf("[wfclient] stage: %d\n", stage)
1131-
1132-
// stop workflow
1133-
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
1134-
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
1135-
}
1136-
1137-
fmt.Println("[wfclient] workflow terminated")
1138-
1139-
if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
1140-
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
1141-
}
1142-
1143-
fmt.Println("[wfclient] workflow purged")
1144-
1145-
// stop workflow runtime
1146-
if err := w.Shutdown(); err != nil {
1147-
log.Fatalf("failed to shutdown runtime: %v", err)
1148-
}
1149-
1150-
fmt.Println("workflow worker successfully shutdown")
1151980
}
1152981

1153-
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
982+
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
1154983
var input int
1155984
if err := ctx.GetInput(&input); err != nil {
1156985
return nil, err
1157986
}
1158987
var output string
1159-
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
988+
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
1160989
return nil, err
1161990
}
1162991

1163-
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
992+
err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
1164993
if err != nil {
1165994
return nil, err
1166995
}
1167996

1168-
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
997+
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
1169998
return nil, err
1170999
}
11711000

11721001
return output, nil
11731002
}
11741003

1175-
func TestActivity(ctx workflow.ActivityContext) (any, error) {
1004+
func TestActivity(ctx task.ActivityContext) (any, error) {
11761005
var input int
11771006
if err := ctx.GetInput(&input); err != nil {
11781007
return "", err

0 commit comments

Comments
 (0)