Skip to content

Commit

Permalink
Add a saga pattern sample (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavlo-v-chernykh authored Jun 27, 2022
1 parent f334c48 commit 014be0f
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 2 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ These samples demonstrate some common control flow patterns using Temporal's Go

- [**Synchronous Proxy Workflow pattern**](https://github.com/temporalio/samples-go/tree/master/synchronous-proxy): This sample demonstrates a synchronous interaction with a "main" Workflow Execution from a "proxy" Workflow Execution. The proxy Workflow Execution sends a Signal to the "main" Workflow Execution, then blocks, waiting for a Signal in response.

- [**Saga pattern**](https://github.com/temporalio/samples-go/tree/main/saga): This sample demonstrates how to implement a saga pattern using golang defer feature.

### Scenario based examples

- [**DSL Workflow**](https://github.com/temporalio/samples-go/tree/master/dsl): Demonstrates how to implement a DSL-based Workflow. This sample contains 2 yaml files that each define a custom "workflow" which instructs the Temporal Workflow. This is useful if you want to build in a "low code" layer.
Expand All @@ -110,7 +112,6 @@ Mostly examples we haven't yet ported from https://github.com/temporalio/samples
- Periodic Workflow: Workflow that executes some logic periodically. *Example to be completed*
- Exception propagation and wrapping: *Example to be completed*
- Polymorphic activity: *Example to be completed*
- SAGA pattern: *Example to be completed*
- Side Effect: *Example to be completed* - [Docs](https://docs.temporal.io/docs/go/side-effect)

### Fixtures
Expand Down
2 changes: 1 addition & 1 deletion child-workflow/starter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
if err != nil {
log.Fatalln("Failure getting workflow result", err)
}
log.Println("Workflow result: %v", "result", result)
log.Printf("Workflow result: %v", result)
}

// @@@SNIPEND
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.temporal.io/sdk/contrib/tally v0.1.0
go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20220331154559-fd0d1eb548eb
go.temporal.io/server v1.15.2
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.47.0
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
Expand Down
12 changes: 12 additions & 0 deletions saga/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
### Steps to run this sample:
1) You need a Temporal service running. See details in README.md
2) Run the following command to start the worker
```
go run saga/worker/main.go
```
3) Run the following command to start the example
```
go run saga/start/main.go
```

Based on https://github.com/temporalio/money-transfer-project-template-go
60 changes: 60 additions & 0 deletions saga/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package saga

import (
"context"
"errors"
"fmt"
)

func Withdraw(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nWithdrawing $%f from account %s. ReferenceId: %s\n",
transferDetails.Amount,
transferDetails.FromAccount,
transferDetails.ReferenceID,
)

return nil
}

func WithdrawCompensation(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nWithdrawing compensation $%f from account %s. ReferenceId: %s\n",
transferDetails.Amount,
transferDetails.FromAccount,
transferDetails.ReferenceID,
)

return nil
}

func Deposit(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nDepositing $%f into account %s. ReferenceId: %s\n",
transferDetails.Amount,
transferDetails.ToAccount,
transferDetails.ReferenceID,
)

return nil
}

func DepositCompensation(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nDepositing compensation $%f into account %s. ReferenceId: %s\n",
transferDetails.Amount,
transferDetails.ToAccount,
transferDetails.ReferenceID,
)

return nil
}

func StepWithError(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"\nSimulate failure to trigger compensation. ReferenceId: %s\n",
transferDetails.ReferenceID,
)

return errors.New("some error")
}
10 changes: 10 additions & 0 deletions saga/shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package saga

const TransferMoneyTaskQueue = "TRANSFER_MONEY_TASK_QUEUE"

type TransferDetails struct {
Amount float32
FromAccount string
ToAccount string
ReferenceID string
}
50 changes: 50 additions & 0 deletions saga/start/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"context"
"log"

"github.com/google/uuid"
"go.temporal.io/sdk/client"

"github.com/temporalio/samples-go/saga"
)

func main() {
// Create the client object just once per process
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
options := client.StartWorkflowOptions{
ID: "transfer-money-workflow",
TaskQueue: saga.TransferMoneyTaskQueue,
}
transferDetails := saga.TransferDetails{
Amount: 54.99,
FromAccount: "001-001",
ToAccount: "002-002",
ReferenceID: uuid.New().String(),
}
we, err := c.ExecuteWorkflow(context.Background(), options, saga.TransferMoney, transferDetails)
if err != nil {
log.Fatalln("error starting TransferMoney workflow", err)
}
printResults(transferDetails, we.GetID(), we.GetRunID())
}

func printResults(transferDetails saga.TransferDetails, workflowID, runID string) {
log.Printf(
"\nTransfer of $%f from account %s to account %s is processing. ReferenceID: %s\n",
transferDetails.Amount,
transferDetails.FromAccount,
transferDetails.ToAccount,
transferDetails.ReferenceID,
)
log.Printf(
"\nWorkflowID: %s RunID: %s\n",
workflowID,
runID,
)
}
32 changes: 32 additions & 0 deletions saga/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/temporalio/samples-go/saga"
)

func main() {
// Create the client object just once per process
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
// This worker hosts both Workflow and Activity functions
w := worker.New(c, saga.TransferMoneyTaskQueue, worker.Options{})
w.RegisterWorkflow(saga.TransferMoney)
w.RegisterActivity(saga.Withdraw)
w.RegisterActivity(saga.WithdrawCompensation)
w.RegisterActivity(saga.Deposit)
w.RegisterActivity(saga.DepositCompensation)
w.RegisterActivity(saga.StepWithError)
// Start listening to the Task Queue
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}
63 changes: 63 additions & 0 deletions saga/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package saga

import (
"time"

"go.uber.org/multierr"

"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

func TransferMoney(ctx workflow.Context, transferDetails TransferDetails) (err error) {
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
}

options := workflow.ActivityOptions{
// Timeout options specify when to automatically timeout Activity functions.
StartToCloseTimeout: time.Minute,
// Optionally provide a customized RetryPolicy.
// Temporal retries failures by default, this is just an example.
RetryPolicy: retryPolicy,
}

ctx = workflow.WithActivityOptions(ctx, options)

err = workflow.ExecuteActivity(ctx, Withdraw, transferDetails).Get(ctx, nil)
if err != nil {
return err
}

defer func() {
if err != nil {
errCompensation := workflow.ExecuteActivity(ctx, WithdrawCompensation, transferDetails).Get(ctx, nil)
err = multierr.Append(err, errCompensation)
}
}()

err = workflow.ExecuteActivity(ctx, Deposit, transferDetails).Get(ctx, nil)
if err != nil {
return err
}

defer func() {
if err != nil {
errCompensation := workflow.ExecuteActivity(ctx, DepositCompensation, transferDetails).Get(ctx, nil)
err = multierr.Append(err, errCompensation)
}

// uncomment to have time to shut down worker to simulate worker rolling update and ensure that compensation sequence preserves after restart
// workflow.Sleep(ctx, 10*time.Second)
}()

err = workflow.ExecuteActivity(ctx, StepWithError, transferDetails).Get(ctx, nil)
if err != nil {
return err
}

return nil
}
31 changes: 31 additions & 0 deletions saga/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package saga

import (
"errors"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"go.temporal.io/sdk/testsuite"
)

func Test_Workflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// Mock activity implementation
testDetails := TransferDetails{
Amount: 1.00,
FromAccount: "001-001",
ToAccount: "002-002",
ReferenceID: "1234",
}
env.OnActivity(Withdraw, mock.Anything, testDetails).Return(nil)
env.OnActivity(WithdrawCompensation, mock.Anything, testDetails).Return(nil)
env.OnActivity(Deposit, mock.Anything, testDetails).Return(nil)
env.OnActivity(DepositCompensation, mock.Anything, testDetails).Return(nil)
env.OnActivity(StepWithError, mock.Anything, testDetails).Return(errors.New("some error"))
env.ExecuteWorkflow(TransferMoney, testDetails)
require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
}

0 comments on commit 014be0f

Please sign in to comment.