Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent multiple active instances with the same id #288

Merged
merged 17 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 29 additions & 23 deletions backend/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cschleiden/go-workflows/backend/metrics"
"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/internal/metrickeys"
"github.com/cschleiden/go-workflows/internal/workflowerrors"
"github.com/cschleiden/go-workflows/workflow"
_ "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
Expand Down Expand Up @@ -146,7 +147,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor
defer tx.Rollback()

// Create workflow instance
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil {
return err
}

Expand Down Expand Up @@ -304,7 +305,13 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
return state, nil
}

func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error {
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error {
// Check for existing instance
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive).
Scan(new(int)); err != sql.ErrNoRows {
return backend.ErrInstanceAlreadyExists
}

var parentInstanceID, parentExecutionID *string
var parentEventID *int64
if wfi.SubWorkflow() {
Expand All @@ -318,9 +325,9 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
return fmt.Errorf("marshaling metadata: %w", err)
}

res, err := tx.ExecContext(
_, err = tx.ExecContext(
ctx,
"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)",
"INSERT INTO `instances` (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)",
wfi.InstanceID,
wfi.ExecutionID,
parentInstanceID,
Expand All @@ -333,17 +340,6 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
return fmt.Errorf("inserting workflow instance: %w", err)
}

if !ignoreDuplicate {
rows, err := res.RowsAffected()
if err != nil {
return err
}

if rows != 1 {
return backend.ErrInstanceAlreadyExists
}
}

return nil
}

Expand Down Expand Up @@ -624,23 +620,33 @@ func (b *mysqlBackend) CompleteWorkflowTask(
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)

for targetInstance, events := range groupedEvents {
for _, m := range events {
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
// Create new instance
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil {
return err
// Are we creating a new sub-workflow instance?
m := events[0]
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
// Create new instance
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil {
if err == backend.ErrInstanceAlreadyExists {
if err := insertPendingEvents(ctx, tx, instance, []*history.Event{
history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowFailed, &history.SubWorkflowFailedAttributes{
Error: workflowerrors.FromError(backend.ErrInstanceAlreadyExists),
}, history.ScheduleEventID(m.WorkflowInstance.ParentEventID)),
}); err != nil {
return fmt.Errorf("inserting sub-workflow failed event: %w", err)
}

continue
}

break
return fmt.Errorf("creating sub-workflow instance: %w", err)
}
}

// Insert pending events for target instance
historyEvents := []*history.Event{}
for _, m := range events {
historyEvents = append(historyEvents, m.HistoryEvent)
}

if err := insertPendingEvents(ctx, tx, &targetInstance, historyEvents); err != nil {
return fmt.Errorf("inserting messages: %w", err)
}
Expand Down
87 changes: 0 additions & 87 deletions backend/redis/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"

"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
Expand Down Expand Up @@ -73,89 +72,3 @@ func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string,
},
}).Err()
}

// addEventsToStream adds the given events to the given event stream. If successful, the message id of the last event added
// is returned
// KEYS[1] - stream key
// ARGV[1] - event data as serialized strings
var addEventsToStreamCmd = redis.NewScript(`
local msgID = ""
for i = 1, #ARGV, 2 do
msgID = redis.call("XADD", KEYS[1], ARGV[i], "event", ARGV[i + 1])
end
return msgID
`)

func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error {
eventsData := make([]string, 0)
for _, event := range events {
eventData, err := marshalEventWithoutAttributes(event)
if err != nil {
return err
}

// log.Println("addEventsToHistoryStreamP:", event.SequenceID, string(eventData))

eventsData = append(eventsData, historyID(event.SequenceID))
eventsData = append(eventsData, string(eventData))
}

addEventsToStreamCmd.Run(ctx, p, []string{streamKey}, eventsData)

return nil
}

// Adds an event to be delivered in the future. Not cluster-safe.
// KEYS[1] - future event zset key
// KEYS[2] - future event key
// KEYS[3] - instance payload key
// ARGV[1] - timestamp
// ARGV[2] - Instance segment
// ARGV[3] - event id
// ARGV[4] - event data
// ARGV[5] - event payload
var addFutureEventCmd = redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4])
redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5])
return 0
`)

func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
eventData, err := marshalEventWithoutAttributes(event)
if err != nil {
return err
}

payloadEventData, err := json.Marshal(event.Attributes)
if err != nil {
return err
}

return addFutureEventCmd.Run(
ctx, p,
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)},
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
instanceSegment(instance),
event.ID,
string(eventData),
string(payloadEventData),
).Err()
}

// Remove a scheduled future event. Not cluster-safe.
// KEYS[1] - future event zset key
// KEYS[2] - future event key
// KEYS[3] - instance payload key
var removeFutureEventCmd = redis.NewScript(`
redis.call("ZREM", KEYS[1], KEYS[2])
local eventID = redis.call("HGET", KEYS[2], "id")
redis.call("HDEL", KEYS[3], eventID)
return redis.call("DEL", KEYS[2])
`)

// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
key := futureEventKey(instance, event.ScheduleEventID)
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)})
}
122 changes: 122 additions & 0 deletions backend/redis/events_future.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package redis

import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
redis "github.com/redis/go-redis/v9"
)

// Adds an event to be delivered in the future. Not cluster-safe.
// KEYS[1] - future event zset key
// KEYS[2] - future event key
// KEYS[3] - instance payload key
// ARGV[1] - timestamp/score for set
// ARGV[2] - Instance segment
// ARGV[3] - event id
// ARGV[4] - event data
// ARGV[5] - event payload
var addFutureEventCmd = redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4])
redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5])
return 0
`)

func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
eventData, err := marshalEventWithoutAttributes(event)
if err != nil {
return err
}

payloadEventData, err := json.Marshal(event.Attributes)
if err != nil {
return err
}

return addFutureEventCmd.Run(
ctx, p,
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)},
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
instanceSegment(instance),
event.ID,
string(eventData),
string(payloadEventData),
).Err()
}

// Remove a scheduled future event. Not cluster-safe.
// KEYS[1] - future event zset key
// KEYS[2] - future event key
// KEYS[3] - instance payload key
var removeFutureEventCmd = redis.NewScript(`
redis.call("ZREM", KEYS[1], KEYS[2])
local eventID = redis.call("HGET", KEYS[2], "id")
redis.call("HDEL", KEYS[3], eventID)
return redis.call("DEL", KEYS[2])
`)

// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
key := futureEventKey(instance, event.ScheduleEventID)
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)})
}

// Find all due future events. For each event:
// - Look up event data
// - Add to pending event stream for workflow instance
// - Try to queue workflow task for workflow instance
// - Remove event from future event set and delete event data
//
// KEYS[1] - future event set key
// KEYS[2] - workflow task queue stream
// KEYS[3] - workflow task queue set
// ARGV[1] - current timestamp for zrange
//
// Note: this does not work with Redis Cluster since not all keys are passed into the script.
var futureEventsCmd = redis.NewScript(`
-- Find events which should become visible now
local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE")
for i = 1, #events do
local instanceSegment = redis.call("HGET", events[i], "instance")

-- Add event to pending event stream
local eventData = redis.call("HGET", events[i], "event")
local pending_events_key = "pending-events:" .. instanceSegment
redis.call("XADD", pending_events_key, "*", "event", eventData)

-- Try to queue workflow task
local already_queued = redis.call("SADD", KEYS[3], instanceSegment)
if already_queued ~= 0 then
redis.call("XADD", KEYS[2], "*", "id", instanceSegment, "data", "")
end

-- Delete event hash data
redis.call("DEL", events[i])
redis.call("ZREM", KEYS[1], events[i])
end

return #events
`)

func scheduleFutureEvents(ctx context.Context, rb *redisBackend) error {
now := time.Now().UnixMilli()
nowStr := strconv.FormatInt(now, 10)

queueKeys := rb.workflowQueue.Keys()

if _, err := futureEventsCmd.Run(ctx, rb.rdb, []string{
futureEventsKey(),
queueKeys.StreamKey,
queueKeys.SetKey,
}, nowStr).Result(); err != nil && err != redis.Nil {
return fmt.Errorf("checking future events: %w", err)
}

return nil
}
33 changes: 4 additions & 29 deletions backend/redis/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ import (
)

func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
state, err := readInstance(ctx, rb.rdb, instanceKey(instance))
activeInstance, err := readActiveInstanceExecution(ctx, rb.rdb, instance.InstanceID)
if err != nil && err != backend.ErrInstanceNotFound {
return err
}

if state != nil {
if activeInstance != nil {
return backend.ErrInstanceAlreadyExists
}

p := rb.rdb.TxPipeline()

if err := createInstanceP(ctx, p, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
if err := createInstanceP(ctx, p, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil {
return err
}

Expand Down Expand Up @@ -145,7 +145,7 @@ type instanceState struct {
LastSequenceID int64 `json:"last_sequence_id,omitempty"`
}

func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *metadata.WorkflowMetadata, ignoreDuplicate bool) error {
func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *metadata.WorkflowMetadata) error {
key := instanceKey(instance)

createdAt := time.Now()
Expand Down Expand Up @@ -175,25 +175,6 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
return nil
}

func updateInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, state *instanceState) error {
key := instanceKey(instance)

b, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("marshaling instance state: %w", err)
}

p.Set(ctx, key, string(b), 0)

if state.State != core.WorkflowInstanceStateActive {
p.SRem(ctx, instancesActive(), instanceSegment(instance))
}

// CreatedAt does not change, so skip updating the instancesByCreation() ZSET

return nil
}

func readInstance(ctx context.Context, rdb redis.UniversalClient, instanceKey string) (*instanceState, error) {
p := rdb.Pipeline()

Expand Down Expand Up @@ -255,9 +236,3 @@ func setActiveInstanceExecutionP(ctx context.Context, p redis.Pipeliner, instanc

return p.Set(ctx, key, string(b), 0).Err()
}

func removeActiveInstanceExecutionP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance) error {
key := activeInstanceExecutionKey(instance.InstanceID)

return p.Del(ctx, key).Err()
}
Loading