Skip to content

Commit

Permalink
Add update time to ES visibility requests and records (#CDNC-2567)
Browse files Browse the repository at this point in the history
  • Loading branch information
neil-xie committed Aug 29, 2022
1 parent 854fc59 commit faeb784
Show file tree
Hide file tree
Showing 20 changed files with 62 additions and 0 deletions.
2 changes: 2 additions & 0 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
TaskList = "TaskList"
IsCron = "IsCron"
NumClusters = "NumClusters"
UpdateTime = "UpdateTime"
CustomDomain = "CustomDomain" // to support batch workflow
Operator = "Operator" // to support batch workflow

Expand Down Expand Up @@ -100,6 +101,7 @@ var systemIndexedKeys = map[string]interface{}{
TaskList: shared.IndexedValueTypeKeyword,
IsCron: shared.IndexedValueTypeBool,
NumClusters: shared.IndexedValueTypeInt,
UpdateTime: shared.IndexedValueTypeInt,
}

// IsSystemIndexedKey return true is key is system added
Expand Down
3 changes: 3 additions & 0 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,9 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
NumClusters: source.NumClusters,
SearchAttributes: source.Attr,
}
if source.UpdateTime != 0 {
record.UpdateTime = time.Unix(0, source.UpdateTime)
}
if source.CloseTime != 0 {
record.CloseTime = time.Unix(0, source.CloseTime)
record.Status = thrift.ToWorkflowExecutionCloseStatus(&source.CloseStatus)
Expand Down
3 changes: 3 additions & 0 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,9 @@ func (c *elasticV7) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
NumClusters: source.NumClusters,
SearchAttributes: source.Attr,
}
if source.UpdateTime != 0 {
record.UpdateTime = time.Unix(0, source.UpdateTime)
}
if source.CloseTime != 0 {
record.CloseTime = time.Unix(0, source.CloseTime)
record.Status = thrift.ToWorkflowExecutionCloseStatus(&source.CloseStatus)
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
IsCron = "IsCron"
NumClusters = "NumClusters"
VisibilityOperation = "VisibilityOperation"
UpdateTime = "UpdateTime"

KafkaKey = "KafkaKey"
)
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTime int64
Attr map[string]interface{}
}

Expand Down
3 changes: 3 additions & 0 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTime time.Time
SearchAttributes map[string]interface{}
}

Expand Down Expand Up @@ -720,6 +721,7 @@ type (
RetentionPeriod time.Duration
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
}

// InternalRecordWorkflowExecutionUninitializedRequest is used to add a record of a newly uninitialized execution
Expand All @@ -744,6 +746,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
}

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp int64
SearchAttributes map[string][]byte
}

Expand All @@ -97,6 +98,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp int64
SearchAttributes map[string][]byte
}

Expand Down
5 changes: 5 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (v *esVisibilityStore) RecordWorkflowExecutionStarted(
0, // will not be used
0, // will not be used
0, // will not be used
0, // will be updated when workflow execution updates
)
return v.producer.Publish(ctx, msg)
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func (v *esVisibilityStore) RecordWorkflowExecutionClosed(
request.CloseTimestamp.UnixNano(),
*thrift.FromWorkflowExecutionCloseStatus(&request.Status),
request.HistoryLength,
request.UpdateTimestamp.UnixNano(),
)
return v.producer.Publish(ctx, msg)
}
Expand Down Expand Up @@ -178,6 +180,7 @@ func (v *esVisibilityStore) UpsertWorkflowExecution(
0, // will not be used
0, // will not be used
0, // will not be used
request.UpdateTimestamp.UnixNano(),
)
return v.producer.Publish(ctx, msg)
}
Expand Down Expand Up @@ -749,6 +752,7 @@ func createVisibilityMessage(
endTimeUnixNano int64, // close execution
closeStatus workflow.WorkflowExecutionCloseStatus, // close execution
historyLength int64, // close execution
updateTimeUnixNano int64, // update execution
) *indexer.Message {
msgType := indexer.MessageTypeIndex

Expand All @@ -759,6 +763,7 @@ func createVisibilityMessage(
es.TaskList: {Type: &es.FieldTypeString, StringData: common.StringPtr(taskList)},
es.IsCron: {Type: &es.FieldTypeBool, BoolData: common.BoolPtr(isCron)},
es.NumClusters: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(int64(NumClusters))},
es.UpdateTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(updateTimeUnixNano)},
}

if len(memo) != 0 {
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
request.HistoryLength = int64(20)
request.IsCron = false
request.NumClusters = 2
request.UpdateTimestamp = time.Unix(0, int64(213))
s.mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.Message) bool {
fields := input.Fields
s.Equal(request.DomainUUID, input.GetDomainID())
Expand All @@ -204,6 +205,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
s.Equal(request.IsCron, fields[es.IsCron].GetBoolData())
s.Equal((int64)(request.NumClusters), fields[es.NumClusters].GetIntData())
s.Equal(indexer.VisibilityOperationRecordClosed, *input.VisibilityOperation)
s.Equal(request.UpdateTimestamp.UnixNano(), fields[es.UpdateTime].GetIntData())
return true
})).Return(nil).Once()

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/visibilitySingleManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionClosed(
RetentionPeriod: common.SecondsToDuration(request.RetentionSeconds),
IsCron: request.IsCron,
NumClusters: request.NumClusters,
UpdateTimestamp: time.Unix(0, request.UpdateTimestamp),
}
return v.persistence.RecordWorkflowExecutionClosed(ctx, req)
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func (v *visibilityManagerImpl) UpsertWorkflowExecution(
TaskList: request.TaskList,
IsCron: request.IsCron,
NumClusters: request.NumClusters,
UpdateTimestamp: time.Unix(0, request.UpdateTimestamp),
SearchAttributes: request.SearchAttributes,
}
return v.persistence.UpsertWorkflowExecution(ctx, req)
Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6265,6 +6265,7 @@ func FromWorkflowExecutionInfo(t *types.WorkflowExecutionInfo) *shared.WorkflowE
AutoResetPoints: FromResetPoints(t.AutoResetPoints),
TaskList: &t.TaskList,
IsCron: &t.IsCron,
UpdateTime: t.UpdateTime,
}
}

Expand All @@ -6288,6 +6289,7 @@ func ToWorkflowExecutionInfo(t *shared.WorkflowExecutionInfo) *types.WorkflowExe
AutoResetPoints: ToResetPoints(t.AutoResetPoints),
TaskList: t.GetTaskList(),
IsCron: t.GetIsCron(),
UpdateTime: t.UpdateTime,
}
}

Expand Down
9 changes: 9 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6892,6 +6892,7 @@ type WorkflowExecutionInfo struct {
AutoResetPoints *ResetPoints `json:"autoResetPoints,omitempty"`
TaskList string `json:"taskList,omitempty"`
IsCron bool `json:"isCron,omitempty"`
UpdateTime *int64 `json:"updateTime,omitempty"`
}

// GetExecution is an internal getter (TBD...)
Expand Down Expand Up @@ -6942,6 +6943,14 @@ func (v *WorkflowExecutionInfo) GetExecutionTime() (o int64) {
return
}

// GetUpdateTime is an internal getter (TBD...)
func (v *WorkflowExecutionInfo) GetUpdateTime() (o int64) {
if v != nil && v.UpdateTime != nil {
return *v.UpdateTime
}
return
}

// GetSearchAttributes is an internal getter (TBD...)
func (v *WorkflowExecutionInfo) GetSearchAttributes() (o *SearchAttributes) {
if v != nil && v.SearchAttributes != nil {
Expand Down
1 change: 1 addition & 0 deletions config/dynamicconfig/development_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ frontend.validSearchAttributes:
TaskList: 1
IsCron: 1
NumClusters: 2
UpdateTime: 2
CustomStringField: 0
CustomKeywordField: 1
CustomIntField: 2
Expand Down
3 changes: 3 additions & 0 deletions schema/elasticsearch/v6/visibility/index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
"NumClusters": {
"type": "integer"
},
"UpdateTime": {
"type": "long"
},
"Attr": {
"properties": {
"CadenceChangeVersion": { "type": "keyword" },
Expand Down
3 changes: 3 additions & 0 deletions schema/elasticsearch/v7/visibility/index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
"NumClusters": {
"type": "integer"
},
"UpdateTime": {
"type": "long"
},
"Attr": {
"properties": {
"CadenceChangeVersion": { "type": "keyword" },
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,7 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
AutoResetPoints: executionInfo.AutoResetPoints,
Memo: &types.Memo{Fields: executionInfo.Memo},
IsCron: len(executionInfo.CronSchedule) > 0,
UpdateTime: common.Int64Ptr(executionInfo.LastUpdatedTimestamp.UnixNano()),
SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
},
}
Expand Down
4 changes: 4 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
workflowHistoryLength := mutableState.GetNextEventID() - 1
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := executionInfo.LastUpdatedTimestamp

startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
Expand Down Expand Up @@ -473,6 +474,7 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
executionInfo.TaskList,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
); err != nil {
return err
Expand Down Expand Up @@ -972,6 +974,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := getWorkflowLastUpdatedTimestamp(mutableState)

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
Expand Down Expand Up @@ -1009,6 +1012,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
visibilityMemo,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
)
}
Expand Down
1 change: 1 addition & 0 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,7 @@ func createUpsertWorkflowSearchAttributesRequest(
TaskList: taskInfo.TaskList,
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: executionInfo.LastUpdatedTimestamp.UnixNano(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := executionInfo.SearchAttributes
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := getWorkflowLastUpdatedTimestamp(mutableState)

lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
Expand Down Expand Up @@ -282,6 +283,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
executionInfo.TaskList,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
)
}
Expand Down Expand Up @@ -476,6 +478,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := getWorkflowLastUpdatedTimestamp(mutableState)

domainEntry, err := t.shard.GetDomainCache().GetDomainByID(transferTask.DomainID)
if err != nil {
Expand Down Expand Up @@ -515,6 +518,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
visibilityMemo,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
)

Expand Down
10 changes: 10 additions & 0 deletions service/history/task/transfer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution(
visibilityMemo *types.Memo,
isCron bool,
numClusters int16,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
) error {

Expand Down Expand Up @@ -248,6 +249,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution(
IsCron: isCron,
NumClusters: numClusters,
SearchAttributes: searchAttributes,
UpdateTimestamp: updateTimeUnixNano,
}

return t.visibilityMgr.UpsertWorkflowExecution(ctx, request)
Expand All @@ -269,6 +271,7 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed(
taskList string,
isCron bool,
numClusters int16,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
) error {

Expand Down Expand Up @@ -318,6 +321,7 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed(
TaskList: taskList,
SearchAttributes: searchAttributes,
IsCron: isCron,
UpdateTimestamp: updateTimeUnixNano,
NumClusters: numClusters,
}); err != nil {
return err
Expand Down Expand Up @@ -373,6 +377,12 @@ func getWorkflowExecutionTimestamp(
return executionTimestamp
}

func getWorkflowLastUpdatedTimestamp(
msBuilder execution.MutableState,
) time.Time {
return msBuilder.GetExecutionInfo().LastUpdatedTimestamp
}

func getWorkflowMemo(
memo map[string][]byte,
) *types.Memo {
Expand Down

0 comments on commit faeb784

Please sign in to comment.