Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track newly created instances correctly #297

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions backend/redis/diagnostics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package redis

import (
"context"
"testing"

"github.com/cschleiden/go-workflows/client"
"github.com/cschleiden/go-workflows/diag"
"github.com/stretchr/testify/require"
)

func Test_Diag_GetWorkflowInstances(t *testing.T) {
if testing.Short() {
t.Skip()
}

rclient := getClient()
setup := getCreateBackend(rclient)

b := setup()

t.Cleanup(func() {
b.Close()
})

bd := b.(diag.Backend)

ctx := context.Background()
instances, err := bd.GetWorkflowInstances(ctx, "", "", 5)
require.NoError(t, err)
require.Len(t, instances, 0)

c := client.New(b)

_, err = c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
InstanceID: "ex1",
}, "some-workflow")
require.NoError(t, err)

instances, err = bd.GetWorkflowInstances(ctx, "", "", 5)
require.NoError(t, err)
require.Len(t, instances, 1)
require.Equal(t, "ex1", instances[0].Instance.InstanceID)
}
2 changes: 2 additions & 0 deletions backend/redis/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
pendingEventsKey(instance),
payloadKey(instance),
instancesActive(),
instancesByCreation(),
keyInfo.SetKey,
keyInfo.StreamKey,
},
Expand All @@ -57,6 +58,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
event.ID,
eventData,
payloadData,
time.Now().UTC().Unix(),
).Result()

if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion backend/redis/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ func instanceKeyFromSegment(segment string) string {
}

// instancesByCreation returns the key for the ZSET that contains all instances sorted by creation date. The score is the
// creation time. Used for listing all workflow instances in the diagnostics UI.
// creation time as a unix timestamp. Used for listing all workflow instances in the diagnostics UI.
func instancesByCreation() string {
return "instances-by-creation"
}

// instancesActive returns the key for the SET that contains all active instances. Used for reporting active workflow
// instances in stats.
func instancesActive() string {
return "instances-active"
}
Expand Down
3 changes: 3 additions & 0 deletions backend/redis/scripts/complete_workflow_task.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ local pendingEventsKey = getKey()
local payloadHashKey = getKey()
local futureEventZSetKey = getKey()
local activeInstancesKey = getKey()
local instancesByCreation = getKey()

local workflowSetKey = getKey()
local workflowStreamKey = getKey()
Expand Down Expand Up @@ -59,6 +60,7 @@ redis.call("XDEL", pendingEventsKey, lastPendingEventMessageId)

-- Update instance state
local now = getArgv()
local nowUnix = tonumber(getArgv())
local state = tonumber(getArgv())

-- State constants
Expand Down Expand Up @@ -164,6 +166,7 @@ for i = 1, otherWorkflowInstances do

-- Track active instance
redis.call("SADD", activeInstancesKey, targetInstanceSegment)
redis.call("ZADD", instancesByCreation, nowUnix, targetInstanceSegment)
end
end

Expand Down
4 changes: 4 additions & 0 deletions backend/redis/scripts/create_workflow_instance.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local pendingEventsKey = getKey()
local payloadHashKey = getKey()

local instancesActiveKey = getKey()
local instancesByCreation = getKey()

local workflowSetKey = getKey()
local workflowStreamKey = getKey()
Expand Down Expand Up @@ -50,6 +51,9 @@ redis.call("XADD", pendingEventsKey, "*", "event", eventData)
local payload = getArgv()
redis.pcall("HSETNX", payloadHashKey, eventId, payload)

local creationTimestamp = tonumber(getArgv())
redis.call("ZADD", instancesByCreation, creationTimestamp, instanceSegment)

-- queue workflow task
local added = redis.call("SADD", workflowSetKey, instanceSegment)
if added == 1 then
Expand Down
14 changes: 12 additions & 2 deletions backend/redis/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
payloadKey(instance),
futureEventsKey(),
instancesActive(),
instancesByCreation(),
queueKeys.SetKey,
queueKeys.StreamKey,
)
Expand Down Expand Up @@ -137,8 +138,17 @@ func (rb *redisBackend) CompleteWorkflowTask(
args = append(args, lastPendingEventMessageID)

// Update instance state and update active execution
nowStr := time.Now().Format(time.RFC3339)
args = append(args, string(nowStr), int(state), int(core.WorkflowInstanceStateContinuedAsNew), int(core.WorkflowInstanceStateFinished))
now := time.Now().UTC()
nowStr := now.Format(time.RFC3339)
nowUnix := now.Unix()
args = append(
args,
string(nowStr),
nowUnix,
int(state),
int(core.WorkflowInstanceStateContinuedAsNew),
int(core.WorkflowInstanceStateFinished),
)
keys = append(keys, activeInstanceExecutionKey(instance.InstanceID))

// Remove canceled timers
Expand Down