Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add saga interceptor #222

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions sagainterceptor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
### Steps to run this sample:
1) You need a Temporal service running. See details in README.md
2) Run the following command to start the worker
```
go run sagainterceptor/worker/main.go
```
3) Run the following command to start the example
```
go run sagainterceptor/start/main.go
```


Workflow definition has no compensations, it is another style to implement saga pattern compared to saga/workflow.go.

Based on https://github.com/temporalio/money-transfer-project-template-go
60 changes: 60 additions & 0 deletions sagainterceptor/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package sagainterceptor

import (
"context"
"errors"
"fmt"
)

func Withdraw(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nWithdrawing $%f from account %s. ReferenceId: %s\n",
transferDetails.Amount,
transferDetails.FromAccount,
transferDetails.ReferenceID,
)

return nil
}

func WithdrawCompensation(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nWithdrawing compensation $%f from account %s. ReferenceId: %s\n",
transferDetails.Amount,
transferDetails.FromAccount,
transferDetails.ReferenceID,
)

return nil
}

func Deposit(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nDepositing $%f into account %s. ReferenceId: %s\n",
transferDetails.Amount,
transferDetails.ToAccount,
transferDetails.ReferenceID,
)

return nil
}

func DepositCompensation(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nDepositing compensation $%f into account %s. ReferenceId: %s\n",
transferDetails.Amount,
transferDetails.ToAccount,
transferDetails.ReferenceID,
)

return nil
}

func StepWithError(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nSimulate failure to trigger compensation. ReferenceId: %s\n",
transferDetails.ReferenceID,
)

return errors.New("some error")
}
178 changes: 178 additions & 0 deletions sagainterceptor/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package sagainterceptor

import (
"time"

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/workflow"
"go.uber.org/multierr"
)

var (
defaultActivityOpts = workflow.ActivityOptions{
ScheduleToStartTimeout: 1 * time.Minute,
StartToCloseTimeout: 5 * time.Minute,
}
)

type (
// SagaOptions is options for a saga transactional workflow
SagaOptions struct{}

// CompensationOptions is options for compensate.
CompensationOptions struct {
// ActivityType is the name of compensate activity.
ActivityType string
// ActivityOptions is the activity execute options, local activity is not supported.
ActivityOptions *workflow.ActivityOptions
// Converter optional. Convert req & response to request for compensate activity.
// currently, activity func is not available for worker, so decode futures should be done by developer.
Converter func(ctx workflow.Context, f workflow.Future, args ...interface{}) ([]interface{}, error)
}

//InterceptorOptions is options for saga interceptor.
InterceptorOptions struct {
// WorkflowRegistry names for workflow to be treated as Saga transaction.
WorkflowRegistry map[string]SagaOptions
// ActivityRegistry Action -> CompensateAction, key is activity type for action.
ActivityRegistry map[string]CompensationOptions
}

sagaInterceptor struct {
interceptor.WorkerInterceptorBase
options InterceptorOptions
}

workflowInboundInterceptor struct {
interceptor.WorkflowInboundInterceptorBase
root *sagaInterceptor
ctx sagaContext
}

workflowOutboundInterceptor struct {
interceptor.WorkflowOutboundInterceptorBase
root *sagaInterceptor
ctx *sagaContext
}

compensation struct {
Options *CompensationOptions
ActionFuture workflow.Future
ActionArgs []interface{}
}

sagaContext struct {
Compensations []*compensation
}
)

// NewInterceptor creates an interceptor for execute in Saga patterns.
// when workflow fails, registered compensate activities will be executed automatically.
func NewInterceptor(options InterceptorOptions) (interceptor.WorkerInterceptor, error) {
return &sagaInterceptor{options: options}, nil
}

func (s *sagaInterceptor) InterceptWorkflow(
ctx workflow.Context,
next interceptor.WorkflowInboundInterceptor,
) interceptor.WorkflowInboundInterceptor {
if _, ok := s.options.WorkflowRegistry[workflow.GetInfo(ctx).WorkflowType.Name]; !ok {
return next
}

workflow.GetLogger(ctx).Debug("intercept saga workflow")
i := &workflowInboundInterceptor{root: s}
i.Next = next
return i
}

func (w *workflowInboundInterceptor) Init(outbound interceptor.WorkflowOutboundInterceptor) error {
i := &workflowOutboundInterceptor{root: w.root, ctx: &w.ctx}
i.Next = outbound
return w.Next.Init(i)
}

func (w *workflowInboundInterceptor) ExecuteWorkflow(
ctx workflow.Context,
in *interceptor.ExecuteWorkflowInput,
) (ret interface{}, err error) {
workflow.GetLogger(ctx).Debug("intercept ExecuteWorkflow")
ret, wferr := w.Next.ExecuteWorkflow(ctx, in)
if wferr == nil || len(w.ctx.Compensations) == 0 {
return ret, wferr
}

ctx, cancel := workflow.NewDisconnectedContext(ctx)
defer cancel()
for i := len(w.ctx.Compensations) - 1; i >= 0; i-- {
c := w.ctx.Compensations[i]
// only compensate action with success
if err := c.ActionFuture.Get(ctx, nil); err != nil {
continue
}

// add opts if not config
activityOpts := c.Options.ActivityOptions
if activityOpts == nil {
activityOpts = &defaultActivityOpts
}
ctx = workflow.WithActivityOptions(ctx, *activityOpts)

// use arg in action as default for compensate
args := c.ActionArgs
if c.Options.Converter != nil {
args, err = c.Options.Converter(ctx, c.ActionFuture, c.ActionArgs...)
if err != nil {
workflow.GetLogger(ctx).Error("failed to convert to compensate req", "error", err)
return ret, multierr.Append(wferr, err)
}
}

if err := workflow.ExecuteActivity(ctx, c.Options.ActivityType, args...).Get(ctx, nil); err != nil {
// best effort, save error and continue
//wferr = multierr.Append(wferr, err)

// fail fast, one compensation fails, it will not continue
return ret, multierr.Append(wferr, err)
}
}
return ret, wferr
}

func (w *workflowOutboundInterceptor) ExecuteActivity(
ctx workflow.Context,
activityType string,
args ...interface{},
) workflow.Future {
workflow.GetLogger(ctx).Debug("intercept ExecuteActivity", "activity_type", activityType)
f := w.Next.ExecuteActivity(ctx, activityType, args...)
if opts, ok := w.root.options.ActivityRegistry[activityType]; ok {
workflow.GetLogger(ctx).Debug("save action future", "activity_type", activityType)
w.ctx.Compensations = append(w.ctx.Compensations, &compensation{
Options: &opts,
ActionArgs: args,
ActionFuture: f,
})
}

return f
}

func (w *workflowOutboundInterceptor) ExecuteLocalActivity(
ctx workflow.Context,
activityType string,
args ...interface{},
) workflow.Future {
workflow.GetLogger(ctx).Debug("intercept ExecuteLocalActivity", "activity_type", activityType)
f := w.Next.ExecuteLocalActivity(ctx, activityType, args...)
if opts, ok := w.root.options.ActivityRegistry[activityType]; ok {
workflow.GetLogger(ctx).Debug("save action future", "activity_type", activityType)
w.ctx.Compensations = append(w.ctx.Compensations, &compensation{
Options: &opts,
ActionArgs: args,
ActionFuture: f,
})
}

return f
}
134 changes: 134 additions & 0 deletions sagainterceptor/interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package sagainterceptor

import (
"context"
"errors"
"log"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

type orderInfo struct {
ID string
IsDelete bool
}

var (
orders = map[string]*orderInfo{}
amount = 1
suite testsuite.WorkflowTestSuite
)

func createOrder(ctx context.Context, amount int) (string, error) {
log.Println("enter createOrder")
id := "abc"
orders[id] = &orderInfo{
ID: id,
}
return id, nil
}

func deleteOrder(ctx context.Context, id string) error {
log.Println("enter deleteOrder, id:", id)
orders[id].IsDelete = true
return nil
}

func stockDeduct(ctx context.Context, in int) error {
log.Println("enter stockDeduct")
amount -= in
return nil
}

func stockInc(ctx context.Context, in int) error {
log.Println("enter stockInc")
amount += in
return nil
}

func createPay(ctx context.Context, in int) error {
return errors.New("must fail")
}

func testConvertor(ctx workflow.Context, f workflow.Future, req ...interface{}) (rsp []interface{}, err error) {
log.Println("convert req:", req[0].(int))
var id string
if err := f.Get(ctx, &id); err != nil {
return nil, err
}
return []interface{}{id}, nil
}

func testWorkflow(ctx workflow.Context, a int) error {
log.Println("enter workflow")
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
})
var id string
if err := workflow.ExecuteActivity(ctx, "createOrder", a).Get(ctx, &id); err != nil {
return err
}
log.Println("create order, id:", id)
if err := workflow.ExecuteActivity(ctx, "stockDeduct", a).Get(ctx, nil); err != nil {
return err
}
if err := workflow.ExecuteActivity(ctx, "createPay", a).Get(ctx, nil); err != nil {
return err
}

return nil
}

func TestWorkflow(t *testing.T) {
env := suite.NewTestWorkflowEnvironment()
intercept, _ := NewInterceptor(InterceptorOptions{
WorkflowRegistry: map[string]SagaOptions{
"testWorkflow": {},
},
ActivityRegistry: map[string]CompensationOptions{
"createOrder": {
ActivityType: "deleteOrder",
Converter: testConvertor,
},
"stockDeduct": {
ActivityType: "stockInc",
},
},
})
env.SetWorkerOptions(worker.Options{Interceptors: []interceptor.WorkerInterceptor{intercept}})
env.RegisterWorkflowWithOptions(testWorkflow, workflow.RegisterOptions{
Name: "testWorkflow",
})
env.RegisterActivityWithOptions(createOrder, activity.RegisterOptions{
Name: "createOrder",
})
env.RegisterActivityWithOptions(deleteOrder, activity.RegisterOptions{
Name: "deleteOrder",
})
env.RegisterActivityWithOptions(stockDeduct, activity.RegisterOptions{
Name: "stockDeduct",
})
env.RegisterActivityWithOptions(stockInc, activity.RegisterOptions{
Name: "stockInc",
})
env.RegisterActivityWithOptions(createPay, activity.RegisterOptions{
Name: "createPay",
})

env.ExecuteWorkflow(testWorkflow, 1)
require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
require.Equal(t, 1, len(orders))
for _, order := range orders {
require.True(t, order.IsDelete)
}
require.Equal(t, 1, amount)
env.AssertExpectations(t)
}
Loading