From cba814c6b3eb72ae6564d152794a57c63040a4e9 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Tue, 28 Nov 2023 20:37:48 -0800 Subject: [PATCH 1/2] Track newly created instances correctly --- backend/redis/diagnostics_test.go | 40 +++++++++++++++++++ backend/redis/instance.go | 2 + backend/redis/keys.go | 4 +- .../redis/scripts/complete_workflow_task.lua | 3 ++ .../scripts/create_workflow_instance.lua | 4 ++ backend/redis/workflow.go | 14 ++++++- 6 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 backend/redis/diagnostics_test.go diff --git a/backend/redis/diagnostics_test.go b/backend/redis/diagnostics_test.go new file mode 100644 index 00000000..4fdeaa1a --- /dev/null +++ b/backend/redis/diagnostics_test.go @@ -0,0 +1,40 @@ +package redis + +import ( + "context" + "testing" + + "github.com/cschleiden/go-workflows/client" + "github.com/cschleiden/go-workflows/diag" + "github.com/stretchr/testify/require" +) + +func Test_Diag_GetWorkflowInstances(t *testing.T) { + rclient := getClient() + setup := getCreateBackend(rclient) + + b := setup() + + t.Cleanup(func() { + b.Close() + }) + + bd := b.(diag.Backend) + + ctx := context.Background() + instances, err := bd.GetWorkflowInstances(ctx, "", "", 5) + require.NoError(t, err) + require.Len(t, instances, 0) + + c := client.New(b) + + _, err = c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{ + InstanceID: "ex1", + }, "some-workflow") + require.NoError(t, err) + + instances, err = bd.GetWorkflowInstances(ctx, "", "", 5) + require.NoError(t, err) + require.Len(t, instances, 1) + require.Equal(t, "ex1", instances[0].Instance.InstanceID) +} diff --git a/backend/redis/instance.go b/backend/redis/instance.go index f56ece0c..4e0611cf 100644 --- a/backend/redis/instance.go +++ b/backend/redis/instance.go @@ -48,6 +48,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo pendingEventsKey(instance), payloadKey(instance), instancesActive(), + instancesByCreation(), keyInfo.SetKey, keyInfo.StreamKey, }, @@ -57,6 +58,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo event.ID, eventData, payloadData, + time.Now().UTC().Unix(), ).Result() if err != nil { diff --git a/backend/redis/keys.go b/backend/redis/keys.go index 24f6d2bf..0cfc5936 100644 --- a/backend/redis/keys.go +++ b/backend/redis/keys.go @@ -24,11 +24,13 @@ func instanceKeyFromSegment(segment string) string { } // instancesByCreation returns the key for the ZSET that contains all instances sorted by creation date. The score is the -// creation time. Used for listing all workflow instances in the diagnostics UI. +// creation time as a unix timestamp. Used for listing all workflow instances in the diagnostics UI. func instancesByCreation() string { return "instances-by-creation" } +// instancesActive returns the key for the SET that contains all active instances. Used for reporting active workflow +// instances in stats. func instancesActive() string { return "instances-active" } diff --git a/backend/redis/scripts/complete_workflow_task.lua b/backend/redis/scripts/complete_workflow_task.lua index 631241ec..cc5690fa 100644 --- a/backend/redis/scripts/complete_workflow_task.lua +++ b/backend/redis/scripts/complete_workflow_task.lua @@ -21,6 +21,7 @@ local pendingEventsKey = getKey() local payloadHashKey = getKey() local futureEventZSetKey = getKey() local activeInstancesKey = getKey() +local instancesByCreation = getKey() local workflowSetKey = getKey() local workflowStreamKey = getKey() @@ -59,6 +60,7 @@ redis.call("XDEL", pendingEventsKey, lastPendingEventMessageId) -- Update instance state local now = getArgv() +local nowUnix = tonumber(getArgv()) local state = tonumber(getArgv()) -- State constants @@ -164,6 +166,7 @@ for i = 1, otherWorkflowInstances do -- Track active instance redis.call("SADD", activeInstancesKey, targetInstanceSegment) + redis.call("ZADD", instancesByCreation, nowUnix, targetInstanceSegment) end end diff --git a/backend/redis/scripts/create_workflow_instance.lua b/backend/redis/scripts/create_workflow_instance.lua index 12bf9d09..ff53bc80 100644 --- a/backend/redis/scripts/create_workflow_instance.lua +++ b/backend/redis/scripts/create_workflow_instance.lua @@ -19,6 +19,7 @@ local pendingEventsKey = getKey() local payloadHashKey = getKey() local instancesActiveKey = getKey() +local instancesByCreation = getKey() local workflowSetKey = getKey() local workflowStreamKey = getKey() @@ -50,6 +51,9 @@ redis.call("XADD", pendingEventsKey, "*", "event", eventData) local payload = getArgv() redis.pcall("HSETNX", payloadHashKey, eventId, payload) +local creationTimestamp = tonumber(getArgv()) +redis.call("ZADD", instancesByCreation, creationTimestamp, instanceSegment) + -- queue workflow task local added = redis.call("SADD", workflowSetKey, instanceSegment) if added == 1 then diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index f2c0da98..28b0bae3 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -110,6 +110,7 @@ func (rb *redisBackend) CompleteWorkflowTask( payloadKey(instance), futureEventsKey(), instancesActive(), + instancesByCreation(), queueKeys.SetKey, queueKeys.StreamKey, ) @@ -137,8 +138,17 @@ func (rb *redisBackend) CompleteWorkflowTask( args = append(args, lastPendingEventMessageID) // Update instance state and update active execution - nowStr := time.Now().Format(time.RFC3339) - args = append(args, string(nowStr), int(state), int(core.WorkflowInstanceStateContinuedAsNew), int(core.WorkflowInstanceStateFinished)) + now := time.Now().UTC() + nowStr := now.Format(time.RFC3339) + nowUnix := now.Unix() + args = append( + args, + string(nowStr), + nowUnix, + int(state), + int(core.WorkflowInstanceStateContinuedAsNew), + int(core.WorkflowInstanceStateFinished), + ) keys = append(keys, activeInstanceExecutionKey(instance.InstanceID)) // Remove canceled timers From 85d008c8673499c2de5473dd7389e072bf4b0f37 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Tue, 28 Nov 2023 20:45:06 -0800 Subject: [PATCH 2/2] Skip test when doing short run --- backend/redis/diagnostics_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/redis/diagnostics_test.go b/backend/redis/diagnostics_test.go index 4fdeaa1a..be9e67fd 100644 --- a/backend/redis/diagnostics_test.go +++ b/backend/redis/diagnostics_test.go @@ -10,6 +10,10 @@ import ( ) func Test_Diag_GetWorkflowInstances(t *testing.T) { + if testing.Short() { + t.Skip() + } + rclient := getClient() setup := getCreateBackend(rclient)