From 34cfbb3fa24a56397a28991cef962987122c5537 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Wed, 26 Jun 2024 16:23:33 -0700 Subject: [PATCH] Fix encoding bug to index context header in search attributes (#6148) What changed? json marshal raw string bytes before store in search attributes Why? Context Header stores the raw string bytes; but search attributes should store json strings rather than raw string bytes. Otherwise, it will cause unmarshal error in creating visibility message. How did you test it? unit test --- .../task/transfer_active_task_executor.go | 7 ++++++- .../transfer_active_task_executor_test.go | 19 +++++++++++++++++-- .../task/transfer_standby_task_executor.go | 7 ++++++- .../transfer_standby_task_executor_test.go | 4 ++++ .../task/transfer_task_executor_base.go | 14 +++++++++----- 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 3bd38c67cd1..97875bd3cb6 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -1049,7 +1049,12 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper( searchAttr := copySearchAttributes(executionInfo.SearchAttributes) if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) { if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil { - searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()) + // fail open to avoid blocking the task processing + if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil { + t.logger.Error("failed to add headers to search attributes", tag.Error(err)) + } else { + searchAttr = newSearchAttr + } } } isCron := len(executionInfo.CronSchedule) > 0 diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 9d544bca472..e173017b6da 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -22,6 +22,7 @@ package task import ( "context" + "encoding/json" "math/rand" "strconv" "testing" @@ -1735,6 +1736,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTask() "RecordWorkflowExecutionStarted", mock.Anything, createRecordWorkflowExecutionStartedRequest( + s.T(), s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), false), ).Once().Return(nil) @@ -1784,6 +1786,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWi "RecordWorkflowExecutionStarted", mock.Anything, createRecordWorkflowExecutionStartedRequest( + s.T(), s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), true), ).Once().Return(nil) @@ -1816,6 +1819,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttribu "UpsertWorkflowExecution", mock.Anything, createUpsertWorkflowSearchAttributesRequest( + s.T(), s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), false), ).Once().Return(nil) @@ -1855,6 +1859,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttribu "UpsertWorkflowExecution", mock.Anything, createUpsertWorkflowSearchAttributesRequest( + s.T(), s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), true), ).Once().Return(nil) @@ -1957,6 +1962,7 @@ func createAddDecisionTaskRequest( } func createRecordWorkflowExecutionStartedRequest( + t *testing.T, domainName string, startEvent *types.HistoryEvent, transferTask Task, @@ -1978,8 +1984,12 @@ func createRecordWorkflowExecutionStartedRequest( } var searchAttributes map[string][]byte if enableContextHeaderInVisibility { + contextValueJSONString, err := json.Marshal("contextValue") + if err != nil { + t.Fatal(err) + } searchAttributes = map[string][]byte{ - "Header.contextKey": []byte("contextValue"), + "Header.contextKey": contextValueJSONString, } } return &persistence.RecordWorkflowExecutionStartedRequest{ @@ -2106,6 +2116,7 @@ func createTestChildWorkflowExecutionRequest( } func createUpsertWorkflowSearchAttributesRequest( + t *testing.T, domainName string, startEvent *types.HistoryEvent, transferTask Task, @@ -2128,8 +2139,12 @@ func createUpsertWorkflowSearchAttributesRequest( } var searchAttributes map[string][]byte if enableContextHeaderInVisibility { + contextValueJSONString, err := json.Marshal("contextValue") + if err != nil { + t.Fatal(err) + } searchAttributes = map[string][]byte{ - "Header.contextKey": []byte("contextValue"), + "Header.contextKey": contextValueJSONString, } } diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index 11488f375fd..8bc1015bd5b 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -495,7 +495,12 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper searchAttr := copySearchAttributes(executionInfo.SearchAttributes) if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) { if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil { - searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()) + // fail open to avoid blocking the task processing + if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil { + t.logger.Error("failed to add headers to search attributes", tag.Error(err)) + } else { + searchAttr = newSearchAttr + } } } diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index 6a5fe067ae7..1c96ab85794 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -801,6 +801,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTask( "RecordWorkflowExecutionStarted", mock.Anything, createRecordWorkflowExecutionStartedRequest( + s.T(), s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), false), ).Return(nil).Once() @@ -855,6 +856,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskW "RecordWorkflowExecutionStarted", mock.Anything, createRecordWorkflowExecutionStartedRequest( + s.T(), s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), true), ).Return(nil).Once() @@ -890,6 +892,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttrib "UpsertWorkflowExecution", mock.Anything, createUpsertWorkflowSearchAttributesRequest( + s.T(), s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), false), ).Return(nil).Once() @@ -934,6 +937,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttrib "UpsertWorkflowExecution", mock.Anything, createUpsertWorkflowSearchAttributesRequest( + s.T(), s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), true), ).Return(nil).Once() diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index ae3ba9d00f1..002e7407a4b 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -22,6 +22,7 @@ package task import ( "context" + "encoding/json" "fmt" "time" @@ -399,7 +400,7 @@ func getWorkflowMemo( return &types.Memo{Fields: memo} } -func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allowedKeys map[string]interface{}) map[string][]byte { +func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allowedKeys map[string]interface{}) (map[string][]byte, error) { for k, v := range context { key := fmt.Sprintf(definition.HeaderFormat, k) if _, ok := attr[key]; ok { // skip if key already exists @@ -408,14 +409,17 @@ func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allo if _, allowed := allowedKeys[key]; !allowed { // skip if not allowed continue } - val := make([]byte, len(v)) - copy(val, v) if attr == nil { attr = make(map[string][]byte) } - attr[key] = val + // context header are raw string bytes, need to be json encoded to be stored in search attributes + data, err := json.Marshal(string(v)) + if err != nil { + return nil, fmt.Errorf("fail to json encoding context key %s, val %v: %w", k, v, err) + } + attr[key] = data } - return attr + return attr, nil } func copySearchAttributes(