diff --git a/backend/mysql/mysql.go b/backend/mysql/mysql.go index a8035b79..453d6e44 100644 --- a/backend/mysql/mysql.go +++ b/backend/mysql/mysql.go @@ -312,7 +312,8 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error { // Check for existing instance - if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows { + if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive). + Scan(new(int)); err != sql.ErrNoRows { return backend.ErrInstanceAlreadyExists } diff --git a/backend/redis/delete.go b/backend/redis/delete.go index ba2575b6..31d84f43 100644 --- a/backend/redis/delete.go +++ b/backend/redis/delete.go @@ -12,12 +12,9 @@ import ( // KEYS[2] - pending events key // KEYS[3] - history key // KEYS[4] - instances-by-creation key -// KEYS[5] - instances // ARGV[1] - instance segment -// ARGV[2] - instance id var deleteCmd = redis.NewScript( `redis.call("DEL", KEYS[1], KEYS[2], KEYS[3]) - redis.call("HDEL", KEYS[5], ARGV[1]) return redis.call("ZREM", KEYS[4], ARGV[1])`) // deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending @@ -30,8 +27,7 @@ func deleteInstance(ctx context.Context, rdb redis.UniversalClient, instance *co pendingEventsKey(instance), historyKey(instance), instancesByCreation(), - instanceIDs(), - }, instanceSegment(instance), instance.InstanceID).Err(); err != nil { + }, instanceSegment(instance)).Err(); err != nil { return fmt.Errorf("failed to delete instance: %w", err) } diff --git a/backend/redis/expire.go b/backend/redis/expire.go index 52c8db8e..37dd65fe 100644 --- a/backend/redis/expire.go +++ b/backend/redis/expire.go @@ -17,12 +17,10 @@ import ( // KEYS[3] - instance key // KEYS[4] - pending events key // KEYS[5] - history key -// KEYS[6] - instances key // ARGV[1] - current timestamp // ARGV[2] - expiration time in seconds // ARGV[3] - expiration timestamp in unix milliseconds // ARGV[4] - instance segment -// ARGV[5] - instance id var expireCmd = redis.NewScript( `-- Find instances which have already expired and remove from the index set local expiredInstances = redis.call("ZRANGE", KEYS[2], "-inf", ARGV[1], "BYSCORE") @@ -30,7 +28,6 @@ var expireCmd = redis.NewScript( local instanceSegment = expiredInstances[i] redis.call("ZREM", KEYS[1], instanceSegment) -- index set redis.call("ZREM", KEYS[2], instanceSegment) -- expiration set - redis.call("HDEL", KEYS[6], ARGV[5]) end -- Add expiration time for future cleanup @@ -58,12 +55,10 @@ func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClien instanceKey(instance), pendingEventsKey(instance), historyKey(instance), - instanceIDs(), }, nowStr, expiration.Seconds(), expStr, instanceSegment(instance), - instance.InstanceID, ).Err() } diff --git a/backend/redis/instance.go b/backend/redis/instance.go index 685ef156..3526aab5 100644 --- a/backend/redis/instance.go +++ b/backend/redis/instance.go @@ -165,9 +165,6 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work // The newly created instance is going to be the active execution setActiveInstanceExecutionP(ctx, p, instance) - // Record instance id - p.HSet(ctx, instanceIDs(), instance.InstanceID, 1) - p.ZAdd(ctx, instancesByCreation(), redis.Z{ Member: instanceSegment(instance), Score: float64(createdAt.UnixMilli()), diff --git a/backend/redis/keys.go b/backend/redis/keys.go index 8ebfa873..24f6d2bf 100644 --- a/backend/redis/keys.go +++ b/backend/redis/keys.go @@ -37,10 +37,6 @@ func instancesExpiring() string { return "instances-expiring" } -func instanceIDs() string { - return "instances" -} - func pendingEventsKey(instance *core.WorkflowInstance) string { return fmt.Sprintf("pending-events:%v", instanceSegment(instance)) } diff --git a/backend/redis/scripts/complete_workflow_task.lua b/backend/redis/scripts/complete_workflow_task.lua index 36e3bb4c..93de72aa 100644 --- a/backend/redis/scripts/complete_workflow_task.lua +++ b/backend/redis/scripts/complete_workflow_task.lua @@ -21,7 +21,6 @@ local pendingEventsKey = getKey() local payloadHashKey = getKey() local futureEventZSetKey = getKey() local activeInstancesKey = getKey() -local instancesByIdKey = getKey() local workflowSetKey = getKey() local workflowStreamKey = getKey() @@ -103,7 +102,7 @@ for i = 1, otherWorkflowInstances do local conflictEventPayloadData = getArgv() -- Does the instance exist already? - local instanceExists = redis.call("HEXISTS", instancesByIdKey, targetInstanceId) + local instanceExists = redis.call("EXISTS", targetActiveInstanceExecutionState) if instanceExists == 1 then redis.call("XADD", pendingEventsKey, "*", "event", conflictEventData) storePayload(conflictEventId, conflictEventPayloadData) @@ -119,8 +118,6 @@ for i = 1, otherWorkflowInstances do -- Track active instance redis.call("SADD", activeInstancesKey, targetInstanceSegment) - - redis.call("HSET", instancesByIdKey, targetInstanceId, 1) end end diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index 402e5e13..fb68fa95 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -106,7 +106,6 @@ func (rb *redisBackend) CompleteWorkflowTask( payloadKey(instance), futureEventsKey(), instancesActive(), - instanceIDs(), queueKeys.SetKey, queueKeys.StreamKey, ) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 791f02c6..35c06174 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -187,7 +187,8 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error { // Check for existing instance - if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows { + if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive). + Scan(new(int)); err != sql.ErrNoRows { return backend.ErrInstanceAlreadyExists } diff --git a/backend/test/e2e.go b/backend/test/e2e.go index fd1a3490..9794681e 100644 --- a/backend/test/e2e.go +++ b/backend/test/e2e.go @@ -206,18 +206,25 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti name: "SubWorkflow_DuplicateInstanceID", f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { swf := func(ctx workflow.Context, i int) (int, error) { + workflow.NewSignalChannel[any](ctx, "signal").Receive(ctx) + return i * 2, nil } wf := func(ctx workflow.Context) (int, error) { - r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ + swf1 := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ InstanceID: "subworkflow", - }, swf, 1).Get(ctx) - if err != nil { - return 0, err - } + }, swf, 1) + + defer func() { + rctx := workflow.NewDisconnectedContext(ctx) + + // Unblock waiting sub workflow + workflow.SignalWorkflow[any](rctx, "subworkflow", "signal", 1).Get(rctx) + swf1.Get(rctx) + }() // Run another subworkflow with the same ID - r, err = workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ + r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ InstanceID: "subworkflow", }, swf, 1).Get(ctx) if err != nil {