diff --git a/aggregatedpool/handler.go b/aggregatedpool/handler.go index c1662ae4..f4724a96 100644 --- a/aggregatedpool/handler.go +++ b/aggregatedpool/handler.go @@ -187,6 +187,12 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error { WorkflowTaskTimeout: command.Options.WorkflowTaskTimeout, }) + case *internal.UpsertWorkflowSearchAttributes: + err := wp.env.UpsertSearchAttributes(command.SearchAttributes) + if err != nil { + return errors.E(op, err) + } + case *internal.SignalExternalWorkflow: wp.env.SignalExternalWorkflow( command.Namespace, diff --git a/internal/protocol.go b/internal/protocol.go index dfa90789..deb7c4ed 100644 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -29,11 +29,12 @@ const ( executeChildWorkflowCommand = "ExecuteChildWorkflow" getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" - newTimerCommand = "NewTimer" - sideEffectCommand = "SideEffect" - getVersionCommand = "GetVersion" - completeWorkflowCommand = "CompleteWorkflow" - continueAsNewCommand = "ContinueAsNew" + newTimerCommand = "NewTimer" + sideEffectCommand = "SideEffect" + getVersionCommand = "GetVersion" + completeWorkflowCommand = "CompleteWorkflow" + continueAsNewCommand = "ContinueAsNew" + upsertWorkflowSearchAttributesCommand = "UpsertWorkflowSearchAttributes" signalExternalWorkflowCommand = "SignalExternalWorkflow" cancelExternalWorkflowCommand = "CancelExternalWorkflow" @@ -230,6 +231,11 @@ type ContinueAsNew struct { } `json:"options"` } +// UpsertWorkflowSearchAttributes allows to upsert search attributes +type UpsertWorkflowSearchAttributes struct { + SearchAttributes map[string]any `json:"searchAttributes"` +} + // SignalExternalWorkflow sends signal to external workflow. type SignalExternalWorkflow struct { Namespace string `json:"namespace"` @@ -379,6 +385,8 @@ func CommandName(cmd any) (string, error) { return completeWorkflowCommand, nil case ContinueAsNew, *ContinueAsNew: return continueAsNewCommand, nil + case UpsertWorkflowSearchAttributes, *UpsertWorkflowSearchAttributes: + return upsertWorkflowSearchAttributesCommand, nil case SignalExternalWorkflow, *SignalExternalWorkflow: return signalExternalWorkflowCommand, nil case CancelExternalWorkflow, *CancelExternalWorkflow: @@ -447,6 +455,9 @@ func InitCommand(name string) (any, error) { case continueAsNewCommand: return &ContinueAsNew{}, nil + case upsertWorkflowSearchAttributesCommand: + return &UpsertWorkflowSearchAttributes{}, nil + case signalExternalWorkflowCommand: return &SignalExternalWorkflow{}, nil