Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pagination and flatten customized search attributes #5234

Merged
merged 18 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
502 changes: 354 additions & 148 deletions common/persistence/Pinot/pinotVisibilityStore.go

Large diffs are not rendered by default.

361 changes: 329 additions & 32 deletions common/persistence/Pinot/pinotVisibilityStore_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/pinot/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
IsOpen bool
Filter IsRecordValidFilter
MaxResultWindow int
ListRequest *p.InternalListWorkflowExecutionsRequest
}

// GenericMatch is a match struct
Expand Down
76 changes: 76 additions & 0 deletions common/pinot/page_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package pinot

import (
"bytes"
"encoding/json"
"fmt"

"github.com/uber/cadence/common/types"
)

type (
// PinotVisibilityPageToken holds the paging token for Pinot
PinotVisibilityPageToken struct {
From int
}
)

// DeserializePageToken return the structural token
func DeserializePageToken(data []byte) (*PinotVisibilityPageToken, error) {
var token PinotVisibilityPageToken
dec := json.NewDecoder(bytes.NewReader(data))
dec.UseNumber()
err := dec.Decode(&token)
if err != nil {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("unable to deserialize page token. err: %v", err),
}
}
return &token, nil
}

// SerializePageToken return the token blob
func SerializePageToken(token *PinotVisibilityPageToken) ([]byte, error) {
data, err := json.Marshal(token)
if err != nil {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("unable to serialize page token. err: %v", err),
}
}
return data, nil
}

// GetNextPageToken returns the structural token with nil handling
func GetNextPageToken(token []byte) (*PinotVisibilityPageToken, error) {
var result *PinotVisibilityPageToken
var err error
if len(token) > 0 {
result, err = DeserializePageToken(token)
if err != nil {
return nil, err
}
} else {
result = &PinotVisibilityPageToken{}
}
return result, nil
}
92 changes: 61 additions & 31 deletions common/pinot/pinotClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package pinot
import (
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/startreedata/pinot-client-go/pinot"
Expand Down Expand Up @@ -60,18 +61,33 @@ func (c *PinotClient) Search(request *SearchRequest) (*SearchResponse, error) {
}
}

return c.getInternalListWorkflowExecutionsResponse(resp, request.Filter)
token, err := GetNextPageToken(request.ListRequest.NextPageToken)

if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("Get NextPage token failed, %v", err),
}
}

return c.getInternalListWorkflowExecutionsResponse(resp, request.Filter, token, request.ListRequest.PageSize, request.MaxResultWindow)
}

func (c *PinotClient) CountByQuery(query string) (int64, error) {
resp, err := c.client.ExecuteSQL(c.tableName, query)
if err != nil {
return 0, &types.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutions failed, %v", err),
Message: fmt.Sprintf("CountWorkflowExecutions ExecuteSQL failed, %v", err),
}
}

return int64(resp.ResultTable.GetRowCount()), nil
count, err := resp.ResultTable.Rows[0][0].(json.Number).Int64()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why GetRowCount() is not working here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous query used Select (), so we could use getRowCount for the total number of results. But we added pagination, the Select () will only return a certain number of results according to the pagination. So we need to change the query to be Select count(*), which means there will only be one result.

if err == nil {
return count, nil
}

return -1, &types.InternalServiceError{
Message: fmt.Sprintf("can't convert result to integer!, query = %s, query result = %v, err = %v", query, resp.ResultTable.Rows[0][0], err),
}
}

func (c *PinotClient) GetTableName() string {
Expand All @@ -84,7 +100,11 @@ func buildMap(hit []interface{}, columnNames []string) map[string]interface{} {
resMap := make(map[string]interface{})

for i := 0; i < len(columnNames); i++ {
resMap[columnNames[i]] = hit[i]
key := columnNames[i]
// checks if it is system key, if yes, put it into the map
if ok, _ := isSystemKey(key); ok {
resMap[key] = hit[i]
}
}

return resMap
Expand Down Expand Up @@ -172,12 +192,14 @@ func (c *PinotClient) convertSearchResultToVisibilityRecord(hit []interface{}, c
func (c *PinotClient) getInternalListWorkflowExecutionsResponse(
resp *pinot.BrokerResponse,
isRecordValid func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool,
token *PinotVisibilityPageToken,
pageSize int,
maxResultWindow int,
) (*p.InternalListWorkflowExecutionsResponse, error) {
if resp == nil {
return nil, nil
}

response := &p.InternalListWorkflowExecutionsResponse{}
if resp == nil || resp.ResultTable == nil || resp.ResultTable.GetRowCount() == 0 {
return response, nil
}
schema := resp.ResultTable.DataSchema // get the schema to map results
//columnDataTypes := schema.ColumnDataTypes
columnNames := schema.ColumnNames
Expand All @@ -194,29 +216,23 @@ func (c *PinotClient) getInternalListWorkflowExecutionsResponse(
}
}

//if numOfActualHits == pageSize { // this means the response is not the last page
// var nextPageToken []byte
// var err error
//
// // ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold
// // to retrieve deeper pages, use ES SearchAfter
// if searchHits.TotalHits <= int64(maxResultWindow-pageSize) { // use ES Search From+Size
// nextPageToken, err = SerializePageToken(&ElasticVisibilityPageToken{From: token.From + numOfActualHits})
// } else { // use ES Search After
// var sortVal interface{}
// sortVals := actualHits[len(response.Executions)-1].Sort
// sortVal = sortVals[0]
// tieBreaker := sortVals[1].(string)
//
// nextPageToken, err = SerializePageToken(&ElasticVisibilityPageToken{SortValue: sortVal, TieBreaker: tieBreaker})
// }
// if err != nil {
// return nil, err
// }
//
// response.NextPageToken = make([]byte, len(nextPageToken))
// copy(response.NextPageToken, nextPageToken)
//}
if numOfActualHits == pageSize { // this means the response is not the last page
var nextPageToken []byte
var err error

// ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold
// TODO: need to confirm if pinot has similar settings
// don't need to retrieve deeper pages in pinot, and no functions like ES SearchAfter
if resp.NumDocsScanned <= int64(maxResultWindow-pageSize) { // use pinot Search From+Size
nextPageToken, err = SerializePageToken(&PinotVisibilityPageToken{From: token.From + numOfActualHits})
}
if err != nil {
return nil, err
}

response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
}

return response, nil
}
Expand All @@ -237,3 +253,17 @@ func (c *PinotClient) getInternalGetClosedWorkflowExecutionResponse(resp *pinot.

return response, nil
}

// checks if a string is system key
func isSystemKey(key string) (bool, string) {
msg := VisibilityRecord{}
values := reflect.ValueOf(msg)
typesOf := values.Type()
for i := 0; i < values.NumField(); i++ {
fieldName := typesOf.Field(i).Name
if fieldName == key {
return true, typesOf.Field(i).Type.String()
}
}
return false, "nil"
}
28 changes: 23 additions & 5 deletions common/pinot/pinotClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package pinot

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -87,6 +88,9 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {
columnName := []string{"WorkflowID", "RunID", "WorkflowType", "DomainID", "StartTime", "ExecutionTime", "CloseTime", "CloseStatus", "HistoryLength", "Encoding", "TaskList", "IsCron", "NumClusters", "UpdateTime", "Attr"}
hit1 := []interface{}{"wfid1", "rid1", "wftype1", "domainid1", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode1", "tsklst1", true, 1, testEarliestTime, "null"}
hit2 := []interface{}{"wfid2", "rid2", "wftype2", "domainid2", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode2", "tsklst2", false, 1, testEarliestTime, "null"}
hit3 := []interface{}{"wfid3", "rid3", "wftype3", "domainid3", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode3", "tsklst3", false, 1, testEarliestTime, "null"}
hit4 := []interface{}{"wfid4", "rid4", "wftype4", "domainid4", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode4", "tsklst4", false, 1, testEarliestTime, "null"}
hit5 := []interface{}{"wfid5", "rid5", "wftype5", "domainid5", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode5", "tsklst5", false, 1, testEarliestTime, "null"}

brokerResponse := &pinot.BrokerResponse{
AggregationResults: nil,
Expand All @@ -99,6 +103,9 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {
Rows: [][]interface{}{
hit1,
hit2,
hit3,
hit4,
hit5,
},
},
Exceptions: nil,
Expand All @@ -109,7 +116,7 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {
NumSegmentsProcessed: 1,
NumSegmentsMatched: 1,
NumConsumingSegmentsQueried: 1,
NumDocsScanned: 1,
NumDocsScanned: 10,
NumEntriesScannedInFilter: 1,
NumEntriesScannedPostFilter: 1,
NumGroupsLimitReached: false,
Expand All @@ -118,8 +125,12 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {
MinConsumingFreshnessTimeMs: 1,
}

token := &PinotVisibilityPageToken{
From: 0,
}

// Cannot use a table test, because they are not checking the same fields
result, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, nil)
result, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, nil, token, 5, 33)

assert.Equal(t, "wfid1", result.Executions[0].WorkflowID)
assert.Equal(t, "rid1", result.Executions[0].RunID)
Expand Down Expand Up @@ -151,17 +162,24 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {

assert.Nil(t, err)

responseToken := result.NextPageToken
unmarshalResponseToken, err := GetNextPageToken(responseToken)
if err != nil {
panic(fmt.Sprintf("Unmarshal error in PinotClient test %s", err))
}
assert.Equal(t, 5, unmarshalResponseToken.From)

// check if record is not valid
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return false
}
emptyResult, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, isRecordValid)
emptyResult, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, isRecordValid, nil, 10, 33)
assert.Equal(t, 0, len(emptyResult.Executions))
assert.Nil(t, err)

// check nil input
nilResult, err := client.getInternalListWorkflowExecutionsResponse(nil, isRecordValid)
assert.Nil(t, nilResult)
nilResult, err := client.getInternalListWorkflowExecutionsResponse(nil, isRecordValid, nil, 10, 33)
assert.Equal(t, &p.InternalListWorkflowExecutionsResponse{}, nilResult)
assert.Nil(t, err)
}

Expand Down
3 changes: 2 additions & 1 deletion schema/Pinot/cadence-visibility-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@
"indexTypes":["TEXT"]
}
]
}
}

70 changes: 69 additions & 1 deletion schema/Pinot/cadence-visibility-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,76 @@
"dataType": "INT"
},
{
"name": "Attr",
"name": "CustomStringField",
"dataType": "STRING"
},
{
"name": "CustomKeywordField",
"dataType": "STRING"
},
{
"name": "CustomIntField",
"dataType": "INT"
},
{
"name": "CustomDoubleField",
"dataType": "DOUBLE"
},
{
"name": "CustomBoolField",
"dataType": "BOOLEAN"
},
{
"name": "CustomDatetimeField",
"dataType": "LONG"
},
{
"name": "project",
"dataType": "STRING"
},
{
"name": "service",
"dataType": "STRING"
},
{
"name": "environment",
"dataType": "STRING"
},
{
"name": "addon",
"dataType": "STRING"
},
{
"name": "addon-type",
"dataType": "STRING"
},
{
"name": "user",
"dataType": "STRING"
},
{
"name": "CustomDomain",
"dataType": "STRING"
},
{
"name": "Operator",
"dataType": "STRING"
},
{
"name": "RolloutID",
"dataType": "STRING"
},
{
"name": "CadenceChangeVersion",
"dataType": "STRING"
},
{
"name": "BinaryChecksums",
"dataType": "STRING"
},
{
"name": "Passed",
"dataType": "BOOLEAN"
}
],
"dateTimeFieldSpecs": [{
Expand Down