From d3093151f21184b94015dbb94bfd4174040b4f77 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Wed, 8 Nov 2023 18:31:49 -0800 Subject: [PATCH] Store event payloads in separate key --- backend/redis/events.go | 85 ++++++++++++++++++++++++++++++++++----- backend/redis/instance.go | 31 +++++++++----- backend/redis/keys.go | 4 ++ backend/redis/redis.go | 1 + backend/redis/workflow.go | 43 +++++++++++++++++--- 5 files changed, 136 insertions(+), 28 deletions(-) diff --git a/backend/redis/events.go b/backend/redis/events.go index c7acc4f6..d4fb21f8 100644 --- a/backend/redis/events.go +++ b/backend/redis/events.go @@ -3,6 +3,7 @@ package redis import ( "context" "encoding/json" + "fmt" "strconv" "github.com/cschleiden/go-workflows/backend/history" @@ -10,8 +11,58 @@ import ( "github.com/redis/go-redis/v9" ) +type eventWithoutAttributes struct { + *history.Event +} + +func (e *eventWithoutAttributes) MarshalJSON() ([]byte, error) { + return json.Marshal(&struct { + *history.Event + Attributes interface{} `json:"attr"` + }{ + Event: e.Event, + Attributes: nil, + }) +} + +func marshalEventWithoutAttributes(event *history.Event) (string, error) { + data, err := json.Marshal(&eventWithoutAttributes{event}) + if err != nil { + return "", err + } + + return string(data), nil +} + +// KEYS[1..n] - payload keys +// ARGV[1..n] - payload values +var addPayloadsCmd = redis.NewScript(` + for i = 1, #ARGV do + redis.pcall("SET", KEYS[i], ARGV[i], "NX") + end + + return 0 +`) + +func addEventPayloads(ctx context.Context, p redis.Pipeliner, events []*history.Event) error { + keys := make([]string, 0) + values := make([]interface{}, 0) + + for _, event := range events { + payload, err := json.Marshal(event.Attributes) + if err != nil { + return fmt.Errorf("marshaling event payload: %w", err) + } + + keys = append(keys, payloadKey(event.ID)) + values = append(values, string(payload)) + } + + return addPayloadsCmd.Run(ctx, p, keys, values...).Err() +} + func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, event *history.Event) error { - eventData, err := json.Marshal(event) + eventData, err := marshalEventWithoutAttributes(event) if err != nil { return err } @@ -37,10 +88,10 @@ var addEventsToStreamCmd = redis.NewScript(` return msgID `) -func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error { +func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error { eventsData := make([]string, 0) for _, event := range events { - eventData, err := json.Marshal(event) + eventData, err := marshalEventWithoutAttributes(event) if err != nil { return err } @@ -56,37 +107,49 @@ func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey return nil } +// Adds an event to be delivered in the future. Not cluster-safe. // KEYS[1] - future event zset key // KEYS[2] - future event key +// KEYS[3] - future event payload key // ARGV[1] - timestamp // ARGV[2] - Instance segment -// ARGV[3] - event payload +// ARGV[3] - event data +// ARGV[4] - event payload var addFutureEventCmd = redis.NewScript(` redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) - return redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3]) + redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3], "payload", KEYS[3]) + redis.call("SET", KEYS[3], ARGV[4], "NX") + return 0 `) func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error { - eventData, err := json.Marshal(event) + eventData, err := marshalEventWithoutAttributes(event) if err != nil { return err } - addFutureEventCmd.Run( + payloadEventData, err := json.Marshal(event.Attributes) + if err != nil { + return err + } + + return addFutureEventCmd.Run( ctx, p, - []string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID)}, + []string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(event.ID)}, strconv.FormatInt(event.VisibleAt.UnixMilli(), 10), instanceSegment(instance), string(eventData), - ) - - return nil + string(payloadEventData), + ).Err() } +// Remove a scheduled future event. Not cluster-safe. // KEYS[1] - future event zset key // KEYS[2] - future event key var removeFutureEventCmd = redis.NewScript(` redis.call("ZREM", KEYS[1], KEYS[2]) + local k = redis.call("HGET", KEYS[2], "payload") + redis.call("DEL", k) return redis.call("DEL", KEYS[2]) `) diff --git a/backend/redis/instance.go b/backend/redis/instance.go index f35970b8..43229384 100644 --- a/backend/redis/instance.go +++ b/backend/redis/instance.go @@ -30,19 +30,14 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo return err } - // Create event stream - eventData, err := json.Marshal(event) - if err != nil { - return err + // Create event stream with initial event + if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil { + return fmt.Errorf("adding event payloads: %w", err) } - p.XAdd(ctx, &redis.XAddArgs{ - Stream: pendingEventsKey(instance), - ID: "*", - Values: map[string]interface{}{ - "event": string(eventData), - }, - }) + if err := addEventToStreamP(ctx, p, pendingEventsKey(instance), event); err != nil { + return fmt.Errorf("adding event to stream: %w", err) + } // Queue workflow instance task if err := rb.workflowQueue.Enqueue(ctx, p, instanceSegment(instance), nil); err != nil { @@ -70,6 +65,7 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance return nil, err } + payloadKeys := make([]string, 0, len(msgs)) var events []*history.Event for _, msg := range msgs { var event *history.Event @@ -77,9 +73,22 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance return nil, fmt.Errorf("unmarshaling event: %w", err) } + payloadKeys = append(payloadKeys, payloadKey(event.ID)) events = append(events, event) } + res, err := rb.rdb.MGet(ctx, payloadKeys...).Result() + if err != nil { + return nil, fmt.Errorf("reading payloads: %w", err) + } + + for i, event := range events { + event.Attributes, err = history.DeserializeAttributes(event.Type, []byte(res[i].(string))) + if err != nil { + return nil, fmt.Errorf("deserializing attributes for event %v: %w", event.Type, err) + } + } + return events, nil } diff --git a/backend/redis/keys.go b/backend/redis/keys.go index 01002b85..492d32f6 100644 --- a/backend/redis/keys.go +++ b/backend/redis/keys.go @@ -56,3 +56,7 @@ func futureEventsKey() string { func futureEventKey(instance *core.WorkflowInstance, scheduleEventID int64) string { return fmt.Sprintf("future-event:%v:%v:%v", instance.InstanceID, instance.ExecutionID, scheduleEventID) } + +func payloadKey(eventID string) string { + return fmt.Sprintf("payload:%v", eventID) +} diff --git a/backend/redis/redis.go b/backend/redis/redis.go index edaaf099..6ed8c27d 100644 --- a/backend/redis/redis.go +++ b/backend/redis/redis.go @@ -60,6 +60,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) ( "requeueInstanceCmd": requeueInstanceCmd.Load(ctx, rb.rdb), "deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb), "expireInstanceCmd": expireCmd.Load(ctx, rb.rdb), + "addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb), } for name, cmd := range cmds { // fmt.Println(name, cmd.Val()) diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index 5598aabb..d925ae04 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -90,6 +90,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowT return nil, fmt.Errorf("reading event stream: %w", err) } + payloadKeys := make([]string, 0, len(msgs)) newEvents := make([]*history.Event, 0, len(msgs)) for _, msg := range msgs { var event *history.Event @@ -98,9 +99,25 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowT return nil, fmt.Errorf("unmarshaling event: %w", err) } + payloadKeys = append(payloadKeys, payloadKey(event.ID)) newEvents = append(newEvents, event) } + // Fetch event payloads + if len(payloadKeys) > 0 { + res, err := rb.rdb.MGet(ctx, payloadKeys...).Result() + if err != nil { + return nil, fmt.Errorf("reading payloads: %w", err) + } + + for i, event := range newEvents { + event.Attributes, err = history.DeserializeAttributes(event.Type, []byte(res[i].(string))) + if err != nil { + return nil, fmt.Errorf("deserializing attributes for event %v: %w", event.Type, err) + } + } + } + return &backend.WorkflowTask{ ID: instanceTask.TaskID, WorkflowInstance: instanceState.Instance, @@ -161,11 +178,15 @@ func (rb *redisBackend) CompleteWorkflowTask( // Check-point the workflow. We guarantee that no other worker is working on this workflow instance at this point via the // task queue, so we don't need to WATCH the keys, we just need to make sure all commands are executed atomically to prevent - // a worker crashing in the middle of this execution. + // bad state if a worker crashes in the middle of this execution. p := rb.rdb.TxPipeline() // Add executed events to the history - if err := addEventsToHistoryStreamP(ctx, p, historyKey(instance), executedEvents); err != nil { + if err := addEventPayloads(ctx, p, executedEvents); err != nil { + return fmt.Errorf("adding event payloads: %w", err) + } + + if err := addEventsToStreamP(ctx, p, historyKey(instance), executedEvents); err != nil { return fmt.Errorf("serializing : %w", err) } @@ -179,7 +200,7 @@ func (rb *redisBackend) CompleteWorkflowTask( // Schedule timers for _, timerEvent := range timerEvents { if err := addFutureEventP(ctx, p, instance, timerEvent); err != nil { - return err + return fmt.Errorf("adding future event: %w", err) } } @@ -199,8 +220,12 @@ func (rb *redisBackend) CompleteWorkflowTask( } // Add pending event to stream + if err := addEventPayloads(ctx, p, []*history.Event{m.HistoryEvent}); err != nil { + return fmt.Errorf("adding event payloads: %w", err) + } + if err := addEventToStreamP(ctx, p, pendingEventsKey(&targetInstance), m.HistoryEvent); err != nil { - return err + return fmt.Errorf("adding event to stream: %w", err) } } @@ -243,7 +268,9 @@ func (rb *redisBackend) CompleteWorkflowTask( // Remove executed pending events if task.CustomData != nil { lastPendingEventMessageID := task.CustomData.(string) - removePendingEventsCmd.Run(ctx, p, []string{pendingEventsKey(instance)}, lastPendingEventMessageID) + if err := removePendingEventsCmd.Run(ctx, p, []string{pendingEventsKey(instance)}, lastPendingEventMessageID).Err(); err != nil { + return fmt.Errorf("removing pending events: %w", err) + } } // Complete workflow task and unlock instance. @@ -268,7 +295,7 @@ func (rb *redisBackend) CompleteWorkflowTask( for _, cmd := range executedCmds { if cmdErr := cmd.Err(); cmdErr != nil { - rb.Logger().Debug("redis command error", log.NamespaceKey+".redis.cmd", cmd.FullName(), log.NamespaceKey+".redis.cmdErr", cmdErr.Error()) + rb.Logger().Debug("redis command error", log.NamespaceKey+".redis.cmd", cmd.Name(), "cmdString", cmd.String(), log.NamespaceKey+".redis.cmdErr", cmdErr.Error()) } } @@ -300,6 +327,10 @@ func (rb *redisBackend) CompleteWorkflowTask( func (rb *redisBackend) addWorkflowInstanceEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error { // Add event to pending events for instance + if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil { + return err + } + if err := addEventToStreamP(ctx, p, pendingEventsKey(instance), event); err != nil { return err }