Skip to content

Commit

Permalink
Simplify check
Browse files Browse the repository at this point in the history
  • Loading branch information
cschleiden committed Nov 17, 2023
1 parent b9508af commit 1a19993
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 30 deletions.
3 changes: 2 additions & 1 deletion backend/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w

func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error {
// Check for existing instance
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows {
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive).
Scan(new(int)); err != sql.ErrNoRows {
return backend.ErrInstanceAlreadyExists
}

Expand Down
6 changes: 1 addition & 5 deletions backend/redis/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ import (
// KEYS[2] - pending events key
// KEYS[3] - history key
// KEYS[4] - instances-by-creation key
// KEYS[5] - instances
// ARGV[1] - instance segment
// ARGV[2] - instance id
var deleteCmd = redis.NewScript(
`redis.call("DEL", KEYS[1], KEYS[2], KEYS[3])
redis.call("HDEL", KEYS[5], ARGV[1])
return redis.call("ZREM", KEYS[4], ARGV[1])`)

// deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending
Expand All @@ -30,8 +27,7 @@ func deleteInstance(ctx context.Context, rdb redis.UniversalClient, instance *co
pendingEventsKey(instance),
historyKey(instance),
instancesByCreation(),
instanceIDs(),
}, instanceSegment(instance), instance.InstanceID).Err(); err != nil {
}, instanceSegment(instance)).Err(); err != nil {
return fmt.Errorf("failed to delete instance: %w", err)
}

Expand Down
5 changes: 0 additions & 5 deletions backend/redis/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@ import (
// KEYS[3] - instance key
// KEYS[4] - pending events key
// KEYS[5] - history key
// KEYS[6] - instances key
// ARGV[1] - current timestamp
// ARGV[2] - expiration time in seconds
// ARGV[3] - expiration timestamp in unix milliseconds
// ARGV[4] - instance segment
// ARGV[5] - instance id
var expireCmd = redis.NewScript(
`-- Find instances which have already expired and remove from the index set
local expiredInstances = redis.call("ZRANGE", KEYS[2], "-inf", ARGV[1], "BYSCORE")
for i = 1, #expiredInstances do
local instanceSegment = expiredInstances[i]
redis.call("ZREM", KEYS[1], instanceSegment) -- index set
redis.call("ZREM", KEYS[2], instanceSegment) -- expiration set
redis.call("HDEL", KEYS[6], ARGV[5])
end
-- Add expiration time for future cleanup
Expand Down Expand Up @@ -58,12 +55,10 @@ func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClien
instanceKey(instance),
pendingEventsKey(instance),
historyKey(instance),
instanceIDs(),
},
nowStr,
expiration.Seconds(),
expStr,
instanceSegment(instance),
instance.InstanceID,
).Err()
}
3 changes: 0 additions & 3 deletions backend/redis/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
// The newly created instance is going to be the active execution
setActiveInstanceExecutionP(ctx, p, instance)

// Record instance id
p.HSet(ctx, instanceIDs(), instance.InstanceID, 1)

p.ZAdd(ctx, instancesByCreation(), redis.Z{
Member: instanceSegment(instance),
Score: float64(createdAt.UnixMilli()),
Expand Down
4 changes: 0 additions & 4 deletions backend/redis/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ func instancesExpiring() string {
return "instances-expiring"
}

func instanceIDs() string {
return "instances"
}

func pendingEventsKey(instance *core.WorkflowInstance) string {
return fmt.Sprintf("pending-events:%v", instanceSegment(instance))
}
Expand Down
5 changes: 1 addition & 4 deletions backend/redis/scripts/complete_workflow_task.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ local pendingEventsKey = getKey()
local payloadHashKey = getKey()
local futureEventZSetKey = getKey()
local activeInstancesKey = getKey()
local instancesByIdKey = getKey()

local workflowSetKey = getKey()
local workflowStreamKey = getKey()
Expand Down Expand Up @@ -103,7 +102,7 @@ for i = 1, otherWorkflowInstances do
local conflictEventPayloadData = getArgv()

-- Does the instance exist already?
local instanceExists = redis.call("HEXISTS", instancesByIdKey, targetInstanceId)
local instanceExists = redis.call("EXISTS", targetActiveInstanceExecutionState)
if instanceExists == 1 then
redis.call("XADD", pendingEventsKey, "*", "event", conflictEventData)
storePayload(conflictEventId, conflictEventPayloadData)
Expand All @@ -119,8 +118,6 @@ for i = 1, otherWorkflowInstances do

-- Track active instance
redis.call("SADD", activeInstancesKey, targetInstanceSegment)

redis.call("HSET", instancesByIdKey, targetInstanceId, 1)
end
end

Expand Down
1 change: 0 additions & 1 deletion backend/redis/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (rb *redisBackend) CompleteWorkflowTask(
payloadKey(instance),
futureEventsKey(),
instancesActive(),
instanceIDs(),
queueKeys.SetKey,
queueKeys.StreamKey,
)
Expand Down
3 changes: 2 additions & 1 deletion backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w

func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error {
// Check for existing instance
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows {
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive).
Scan(new(int)); err != sql.ErrNoRows {
return backend.ErrInstanceAlreadyExists
}

Expand Down
19 changes: 13 additions & 6 deletions backend/test/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,25 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
name: "SubWorkflow_DuplicateInstanceID",
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
swf := func(ctx workflow.Context, i int) (int, error) {
workflow.NewSignalChannel[any](ctx, "signal").Receive(ctx)

return i * 2, nil
}
wf := func(ctx workflow.Context) (int, error) {
r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
swf1 := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
InstanceID: "subworkflow",
}, swf, 1).Get(ctx)
if err != nil {
return 0, err
}
}, swf, 1)

defer func() {
rctx := workflow.NewDisconnectedContext(ctx)

// Unblock waiting sub workflow
workflow.SignalWorkflow[any](rctx, "subworkflow", "signal", 1).Get(rctx)
swf1.Get(rctx)
}()

// Run another subworkflow with the same ID
r, err = workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
InstanceID: "subworkflow",
}, swf, 1).Get(ctx)
if err != nil {
Expand Down

0 comments on commit 1a19993

Please sign in to comment.