Skip to content

Commit

Permalink
Fix encoding bug to index context header in search attributes (#6148)
Browse files Browse the repository at this point in the history
What changed?

json marshal raw string bytes before store in search attributes

Why?

Context Header stores the raw string bytes; but search attributes should store json strings rather than raw string bytes. Otherwise, it will cause unmarshal error in creating visibility message.

How did you test it?

unit test
  • Loading branch information
shijiesheng authored Jun 26, 2024
1 parent c7f6233 commit 34cfbb3
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 9 deletions.
7 changes: 6 additions & 1 deletion service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,12 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes())
// fail open to avoid blocking the task processing
if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
} else {
searchAttr = newSearchAttr
}
}
}
isCron := len(executionInfo.CronSchedule) > 0
Expand Down
19 changes: 17 additions & 2 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package task

import (
"context"
"encoding/json"
"math/rand"
"strconv"
"testing"
Expand Down Expand Up @@ -1735,6 +1736,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTask()
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Once().Return(nil)
Expand Down Expand Up @@ -1784,6 +1786,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWi
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Once().Return(nil)
Expand Down Expand Up @@ -1816,6 +1819,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttribu
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Once().Return(nil)
Expand Down Expand Up @@ -1855,6 +1859,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttribu
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Once().Return(nil)
Expand Down Expand Up @@ -1957,6 +1962,7 @@ func createAddDecisionTaskRequest(
}

func createRecordWorkflowExecutionStartedRequest(
t *testing.T,
domainName string,
startEvent *types.HistoryEvent,
transferTask Task,
Expand All @@ -1978,8 +1984,12 @@ func createRecordWorkflowExecutionStartedRequest(
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
contextValueJSONString, err := json.Marshal("contextValue")
if err != nil {
t.Fatal(err)
}
searchAttributes = map[string][]byte{
"Header.contextKey": []byte("contextValue"),
"Header.contextKey": contextValueJSONString,
}
}
return &persistence.RecordWorkflowExecutionStartedRequest{
Expand Down Expand Up @@ -2106,6 +2116,7 @@ func createTestChildWorkflowExecutionRequest(
}

func createUpsertWorkflowSearchAttributesRequest(
t *testing.T,
domainName string,
startEvent *types.HistoryEvent,
transferTask Task,
Expand All @@ -2128,8 +2139,12 @@ func createUpsertWorkflowSearchAttributesRequest(
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
contextValueJSONString, err := json.Marshal("contextValue")
if err != nil {
t.Fatal(err)
}
searchAttributes = map[string][]byte{
"Header.contextKey": []byte("contextValue"),
"Header.contextKey": contextValueJSONString,
}
}

Expand Down
7 changes: 6 additions & 1 deletion service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,12 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes())
// fail open to avoid blocking the task processing
if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
} else {
searchAttr = newSearchAttr
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions service/history/task/transfer_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTask(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Return(nil).Once()
Expand Down Expand Up @@ -855,6 +856,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskW
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Return(nil).Once()
Expand Down Expand Up @@ -890,6 +892,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttrib
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Return(nil).Once()
Expand Down Expand Up @@ -934,6 +937,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttrib
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Return(nil).Once()
Expand Down
14 changes: 9 additions & 5 deletions service/history/task/transfer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package task

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -399,7 +400,7 @@ func getWorkflowMemo(
return &types.Memo{Fields: memo}
}

func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allowedKeys map[string]interface{}) map[string][]byte {
func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allowedKeys map[string]interface{}) (map[string][]byte, error) {
for k, v := range context {
key := fmt.Sprintf(definition.HeaderFormat, k)
if _, ok := attr[key]; ok { // skip if key already exists
Expand All @@ -408,14 +409,17 @@ func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allo
if _, allowed := allowedKeys[key]; !allowed { // skip if not allowed
continue
}
val := make([]byte, len(v))
copy(val, v)
if attr == nil {
attr = make(map[string][]byte)
}
attr[key] = val
// context header are raw string bytes, need to be json encoded to be stored in search attributes
data, err := json.Marshal(string(v))
if err != nil {
return nil, fmt.Errorf("fail to json encoding context key %s, val %v: %w", k, v, err)
}
attr[key] = data
}
return attr
return attr, nil
}

func copySearchAttributes(
Expand Down

0 comments on commit 34cfbb3

Please sign in to comment.