Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update time to ES visibility requests and records #4962

Merged
merged 1 commit into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
TaskList = "TaskList"
IsCron = "IsCron"
NumClusters = "NumClusters"
UpdateTime = "UpdateTime"
CustomDomain = "CustomDomain" // to support batch workflow
Operator = "Operator" // to support batch workflow

Expand Down Expand Up @@ -100,6 +101,7 @@ var systemIndexedKeys = map[string]interface{}{
TaskList: shared.IndexedValueTypeKeyword,
IsCron: shared.IndexedValueTypeBool,
NumClusters: shared.IndexedValueTypeInt,
UpdateTime: shared.IndexedValueTypeInt,
}

// IsSystemIndexedKey return true is key is system added
Expand Down
3 changes: 3 additions & 0 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,9 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
NumClusters: source.NumClusters,
SearchAttributes: source.Attr,
}
if source.UpdateTime != 0 {
record.UpdateTime = time.Unix(0, source.UpdateTime)
}
if source.CloseTime != 0 {
record.CloseTime = time.Unix(0, source.CloseTime)
record.Status = thrift.ToWorkflowExecutionCloseStatus(&source.CloseStatus)
Expand Down
3 changes: 3 additions & 0 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,9 @@ func (c *elasticV7) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
NumClusters: source.NumClusters,
SearchAttributes: source.Attr,
}
if source.UpdateTime != 0 {
record.UpdateTime = time.Unix(0, source.UpdateTime)
}
if source.CloseTime != 0 {
record.CloseTime = time.Unix(0, source.CloseTime)
record.Status = thrift.ToWorkflowExecutionCloseStatus(&source.CloseStatus)
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
IsCron = "IsCron"
NumClusters = "NumClusters"
VisibilityOperation = "VisibilityOperation"
UpdateTime = "UpdateTime"

KafkaKey = "KafkaKey"
)
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTime int64
Attr map[string]interface{}
}

Expand Down
3 changes: 3 additions & 0 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTime time.Time
SearchAttributes map[string]interface{}
}

Expand Down Expand Up @@ -720,6 +721,7 @@ type (
RetentionPeriod time.Duration
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
}

// InternalRecordWorkflowExecutionUninitializedRequest is used to add a record of a newly uninitialized execution
Expand All @@ -744,6 +746,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
}

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp int64
SearchAttributes map[string][]byte
}

Expand All @@ -97,6 +98,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp int64
SearchAttributes map[string][]byte
}

Expand Down
5 changes: 5 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (v *esVisibilityStore) RecordWorkflowExecutionStarted(
0, // will not be used
0, // will not be used
0, // will not be used
0, // will be updated when workflow execution updates
)
return v.producer.Publish(ctx, msg)
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func (v *esVisibilityStore) RecordWorkflowExecutionClosed(
request.CloseTimestamp.UnixNano(),
*thrift.FromWorkflowExecutionCloseStatus(&request.Status),
request.HistoryLength,
request.UpdateTimestamp.UnixNano(),
)
return v.producer.Publish(ctx, msg)
}
Expand Down Expand Up @@ -178,6 +180,7 @@ func (v *esVisibilityStore) UpsertWorkflowExecution(
0, // will not be used
0, // will not be used
0, // will not be used
request.UpdateTimestamp.UnixNano(),
)
return v.producer.Publish(ctx, msg)
}
Expand Down Expand Up @@ -749,6 +752,7 @@ func createVisibilityMessage(
endTimeUnixNano int64, // close execution
closeStatus workflow.WorkflowExecutionCloseStatus, // close execution
historyLength int64, // close execution
updateTimeUnixNano int64, // update execution
) *indexer.Message {
msgType := indexer.MessageTypeIndex

Expand All @@ -759,6 +763,7 @@ func createVisibilityMessage(
es.TaskList: {Type: &es.FieldTypeString, StringData: common.StringPtr(taskList)},
es.IsCron: {Type: &es.FieldTypeBool, BoolData: common.BoolPtr(isCron)},
es.NumClusters: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(int64(NumClusters))},
es.UpdateTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(updateTimeUnixNano)},
}

if len(memo) != 0 {
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
request.HistoryLength = int64(20)
request.IsCron = false
request.NumClusters = 2
request.UpdateTimestamp = time.Unix(0, int64(213))
s.mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.Message) bool {
fields := input.Fields
s.Equal(request.DomainUUID, input.GetDomainID())
Expand All @@ -204,6 +205,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
s.Equal(request.IsCron, fields[es.IsCron].GetBoolData())
s.Equal((int64)(request.NumClusters), fields[es.NumClusters].GetIntData())
s.Equal(indexer.VisibilityOperationRecordClosed, *input.VisibilityOperation)
s.Equal(request.UpdateTimestamp.UnixNano(), fields[es.UpdateTime].GetIntData())
return true
})).Return(nil).Once()

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/visibilitySingleManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionClosed(
RetentionPeriod: common.SecondsToDuration(request.RetentionSeconds),
IsCron: request.IsCron,
NumClusters: request.NumClusters,
UpdateTimestamp: time.Unix(0, request.UpdateTimestamp),
}
return v.persistence.RecordWorkflowExecutionClosed(ctx, req)
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func (v *visibilityManagerImpl) UpsertWorkflowExecution(
TaskList: request.TaskList,
IsCron: request.IsCron,
NumClusters: request.NumClusters,
UpdateTimestamp: time.Unix(0, request.UpdateTimestamp),
SearchAttributes: request.SearchAttributes,
}
return v.persistence.UpsertWorkflowExecution(ctx, req)
Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6265,6 +6265,7 @@ func FromWorkflowExecutionInfo(t *types.WorkflowExecutionInfo) *shared.WorkflowE
AutoResetPoints: FromResetPoints(t.AutoResetPoints),
TaskList: &t.TaskList,
IsCron: &t.IsCron,
UpdateTime: t.UpdateTime,
}
}

Expand All @@ -6288,6 +6289,7 @@ func ToWorkflowExecutionInfo(t *shared.WorkflowExecutionInfo) *types.WorkflowExe
AutoResetPoints: ToResetPoints(t.AutoResetPoints),
TaskList: t.GetTaskList(),
IsCron: t.GetIsCron(),
UpdateTime: t.UpdateTime,
}
}

Expand Down
9 changes: 9 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6892,6 +6892,7 @@ type WorkflowExecutionInfo struct {
AutoResetPoints *ResetPoints `json:"autoResetPoints,omitempty"`
TaskList string `json:"taskList,omitempty"`
IsCron bool `json:"isCron,omitempty"`
UpdateTime *int64 `json:"updateTime,omitempty"`
}

// GetExecution is an internal getter (TBD...)
Expand Down Expand Up @@ -6942,6 +6943,14 @@ func (v *WorkflowExecutionInfo) GetExecutionTime() (o int64) {
return
}

// GetUpdateTime is an internal getter (TBD...)
func (v *WorkflowExecutionInfo) GetUpdateTime() (o int64) {
if v != nil && v.UpdateTime != nil {
return *v.UpdateTime
}
return
}

// GetSearchAttributes is an internal getter (TBD...)
func (v *WorkflowExecutionInfo) GetSearchAttributes() (o *SearchAttributes) {
if v != nil && v.SearchAttributes != nil {
Expand Down
1 change: 1 addition & 0 deletions config/dynamicconfig/development_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ frontend.validSearchAttributes:
TaskList: 1
IsCron: 1
NumClusters: 2
UpdateTime: 2
CustomStringField: 0
CustomKeywordField: 1
CustomIntField: 2
Expand Down
3 changes: 3 additions & 0 deletions schema/elasticsearch/v6/visibility/index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
"NumClusters": {
"type": "integer"
},
"UpdateTime": {
"type": "long"
},
"Attr": {
"properties": {
"CadenceChangeVersion": { "type": "keyword" },
Expand Down
3 changes: 3 additions & 0 deletions schema/elasticsearch/v7/visibility/index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
"NumClusters": {
"type": "integer"
},
"UpdateTime": {
"type": "long"
},
"Attr": {
"properties": {
"CadenceChangeVersion": { "type": "keyword" },
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,7 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
AutoResetPoints: executionInfo.AutoResetPoints,
Memo: &types.Memo{Fields: executionInfo.Memo},
IsCron: len(executionInfo.CronSchedule) > 0,
UpdateTime: common.Int64Ptr(executionInfo.LastUpdatedTimestamp.UnixNano()),
SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
},
}
Expand Down
4 changes: 4 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
workflowHistoryLength := mutableState.GetNextEventID() - 1
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := executionInfo.LastUpdatedTimestamp

startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
Expand Down Expand Up @@ -473,6 +474,7 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
executionInfo.TaskList,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
); err != nil {
return err
Expand Down Expand Up @@ -972,6 +974,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := getWorkflowLastUpdatedTimestamp(mutableState)

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
Expand Down Expand Up @@ -1009,6 +1012,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
visibilityMemo,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,7 @@ func createUpsertWorkflowSearchAttributesRequest(
TaskList: taskInfo.TaskList,
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: executionInfo.LastUpdatedTimestamp.UnixNano(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := executionInfo.SearchAttributes
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := getWorkflowLastUpdatedTimestamp(mutableState)

lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
Expand Down Expand Up @@ -282,6 +283,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
executionInfo.TaskList,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
)
}
Expand Down Expand Up @@ -476,6 +478,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := getWorkflowLastUpdatedTimestamp(mutableState)

domainEntry, err := t.shard.GetDomainCache().GetDomainByID(transferTask.DomainID)
if err != nil {
Expand Down Expand Up @@ -515,6 +518,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
visibilityMemo,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
)

Expand Down
10 changes: 10 additions & 0 deletions service/history/task/transfer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution(
visibilityMemo *types.Memo,
isCron bool,
numClusters int16,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
) error {

Expand Down Expand Up @@ -248,6 +249,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution(
IsCron: isCron,
NumClusters: numClusters,
SearchAttributes: searchAttributes,
UpdateTimestamp: updateTimeUnixNano,
}

return t.visibilityMgr.UpsertWorkflowExecution(ctx, request)
Expand All @@ -269,6 +271,7 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed(
taskList string,
isCron bool,
numClusters int16,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
) error {

Expand Down Expand Up @@ -318,6 +321,7 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed(
TaskList: taskList,
SearchAttributes: searchAttributes,
IsCron: isCron,
UpdateTimestamp: updateTimeUnixNano,
NumClusters: numClusters,
}); err != nil {
return err
Expand Down Expand Up @@ -373,6 +377,12 @@ func getWorkflowExecutionTimestamp(
return executionTimestamp
}

func getWorkflowLastUpdatedTimestamp(
msBuilder execution.MutableState,
) time.Time {
return msBuilder.GetExecutionInfo().LastUpdatedTimestamp
}

func getWorkflowMemo(
memo map[string][]byte,
) *types.Memo {
Expand Down