-
Notifications
You must be signed in to change notification settings - Fork 7
/
await_test.go
44 lines (36 loc) · 1001 Bytes
/
await_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package workflow_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/luno/workflow"
"github.com/luno/workflow/adapters/memrecordstore"
"github.com/luno/workflow/adapters/memrolescheduler"
"github.com/luno/workflow/adapters/memstreamer"
)
func TestAwait(t *testing.T) {
b := workflow.NewBuilder[string, status]("consumer lag")
b.AddStep(
StatusStart,
func(ctx context.Context, r *workflow.Run[string, status]) (status, error) {
*r.Object = "hello world"
return StatusEnd, nil
},
StatusEnd,
)
wf := b.Build(
memstreamer.New(),
memrecordstore.New(),
memrolescheduler.New(),
)
ctx := context.Background()
wf.Run(ctx)
t.Cleanup(wf.Stop)
runID, err := wf.Trigger(ctx, "1", StatusStart)
require.Nil(t, err)
res, err := wf.Await(ctx, "1", runID, StatusEnd, workflow.WithAwaitPollingFrequency(10*time.Nanosecond))
require.Nil(t, err)
require.Equal(t, StatusEnd, res.Status)
require.Equal(t, "hello world", *res.Object)
}