diff --git a/backend/redis/events.go b/backend/redis/events.go index d4fb21f8..230c8834 100644 --- a/backend/redis/events.go +++ b/backend/redis/events.go @@ -34,19 +34,18 @@ func marshalEventWithoutAttributes(event *history.Event) (string, error) { return string(data), nil } -// KEYS[1..n] - payload keys +// KEYS[1 - payload key // ARGV[1..n] - payload values var addPayloadsCmd = redis.NewScript(` - for i = 1, #ARGV do - redis.pcall("SET", KEYS[i], ARGV[i], "NX") + for i = 1, #ARGV, 2 do + redis.pcall("HSETNX", KEYS[1], ARGV[i], ARGV[i+1]) end return 0 `) -func addEventPayloads(ctx context.Context, p redis.Pipeliner, events []*history.Event) error { - keys := make([]string, 0) - values := make([]interface{}, 0) +func addEventPayloadsP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, events []*history.Event) error { + args := make([]interface{}, 0) for _, event := range events { payload, err := json.Marshal(event.Attributes) @@ -54,11 +53,10 @@ func addEventPayloads(ctx context.Context, p redis.Pipeliner, events []*history. return fmt.Errorf("marshaling event payload: %w", err) } - keys = append(keys, payloadKey(event.ID)) - values = append(values, string(payload)) + args = append(args, event.ID, string(payload)) } - return addPayloadsCmd.Run(ctx, p, keys, values...).Err() + return addPayloadsCmd.Run(ctx, p, []string{payloadKey(instance)}, args...).Err() } func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, event *history.Event) error { @@ -110,15 +108,16 @@ func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string // Adds an event to be delivered in the future. Not cluster-safe. // KEYS[1] - future event zset key // KEYS[2] - future event key -// KEYS[3] - future event payload key +// KEYS[3] - instance payload key // ARGV[1] - timestamp // ARGV[2] - Instance segment -// ARGV[3] - event data -// ARGV[4] - event payload +// ARGV[3] - event id +// ARGV[4] - event data +// ARGV[5] - event payload var addFutureEventCmd = redis.NewScript(` redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) - redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3], "payload", KEYS[3]) - redis.call("SET", KEYS[3], ARGV[4], "NX") + redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4]) + redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5]) return 0 `) @@ -135,9 +134,10 @@ func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.Work return addFutureEventCmd.Run( ctx, p, - []string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(event.ID)}, + []string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)}, strconv.FormatInt(event.VisibleAt.UnixMilli(), 10), instanceSegment(instance), + event.ID, string(eventData), string(payloadEventData), ).Err() @@ -146,15 +146,16 @@ func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.Work // Remove a scheduled future event. Not cluster-safe. // KEYS[1] - future event zset key // KEYS[2] - future event key +// KEYS[3] - instance payload key var removeFutureEventCmd = redis.NewScript(` redis.call("ZREM", KEYS[1], KEYS[2]) - local k = redis.call("HGET", KEYS[2], "payload") - redis.call("DEL", k) + local eventID = redis.call("HGET", KEYS[2], "id") + redis.call("HDEL", KEYS[3], eventID) return redis.call("DEL", KEYS[2]) `) // removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) { key := futureEventKey(instance, event.ScheduleEventID) - removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key}) + removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)}) } diff --git a/backend/redis/instance.go b/backend/redis/instance.go index 43229384..6eddfe2a 100644 --- a/backend/redis/instance.go +++ b/backend/redis/instance.go @@ -31,7 +31,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo } // Create event stream with initial event - if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil { + if err := addEventPayloadsP(ctx, p, instance, []*history.Event{event}); err != nil { return fmt.Errorf("adding event payloads: %w", err) } @@ -73,11 +73,11 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance return nil, fmt.Errorf("unmarshaling event: %w", err) } - payloadKeys = append(payloadKeys, payloadKey(event.ID)) + payloadKeys = append(payloadKeys, event.ID) events = append(events, event) } - res, err := rb.rdb.MGet(ctx, payloadKeys...).Result() + res, err := rb.rdb.HMGet(ctx, payloadKey(instance), payloadKeys...).Result() if err != nil { return nil, fmt.Errorf("reading payloads: %w", err) } diff --git a/backend/redis/keys.go b/backend/redis/keys.go index 492d32f6..9a3652e0 100644 --- a/backend/redis/keys.go +++ b/backend/redis/keys.go @@ -57,6 +57,6 @@ func futureEventKey(instance *core.WorkflowInstance, scheduleEventID int64) stri return fmt.Sprintf("future-event:%v:%v:%v", instance.InstanceID, instance.ExecutionID, scheduleEventID) } -func payloadKey(eventID string) string { - return fmt.Sprintf("payload:%v", eventID) +func payloadKey(instance *core.WorkflowInstance) string { + return fmt.Sprintf("payload:%v", instanceSegment(instance)) } diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index d925ae04..dcaf8b09 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -99,13 +99,13 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowT return nil, fmt.Errorf("unmarshaling event: %w", err) } - payloadKeys = append(payloadKeys, payloadKey(event.ID)) + payloadKeys = append(payloadKeys, event.ID) newEvents = append(newEvents, event) } // Fetch event payloads if len(payloadKeys) > 0 { - res, err := rb.rdb.MGet(ctx, payloadKeys...).Result() + res, err := rb.rdb.HMGet(ctx, payloadKey(instanceState.Instance), payloadKeys...).Result() if err != nil { return nil, fmt.Errorf("reading payloads: %w", err) } @@ -182,7 +182,7 @@ func (rb *redisBackend) CompleteWorkflowTask( p := rb.rdb.TxPipeline() // Add executed events to the history - if err := addEventPayloads(ctx, p, executedEvents); err != nil { + if err := addEventPayloadsP(ctx, p, instance, executedEvents); err != nil { return fmt.Errorf("adding event payloads: %w", err) } @@ -220,7 +220,7 @@ func (rb *redisBackend) CompleteWorkflowTask( } // Add pending event to stream - if err := addEventPayloads(ctx, p, []*history.Event{m.HistoryEvent}); err != nil { + if err := addEventPayloadsP(ctx, p, &targetInstance, []*history.Event{m.HistoryEvent}); err != nil { return fmt.Errorf("adding event payloads: %w", err) } @@ -327,7 +327,7 @@ func (rb *redisBackend) CompleteWorkflowTask( func (rb *redisBackend) addWorkflowInstanceEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error { // Add event to pending events for instance - if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil { + if err := addEventPayloadsP(ctx, p, instance, []*history.Event{event}); err != nil { return err }