Skip to content

Commit

Permalink
Store event payloads in separate key
Browse files Browse the repository at this point in the history
  • Loading branch information
cschleiden committed Nov 9, 2023
1 parent d5f0a19 commit 534a5c9
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 28 deletions.
85 changes: 74 additions & 11 deletions backend/redis/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,66 @@ package redis
import (
"context"
"encoding/json"
"fmt"
"strconv"

"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
"github.com/redis/go-redis/v9"
)

type eventWithoutAttributes struct {
*history.Event
}

func (e *eventWithoutAttributes) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
*history.Event
Attributes interface{} `json:"attr"`
}{
Event: e.Event,
Attributes: nil,
})
}

func marshalEventWithoutAttributes(event *history.Event) (string, error) {
data, err := json.Marshal(&eventWithoutAttributes{event})
if err != nil {
return "", err
}

return string(data), nil
}

// KEYS[1..n] - payload keys
// ARGV[1..n] - payload values
var addPayloadsCmd = redis.NewScript(`
for i = 1, #ARGV do
redis.pcall("SET", KEYS[i], ARGV[i], "NX")
end
return 0
`)

func addEventPayloads(ctx context.Context, p redis.Pipeliner, events []*history.Event) error {
keys := make([]string, 0)
values := make([]interface{}, 0)

for _, event := range events {
payload, err := json.Marshal(event.Attributes)
if err != nil {
return fmt.Errorf("marshaling event payload: %w", err)
}

keys = append(keys, payloadKey(event.ID))
values = append(values, string(payload))
}

return addPayloadsCmd.Run(ctx, p, keys, values...).Err()
}

func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, event *history.Event) error {
eventData, err := json.Marshal(event)
eventData, err := marshalEventWithoutAttributes(event)
if err != nil {
return err
}
Expand All @@ -37,10 +88,10 @@ var addEventsToStreamCmd = redis.NewScript(`
return msgID
`)

func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error {
func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error {
eventsData := make([]string, 0)
for _, event := range events {
eventData, err := json.Marshal(event)
eventData, err := marshalEventWithoutAttributes(event)
if err != nil {
return err
}
Expand All @@ -56,37 +107,49 @@ func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey
return nil
}

// Adds an event to be delivered in the future. Not cluster-safe.
// KEYS[1] - future event zset key
// KEYS[2] - future event key
// KEYS[3] - future event payload key
// ARGV[1] - timestamp
// ARGV[2] - Instance segment
// ARGV[3] - event payload
// ARGV[3] - event data
// ARGV[4] - event payload
var addFutureEventCmd = redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
return redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3])
redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3], "payload", KEYS[3])
redis.call("SET", KEYS[3], ARGV[4], "NX")
return 0
`)

func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
eventData, err := json.Marshal(event)
eventData, err := marshalEventWithoutAttributes(event)
if err != nil {
return err
}

addFutureEventCmd.Run(
payloadEventData, err := json.Marshal(event.Attributes)
if err != nil {
return err
}

return addFutureEventCmd.Run(
ctx, p,
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID)},
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(event.ID)},
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
instanceSegment(instance),
string(eventData),
)

return nil
string(payloadEventData),
).Err()
}

// Remove a scheduled future event. Not cluster-safe.
// KEYS[1] - future event zset key
// KEYS[2] - future event key
var removeFutureEventCmd = redis.NewScript(`
redis.call("ZREM", KEYS[1], KEYS[2])
local k = redis.call("HGET", KEYS[2], "payload")
redis.call("DEL", k)
return redis.call("DEL", KEYS[2])
`)

Expand Down
31 changes: 20 additions & 11 deletions backend/redis/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,14 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
return err
}

// Create event stream
eventData, err := json.Marshal(event)
if err != nil {
return err
// Create event stream with initial event
if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil {
return fmt.Errorf("adding event payloads: %w", err)
}

p.XAdd(ctx, &redis.XAddArgs{
Stream: pendingEventsKey(instance),
ID: "*",
Values: map[string]interface{}{
"event": string(eventData),
},
})
if err := addEventToStreamP(ctx, p, pendingEventsKey(instance), event); err != nil {
return fmt.Errorf("adding event to stream: %w", err)
}

// Queue workflow instance task
if err := rb.workflowQueue.Enqueue(ctx, p, instanceSegment(instance), nil); err != nil {
Expand Down Expand Up @@ -70,16 +65,30 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
return nil, err
}

payloadKeys := make([]string, 0, len(msgs))
var events []*history.Event
for _, msg := range msgs {
var event *history.Event
if err := json.Unmarshal([]byte(msg.Values["event"].(string)), &event); err != nil {
return nil, fmt.Errorf("unmarshaling event: %w", err)
}

payloadKeys = append(payloadKeys, payloadKey(event.ID))
events = append(events, event)
}

res, err := rb.rdb.MGet(ctx, payloadKeys...).Result()
if err != nil {
return nil, fmt.Errorf("reading payloads: %w", err)
}

for i, event := range events {
event.Attributes, err = history.DeserializeAttributes(event.Type, []byte(res[i].(string)))
if err != nil {
return nil, fmt.Errorf("deserializing attributes for event %v: %w", event.Type, err)
}
}

return events, nil
}

Expand Down
4 changes: 4 additions & 0 deletions backend/redis/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func futureEventsKey() string {
func futureEventKey(instance *core.WorkflowInstance, scheduleEventID int64) string {
return fmt.Sprintf("future-event:%v:%v:%v", instance.InstanceID, instance.ExecutionID, scheduleEventID)
}

func payloadKey(eventID string) string {
return fmt.Sprintf("payload:%v", eventID)
}
1 change: 1 addition & 0 deletions backend/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
"requeueInstanceCmd": requeueInstanceCmd.Load(ctx, rb.rdb),
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
"expireInstanceCmd": expireCmd.Load(ctx, rb.rdb),
"addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb),
}
for name, cmd := range cmds {
// fmt.Println(name, cmd.Val())
Expand Down
43 changes: 37 additions & 6 deletions backend/redis/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowT
return nil, fmt.Errorf("reading event stream: %w", err)
}

payloadKeys := make([]string, 0, len(msgs))
newEvents := make([]*history.Event, 0, len(msgs))
for _, msg := range msgs {
var event *history.Event
Expand All @@ -98,9 +99,25 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowT
return nil, fmt.Errorf("unmarshaling event: %w", err)
}

payloadKeys = append(payloadKeys, payloadKey(event.ID))
newEvents = append(newEvents, event)
}

// Fetch event payloads
if len(payloadKeys) > 0 {
res, err := rb.rdb.MGet(ctx, payloadKeys...).Result()
if err != nil {
return nil, fmt.Errorf("reading payloads: %w", err)
}

for i, event := range newEvents {
event.Attributes, err = history.DeserializeAttributes(event.Type, []byte(res[i].(string)))
if err != nil {
return nil, fmt.Errorf("deserializing attributes for event %v: %w", event.Type, err)
}
}
}

return &backend.WorkflowTask{
ID: instanceTask.TaskID,
WorkflowInstance: instanceState.Instance,
Expand Down Expand Up @@ -161,11 +178,15 @@ func (rb *redisBackend) CompleteWorkflowTask(

// Check-point the workflow. We guarantee that no other worker is working on this workflow instance at this point via the
// task queue, so we don't need to WATCH the keys, we just need to make sure all commands are executed atomically to prevent
// a worker crashing in the middle of this execution.
// bad state if a worker crashes in the middle of this execution.
p := rb.rdb.TxPipeline()

// Add executed events to the history
if err := addEventsToHistoryStreamP(ctx, p, historyKey(instance), executedEvents); err != nil {
if err := addEventPayloads(ctx, p, executedEvents); err != nil {
return fmt.Errorf("adding event payloads: %w", err)
}

if err := addEventsToStreamP(ctx, p, historyKey(instance), executedEvents); err != nil {
return fmt.Errorf("serializing : %w", err)
}

Expand All @@ -179,7 +200,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
// Schedule timers
for _, timerEvent := range timerEvents {
if err := addFutureEventP(ctx, p, instance, timerEvent); err != nil {
return err
return fmt.Errorf("adding future event: %w", err)
}
}

Expand All @@ -199,8 +220,12 @@ func (rb *redisBackend) CompleteWorkflowTask(
}

// Add pending event to stream
if err := addEventPayloads(ctx, p, []*history.Event{m.HistoryEvent}); err != nil {
return fmt.Errorf("adding event payloads: %w", err)
}

if err := addEventToStreamP(ctx, p, pendingEventsKey(&targetInstance), m.HistoryEvent); err != nil {
return err
return fmt.Errorf("adding event to stream: %w", err)
}
}

Expand Down Expand Up @@ -243,7 +268,9 @@ func (rb *redisBackend) CompleteWorkflowTask(
// Remove executed pending events
if task.CustomData != nil {
lastPendingEventMessageID := task.CustomData.(string)
removePendingEventsCmd.Run(ctx, p, []string{pendingEventsKey(instance)}, lastPendingEventMessageID)
if err := removePendingEventsCmd.Run(ctx, p, []string{pendingEventsKey(instance)}, lastPendingEventMessageID).Err(); err != nil {
return fmt.Errorf("removing pending events: %w", err)
}
}

// Complete workflow task and unlock instance.
Expand All @@ -268,7 +295,7 @@ func (rb *redisBackend) CompleteWorkflowTask(

for _, cmd := range executedCmds {
if cmdErr := cmd.Err(); cmdErr != nil {
rb.Logger().Debug("redis command error", log.NamespaceKey+".redis.cmd", cmd.FullName(), log.NamespaceKey+".redis.cmdErr", cmdErr.Error())
rb.Logger().Debug("redis command error", log.NamespaceKey+".redis.cmd", cmd.Name(), "cmdString", cmd.String(), log.NamespaceKey+".redis.cmdErr", cmdErr.Error())
}
}

Expand Down Expand Up @@ -300,6 +327,10 @@ func (rb *redisBackend) CompleteWorkflowTask(

func (rb *redisBackend) addWorkflowInstanceEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
// Add event to pending events for instance
if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil {
return err
}

if err := addEventToStreamP(ctx, p, pendingEventsKey(instance), event); err != nil {
return err
}
Expand Down

0 comments on commit 534a5c9

Please sign in to comment.