From 3ae03a459688934c0ba36b4fc95195901ac84957 Mon Sep 17 00:00:00 2001 From: mikeee Date: Sun, 17 Dec 2023 15:19:50 +0000 Subject: [PATCH] feat: initial workflow Signed-off-by: mikeee --- go.mod | 6 ++ go.sum | 13 ++++ workflow/activity_context.go | 13 ++++ workflow/context.go | 60 ++++++++++++++++ workflow/runtime.go | 131 +++++++++++++++++++++++++++++++++++ workflow/runtime_test.go | 28 ++++++++ 6 files changed, 251 insertions(+) create mode 100644 workflow/activity_context.go create mode 100644 workflow/context.go create mode 100644 workflow/runtime.go create mode 100644 workflow/runtime_test.go diff --git a/go.mod b/go.mod index 79bfe789..04e4cb52 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.3 github.com/google/uuid v1.3.1 + github.com/microsoft/durabletask-go v0.3.1 github.com/stretchr/testify v1.8.4 google.golang.org/grpc v1.57.0 google.golang.org/protobuf v1.31.0 @@ -15,10 +16,15 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/kr/text v0.2.0 // indirect + github.com/marusama/semaphore/v2 v2.5.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.opentelemetry.io/otel v1.16.0 // indirect + go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/sys v0.12.0 // indirect diff --git a/go.sum b/go.sum index 05f510a7..53e70b89 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/dapr/dapr v1.12.1-0.20231030205344-441017b888c5 h1:IlC2/2TemJw3dC1P8DsFZ4/ANl6IojDr50B7B8dIGIk= github.com/dapr/dapr v1.12.1-0.20231030205344-441017b888c5/go.mod h1:zHcMel+UwYnMWfvJwpaDr43p95JteXyvBsSjXNnPU+c= @@ -5,6 +7,11 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk= github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -20,6 +27,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= +github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= +github.com/microsoft/durabletask-go v0.3.1 h1:Y7RrPefd4cz5GMxjMx/Zvf9r5INombNlzI0DaQd994k= +github.com/microsoft/durabletask-go v0.3.1/go.mod h1:t3u0iRvIadT1y4MD5cUG0mbTOqgANT6IFcLogv7o0M0= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -27,6 +38,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/workflow/activity_context.go b/workflow/activity_context.go new file mode 100644 index 00000000..72b2bd7f --- /dev/null +++ b/workflow/activity_context.go @@ -0,0 +1,13 @@ +package workflow + +import ( + "github.com/microsoft/durabletask-go/task" +) + +type ActivityContext struct { + ctx task.ActivityContext +} + +func (wfac *ActivityContext) GetInput(v interface{}) error { + return wfac.ctx.GetInput(&v) +} diff --git a/workflow/context.go b/workflow/context.go new file mode 100644 index 00000000..439f16c7 --- /dev/null +++ b/workflow/context.go @@ -0,0 +1,60 @@ +package workflow + +import ( + "fmt" + "log" + "time" + + "github.com/microsoft/durabletask-go/task" +) + +type Context struct { + orchestrationContext *task.OrchestrationContext +} + +func (wfc *Context) GetInput(v interface{}) error { + return wfc.orchestrationContext.GetInput(&v) +} + +func (wfc *Context) Name() string { + return wfc.orchestrationContext.Name +} + +func (wfc *Context) InstanceID() string { + return fmt.Sprintf("%v", wfc.orchestrationContext.ID) +} + +func (wfc *Context) CurrentUTCDateTime() time.Time { + return wfc.orchestrationContext.CurrentTimeUtc +} + +func (wfc *Context) IsReplaying() bool { + return wfc.orchestrationContext.IsReplaying +} + +func (wfc *Context) CallActivity(activity interface{}) task.Task { + var inp string + if err := wfc.GetInput(&inp); err != nil { + log.Printf("unable to get activity input: %v", err) + } + // the call should continue despite being unable to obtain an input + + return wfc.orchestrationContext.CallActivity(activity, task.WithActivityInput(inp)) +} + +func (wfc *Context) CallChildWorkflow() { + // TODO: implement + // call suborchestrator +} + +func (wfc *Context) CreateTimer() { + // TODO: implement +} + +func (wfc *Context) WaitForExternalEvent() { + // TODO: implement +} + +func (wfc *Context) ContinueAsNew() { + // TODO: implement +} diff --git a/workflow/runtime.go b/workflow/runtime.go new file mode 100644 index 00000000..892dc85e --- /dev/null +++ b/workflow/runtime.go @@ -0,0 +1,131 @@ +package workflow + +import ( + "context" + "errors" + "fmt" + "log" + "reflect" + "runtime" + "strings" + "sync" + "time" + + "github.com/microsoft/durabletask-go/backend" + "github.com/microsoft/durabletask-go/client" + "github.com/microsoft/durabletask-go/task" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type WorkflowRuntime struct { + tasks *task.TaskRegistry + client *client.TaskHubGrpcClient + + mutex sync.Mutex // TODO: implement + quit chan bool + cancel context.CancelFunc +} + +type Workflow func(ctx *Context) (any, error) + +type Activity func(ctx ActivityContext) (any, error) + +func NewRuntime(host string, port string) (*WorkflowRuntime, error) { + ctx, canc := context.WithTimeout(context.Background(), time.Second*10) + defer canc() + + address := fmt.Sprintf("%s:%s", host, port) + + clientConn, err := grpc.DialContext( + ctx, + address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), // TODO: config + ) + if err != nil { + return nil, fmt.Errorf("failed to create runtime - grpc connection failed: %v", err) + } + + return &WorkflowRuntime{ + tasks: task.NewTaskRegistry(), + client: client.NewTaskHubGrpcClient(clientConn, backend.DefaultLogger()), + quit: make(chan bool), + cancel: canc, + }, nil +} + +func getDecorator(f interface{}) (string, error) { + if f == nil { + return "", errors.New("nil function name") + } + + callSplit := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".") + + funcName := callSplit[len(callSplit)-1] + + return funcName, nil +} + +func (wr *WorkflowRuntime) RegisterWorkflow(w Workflow) error { + wrappedOrchestration := func(ctx *task.OrchestrationContext) (any, error) { + wfCtx := &Context{orchestrationContext: ctx} + + return w(wfCtx) + } + + // getdecorator for workflow + name, err := getDecorator(w) + if err != nil { + return fmt.Errorf("failed to get workflow decorator: %v", err) + } + + err = wr.tasks.AddOrchestratorN(name, wrappedOrchestration) + return err +} + +func (wr *WorkflowRuntime) RegisterActivity(a Activity) error { + wrappedActivity := func(ctx task.ActivityContext) (any, error) { + ac := ActivityContext{ctx: ctx} + + return a(ac) + } + + // getdecorator for activity + name, err := getDecorator(a) + if err != nil { + return fmt.Errorf("failed to get activity decorator: %v", err) + } + + err = wr.tasks.AddActivityN(name, wrappedActivity) + return err +} + +func (wr *WorkflowRuntime) Start() error { + // go func start + go func() { + err := wr.client.StartWorkItemListener(context.Background(), wr.tasks) + if err != nil { + log.Fatalf("failed to start work stream: %v", err) + } + for { + select { + case <-wr.quit: + return + default: + // continue serving + } + } + }() + + return nil +} + +func (wr *WorkflowRuntime) Shutdown() error { + // cancel grpc context + wr.cancel() + // send close signal + wr.quit <- true + + return nil +} diff --git a/workflow/runtime_test.go b/workflow/runtime_test.go new file mode 100644 index 00000000..eae8a86a --- /dev/null +++ b/workflow/runtime_test.go @@ -0,0 +1,28 @@ +package workflow + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWorkflowRuntime(t *testing.T) { + // TODO: Mock grpc conn - currently requires dapr to be available + t.Run("test workflow name is correct", func(t *testing.T) { + wr, err := NewRuntime("localhost", "50001") + require.NoError(t, err) + err = wr.RegisterWorkflow(testOrchestrator) + require.NoError(t, err) + }) +} + +func TestGetDecorator(t *testing.T) { + name, err := getDecorator(testOrchestrator) + require.NoError(t, err) + assert.Equal(t, "testOrchestrator", name) +} + +func testOrchestrator(ctx *Context) (any, error) { + return nil, nil +}