From 60e05fab49e1a1e7f4c72071aab7bfc49d01c9a4 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Sun, 12 Nov 2023 10:52:23 -0800 Subject: [PATCH 01/17] Add test for duplicate instances with the same id --- backend/test/backendtest.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/backend/test/backendtest.go b/backend/test/backendtest.go index ab1691da..5d6e3b15 100644 --- a/backend/test/backendtest.go +++ b/backend/test/backendtest.go @@ -40,17 +40,18 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test name: "CreateWorkflowInstance_SameInstanceIDErrors", f: func(t *testing.T, ctx context.Context, b backend.Backend) { instanceID := uuid.NewString() - executionID := uuid.NewString() + executionID1 := uuid.NewString() + executionID2 := uuid.NewString() err := b.CreateWorkflowInstance(ctx, - core.NewWorkflowInstance(instanceID, executionID), + core.NewWorkflowInstance(instanceID, executionID1), history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}), ) require.NoError(t, err) err = b.CreateWorkflowInstance( ctx, - core.NewWorkflowInstance(instanceID, executionID), + core.NewWorkflowInstance(instanceID, executionID2), history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}), ) require.Error(t, err) From c537e13027b32a8dcb05e0723830df07a91e952a Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Sun, 12 Nov 2023 10:52:41 -0800 Subject: [PATCH 02/17] Return errors if creating instance with existing id --- backend/mysql/mysql.go | 26 ++++++++++---------------- backend/redis/instance.go | 4 ++-- backend/sqlite/sqlite.go | 5 +++++ 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/backend/mysql/mysql.go b/backend/mysql/mysql.go index 3e27aaf6..0b26a32f 100644 --- a/backend/mysql/mysql.go +++ b/backend/mysql/mysql.go @@ -145,8 +145,13 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor } defer tx.Rollback() + // Check for existing instance + if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", instance.InstanceID).Scan(new(int)); err != sql.ErrNoRows { + return backend.ErrInstanceAlreadyExists + } + // Create workflow instance - if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil { + if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil { return err } @@ -304,7 +309,7 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w return state, nil } -func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error { +func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error { var parentInstanceID, parentExecutionID *string var parentEventID *int64 if wfi.SubWorkflow() { @@ -318,9 +323,9 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met return fmt.Errorf("marshaling metadata: %w", err) } - res, err := tx.ExecContext( + _, err = tx.ExecContext( ctx, - "INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO `instances` (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)", wfi.InstanceID, wfi.ExecutionID, parentInstanceID, @@ -333,17 +338,6 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met return fmt.Errorf("inserting workflow instance: %w", err) } - if !ignoreDuplicate { - rows, err := res.RowsAffected() - if err != nil { - return err - } - - if rows != 1 { - return backend.ErrInstanceAlreadyExists - } - } - return nil } @@ -628,7 +622,7 @@ func (b *mysqlBackend) CompleteWorkflowTask( if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted { a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) // Create new instance - if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil { + if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil { return err } diff --git a/backend/redis/instance.go b/backend/redis/instance.go index 6eddfe2a..cca68ce6 100644 --- a/backend/redis/instance.go +++ b/backend/redis/instance.go @@ -15,12 +15,12 @@ import ( ) func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error { - state, err := readInstance(ctx, rb.rdb, instanceKey(instance)) + activeInstance, err := readActiveInstanceExecution(ctx, rb.rdb, instance.InstanceID) if err != nil && err != backend.ErrInstanceNotFound { return err } - if state != nil { + if activeInstance != nil { return backend.ErrInstanceAlreadyExists } diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 122329f3..44472458 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -168,6 +168,11 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w } defer tx.Rollback() + // Check for existing instance + if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", instance.InstanceID).Scan(new(int)); err != sql.ErrNoRows { + return backend.ErrInstanceAlreadyExists + } + // Create workflow instance if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil { return err From b0ee70c14a25f7cd0b157b318be5d6f061a01deb Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Sun, 12 Nov 2023 10:53:08 -0800 Subject: [PATCH 03/17] Add test for subworkflows with duplicate id --- backend/test/e2e.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/backend/test/e2e.go b/backend/test/e2e.go index 1871d5c0..41e28378 100644 --- a/backend/test/e2e.go +++ b/backend/test/e2e.go @@ -202,6 +202,39 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti require.Equal(t, 2, r) }, }, + { + name: "SubWorkflow_DuplicateInstanceID", + f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { + swf := func(ctx workflow.Context, i int) (int, error) { + return i * 2, nil + } + wf := func(ctx workflow.Context) (int, error) { + r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ + InstanceID: "subworkflow", + }, swf, 1).Get(ctx) + if err != nil { + return 0, err + } + + // Run another subworkflow with the same ID + r, err = workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ + InstanceID: "subworkflow", + }, swf, 1).Get(ctx) + if err != nil { + return 0, err + } + + return r, nil + } + register(t, ctx, w, []interface{}{wf, swf}, nil) + + instance := runWorkflow(t, ctx, c, wf) + + _, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20) + require.Error(t, err) + require.ErrorIs(t, err, backend.ErrInstanceAlreadyExists) + }, + }, { name: "SubWorkflow_PropagateCancellation", f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { From 8e5ba3cd48009953ce24989a749c74a3e66b7f64 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 Nov 2023 10:27:09 -0800 Subject: [PATCH 04/17] Move future event handling to separate file --- backend/redis/events.go | 56 --------------- backend/redis/events_future.go | 122 +++++++++++++++++++++++++++++++++ backend/redis/keys.go | 2 +- backend/redis/workflow.go | 52 +------------- 4 files changed, 125 insertions(+), 107 deletions(-) create mode 100644 backend/redis/events_future.go diff --git a/backend/redis/events.go b/backend/redis/events.go index 230c8834..1fedbc03 100644 --- a/backend/redis/events.go +++ b/backend/redis/events.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "strconv" "github.com/cschleiden/go-workflows/backend/history" "github.com/cschleiden/go-workflows/core" @@ -104,58 +103,3 @@ func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string return nil } - -// Adds an event to be delivered in the future. Not cluster-safe. -// KEYS[1] - future event zset key -// KEYS[2] - future event key -// KEYS[3] - instance payload key -// ARGV[1] - timestamp -// ARGV[2] - Instance segment -// ARGV[3] - event id -// ARGV[4] - event data -// ARGV[5] - event payload -var addFutureEventCmd = redis.NewScript(` - redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) - redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4]) - redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5]) - return 0 -`) - -func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error { - eventData, err := marshalEventWithoutAttributes(event) - if err != nil { - return err - } - - payloadEventData, err := json.Marshal(event.Attributes) - if err != nil { - return err - } - - return addFutureEventCmd.Run( - ctx, p, - []string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)}, - strconv.FormatInt(event.VisibleAt.UnixMilli(), 10), - instanceSegment(instance), - event.ID, - string(eventData), - string(payloadEventData), - ).Err() -} - -// Remove a scheduled future event. Not cluster-safe. -// KEYS[1] - future event zset key -// KEYS[2] - future event key -// KEYS[3] - instance payload key -var removeFutureEventCmd = redis.NewScript(` - redis.call("ZREM", KEYS[1], KEYS[2]) - local eventID = redis.call("HGET", KEYS[2], "id") - redis.call("HDEL", KEYS[3], eventID) - return redis.call("DEL", KEYS[2]) -`) - -// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID -func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) { - key := futureEventKey(instance, event.ScheduleEventID) - removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)}) -} diff --git a/backend/redis/events_future.go b/backend/redis/events_future.go new file mode 100644 index 00000000..98a7c252 --- /dev/null +++ b/backend/redis/events_future.go @@ -0,0 +1,122 @@ +package redis + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/cschleiden/go-workflows/backend/history" + "github.com/cschleiden/go-workflows/core" + redis "github.com/redis/go-redis/v9" +) + +// Adds an event to be delivered in the future. Not cluster-safe. +// KEYS[1] - future event zset key +// KEYS[2] - future event key +// KEYS[3] - instance payload key +// ARGV[1] - timestamp/score for set +// ARGV[2] - Instance segment +// ARGV[3] - event id +// ARGV[4] - event data +// ARGV[5] - event payload +var addFutureEventCmd = redis.NewScript(` + redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) + redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4]) + redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5]) + return 0 +`) + +func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error { + eventData, err := marshalEventWithoutAttributes(event) + if err != nil { + return err + } + + payloadEventData, err := json.Marshal(event.Attributes) + if err != nil { + return err + } + + return addFutureEventCmd.Run( + ctx, p, + []string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)}, + strconv.FormatInt(event.VisibleAt.UnixMilli(), 10), + instanceSegment(instance), + event.ID, + string(eventData), + string(payloadEventData), + ).Err() +} + +// Remove a scheduled future event. Not cluster-safe. +// KEYS[1] - future event zset key +// KEYS[2] - future event key +// KEYS[3] - instance payload key +var removeFutureEventCmd = redis.NewScript(` + redis.call("ZREM", KEYS[1], KEYS[2]) + local eventID = redis.call("HGET", KEYS[2], "id") + redis.call("HDEL", KEYS[3], eventID) + return redis.call("DEL", KEYS[2]) +`) + +// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID +func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) { + key := futureEventKey(instance, event.ScheduleEventID) + removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)}) +} + +// Find all due future events. For each event: +// - Look up event data +// - Add to pending event stream for workflow instance +// - Try to queue workflow task for workflow instance +// - Remove event from future event set and delete event data +// +// KEYS[1] - future event set key +// KEYS[2] - workflow task queue stream +// KEYS[3] - workflow task queue set +// ARGV[1] - current timestamp for zrange +// +// Note: this does not work with Redis Cluster since not all keys are passed into the script. +var futureEventsCmd = redis.NewScript(` + -- Find events which should become visible now + local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE") + for i = 1, #events do + local instanceSegment = redis.call("HGET", events[i], "instance") + + -- Add event to pending event stream + local eventData = redis.call("HGET", events[i], "event") + local pending_events_key = "pending-events:" .. instanceSegment + redis.call("XADD", pending_events_key, "*", "event", eventData) + + -- Try to queue workflow task + local already_queued = redis.call("SADD", KEYS[3], instanceSegment) + if already_queued ~= 0 then + redis.call("XADD", KEYS[2], "*", "id", instanceSegment, "data", "") + end + + -- Delete event hash data + redis.call("DEL", events[i]) + redis.call("ZREM", KEYS[1], events[i]) + end + + return #events +`) + +func scheduleFutureEvents(ctx context.Context, rb *redisBackend) error { + now := time.Now().UnixMilli() + nowStr := strconv.FormatInt(now, 10) + + queueKeys := rb.workflowQueue.Keys() + + if _, err := futureEventsCmd.Run(ctx, rb.rdb, []string{ + futureEventsKey(), + queueKeys.StreamKey, + queueKeys.SetKey, + }, nowStr).Result(); err != nil && err != redis.Nil { + return fmt.Errorf("checking future events: %w", err) + } + + return nil +} diff --git a/backend/redis/keys.go b/backend/redis/keys.go index 9a3652e0..24f6d2bf 100644 --- a/backend/redis/keys.go +++ b/backend/redis/keys.go @@ -54,7 +54,7 @@ func futureEventsKey() string { } func futureEventKey(instance *core.WorkflowInstance, scheduleEventID int64) string { - return fmt.Sprintf("future-event:%v:%v:%v", instance.InstanceID, instance.ExecutionID, scheduleEventID) + return fmt.Sprintf("future-event:%v:%v", instanceSegment(instance), scheduleEventID) } func payloadKey(instance *core.WorkflowInstance) string { diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index dcaf8b09..a7655ed7 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "strconv" "time" "github.com/cschleiden/go-workflows/backend" @@ -17,56 +16,9 @@ import ( "go.opentelemetry.io/otel/trace" ) -// Find all due future events. For each event: -// - Look up event data -// - Add to pending event stream for workflow instance -// - Try to queue workflow task for workflow instance -// - Remove event from future event set and delete event data -// -// KEYS[1] - future event set key -// KEYS[2] - workflow task queue stream -// KEYS[3] - workflow task queue set -// ARGV[1] - current timestamp for zrange -// -// Note: this does not work with Redis Cluster since not all keys are passed into the script. -var futureEventsCmd = redis.NewScript(` - -- Find events which should become visible now - local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE") - for i = 1, #events do - local instanceSegment = redis.call("HGET", events[i], "instance") - - -- Add event to pending event stream - local eventData = redis.call("HGET", events[i], "event") - local pending_events_key = "pending-events:" .. instanceSegment - redis.call("XADD", pending_events_key, "*", "event", eventData) - - -- Try to queue workflow task - local already_queued = redis.call("SADD", KEYS[3], instanceSegment) - if already_queued ~= 0 then - redis.call("XADD", KEYS[2], "*", "id", instanceSegment, "data", "") - end - - -- Delete event hash data - redis.call("DEL", events[i]) - redis.call("ZREM", KEYS[1], events[i]) - end - - return #events -`) - func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) { - // Check for future events - now := time.Now().UnixMilli() - nowStr := strconv.FormatInt(now, 10) - - queueKeys := rb.workflowQueue.Keys() - - if _, err := futureEventsCmd.Run(ctx, rb.rdb, []string{ - futureEventsKey(), - queueKeys.StreamKey, - queueKeys.SetKey, - }, nowStr).Result(); err != nil && err != redis.Nil { - return nil, fmt.Errorf("checking future events: %w", err) + if err := scheduleFutureEvents(ctx, rb); err != nil { + return nil, fmt.Errorf("scheduling future events: %w", err) } // Try to get a workflow task, this locks the instance when it dequeues one From 639328e61618d634c82232954f39f40661dba954 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 Nov 2023 11:02:07 -0800 Subject: [PATCH 05/17] Return error when creating duplicate sub workflow instance using mysql backend --- backend/mysql/mysql.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/backend/mysql/mysql.go b/backend/mysql/mysql.go index 0b26a32f..a8035b79 100644 --- a/backend/mysql/mysql.go +++ b/backend/mysql/mysql.go @@ -19,6 +19,7 @@ import ( "github.com/cschleiden/go-workflows/backend/metrics" "github.com/cschleiden/go-workflows/core" "github.com/cschleiden/go-workflows/internal/metrickeys" + "github.com/cschleiden/go-workflows/internal/workflowerrors" "github.com/cschleiden/go-workflows/workflow" _ "github.com/go-sql-driver/mysql" "github.com/google/uuid" @@ -310,6 +311,11 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w } func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error { + // Check for existing instance + if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows { + return backend.ErrInstanceAlreadyExists + } + var parentInstanceID, parentExecutionID *string var parentEventID *int64 if wfi.SubWorkflow() { @@ -618,23 +624,33 @@ func (b *mysqlBackend) CompleteWorkflowTask( groupedEvents := history.EventsByWorkflowInstance(workflowEvents) for targetInstance, events := range groupedEvents { - for _, m := range events { - if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted { - a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) - // Create new instance - if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil { - return err + // Are we creating a new sub-workflow instance? + m := events[0] + if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted { + a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) + // Create new instance + if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil { + if err == backend.ErrInstanceAlreadyExists { + if err := insertPendingEvents(ctx, tx, instance, []*history.Event{ + history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowFailed, &history.SubWorkflowFailedAttributes{ + Error: workflowerrors.FromError(backend.ErrInstanceAlreadyExists), + }, history.ScheduleEventID(m.WorkflowInstance.ParentEventID)), + }); err != nil { + return fmt.Errorf("inserting sub-workflow failed event: %w", err) + } + + continue } - break + return fmt.Errorf("creating sub-workflow instance: %w", err) } } + // Insert pending events for target instance historyEvents := []*history.Event{} for _, m := range events { historyEvents = append(historyEvents, m.HistoryEvent) } - if err := insertPendingEvents(ctx, tx, &targetInstance, historyEvents); err != nil { return fmt.Errorf("inserting messages: %w", err) } From 7f03877ef0eaadb96ac72e6fd1eec1be1498878f Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 Nov 2023 11:02:33 -0800 Subject: [PATCH 06/17] Add error handling for duplicate sub workflow instance in SQLite backend --- backend/sqlite/sqlite.go | 56 +++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 44472458..c4e47ab6 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -19,6 +19,7 @@ import ( "github.com/cschleiden/go-workflows/backend/metrics" "github.com/cschleiden/go-workflows/core" "github.com/cschleiden/go-workflows/internal/metrickeys" + "github.com/cschleiden/go-workflows/internal/workflowerrors" "github.com/cschleiden/go-workflows/workflow" "github.com/google/uuid" "go.opentelemetry.io/otel/trace" @@ -168,13 +169,8 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w } defer tx.Rollback() - // Check for existing instance - if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", instance.InstanceID).Scan(new(int)); err != sql.ErrNoRows { - return backend.ErrInstanceAlreadyExists - } - // Create workflow instance - if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil { + if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil { return err } @@ -189,7 +185,12 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w return nil } -func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error { +func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error { + // Check for existing instance + if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows { + return backend.ErrInstanceAlreadyExists + } + var parentInstanceID, parentExecutionID *string var parentEventID *int64 if wfi.SubWorkflow() { @@ -203,9 +204,9 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met return fmt.Errorf("marshaling metadata: %w", err) } - res, err := tx.ExecContext( + _, err = tx.ExecContext( ctx, - "INSERT OR IGNORE INTO `instances` (id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO `instances` (id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)", wfi.InstanceID, wfi.ExecutionID, parentInstanceID, @@ -218,17 +219,6 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met return fmt.Errorf("inserting workflow instance: %w", err) } - if !ignoreDuplicate { - rows, err := res.RowsAffected() - if err != nil { - return err - } - - if rows != 1 { - return backend.ErrInstanceAlreadyExists - } - } - return nil } @@ -547,16 +537,28 @@ func (sb *sqliteBackend) CompleteWorkflowTask( groupedEvents := history.EventsByWorkflowInstance(workflowEvents) for targetInstance, events := range groupedEvents { - for _, m := range events { - if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted { - a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) - // Create new instance - if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil { - return err + // Are we creating a new sub-workflow instance? + m := events[0] + if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted { + a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) + // Create new instance + if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil { + if err == backend.ErrInstanceAlreadyExists { + if err := insertPendingEvents(ctx, tx, instance, []*history.Event{ + history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowFailed, &history.SubWorkflowFailedAttributes{ + Error: workflowerrors.FromError(backend.ErrInstanceAlreadyExists), + }, history.ScheduleEventID(m.WorkflowInstance.ParentEventID)), + }); err != nil { + return fmt.Errorf("inserting sub-workflow failed event: %w", err) + } + + continue } - break + return fmt.Errorf("creating sub-workflow instance: %w", err) } + + continue } // Insert pending events for target instance From 34a5c6746439637dd3f2086f3ca529bb7d10cea1 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 Nov 2023 11:02:45 -0800 Subject: [PATCH 07/17] Formatting --- internal/command/schedule_subworkflow.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/command/schedule_subworkflow.go b/internal/command/schedule_subworkflow.go index b453cdf4..7be2d512 100644 --- a/internal/command/schedule_subworkflow.go +++ b/internal/command/schedule_subworkflow.go @@ -80,6 +80,7 @@ func (c *ScheduleSubWorkflowCommand) Execute(clock clock.Clock) *CommandResult { }, }, } + case CommandState_CancelPending: c.state = CommandState_Canceled From 9c3ea0a8615da5771c8fc9a4a3eefe27c7f8b548 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 Nov 2023 21:10:15 -0800 Subject: [PATCH 08/17] Move redis task completion to script --- backend/redis/delete.go | 6 +- backend/redis/events.go | 31 --- backend/redis/expire.go | 5 + backend/redis/instance.go | 7 +- backend/redis/keys.go | 4 + backend/redis/queue.go | 3 +- backend/redis/redis.go | 42 ++- .../redis/scripts/complete_workflow_task.lua | 222 +++++++++++++++ backend/redis/workflow.go | 257 +++++++++--------- backend/test/backendtest.go | 1 + backend/test/e2e.go | 3 +- internal/workflowerrors/error.go | 4 + 12 files changed, 417 insertions(+), 168 deletions(-) create mode 100644 backend/redis/scripts/complete_workflow_task.lua diff --git a/backend/redis/delete.go b/backend/redis/delete.go index 6ae9bcd6..f637ed8b 100644 --- a/backend/redis/delete.go +++ b/backend/redis/delete.go @@ -14,9 +14,12 @@ import ( // KEYS[4] - payload key // KEYS[5] - active-instance-execution key // KEYS[6] - instances-by-creation key +// KEYS[7] - instances // ARGV[1] - instance segment +// ARGV[2] - instance id var deleteCmd = redis.NewScript( `redis.call("DEL", KEYS[1], KEYS[2], KEYS[3], KEYS[4], KEYS[5]) + redis.call("HDEL", KEYS[7], ARGV[1]) return redis.call("ZREM", KEYS[6], ARGV[1])`) // deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending @@ -31,7 +34,8 @@ func deleteInstance(ctx context.Context, rdb redis.UniversalClient, instance *co payloadKey(instance), activeInstanceExecutionKey(instance.InstanceID), instancesByCreation(), - }, instanceSegment(instance)).Err(); err != nil { + instanceIDs(), + }, instanceSegment(instance), instance.InstanceID).Err(); err != nil { return fmt.Errorf("failed to delete instance: %w", err) } diff --git a/backend/redis/events.go b/backend/redis/events.go index 1fedbc03..c601cd21 100644 --- a/backend/redis/events.go +++ b/backend/redis/events.go @@ -72,34 +72,3 @@ func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, }, }).Err() } - -// addEventsToStream adds the given events to the given event stream. If successful, the message id of the last event added -// is returned -// KEYS[1] - stream key -// ARGV[1] - event data as serialized strings -var addEventsToStreamCmd = redis.NewScript(` - local msgID = "" - for i = 1, #ARGV, 2 do - msgID = redis.call("XADD", KEYS[1], ARGV[i], "event", ARGV[i + 1]) - end - return msgID -`) - -func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error { - eventsData := make([]string, 0) - for _, event := range events { - eventData, err := marshalEventWithoutAttributes(event) - if err != nil { - return err - } - - // log.Println("addEventsToHistoryStreamP:", event.SequenceID, string(eventData)) - - eventsData = append(eventsData, historyID(event.SequenceID)) - eventsData = append(eventsData, string(eventData)) - } - - addEventsToStreamCmd.Run(ctx, p, []string{streamKey}, eventsData) - - return nil -} diff --git a/backend/redis/expire.go b/backend/redis/expire.go index a0fc7d5a..b4b81f58 100644 --- a/backend/redis/expire.go +++ b/backend/redis/expire.go @@ -18,10 +18,12 @@ import ( // KEYS[4] - pending events key // KEYS[5] - history key // KEYS[6] - payload key +// KEYS[7] - instances key // ARGV[1] - current timestamp // ARGV[2] - expiration time in seconds // ARGV[3] - expiration timestamp in unix milliseconds // ARGV[4] - instance segment +// ARGV[5] - instance id var expireCmd = redis.NewScript( `-- Find instances which have already expired and remove from the index set local expiredInstances = redis.call("ZRANGE", KEYS[2], "-inf", ARGV[1], "BYSCORE") @@ -29,6 +31,7 @@ var expireCmd = redis.NewScript( local instanceSegment = expiredInstances[i] redis.call("ZREM", KEYS[1], instanceSegment) -- index set redis.call("ZREM", KEYS[2], instanceSegment) -- expiration set + redis.call("HDEL", KEYS[7], ARGV[5]) end -- Add expiration time for future cleanup @@ -57,10 +60,12 @@ func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClien pendingEventsKey(instance), historyKey(instance), payloadKey(instance), + instanceIDs(), }, nowStr, expiration.Seconds(), expStr, instanceSegment(instance), + instance.InstanceID, ).Err() } diff --git a/backend/redis/instance.go b/backend/redis/instance.go index cca68ce6..685ef156 100644 --- a/backend/redis/instance.go +++ b/backend/redis/instance.go @@ -26,7 +26,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo p := rb.rdb.TxPipeline() - if err := createInstanceP(ctx, p, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil { + if err := createInstanceP(ctx, p, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil { return err } @@ -145,7 +145,7 @@ type instanceState struct { LastSequenceID int64 `json:"last_sequence_id,omitempty"` } -func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *metadata.WorkflowMetadata, ignoreDuplicate bool) error { +func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *metadata.WorkflowMetadata) error { key := instanceKey(instance) createdAt := time.Now() @@ -165,6 +165,9 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work // The newly created instance is going to be the active execution setActiveInstanceExecutionP(ctx, p, instance) + // Record instance id + p.HSet(ctx, instanceIDs(), instance.InstanceID, 1) + p.ZAdd(ctx, instancesByCreation(), redis.Z{ Member: instanceSegment(instance), Score: float64(createdAt.UnixMilli()), diff --git a/backend/redis/keys.go b/backend/redis/keys.go index 24f6d2bf..8ebfa873 100644 --- a/backend/redis/keys.go +++ b/backend/redis/keys.go @@ -37,6 +37,10 @@ func instancesExpiring() string { return "instances-expiring" } +func instanceIDs() string { + return "instances" +} + func pendingEventsKey(instance *core.WorkflowInstance) string { return fmt.Sprintf("pending-events:%v", instanceSegment(instance)) } diff --git a/backend/redis/queue.go b/backend/redis/queue.go index dcaa73a2..2d0317a5 100644 --- a/backend/redis/queue.go +++ b/backend/redis/queue.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" "time" "github.com/google/uuid" @@ -119,7 +120,6 @@ var createGroupCmd = redis.NewScript(` return true `) - func (q *taskQueue[T]) Enqueue(ctx context.Context, p redis.Pipeliner, id string, data *T) error { ds, err := json.Marshal(data) if err != nil { @@ -139,6 +139,7 @@ func (q *taskQueue[T]) Dequeue(ctx context.Context, rdb redis.UniversalClient, l } if task != nil { + log.Println("Recovered task", task.ID) return task, nil } diff --git a/backend/redis/redis.go b/backend/redis/redis.go index 6ed8c27d..c9125086 100644 --- a/backend/redis/redis.go +++ b/backend/redis/redis.go @@ -2,7 +2,9 @@ package redis import ( "context" + "embed" "fmt" + "io/fs" "log/slog" "time" @@ -19,6 +21,11 @@ import ( var _ backend.Backend = (*redisBackend)(nil) +//go:embed scripts/*.lua +var luaScripts embed.FS + +var completeWorkflowTaskCmd *redis.Script + func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (*redisBackend, error) { workflowQueue, err := newTaskQueue[any](client, "workflows") if err != nil { @@ -52,15 +59,13 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) ( // them, loads them. This doesn't work when using (transactional) pipelines, so eagerly load them on startup. ctx := context.Background() cmds := map[string]*redis.StringCmd{ - "addEventsToStreamCmd": addEventsToStreamCmd.Load(ctx, rb.rdb), - "addFutureEventCmd": addFutureEventCmd.Load(ctx, rb.rdb), - "futureEventsCmd": futureEventsCmd.Load(ctx, rb.rdb), - "removeFutureEventCmd": removeFutureEventCmd.Load(ctx, rb.rdb), - "removePendingEventsCmd": removePendingEventsCmd.Load(ctx, rb.rdb), - "requeueInstanceCmd": requeueInstanceCmd.Load(ctx, rb.rdb), - "deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb), - "expireInstanceCmd": expireCmd.Load(ctx, rb.rdb), - "addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb), + "addEventsToStreamCmd": addEventsToStreamCmd.Load(ctx, rb.rdb), + "addFutureEventCmd": addFutureEventCmd.Load(ctx, rb.rdb), + "futureEventsCmd": futureEventsCmd.Load(ctx, rb.rdb), + "removeFutureEventCmd": removeFutureEventCmd.Load(ctx, rb.rdb), + "deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb), + "expireInstanceCmd": expireCmd.Load(ctx, rb.rdb), + "addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb), } for name, cmd := range cmds { // fmt.Println(name, cmd.Val()) @@ -70,6 +75,25 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) ( } } + // Load all Lua scripts + + cmdMapping := map[string]**redis.Script{ + "complete_workflow_task.lua": &completeWorkflowTaskCmd, + } + + for scriptFile, cmd := range cmdMapping { + scriptContent, err := fs.ReadFile(luaScripts, "scripts/"+scriptFile) + if err != nil { + return nil, fmt.Errorf("reading Lua script %s: %w", scriptFile, err) + } + + *cmd = redis.NewScript(string(scriptContent)) + + if c := (*cmd).Load(ctx, rb.rdb); c.Err() != nil { + return nil, fmt.Errorf("loading Lua script %s: %w", scriptFile, c.Err()) + } + } + return rb, nil } diff --git a/backend/redis/scripts/complete_workflow_task.lua b/backend/redis/scripts/complete_workflow_task.lua new file mode 100644 index 00000000..36e3bb4c --- /dev/null +++ b/backend/redis/scripts/complete_workflow_task.lua @@ -0,0 +1,222 @@ +local keyIdx = 1 +local argvIdx = 1 + +local getKey = function() + local key = KEYS[keyIdx] + keyIdx = keyIdx + 1 + return key +end + +local getArgv = function() + local argv = ARGV[argvIdx] + argvIdx = argvIdx + 1 + -- redis.call("ECHO", argv) + return argv +end + +-- Shared keys +local instanceKey = getKey() +local historyStreamKey = getKey() +local pendingEventsKey = getKey() +local payloadHashKey = getKey() +local futureEventZSetKey = getKey() +local activeInstancesKey = getKey() +local instancesByIdKey = getKey() + +local workflowSetKey = getKey() +local workflowStreamKey = getKey() + +local instanceSegment = getArgv() + +local storePayload = function(eventId, payload) + redis.pcall("HSETNX", payloadHashKey, eventId, payload) +end + +-- Read instance +local instance = cjson.decode(redis.call("GET", instanceKey)) + +-- Add executed events to history +local executedEvents = tonumber(getArgv()) +local lastSequenceId = 0 +for i = 1, executedEvents do + local eventId = getArgv() + local historyId = getArgv() + local eventData = getArgv() + local payloadData = getArgv() + local sequenceId = getArgv() + + -- Add event to history + redis.call("XADD", historyStreamKey, historyId, "event", eventData) + + storePayload(eventId, payloadData) + + lastSequenceId = tonumber(sequenceId) +end + +-- Remove canceled timers +local timersToCancel = tonumber(getArgv()) +for i = 1, timersToCancel do + local futureEventKey = getKey() + + redis.call("ZREM", futureEventZSetKey, futureEventKey) + -- remove payload + local eventId = redis.call("HGET", futureEventKey, "id") + redis.call("HDEL", payloadHashKey, eventId) + -- remove event hash + redis.call("DEL", futureEventKey) +end + +-- Schedule timers +local timersToSchedule = tonumber(getArgv()) +for i = 1, timersToSchedule do + local eventId = getArgv() + local timestamp = getArgv() + local eventData = getArgv() + local payloadData = getArgv() + + local futureEventKey = getKey() + + redis.call("ZADD", futureEventZSetKey, timestamp, futureEventKey) + redis.call("HSET", futureEventKey, "instance", instanceSegment, "id", eventId, "event", eventData) + storePayload(eventId, payloadData) +end + +-- Send events to other workflow instances +local otherWorkflowInstances = tonumber(getArgv()) +for i = 1, otherWorkflowInstances do + local targetInstanceKey = getKey() + local targetActiveInstanceExecutionKey = getKey() + + local targetInstanceSegment = getArgv() + local targetInstanceId = getArgv() + local createNewInstance = tonumber(getArgv()) + local eventsToDeliver = tonumber(getArgv()) + local skipEvents = false + + -- Creating a new instance? + if createNewInstance == 1 then + local targetInstanceState = getArgv() + local targetActiveInstanceExecutionState = getArgv() + + local conflictEventId = getArgv() + local conflictEventData = getArgv() + local conflictEventPayloadData = getArgv() + + -- Does the instance exist already? + local instanceExists = redis.call("HEXISTS", instancesByIdKey, targetInstanceId) + if instanceExists == 1 then + redis.call("XADD", pendingEventsKey, "*", "event", conflictEventData) + storePayload(conflictEventId, conflictEventPayloadData) + redis.call("ECHO", "Conflict detected, event " .. conflictEventId .. " was not delivered to instance " .. targetInstanceSegment .. ".") + + skipEvents = true + else + -- Create new instance + redis.call("SETNX", targetInstanceKey, targetInstanceState) + + -- Set active execution + redis.call("SET", targetActiveInstanceExecutionKey, targetActiveInstanceExecutionState) + + -- Track active instance + redis.call("SADD", activeInstancesKey, targetInstanceSegment) + + redis.call("HSET", instancesByIdKey, targetInstanceId, 1) + end + end + + local instancePendingEventsKey = getKey() + local instancePayloadHashKey = getKey() + for j = 1, eventsToDeliver do + local eventId = getArgv() + local eventData = getArgv() + local payloadData = getArgv() + + if not skipEvents then + -- Add event to pending events + redis.call("XADD", instancePendingEventsKey, "*", "event", eventData) + + -- Store payload + redis.pcall("HSETNX", instancePayloadHashKey, eventId, payloadData) + end + end + + -- If events were delivered, try to queue a workflow task + if eventsToDeliver > 0 and not skipEvents then + -- Enqueue workflow task + local added = redis.call("SADD", workflowSetKey, targetInstanceSegment) + if added == 1 then + redis.call("XADD", workflowStreamKey, "*", "id", targetInstanceSegment, "data", "") + end + end +end + +-- Update instance state +local now = getArgv() +local state = tonumber(getArgv()) + +-- State constants +local ContinuedAsNew = tonumber(getArgv()) +local Finished = tonumber(getArgv()) + +instance["state"] = state + +-- If workflow instance finished, remove active execution +local activeInstanceExecutionKey = getKey() +if state == ContinuedAsNew or state == Finished then + -- Remove active execution + redis.call("DEL", activeInstanceExecutionKey) + + instance["completed_at"] = now + + -- TODO: Set auto expiration + + redis.call("SREM", activeInstancesKey, instanceSegment) +end + +if lastSequenceId > 0 then + instance["last_sequence_id"] = lastSequenceId +end + +redis.call("SET", instanceKey, cjson.encode(instance)) + +-- Schedule activities +local activities = tonumber(getArgv()) +local activitySetKey = getKey() +local activityStreamKey = getKey() +for i = 1, activities do + local activityId = getArgv() + local activityData = getArgv() + + local added = redis.call("SADD", activitySetKey, activityId) + if added == 1 then + redis.call("XADD", activityStreamKey, "*", "id", activityId, "data", activityData) + end +end + +-- Remove executed pending events +local lastPendingEventMessageId = getArgv() +redis.call("XTRIM", pendingEventsKey, "MINID", lastPendingEventMessageId) +redis.call("XDEL", pendingEventsKey, lastPendingEventMessageId) + +-- Complete workflow task and unlock instance +local taskId = getArgv() +local groupName = getArgv() +local task = redis.call("XRANGE", workflowStreamKey, taskId, taskId) +if #task ~= 0 then + local id = task[1][2][2] + redis.call("SREM", workflowSetKey, id) + redis.call("XACK", workflowStreamKey, groupName, taskId) + + redis.call("XDEL", workflowStreamKey, taskId) +end + +-- If there are pending events, queue the instance again +local pending_events = redis.call("XLEN", pendingEventsKey) +if pending_events > 0 then + local added = redis.call("SADD", workflowSetKey, instanceSegment) + if added == 1 then + redis.call("XADD", workflowStreamKey, "*", "id", instanceSegment, "data", "") + end +end + +return true \ No newline at end of file diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index a7655ed7..402e5e13 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -4,16 +4,14 @@ import ( "context" "encoding/json" "fmt" + "strconv" "time" "github.com/cschleiden/go-workflows/backend" "github.com/cschleiden/go-workflows/backend/history" "github.com/cschleiden/go-workflows/core" - "github.com/cschleiden/go-workflows/internal/log" - "github.com/cschleiden/go-workflows/internal/tracing" + "github.com/cschleiden/go-workflows/internal/workflowerrors" "github.com/redis/go-redis/v9" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) { @@ -89,32 +87,6 @@ func (rb *redisBackend) ExtendWorkflowTask(ctx context.Context, taskID string, i return err } -// Remove all pending events before (and including) a given message id -// KEYS[1] - pending events stream key -// ARGV[1] - message id -var removePendingEventsCmd = redis.NewScript(` - local trimmed = redis.call("XTRIM", KEYS[1], "MINID", ARGV[1]) - local deleted = redis.call("XDEL", KEYS[1], ARGV[1]) - local removed = trimmed + deleted - return removed -`) - -// KEYS[1] - pending events -// KEYS[2] - task queue stream -// KEYS[3] - task queue set -// ARGV[1] - Instance segment -var requeueInstanceCmd = redis.NewScript(` - local pending_events = redis.call("XLEN", KEYS[1]) - if pending_events > 0 then - local added = redis.call("SADD", KEYS[3], ARGV[1]) - if added == 1 then - redis.call("XADD", KEYS[2], "*", "id", ARGV[1], "data", "") - end - end - - return true -`) - func (rb *redisBackend) CompleteWorkflowTask( ctx context.Context, task *backend.WorkflowTask, @@ -123,150 +95,178 @@ func (rb *redisBackend) CompleteWorkflowTask( executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent, ) error { - instanceState, err := readInstance(ctx, rb.rdb, instanceKey(instance)) - if err != nil { - return err - } - - // Check-point the workflow. We guarantee that no other worker is working on this workflow instance at this point via the - // task queue, so we don't need to WATCH the keys, we just need to make sure all commands are executed atomically to prevent - // bad state if a worker crashes in the middle of this execution. - p := rb.rdb.TxPipeline() + keys := make([]string, 0) + args := make([]interface{}, 0) + + queueKeys := rb.workflowQueue.Keys() + keys = append(keys, + instanceKey(instance), + historyKey(instance), + pendingEventsKey(instance), + payloadKey(instance), + futureEventsKey(), + instancesActive(), + instanceIDs(), + queueKeys.SetKey, + queueKeys.StreamKey, + ) + args = append(args, instanceSegment(instance)) // Add executed events to the history - if err := addEventPayloadsP(ctx, p, instance, executedEvents); err != nil { - return fmt.Errorf("adding event payloads: %w", err) - } + args = append(args, len(executedEvents)) + + for _, event := range executedEvents { + eventData, err := marshalEventWithoutAttributes(event) + if err != nil { + return fmt.Errorf("marshaling event: %w", err) + } + + payloadData, err := json.Marshal(event.Attributes) + if err != nil { + return fmt.Errorf("marshaling event payload: %w", err) + } - if err := addEventsToStreamP(ctx, p, historyKey(instance), executedEvents); err != nil { - return fmt.Errorf("serializing : %w", err) + args = append(args, event.ID, historyID(event.SequenceID), eventData, payloadData, event.SequenceID) } + // Remove canceled timers + timersToCancel := make([]*history.Event, 0) for _, event := range executedEvents { switch event.Type { case history.EventType_TimerCanceled: - removeFutureEventP(ctx, p, instance, event) + timersToCancel = append(timersToCancel, event) } } + args = append(args, len(timersToCancel)) + for _, event := range timersToCancel { + keys = append(keys, futureEventKey(instance, event.ScheduleEventID)) + } + // Schedule timers + args = append(args, len(timerEvents)) for _, timerEvent := range timerEvents { - if err := addFutureEventP(ctx, p, instance, timerEvent); err != nil { - return fmt.Errorf("adding future event: %w", err) + eventData, err := marshalEventWithoutAttributes(timerEvent) + if err != nil { + return fmt.Errorf("marshaling event: %w", err) + } + + payloadEventData, err := json.Marshal(timerEvent.Attributes) + if err != nil { + return fmt.Errorf("marshaling event payload: %w", err) } + + args = append(args, timerEvent.ID, strconv.FormatInt(timerEvent.VisibleAt.UnixMilli(), 10), eventData, payloadEventData) + keys = append(keys, futureEventKey(instance, timerEvent.ScheduleEventID)) } // Send new workflow events to the respective streams groupedEvents := history.EventsByWorkflowInstance(workflowEvents) + args = append(args, len(groupedEvents)) for targetInstance, events := range groupedEvents { - // Insert pending events for target instance - for _, m := range events { - m := m - - if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted { - // Create new instance - a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) - if err := createInstanceP(ctx, p, m.WorkflowInstance, a.Metadata, true); err != nil { - return err - } + keys = append(keys, instanceKey(&targetInstance), activeInstanceExecutionKey(targetInstance.InstanceID)) + args = append(args, instanceSegment(&targetInstance), targetInstance.InstanceID) + + // Are we creating a new sub-workflow instance? + m := events[0] + createNewInstance := m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted + args = append(args, createNewInstance) + args = append(args, len(events)) + + if createNewInstance { + a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) + isb, err := json.Marshal(&instanceState{ + Instance: &targetInstance, + State: core.WorkflowInstanceStateActive, + Metadata: a.Metadata, + CreatedAt: time.Now(), + }) + if err != nil { + return fmt.Errorf("marshaling new instance state: %w", err) } - // Add pending event to stream - if err := addEventPayloadsP(ctx, p, &targetInstance, []*history.Event{m.HistoryEvent}); err != nil { - return fmt.Errorf("adding event payloads: %w", err) + ib, err := json.Marshal(targetInstance) + if err != nil { + return fmt.Errorf("marshaling instance: %w", err) } - if err := addEventToStreamP(ctx, p, pendingEventsKey(&targetInstance), m.HistoryEvent); err != nil { - return fmt.Errorf("adding event to stream: %w", err) - } - } + args = append(args, isb, ib) - // Try to enqueue workflow task - if targetInstance.InstanceID != instance.InstanceID || targetInstance.ExecutionID != instance.ExecutionID { - if err := rb.workflowQueue.Enqueue(ctx, p, instanceSegment(&targetInstance), nil); err != nil { - return fmt.Errorf("enqueuing workflow task: %w", err) + // Create pending event for conflicts + pfe := history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowFailed, &history.SubWorkflowFailedAttributes{ + Error: workflowerrors.FromError(backend.ErrInstanceAlreadyExists), + }, history.ScheduleEventID(m.WorkflowInstance.ParentEventID)) + eventData, payloadEventData, err := marshalEvent(pfe) + if err != nil { + return fmt.Errorf("marshaling event: %w", err) } - } - } - - instanceState.State = state - if state == core.WorkflowInstanceStateFinished || state == core.WorkflowInstanceStateContinuedAsNew { - t := time.Now() - instanceState.CompletedAt = &t + args = append(args, pfe.ID, eventData, payloadEventData) + } - removeActiveInstanceExecutionP(ctx, p, instance) - } + keys = append(keys, pendingEventsKey(&targetInstance), payloadKey(&targetInstance)) + for _, m := range events { + eventData, payloadEventData, err := marshalEvent(m.HistoryEvent) + if err != nil { + return fmt.Errorf("marshaling event: %w", err) + } - if len(executedEvents) > 0 { - instanceState.LastSequenceID = executedEvents[len(executedEvents)-1].SequenceID + args = append(args, m.HistoryEvent.ID, eventData, payloadEventData) + } } - if err := updateInstanceP(ctx, p, instance, instanceState); err != nil { - return fmt.Errorf("updating workflow instance: %w", err) - } + // Update instance state and update active execution + nowStr := time.Now().Format(time.RFC3339) + args = append(args, string(nowStr), int(state), int(core.WorkflowInstanceStateContinuedAsNew), int(core.WorkflowInstanceStateFinished)) + keys = append(keys, activeInstanceExecutionKey(instance.InstanceID)) // Store activity data + args = append(args, len(activityEvents)) + activityQueueKeys := rb.activityQueue.Keys() + keys = append(keys, activityQueueKeys.SetKey, activityQueueKeys.StreamKey) for _, activityEvent := range activityEvents { - if err := rb.activityQueue.Enqueue(ctx, p, activityEvent.ID, &activityData{ + activityData, err := json.Marshal(&activityData{ Instance: instance, ID: activityEvent.ID, Event: activityEvent, - }); err != nil { - return fmt.Errorf("queueing activity task: %w", err) + }) + if err != nil { + return fmt.Errorf("marshaling activity data: %w", err) } + args = append(args, activityEvent.ID, activityData) } // Remove executed pending events - if task.CustomData != nil { - lastPendingEventMessageID := task.CustomData.(string) - if err := removePendingEventsCmd.Run(ctx, p, []string{pendingEventsKey(instance)}, lastPendingEventMessageID).Err(); err != nil { - return fmt.Errorf("removing pending events: %w", err) - } - } + lastPendingEventMessageID := task.CustomData.(string) + args = append(args, lastPendingEventMessageID) // Complete workflow task and unlock instance. - completeCmd, err := rb.workflowQueue.Complete(ctx, p, task.ID) - if err != nil { - return fmt.Errorf("completing workflow task: %w", err) - } + args = append(args, task.ID, rb.workflowQueue.groupName) // If there are pending events, queue the instance again - keyInfo := rb.workflowQueue.Keys() - requeueInstanceCmd.Run(ctx, p, - []string{pendingEventsKey(instance), keyInfo.StreamKey, keyInfo.SetKey}, - instanceSegment(instance), - ) + // No args/keys needed // Commit transaction - executedCmds, err := p.Exec(ctx) + _, err := completeWorkflowTaskCmd.Run(ctx, rb.rdb, keys, args...).Result() if err != nil { - if err := completeCmd.Err(); err != nil && err == redis.Nil { - return fmt.Errorf("could not complete workflow task: %w", err) - } - - for _, cmd := range executedCmds { - if cmdErr := cmd.Err(); cmdErr != nil { - rb.Logger().Debug("redis command error", log.NamespaceKey+".redis.cmd", cmd.Name(), "cmdString", cmd.String(), log.NamespaceKey+".redis.cmdErr", cmdErr.Error()) - } - } - return fmt.Errorf("completing workflow task: %w", err) } if state == core.WorkflowInstanceStateFinished || state == core.WorkflowInstanceStateContinuedAsNew { // Trace workflow completion - ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, instanceState.Metadata) - if err != nil { - rb.Logger().Error("extracting tracing context", log.ErrorKey, err) - } - - _, span := rb.Tracer().Start(ctx, "WorkflowComplete", - trace.WithAttributes( - attribute.String(log.NamespaceKey+log.InstanceIDKey, instanceState.Instance.InstanceID), - )) - span.End() - + // TODO: Return metadata from script + // ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, instanceState.Metadata) + // if err != nil { + // rb.Logger().Error("extracting tracing context", log.ErrorKey, err) + // } + + // _, span := rb.Tracer().Start(ctx, "WorkflowComplete", + // trace.WithAttributes( + // attribute.String(log.NamespaceKey+log.InstanceIDKey, instanceState.Instance.InstanceID), + // )) + // span.End() + + // TODO: Move to script if rb.options.AutoExpiration > 0 { if err := setWorkflowInstanceExpiration(ctx, rb.rdb, instance, rb.options.AutoExpiration); err != nil { return fmt.Errorf("setting workflow instance expiration: %w", err) @@ -277,6 +277,19 @@ func (rb *redisBackend) CompleteWorkflowTask( return nil } +func marshalEvent(event *history.Event) (string, string, error) { + eventData, err := marshalEventWithoutAttributes(event) + if err != nil { + return "", "", fmt.Errorf("marshaling event payload: %w", err) + } + + payloadEventData, err := json.Marshal(event.Attributes) + if err != nil { + return "", "", fmt.Errorf("marshaling event payload: %w", err) + } + return eventData, string(payloadEventData), nil +} + func (rb *redisBackend) addWorkflowInstanceEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error { // Add event to pending events for instance if err := addEventPayloadsP(ctx, p, instance, []*history.Event{event}); err != nil { diff --git a/backend/test/backendtest.go b/backend/test/backendtest.go index 5d6e3b15..c4d84cdb 100644 --- a/backend/test/backendtest.go +++ b/backend/test/backendtest.go @@ -297,6 +297,7 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test task, err = b.GetWorkflowTask(ctx) require.NoError(t, err) + require.NotNil(t, task) require.Equal(t, subInstance1, task.WorkflowInstance) require.Equal(t, history.EventType_WorkflowExecutionCanceled, task.NewEvents[len(task.NewEvents)-1].Type) }, diff --git a/backend/test/e2e.go b/backend/test/e2e.go index 41e28378..fd1a3490 100644 --- a/backend/test/e2e.go +++ b/backend/test/e2e.go @@ -231,8 +231,7 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti instance := runWorkflow(t, ctx, c, wf) _, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20) - require.Error(t, err) - require.ErrorIs(t, err, backend.ErrInstanceAlreadyExists) + require.Error(t, err, backend.ErrInstanceAlreadyExists.Error()) }, }, { diff --git a/internal/workflowerrors/error.go b/internal/workflowerrors/error.go index 60c13d4c..33118268 100644 --- a/internal/workflowerrors/error.go +++ b/internal/workflowerrors/error.go @@ -36,6 +36,10 @@ func (we *Error) Error() string { } func (we *Error) Unwrap() error { + if we == nil || we.Cause == (*Error)(nil) { + return nil + } + return we.Cause } From 40b921353e91c08ab25c425d70e8021ac5c5a92f Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 Nov 2023 21:19:40 -0800 Subject: [PATCH 09/17] Remove cmd --- backend/redis/redis.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/redis/redis.go b/backend/redis/redis.go index c9125086..35b9a473 100644 --- a/backend/redis/redis.go +++ b/backend/redis/redis.go @@ -59,7 +59,6 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) ( // them, loads them. This doesn't work when using (transactional) pipelines, so eagerly load them on startup. ctx := context.Background() cmds := map[string]*redis.StringCmd{ - "addEventsToStreamCmd": addEventsToStreamCmd.Load(ctx, rb.rdb), "addFutureEventCmd": addFutureEventCmd.Load(ctx, rb.rdb), "futureEventsCmd": futureEventsCmd.Load(ctx, rb.rdb), "removeFutureEventCmd": removeFutureEventCmd.Load(ctx, rb.rdb), From 90737c7b423f3535a3be567580b7906e56ff8301 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 Nov 2023 22:04:32 -0800 Subject: [PATCH 10/17] Do not skip adding subworkflow events for sqlite --- backend/sqlite/sqlite.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index c4e47ab6..791f02c6 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -557,8 +557,6 @@ func (sb *sqliteBackend) CompleteWorkflowTask( return fmt.Errorf("creating sub-workflow instance: %w", err) } - - continue } // Insert pending events for target instance From f2db228589e2f0f734e3419c0b660385417a2cb9 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 Nov 2023 22:19:24 -0800 Subject: [PATCH 11/17] Simplify check --- backend/mysql/mysql.go | 3 ++- backend/redis/delete.go | 6 +----- backend/redis/expire.go | 5 ----- backend/redis/instance.go | 3 --- backend/redis/keys.go | 4 ---- .../redis/scripts/complete_workflow_task.lua | 5 +---- backend/redis/workflow.go | 1 - backend/sqlite/sqlite.go | 3 ++- backend/test/e2e.go | 19 +++++++++++++------ 9 files changed, 19 insertions(+), 30 deletions(-) diff --git a/backend/mysql/mysql.go b/backend/mysql/mysql.go index a8035b79..453d6e44 100644 --- a/backend/mysql/mysql.go +++ b/backend/mysql/mysql.go @@ -312,7 +312,8 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error { // Check for existing instance - if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows { + if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive). + Scan(new(int)); err != sql.ErrNoRows { return backend.ErrInstanceAlreadyExists } diff --git a/backend/redis/delete.go b/backend/redis/delete.go index f637ed8b..6ae9bcd6 100644 --- a/backend/redis/delete.go +++ b/backend/redis/delete.go @@ -14,12 +14,9 @@ import ( // KEYS[4] - payload key // KEYS[5] - active-instance-execution key // KEYS[6] - instances-by-creation key -// KEYS[7] - instances // ARGV[1] - instance segment -// ARGV[2] - instance id var deleteCmd = redis.NewScript( `redis.call("DEL", KEYS[1], KEYS[2], KEYS[3], KEYS[4], KEYS[5]) - redis.call("HDEL", KEYS[7], ARGV[1]) return redis.call("ZREM", KEYS[6], ARGV[1])`) // deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending @@ -34,8 +31,7 @@ func deleteInstance(ctx context.Context, rdb redis.UniversalClient, instance *co payloadKey(instance), activeInstanceExecutionKey(instance.InstanceID), instancesByCreation(), - instanceIDs(), - }, instanceSegment(instance), instance.InstanceID).Err(); err != nil { + }, instanceSegment(instance)).Err(); err != nil { return fmt.Errorf("failed to delete instance: %w", err) } diff --git a/backend/redis/expire.go b/backend/redis/expire.go index b4b81f58..a0fc7d5a 100644 --- a/backend/redis/expire.go +++ b/backend/redis/expire.go @@ -18,12 +18,10 @@ import ( // KEYS[4] - pending events key // KEYS[5] - history key // KEYS[6] - payload key -// KEYS[7] - instances key // ARGV[1] - current timestamp // ARGV[2] - expiration time in seconds // ARGV[3] - expiration timestamp in unix milliseconds // ARGV[4] - instance segment -// ARGV[5] - instance id var expireCmd = redis.NewScript( `-- Find instances which have already expired and remove from the index set local expiredInstances = redis.call("ZRANGE", KEYS[2], "-inf", ARGV[1], "BYSCORE") @@ -31,7 +29,6 @@ var expireCmd = redis.NewScript( local instanceSegment = expiredInstances[i] redis.call("ZREM", KEYS[1], instanceSegment) -- index set redis.call("ZREM", KEYS[2], instanceSegment) -- expiration set - redis.call("HDEL", KEYS[7], ARGV[5]) end -- Add expiration time for future cleanup @@ -60,12 +57,10 @@ func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClien pendingEventsKey(instance), historyKey(instance), payloadKey(instance), - instanceIDs(), }, nowStr, expiration.Seconds(), expStr, instanceSegment(instance), - instance.InstanceID, ).Err() } diff --git a/backend/redis/instance.go b/backend/redis/instance.go index 685ef156..3526aab5 100644 --- a/backend/redis/instance.go +++ b/backend/redis/instance.go @@ -165,9 +165,6 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work // The newly created instance is going to be the active execution setActiveInstanceExecutionP(ctx, p, instance) - // Record instance id - p.HSet(ctx, instanceIDs(), instance.InstanceID, 1) - p.ZAdd(ctx, instancesByCreation(), redis.Z{ Member: instanceSegment(instance), Score: float64(createdAt.UnixMilli()), diff --git a/backend/redis/keys.go b/backend/redis/keys.go index 8ebfa873..24f6d2bf 100644 --- a/backend/redis/keys.go +++ b/backend/redis/keys.go @@ -37,10 +37,6 @@ func instancesExpiring() string { return "instances-expiring" } -func instanceIDs() string { - return "instances" -} - func pendingEventsKey(instance *core.WorkflowInstance) string { return fmt.Sprintf("pending-events:%v", instanceSegment(instance)) } diff --git a/backend/redis/scripts/complete_workflow_task.lua b/backend/redis/scripts/complete_workflow_task.lua index 36e3bb4c..93de72aa 100644 --- a/backend/redis/scripts/complete_workflow_task.lua +++ b/backend/redis/scripts/complete_workflow_task.lua @@ -21,7 +21,6 @@ local pendingEventsKey = getKey() local payloadHashKey = getKey() local futureEventZSetKey = getKey() local activeInstancesKey = getKey() -local instancesByIdKey = getKey() local workflowSetKey = getKey() local workflowStreamKey = getKey() @@ -103,7 +102,7 @@ for i = 1, otherWorkflowInstances do local conflictEventPayloadData = getArgv() -- Does the instance exist already? - local instanceExists = redis.call("HEXISTS", instancesByIdKey, targetInstanceId) + local instanceExists = redis.call("EXISTS", targetActiveInstanceExecutionState) if instanceExists == 1 then redis.call("XADD", pendingEventsKey, "*", "event", conflictEventData) storePayload(conflictEventId, conflictEventPayloadData) @@ -119,8 +118,6 @@ for i = 1, otherWorkflowInstances do -- Track active instance redis.call("SADD", activeInstancesKey, targetInstanceSegment) - - redis.call("HSET", instancesByIdKey, targetInstanceId, 1) end end diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index 402e5e13..fb68fa95 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -106,7 +106,6 @@ func (rb *redisBackend) CompleteWorkflowTask( payloadKey(instance), futureEventsKey(), instancesActive(), - instanceIDs(), queueKeys.SetKey, queueKeys.StreamKey, ) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 791f02c6..35c06174 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -187,7 +187,8 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error { // Check for existing instance - if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows { + if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive). + Scan(new(int)); err != sql.ErrNoRows { return backend.ErrInstanceAlreadyExists } diff --git a/backend/test/e2e.go b/backend/test/e2e.go index fd1a3490..9794681e 100644 --- a/backend/test/e2e.go +++ b/backend/test/e2e.go @@ -206,18 +206,25 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti name: "SubWorkflow_DuplicateInstanceID", f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { swf := func(ctx workflow.Context, i int) (int, error) { + workflow.NewSignalChannel[any](ctx, "signal").Receive(ctx) + return i * 2, nil } wf := func(ctx workflow.Context) (int, error) { - r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ + swf1 := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ InstanceID: "subworkflow", - }, swf, 1).Get(ctx) - if err != nil { - return 0, err - } + }, swf, 1) + + defer func() { + rctx := workflow.NewDisconnectedContext(ctx) + + // Unblock waiting sub workflow + workflow.SignalWorkflow[any](rctx, "subworkflow", "signal", 1).Get(rctx) + swf1.Get(rctx) + }() // Run another subworkflow with the same ID - r, err = workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ + r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ InstanceID: "subworkflow", }, swf, 1).Get(ctx) if err != nil { From ac207ce786f91dd7d80400b8cac0fb32a1266c6e Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Fri, 17 Nov 2023 08:12:33 -0800 Subject: [PATCH 12/17] Remove extraneous query for mysql backend --- backend/mysql/mysql.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/backend/mysql/mysql.go b/backend/mysql/mysql.go index 453d6e44..57e84370 100644 --- a/backend/mysql/mysql.go +++ b/backend/mysql/mysql.go @@ -146,11 +146,6 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor } defer tx.Rollback() - // Check for existing instance - if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", instance.InstanceID).Scan(new(int)); err != sql.ErrNoRows { - return backend.ErrInstanceAlreadyExists - } - // Create workflow instance if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil { return err From 2cebf16ddf3594a2873338c5e9faae9a0bd12c80 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Fri, 17 Nov 2023 08:24:02 -0800 Subject: [PATCH 13/17] Remove unused code --- backend/redis/instance.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/backend/redis/instance.go b/backend/redis/instance.go index 3526aab5..dc997d09 100644 --- a/backend/redis/instance.go +++ b/backend/redis/instance.go @@ -175,25 +175,6 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work return nil } -func updateInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, state *instanceState) error { - key := instanceKey(instance) - - b, err := json.Marshal(state) - if err != nil { - return fmt.Errorf("marshaling instance state: %w", err) - } - - p.Set(ctx, key, string(b), 0) - - if state.State != core.WorkflowInstanceStateActive { - p.SRem(ctx, instancesActive(), instanceSegment(instance)) - } - - // CreatedAt does not change, so skip updating the instancesByCreation() ZSET - - return nil -} - func readInstance(ctx context.Context, rdb redis.UniversalClient, instanceKey string) (*instanceState, error) { p := rdb.Pipeline() @@ -255,9 +236,3 @@ func setActiveInstanceExecutionP(ctx context.Context, p redis.Pipeliner, instanc return p.Set(ctx, key, string(b), 0).Err() } - -func removeActiveInstanceExecutionP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance) error { - key := activeInstanceExecutionKey(instance.InstanceID) - - return p.Del(ctx, key).Err() -} From 34fd5ada5f11c723b06cd51b661a649c9186c4da Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Fri, 17 Nov 2023 10:01:35 -0800 Subject: [PATCH 14/17] Resolve todo and add tracing --- .../redis/scripts/complete_workflow_task.lua | 2 -- backend/redis/workflow.go | 31 ++++++++++--------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/backend/redis/scripts/complete_workflow_task.lua b/backend/redis/scripts/complete_workflow_task.lua index 93de72aa..fd9ec2b1 100644 --- a/backend/redis/scripts/complete_workflow_task.lua +++ b/backend/redis/scripts/complete_workflow_task.lua @@ -165,8 +165,6 @@ if state == ContinuedAsNew or state == Finished then instance["completed_at"] = now - -- TODO: Set auto expiration - redis.call("SREM", activeInstancesKey, instanceSegment) end diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index fb68fa95..244f5b83 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -10,8 +10,12 @@ import ( "github.com/cschleiden/go-workflows/backend" "github.com/cschleiden/go-workflows/backend/history" "github.com/cschleiden/go-workflows/core" + "github.com/cschleiden/go-workflows/internal/log" + "github.com/cschleiden/go-workflows/internal/tracing" "github.com/cschleiden/go-workflows/internal/workflowerrors" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) { @@ -245,7 +249,7 @@ func (rb *redisBackend) CompleteWorkflowTask( // If there are pending events, queue the instance again // No args/keys needed - // Commit transaction + // Run script _, err := completeWorkflowTaskCmd.Run(ctx, rb.rdb, keys, args...).Result() if err != nil { return fmt.Errorf("completing workflow task: %w", err) @@ -253,19 +257,18 @@ func (rb *redisBackend) CompleteWorkflowTask( if state == core.WorkflowInstanceStateFinished || state == core.WorkflowInstanceStateContinuedAsNew { // Trace workflow completion - // TODO: Return metadata from script - // ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, instanceState.Metadata) - // if err != nil { - // rb.Logger().Error("extracting tracing context", log.ErrorKey, err) - // } - - // _, span := rb.Tracer().Start(ctx, "WorkflowComplete", - // trace.WithAttributes( - // attribute.String(log.NamespaceKey+log.InstanceIDKey, instanceState.Instance.InstanceID), - // )) - // span.End() - - // TODO: Move to script + ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, task.Metadata) + if err != nil { + rb.Logger().Error("extracting tracing context", log.ErrorKey, err) + } + + _, span := rb.Tracer().Start(ctx, "WorkflowComplete", + trace.WithAttributes( + attribute.String(log.NamespaceKey+log.InstanceIDKey, task.WorkflowInstance.InstanceID), + )) + span.End() + + // Auto expiration if rb.options.AutoExpiration > 0 { if err := setWorkflowInstanceExpiration(ctx, rb.rdb, instance, rb.options.AutoExpiration); err != nil { return fmt.Errorf("setting workflow instance expiration: %w", err) From 5120f1ddc1ab8633bd97736479817729fbc048f5 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Fri, 17 Nov 2023 19:39:16 -0800 Subject: [PATCH 15/17] Remove log message --- backend/redis/queue.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/redis/queue.go b/backend/redis/queue.go index 2d0317a5..2a58e861 100644 --- a/backend/redis/queue.go +++ b/backend/redis/queue.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "log" "time" "github.com/google/uuid" @@ -139,7 +138,6 @@ func (q *taskQueue[T]) Dequeue(ctx context.Context, rdb redis.UniversalClient, l } if task != nil { - log.Println("Recovered task", task.ID) return task, nil } From baf44d5a7e1cd53c5fd301138c736a9dbdbee2ac Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Fri, 17 Nov 2023 19:54:38 -0800 Subject: [PATCH 16/17] Add test to ensure instance id can be re-used after workflow is finished --- backend/test/e2e.go | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/backend/test/e2e.go b/backend/test/e2e.go index 9794681e..ce8b058e 100644 --- a/backend/test/e2e.go +++ b/backend/test/e2e.go @@ -203,7 +203,7 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti }, }, { - name: "SubWorkflow_DuplicateInstanceID", + name: "SubWorkflow_DuplicateActiveInstanceID", f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { swf := func(ctx workflow.Context, i int) (int, error) { workflow.NewSignalChannel[any](ctx, "signal").Receive(ctx) @@ -241,6 +241,40 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti require.Error(t, err, backend.ErrInstanceAlreadyExists.Error()) }, }, + { + name: "SubWorkflow_DuplicateInactiveInstanceID", + f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { + swf := func(ctx workflow.Context, i int) (int, error) { + return i * 2, nil + } + wf := func(ctx workflow.Context) (int, error) { + // Let sub-workflow run to completion + r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ + InstanceID: "subworkflow", + }, swf, 1).Get(ctx) + if err != nil { + return 0, err + } + + // Run another subworkflow with the same ID + r, err = workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{ + InstanceID: "subworkflow", + }, swf, 2).Get(ctx) + if err != nil { + return 0, err + } + + return r, nil + } + register(t, ctx, w, []interface{}{wf, swf}, nil) + + instance := runWorkflow(t, ctx, c, wf) + + r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20) + require.NoError(t, err) + require.Equal(t, 4, r) + }, + }, { name: "SubWorkflow_PropagateCancellation", f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { From 7bd93bac76c9482654bac12ac017b30b34a7a705 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Fri, 17 Nov 2023 21:25:06 -0800 Subject: [PATCH 17/17] Add tests scheduling large number of activities --- backend/test/e2e.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/backend/test/e2e.go b/backend/test/e2e.go index ce8b058e..a2ba600c 100644 --- a/backend/test/e2e.go +++ b/backend/test/e2e.go @@ -12,6 +12,7 @@ import ( "github.com/cschleiden/go-workflows/backend/history" "github.com/cschleiden/go-workflows/client" "github.com/cschleiden/go-workflows/core" + "github.com/cschleiden/go-workflows/internal/sync" internalwf "github.com/cschleiden/go-workflows/internal/workflow" "github.com/cschleiden/go-workflows/worker" "github.com/cschleiden/go-workflows/workflow" @@ -43,6 +44,48 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti require.Equal(t, "hello world", output) }, }, + { + name: "Workflow_LotsOfActivities", + f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { + a := func(ctx context.Context) (int, error) { + return 42, nil + } + + wf := func(ctx workflow.Context, msg string) (string, error) { + done := 0 + + y := make([]workflow.SelectCase, 0) + for i := 0; i < 100; i++ { + var sc workflow.SelectCase + sc = workflow.Await[int](workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{}, a), + func(ctx sync.Context, f workflow.Future[int]) { + done++ + + // Remove sc from y + for i, v := range y { + if v == sc { + y = append(y[:i], y[i+1:]...) + break + } + } + }) + y = append(y, sc) + } + + for done < 100 { + workflow.Select(ctx, y...) + } + + return msg + " world", nil + } + register(t, ctx, w, []interface{}{wf}, []interface{}{a}) + + output, err := runWorkflowWithResult[string](t, ctx, c, wf, "hello") + + require.NoError(t, err) + require.Equal(t, "hello world", output) + }, + }, { name: "SimpleWorkflow_ExpectedHistory", f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {