From 13c8ffbc9fdb56bc78cd7b1f9e42a70e8ad0ccd4 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Wed, 8 May 2024 14:56:27 +0200 Subject: [PATCH 1/8] try --- .../dataVisibilityManagerInterfaces.go | 1 + common/persistence/data_store_interfaces.go | 9 ++ .../pinot/pinot_visibility_store.go | 91 ++++++++++++++++++- 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/common/persistence/dataVisibilityManagerInterfaces.go b/common/persistence/dataVisibilityManagerInterfaces.go index fd63c0357cd..50c4846f88a 100644 --- a/common/persistence/dataVisibilityManagerInterfaces.go +++ b/common/persistence/dataVisibilityManagerInterfaces.go @@ -142,6 +142,7 @@ type ( // Pass in empty slice for first page. NextPageToken []byte Query string + PartialMatch bool } // ListWorkflowExecutionsResponse is the response to ListWorkflowExecutionsRequest diff --git a/common/persistence/data_store_interfaces.go b/common/persistence/data_store_interfaces.go index bea94c9b25d..878890708cf 100644 --- a/common/persistence/data_store_interfaces.go +++ b/common/persistence/data_store_interfaces.go @@ -180,6 +180,7 @@ type ( GetClosedWorkflowExecution(ctx context.Context, request *InternalGetClosedWorkflowExecutionRequest) (*InternalGetClosedWorkflowExecutionResponse, error) DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error) + ListAllWorkflowExecutions(ctx context.Context, request *InternalListAllWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error) ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error) CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error) DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error @@ -706,6 +707,14 @@ type ( WorkflowTypeName string } + // InternalListAllWorkflowExecutionsByTypeRequest is used to list all open and closed executions with specific filters in a domain + InternalListAllWorkflowExecutionsByTypeRequest struct { + InternalListWorkflowExecutionsRequest + Status types.WorkflowExecutionCloseStatus + PartialMatch bool + WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID + } + // InternalGetClosedWorkflowExecutionResponse is response from GetWorkflowExecution InternalGetClosedWorkflowExecutionResponse struct { Execution *InternalVisibilityWorkflowExecutionInfo diff --git a/common/persistence/pinot/pinot_visibility_store.go b/common/persistence/pinot/pinot_visibility_store.go index fe4da913596..7d01ec1eeb0 100644 --- a/common/persistence/pinot/pinot_visibility_store.go +++ b/common/persistence/pinot/pinot_visibility_store.go @@ -363,6 +363,28 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Co return v.pinotClient.Search(req) } +func (v *pinotVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) { + isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { + return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) + } + + query, err := getListAllWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err)) + return nil, err + } + + req := &pnt.SearchRequest{ + Query: query, + IsOpen: true, + Filter: isRecordValid, + MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &request.InternalListWorkflowExecutionsRequest, + } + + return v.pinotClient.Search(req) +} + func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx context.Context, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) @@ -659,6 +681,7 @@ func isTimeStruct(value []byte) ([]byte, error) { type PinotQuery struct { query string filters PinotQueryFilter + search PinotQuerySearchField sorters string limits string } @@ -667,10 +690,39 @@ type PinotQueryFilter struct { string } +type PinotQuerySearchField struct { + string +} + +func (s *PinotQuerySearchField) addEqual(obj string, val interface{}) { + s.string += "OR " + if _, ok := val.(string); ok { + val = fmt.Sprintf("'%s'", val) + } else { + val = fmt.Sprintf("%v", val) + } + + quotedVal := fmt.Sprintf("%s", val) + s.string += fmt.Sprintf("%s = %s\n", obj, quotedVal) +} + +func (s *PinotQuerySearchField) addLike(obj string, val interface{}) { + s.string += "OR " + if _, ok := val.(string); ok { + val = fmt.Sprintf("'%s'", val) + } else { + val = fmt.Sprintf("%v", val) + } + + quotedVal := fmt.Sprintf("%%%s%%", val) + s.string += fmt.Sprintf("%s like %s\n", obj, quotedVal) +} + func NewPinotQuery(tableName string) PinotQuery { return PinotQuery{ query: fmt.Sprintf("SELECT *\nFROM %s\n", tableName), filters: PinotQueryFilter{}, + search: PinotQuerySearchField{}, sorters: "", limits: "", } @@ -686,7 +738,7 @@ func NewPinotCountQuery(tableName string) PinotQuery { } func (q *PinotQuery) String() string { - return fmt.Sprintf("%s%s%s%s", q.query, q.filters.string, q.sorters, q.limits) + return fmt.Sprintf("%s%s%s%s%s", q.query, q.filters.string, q.search, q.sorters, q.limits) } func (q *PinotQuery) concatSorter(sorter string) { @@ -976,6 +1028,43 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor return query.String(), nil } +func getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (string, error) { + if request == nil { + return "", nil + } + + query := NewPinotQuery(tableName) + + query.filters.addEqual(DomainID, request.DomainUUID) + query.filters.addEqual(IsDeleted, false) + + earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano + latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano + + query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds + query.addPinotSorter(StartTime, DescendingOrder) + + token, err := pnt.GetNextPageToken(request.NextPageToken) + if err != nil { + return "", fmt.Errorf("next page token: %w", err) + } + + from := token.From + pageSize := request.PageSize + query.addOffsetAndLimits(from, pageSize) + + if request.PartialMatch { + query.search.addLike(WorkflowID, request.WorkflowSearchValue) + query.search.addLike(WorkflowType, request.WorkflowSearchValue) + query.search.addLike(RunID, request.WorkflowSearchValue) + } else { + query.search.addEqual(WorkflowID, request.WorkflowSearchValue) + query.search.addEqual(WorkflowType, request.WorkflowSearchValue) + query.search.addEqual(RunID, request.WorkflowSearchValue) + } + return query.String(), nil +} + func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) { if request == nil { return "", nil From 06635a21e83ee10fdbd38508868c594e3bf06e81 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 May 2024 13:03:30 +0200 Subject: [PATCH 2/8] update --- common/persistence/data_store_interfaces.go | 1 - .../pinot/pinot_visibility_store.go | 57 ++++-- .../pinot/pinot_visibility_store_test.go | 184 +++++++++++++++++- 3 files changed, 211 insertions(+), 31 deletions(-) diff --git a/common/persistence/data_store_interfaces.go b/common/persistence/data_store_interfaces.go index 878890708cf..85cd20cc912 100644 --- a/common/persistence/data_store_interfaces.go +++ b/common/persistence/data_store_interfaces.go @@ -710,7 +710,6 @@ type ( // InternalListAllWorkflowExecutionsByTypeRequest is used to list all open and closed executions with specific filters in a domain InternalListAllWorkflowExecutionsByTypeRequest struct { InternalListWorkflowExecutionsRequest - Status types.WorkflowExecutionCloseStatus PartialMatch bool WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID } diff --git a/common/persistence/pinot/pinot_visibility_store.go b/common/persistence/pinot/pinot_visibility_store.go index 7d01ec1eeb0..02fd1d3de37 100644 --- a/common/persistence/pinot/pinot_visibility_store.go +++ b/common/persistence/pinot/pinot_visibility_store.go @@ -694,8 +694,22 @@ type PinotQuerySearchField struct { string } +func (s *PinotQuerySearchField) checkFirstSearchField() { + if s.string == "" { + s.string = "( " + } else { + s.string += "OR " + } +} + +func (s *PinotQuerySearchField) lastSearchField() { + if s.string != "" { + s.string += " )" + } +} + func (s *PinotQuerySearchField) addEqual(obj string, val interface{}) { - s.string += "OR " + s.checkFirstSearchField() if _, ok := val.(string); ok { val = fmt.Sprintf("'%s'", val) } else { @@ -707,15 +721,10 @@ func (s *PinotQuerySearchField) addEqual(obj string, val interface{}) { } func (s *PinotQuerySearchField) addLike(obj string, val interface{}) { - s.string += "OR " - if _, ok := val.(string); ok { - val = fmt.Sprintf("'%s'", val) - } else { - val = fmt.Sprintf("%v", val) - } + s.checkFirstSearchField() quotedVal := fmt.Sprintf("%%%s%%", val) - s.string += fmt.Sprintf("%s like %s\n", obj, quotedVal) + s.string += fmt.Sprintf("%s LIKE \"%s\"\n", obj, quotedVal) } func NewPinotQuery(tableName string) PinotQuery { @@ -738,7 +747,7 @@ func NewPinotCountQuery(tableName string) PinotQuery { } func (q *PinotQuery) String() string { - return fmt.Sprintf("%s%s%s%s%s", q.query, q.filters.string, q.search, q.sorters, q.limits) + return fmt.Sprintf("%s%s%s%s", q.query, q.filters.string, q.sorters, q.limits) } func (q *PinotQuery) concatSorter(sorter string) { @@ -1038,10 +1047,12 @@ func getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalList query.filters.addEqual(DomainID, request.DomainUUID) query.filters.addEqual(IsDeleted, false) - earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano - latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano + if !request.EarliestTime.IsZero() && !request.LatestTime.IsZero() { + earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano + latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano + query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds + } - query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds query.addPinotSorter(StartTime, DescendingOrder) token, err := pnt.GetNextPageToken(request.NextPageToken) @@ -1053,15 +1064,21 @@ func getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalList pageSize := request.PageSize query.addOffsetAndLimits(from, pageSize) - if request.PartialMatch { - query.search.addLike(WorkflowID, request.WorkflowSearchValue) - query.search.addLike(WorkflowType, request.WorkflowSearchValue) - query.search.addLike(RunID, request.WorkflowSearchValue) - } else { - query.search.addEqual(WorkflowID, request.WorkflowSearchValue) - query.search.addEqual(WorkflowType, request.WorkflowSearchValue) - query.search.addEqual(RunID, request.WorkflowSearchValue) + if request.WorkflowSearchValue != "" { + if request.PartialMatch { + query.search.addLike(WorkflowID, request.WorkflowSearchValue) + query.search.addLike(WorkflowType, request.WorkflowSearchValue) + query.search.addLike(RunID, request.WorkflowSearchValue) + } else { + query.search.addEqual(WorkflowID, request.WorkflowSearchValue) + query.search.addEqual(WorkflowType, request.WorkflowSearchValue) + query.search.addEqual(RunID, request.WorkflowSearchValue) + } + + query.search.lastSearchField() + query.filters.addQuery(query.search.string) } + return query.String(), nil } diff --git a/common/persistence/pinot/pinot_visibility_store_test.go b/common/persistence/pinot/pinot_visibility_store_test.go index c1a1102b51a..cf707e9c4c3 100644 --- a/common/persistence/pinot/pinot_visibility_store_test.go +++ b/common/persistence/pinot/pinot_visibility_store_test.go @@ -59,7 +59,8 @@ var ( testContextTimeout = 5 * time.Second - validSearchAttr = definition.GetDefaultIndexedKeys() + validSearchAttr = definition.GetDefaultIndexedKeys() + nextPageTokenErr = fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value") ) func TestRecordWorkflowExecutionStarted(t *testing.T) { @@ -428,7 +429,7 @@ func TestListOpenWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -487,7 +488,7 @@ func TestListClosedWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -547,7 +548,7 @@ func TestListOpenWorkflowExecutionsByType(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -606,7 +607,7 @@ func TestListClosedWorkflowExecutionsByType(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -664,7 +665,7 @@ func TestListOpenWorkflowExecutionsByWorkflowID(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -722,7 +723,7 @@ func TestListClosedWorkflowExecutionsByWorkflowID(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -780,7 +781,7 @@ func TestListClosedWorkflowExecutionsByStatus(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -900,7 +901,7 @@ func TestListWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -937,6 +938,89 @@ func TestListWorkflowExecutions(t *testing.T) { } } +func TestListAllWorkflowExecutions(t *testing.T) { + tests := map[string]struct { + request *p.InternalListAllWorkflowExecutionsByTypeRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + pinotClientMockAffordance func(mockPinotClient *pnt.MockGenericClient) + expectedError error + }{ + "successful request": { + request: &p.InternalListAllWorkflowExecutionsByTypeRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: 10, + NextPageToken: nil, + }, + PartialMatch: false, + WorkflowSearchValue: "", + }, + expectedResp: &p.InternalListWorkflowExecutionsResponse{ + Executions: []*p.InternalVisibilityWorkflowExecutionInfo{ + { + DomainID: DomainID, + }, + }, + }, + pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(&pnt.SearchResponse{ + Executions: []*p.InternalVisibilityWorkflowExecutionInfo{ + { + DomainID: DomainID, + }, + }, + }, nil).Times(1) + }, + expectedError: nil, + }, + "error request": { + request: &p.InternalListAllWorkflowExecutionsByTypeRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: 10, + NextPageToken: []byte("error"), + }, + PartialMatch: false, + WorkflowSearchValue: "", + }, + expectedResp: nil, + pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + }, + expectedError: nextPageTokenErr, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, log.NewNoop()) + visibilityStore := mgr.(*pinotVisibilityStore) + + test.pinotClientMockAffordance(mockPinotClient) + + resp, err := visibilityStore.ListAllWorkflowExecutions(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + if test.expectedError != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + func TestScanWorkflowExecutions(t *testing.T) { errorRequest := &p.ListWorkflowExecutionsByQueryRequest{ NextPageToken: []byte("error"), @@ -955,7 +1039,7 @@ func TestScanWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value"), + expectedError: nextPageTokenErr, }, "Case2: normal case with nil response": { request: request, @@ -1667,6 +1751,86 @@ LIMIT 0, 10 }) } } +func TestGetListAllWorkflowExecutionsQuery(t *testing.T) { + tests := map[string]struct { + inputRequest *p.InternalListAllWorkflowExecutionsByTypeRequest + expectResult string + expectError error + }{ + "complete request with exact match": { + inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + PartialMatch: false, + WorkflowSearchValue: "123", + }, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND StartTime BETWEEN 1547596871371 AND 2547596873371 +AND ( WorkflowID = '123' +OR WorkflowType = '123' +OR RunID = '123' + ) +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + expectError: nil, + }, + "complete request with partial match": { + inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + PartialMatch: true, + WorkflowSearchValue: "123", + }, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND StartTime BETWEEN 1547596871371 AND 2547596873371 +AND ( WorkflowID LIKE "%%123%%" +OR WorkflowType LIKE "%%123%%" +OR RunID LIKE "%%123%%" + ) +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + expectError: nil, + }, + "empty request": { + inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{}, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = '' +AND IsDeleted = false +Order BY StartTime DESC +LIMIT 0, 0 +`, testTableName), + expectError: nil, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualResult, actualError := getListAllWorkflowExecutionsQuery(testTableName, test.inputRequest) + assert.Equal(t, test.expectResult, actualResult) + assert.NoError(t, actualError) + }) + } +} func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { tests := map[string]struct { From 9d1b8d5c275e4aeeac4da3863f18e426ad5a1680 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 May 2024 13:07:39 +0200 Subject: [PATCH 3/8] Update dataVisibilityManagerInterfaces.go --- common/persistence/dataVisibilityManagerInterfaces.go | 1 - 1 file changed, 1 deletion(-) diff --git a/common/persistence/dataVisibilityManagerInterfaces.go b/common/persistence/dataVisibilityManagerInterfaces.go index 50c4846f88a..fd63c0357cd 100644 --- a/common/persistence/dataVisibilityManagerInterfaces.go +++ b/common/persistence/dataVisibilityManagerInterfaces.go @@ -142,7 +142,6 @@ type ( // Pass in empty slice for first page. NextPageToken []byte Query string - PartialMatch bool } // ListWorkflowExecutionsResponse is the response to ListWorkflowExecutionsRequest From fbabc59f7f91b00e94917bbb8a380f807c6cdf03 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Wed, 15 May 2024 11:49:39 +0200 Subject: [PATCH 4/8] updated --- common/persistence/dataVisibilityManagerInterfaces.go | 2 +- .../persistence/elasticsearch/es_visibility_store.go | 6 ++++++ common/persistence/nosql/nosql_visibility_store.go | 7 +++++++ common/persistence/pinot/pinot_visibility_store.go | 11 +++++------ .../persistence/pinot/pinot_visibility_store_test.go | 6 +++--- common/persistence/sql/sql_visibility_store.go | 7 +++++++ 6 files changed, 29 insertions(+), 10 deletions(-) diff --git a/common/persistence/dataVisibilityManagerInterfaces.go b/common/persistence/dataVisibilityManagerInterfaces.go index fd63c0357cd..0e15451ecbe 100644 --- a/common/persistence/dataVisibilityManagerInterfaces.go +++ b/common/persistence/dataVisibilityManagerInterfaces.go @@ -43,7 +43,7 @@ import ( // purposes. // ErrVisibilityOperationNotSupported is an error which indicates that operation is not supported in selected persistence -var ErrVisibilityOperationNotSupported = &types.BadRequestError{Message: "Operation is not supported. Please use ElasticSearch"} +var ErrVisibilityOperationNotSupported = &types.BadRequestError{Message: "Operation is not supported"} type ( // RecordWorkflowExecutionStartedRequest is used to add a record of a newly diff --git a/common/persistence/elasticsearch/es_visibility_store.go b/common/persistence/elasticsearch/es_visibility_store.go index 0b9a99c05c0..672eb5f9b84 100644 --- a/common/persistence/elasticsearch/es_visibility_store.go +++ b/common/persistence/elasticsearch/es_visibility_store.go @@ -51,6 +51,8 @@ import ( const ( esPersistenceName = "elasticsearch" + DomainID = "DomainID" + StartTime = "StartTime" ) type ( @@ -442,6 +444,10 @@ func (v *esVisibilityStore) ListWorkflowExecutions( return resp, nil } +func (v *esVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) { + return nil, p.ErrVisibilityOperationNotSupported +} + func (v *esVisibilityStore) ScanWorkflowExecutions( ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest, diff --git a/common/persistence/nosql/nosql_visibility_store.go b/common/persistence/nosql/nosql_visibility_store.go index 78df3bc85de..13e5d8e1043 100644 --- a/common/persistence/nosql/nosql_visibility_store.go +++ b/common/persistence/nosql/nosql_visibility_store.go @@ -372,6 +372,13 @@ func (v *nosqlVisibilityStore) ListWorkflowExecutions( return nil, persistence.ErrVisibilityOperationNotSupported } +func (v *nosqlVisibilityStore) ListAllWorkflowExecutions( + ctx context.Context, + request *persistence.InternalListAllWorkflowExecutionsByTypeRequest, +) (*persistence.InternalListWorkflowExecutionsResponse, error) { + return nil, persistence.ErrVisibilityOperationNotSupported +} + func (v *nosqlVisibilityStore) ScanWorkflowExecutions( _ context.Context, _ *persistence.ListWorkflowExecutionsByQueryRequest) (*persistence.InternalListWorkflowExecutionsResponse, error) { diff --git a/common/persistence/pinot/pinot_visibility_store.go b/common/persistence/pinot/pinot_visibility_store.go index 02fd1d3de37..3290b2344e8 100644 --- a/common/persistence/pinot/pinot_visibility_store.go +++ b/common/persistence/pinot/pinot_visibility_store.go @@ -720,11 +720,10 @@ func (s *PinotQuerySearchField) addEqual(obj string, val interface{}) { s.string += fmt.Sprintf("%s = %s\n", obj, quotedVal) } -func (s *PinotQuerySearchField) addLike(obj string, val interface{}) { +func (s *PinotQuerySearchField) addMatch(obj string, val interface{}) { s.checkFirstSearchField() - quotedVal := fmt.Sprintf("%%%s%%", val) - s.string += fmt.Sprintf("%s LIKE \"%s\"\n", obj, quotedVal) + s.string += fmt.Sprintf("text_match(%s, '\"%s\"')\n", obj, val) } func NewPinotQuery(tableName string) PinotQuery { @@ -1066,9 +1065,9 @@ func getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalList if request.WorkflowSearchValue != "" { if request.PartialMatch { - query.search.addLike(WorkflowID, request.WorkflowSearchValue) - query.search.addLike(WorkflowType, request.WorkflowSearchValue) - query.search.addLike(RunID, request.WorkflowSearchValue) + query.search.addMatch(WorkflowID, request.WorkflowSearchValue) + query.search.addMatch(WorkflowType, request.WorkflowSearchValue) + query.search.addMatch(RunID, request.WorkflowSearchValue) } else { query.search.addEqual(WorkflowID, request.WorkflowSearchValue) query.search.addEqual(WorkflowType, request.WorkflowSearchValue) diff --git a/common/persistence/pinot/pinot_visibility_store_test.go b/common/persistence/pinot/pinot_visibility_store_test.go index cf707e9c4c3..f23952696e3 100644 --- a/common/persistence/pinot/pinot_visibility_store_test.go +++ b/common/persistence/pinot/pinot_visibility_store_test.go @@ -1802,9 +1802,9 @@ FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' AND IsDeleted = false AND StartTime BETWEEN 1547596871371 AND 2547596873371 -AND ( WorkflowID LIKE "%%123%%" -OR WorkflowType LIKE "%%123%%" -OR RunID LIKE "%%123%%" +AND ( text_match(WorkflowID, '"123"') +OR text_match(WorkflowType, '"123"') +OR text_match(RunID, '"123"') ) Order BY StartTime DESC LIMIT 0, 10 diff --git a/common/persistence/sql/sql_visibility_store.go b/common/persistence/sql/sql_visibility_store.go index 678d80c929a..89e7073b3cf 100644 --- a/common/persistence/sql/sql_visibility_store.go +++ b/common/persistence/sql/sql_visibility_store.go @@ -319,6 +319,13 @@ func (s *sqlVisibilityStore) ListWorkflowExecutions( return nil, p.ErrVisibilityOperationNotSupported } +func (s *sqlVisibilityStore) ListAllWorkflowExecutions( + _ context.Context, + _ *p.InternalListAllWorkflowExecutionsByTypeRequest, +) (*p.InternalListWorkflowExecutionsResponse, error) { + return nil, p.ErrVisibilityOperationNotSupported +} + func (s *sqlVisibilityStore) ScanWorkflowExecutions( _ context.Context, _ *p.ListWorkflowExecutionsByQueryRequest, From 6803762d994b797048835d98a57d4b6744d3cba3 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Wed, 15 May 2024 12:21:32 +0200 Subject: [PATCH 5/8] Update pinot_visibility_store_test.go --- .../pinot/pinot_visibility_store_test.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/common/persistence/pinot/pinot_visibility_store_test.go b/common/persistence/pinot/pinot_visibility_store_test.go index f23952696e3..351122d92dd 100644 --- a/common/persistence/pinot/pinot_visibility_store_test.go +++ b/common/persistence/pinot/pinot_visibility_store_test.go @@ -60,7 +60,7 @@ var ( testContextTimeout = 5 * time.Second validSearchAttr = definition.GetDefaultIndexedKeys() - nextPageTokenErr = fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value") + errNextPageToken = fmt.Errorf("next page token: unable to deserialize page token. err: invalid character 'e' looking for beginning of value") ) func TestRecordWorkflowExecutionStarted(t *testing.T) { @@ -429,7 +429,7 @@ func TestListOpenWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, @@ -488,7 +488,7 @@ func TestListClosedWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, @@ -548,7 +548,7 @@ func TestListOpenWorkflowExecutionsByType(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, @@ -607,7 +607,7 @@ func TestListClosedWorkflowExecutionsByType(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, @@ -665,7 +665,7 @@ func TestListOpenWorkflowExecutionsByWorkflowID(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, @@ -723,7 +723,7 @@ func TestListClosedWorkflowExecutionsByWorkflowID(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, @@ -781,7 +781,7 @@ func TestListClosedWorkflowExecutionsByStatus(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, @@ -901,7 +901,7 @@ func TestListWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, @@ -994,7 +994,7 @@ func TestListAllWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, } for name, test := range tests { @@ -1039,7 +1039,7 @@ func TestScanWorkflowExecutions(t *testing.T) { pinotClientMockAffordance: func(mockPinotClient *pnt.MockGenericClient) { mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) }, - expectedError: nextPageTokenErr, + expectedError: errNextPageToken, }, "Case2: normal case with nil response": { request: request, From a4635a6f3cb4be037567673608d7d563cdbb6228 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Wed, 15 May 2024 12:46:45 +0200 Subject: [PATCH 6/8] Update visibility_store_mock.go --- common/persistence/visibility_store_mock.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/common/persistence/visibility_store_mock.go b/common/persistence/visibility_store_mock.go index 699590a31d6..6ac08582301 100644 --- a/common/persistence/visibility_store_mock.go +++ b/common/persistence/visibility_store_mock.go @@ -140,6 +140,21 @@ func (mr *MockVisibilityStoreMockRecorder) GetName() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetName", reflect.TypeOf((*MockVisibilityStore)(nil).GetName)) } +// ListAllWorkflowExecutions mocks base method. +func (m *MockVisibilityStore) ListAllWorkflowExecutions(arg0 context.Context, arg1 *InternalListAllWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListAllWorkflowExecutions", arg0, arg1) + ret0, _ := ret[0].(*InternalListWorkflowExecutionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListAllWorkflowExecutions indicates an expected call of ListAllWorkflowExecutions. +func (mr *MockVisibilityStoreMockRecorder) ListAllWorkflowExecutions(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllWorkflowExecutions", reflect.TypeOf((*MockVisibilityStore)(nil).ListAllWorkflowExecutions), arg0, arg1) +} + // ListClosedWorkflowExecutions mocks base method. func (m *MockVisibilityStore) ListClosedWorkflowExecutions(arg0 context.Context, arg1 *InternalListWorkflowExecutionsRequest) (*InternalListWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() From 7638b9552242cb9e2aa3d5b73f08bc496bd10629 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Wed, 15 May 2024 15:00:20 +0200 Subject: [PATCH 7/8] Update es_visibility_store_test.go --- .../persistence/elasticsearch/es_visibility_store_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/common/persistence/elasticsearch/es_visibility_store_test.go b/common/persistence/elasticsearch/es_visibility_store_test.go index 6fd5150aaf0..9f5ce09f595 100644 --- a/common/persistence/elasticsearch/es_visibility_store_test.go +++ b/common/persistence/elasticsearch/es_visibility_store_test.go @@ -359,6 +359,12 @@ func (s *ESVisibilitySuite) TestListOpenWorkflowExecutionsByType() { s.True(strings.Contains(err.Error(), "ListOpenWorkflowExecutionsByType failed")) } +func (s *ESVisibilitySuite) TestListAllWorkflowExecutions() { + _, err := s.visibilityStore.ListAllWorkflowExecutions(context.Background(), &p.InternalListAllWorkflowExecutionsByTypeRequest{}) + s.Error(err) + s.Equal(p.ErrVisibilityOperationNotSupported, err) +} + func (s *ESVisibilitySuite) TestListClosedWorkflowExecutionsByType() { s.mockESClient.On("Search", mock.Anything, mock.MatchedBy(func(input *es.SearchRequest) bool { s.False(input.IsOpen) From 1b7a194f488208c4ba31edd67d495b9364e55d6d Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Tue, 21 May 2024 09:54:54 +0200 Subject: [PATCH 8/8] Update pinot_visibility_store.go --- common/persistence/pinot/pinot_visibility_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/persistence/pinot/pinot_visibility_store.go b/common/persistence/pinot/pinot_visibility_store.go index 3290b2344e8..1ba9f63c650 100644 --- a/common/persistence/pinot/pinot_visibility_store.go +++ b/common/persistence/pinot/pinot_visibility_store.go @@ -370,7 +370,7 @@ func (v *pinotVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, re query, err := getListAllWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request) if err != nil { - v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err)) + v.logger.Error(fmt.Sprintf("failed to build list all workflow executions query %v", err)) return nil, err }