Skip to content

Commit

Permalink
Add pagination and flatten customized search attributes (#5234)
Browse files Browse the repository at this point in the history
* implement pagination

* add a test case for nextPageToken

* add more test cases for pagination

* change elastic search token into pinot token

* clean up

* fix count didn'w work issue

* try to implement flatten schema, add attributes into schema

* add attributes into schema

* add some code to convert time to unix time in customized search attributes

* refactor previously added code to a function

* customized search attributes and unit tests

* Remove attr column and minor clean up

* edge case for ORDER BY in customized search attribute

* Fix typo in name

* add one more condition for parseLastElement

* split one element of customized search attribute by operator instead of space; add a filter for customized attributes prefix

* update unit test

* update unit test

---------

Co-authored-by: Neil Xie <neil.xie@uber.com>
  • Loading branch information
bowenxia and neil-xie committed May 19, 2023
1 parent ecef340 commit 46f5d84
Show file tree
Hide file tree
Showing 8 changed files with 915 additions and 218 deletions.
502 changes: 354 additions & 148 deletions common/persistence/Pinot/pinotVisibilityStore.go

Large diffs are not rendered by default.

361 changes: 329 additions & 32 deletions common/persistence/Pinot/pinotVisibilityStore_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/pinot/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
IsOpen bool
Filter IsRecordValidFilter
MaxResultWindow int
ListRequest *p.InternalListWorkflowExecutionsRequest
}

// GenericMatch is a match struct
Expand Down
76 changes: 76 additions & 0 deletions common/pinot/page_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package pinot

import (
"bytes"
"encoding/json"
"fmt"

"github.com/uber/cadence/common/types"
)

type (
// PinotVisibilityPageToken holds the paging token for Pinot
PinotVisibilityPageToken struct {
From int
}
)

// DeserializePageToken return the structural token
func DeserializePageToken(data []byte) (*PinotVisibilityPageToken, error) {
var token PinotVisibilityPageToken
dec := json.NewDecoder(bytes.NewReader(data))
dec.UseNumber()
err := dec.Decode(&token)
if err != nil {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("unable to deserialize page token. err: %v", err),
}
}
return &token, nil
}

// SerializePageToken return the token blob
func SerializePageToken(token *PinotVisibilityPageToken) ([]byte, error) {
data, err := json.Marshal(token)
if err != nil {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("unable to serialize page token. err: %v", err),
}
}
return data, nil
}

// GetNextPageToken returns the structural token with nil handling
func GetNextPageToken(token []byte) (*PinotVisibilityPageToken, error) {
var result *PinotVisibilityPageToken
var err error
if len(token) > 0 {
result, err = DeserializePageToken(token)
if err != nil {
return nil, err
}
} else {
result = &PinotVisibilityPageToken{}
}
return result, nil
}
92 changes: 61 additions & 31 deletions common/pinot/pinotClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package pinot
import (
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/startreedata/pinot-client-go/pinot"
Expand Down Expand Up @@ -60,18 +61,33 @@ func (c *PinotClient) Search(request *SearchRequest) (*SearchResponse, error) {
}
}

return c.getInternalListWorkflowExecutionsResponse(resp, request.Filter)
token, err := GetNextPageToken(request.ListRequest.NextPageToken)

if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("Get NextPage token failed, %v", err),
}
}

return c.getInternalListWorkflowExecutionsResponse(resp, request.Filter, token, request.ListRequest.PageSize, request.MaxResultWindow)
}

func (c *PinotClient) CountByQuery(query string) (int64, error) {
resp, err := c.client.ExecuteSQL(c.tableName, query)
if err != nil {
return 0, &types.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutions failed, %v", err),
Message: fmt.Sprintf("CountWorkflowExecutions ExecuteSQL failed, %v", err),
}
}

return int64(resp.ResultTable.GetRowCount()), nil
count, err := resp.ResultTable.Rows[0][0].(json.Number).Int64()
if err == nil {
return count, nil
}

return -1, &types.InternalServiceError{
Message: fmt.Sprintf("can't convert result to integer!, query = %s, query result = %v, err = %v", query, resp.ResultTable.Rows[0][0], err),
}
}

func (c *PinotClient) GetTableName() string {
Expand All @@ -84,7 +100,11 @@ func buildMap(hit []interface{}, columnNames []string) map[string]interface{} {
resMap := make(map[string]interface{})

for i := 0; i < len(columnNames); i++ {
resMap[columnNames[i]] = hit[i]
key := columnNames[i]
// checks if it is system key, if yes, put it into the map
if ok, _ := isSystemKey(key); ok {
resMap[key] = hit[i]
}
}

return resMap
Expand Down Expand Up @@ -172,12 +192,14 @@ func (c *PinotClient) convertSearchResultToVisibilityRecord(hit []interface{}, c
func (c *PinotClient) getInternalListWorkflowExecutionsResponse(
resp *pinot.BrokerResponse,
isRecordValid func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool,
token *PinotVisibilityPageToken,
pageSize int,
maxResultWindow int,
) (*p.InternalListWorkflowExecutionsResponse, error) {
if resp == nil {
return nil, nil
}

response := &p.InternalListWorkflowExecutionsResponse{}
if resp == nil || resp.ResultTable == nil || resp.ResultTable.GetRowCount() == 0 {
return response, nil
}
schema := resp.ResultTable.DataSchema // get the schema to map results
//columnDataTypes := schema.ColumnDataTypes
columnNames := schema.ColumnNames
Expand All @@ -194,29 +216,23 @@ func (c *PinotClient) getInternalListWorkflowExecutionsResponse(
}
}

//if numOfActualHits == pageSize { // this means the response is not the last page
// var nextPageToken []byte
// var err error
//
// // ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold
// // to retrieve deeper pages, use ES SearchAfter
// if searchHits.TotalHits <= int64(maxResultWindow-pageSize) { // use ES Search From+Size
// nextPageToken, err = SerializePageToken(&ElasticVisibilityPageToken{From: token.From + numOfActualHits})
// } else { // use ES Search After
// var sortVal interface{}
// sortVals := actualHits[len(response.Executions)-1].Sort
// sortVal = sortVals[0]
// tieBreaker := sortVals[1].(string)
//
// nextPageToken, err = SerializePageToken(&ElasticVisibilityPageToken{SortValue: sortVal, TieBreaker: tieBreaker})
// }
// if err != nil {
// return nil, err
// }
//
// response.NextPageToken = make([]byte, len(nextPageToken))
// copy(response.NextPageToken, nextPageToken)
//}
if numOfActualHits == pageSize { // this means the response is not the last page
var nextPageToken []byte
var err error

// ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold
// TODO: need to confirm if pinot has similar settings
// don't need to retrieve deeper pages in pinot, and no functions like ES SearchAfter
if resp.NumDocsScanned <= int64(maxResultWindow-pageSize) { // use pinot Search From+Size
nextPageToken, err = SerializePageToken(&PinotVisibilityPageToken{From: token.From + numOfActualHits})
}
if err != nil {
return nil, err
}

response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
}

return response, nil
}
Expand All @@ -237,3 +253,17 @@ func (c *PinotClient) getInternalGetClosedWorkflowExecutionResponse(resp *pinot.

return response, nil
}

// checks if a string is system key
func isSystemKey(key string) (bool, string) {
msg := VisibilityRecord{}
values := reflect.ValueOf(msg)
typesOf := values.Type()
for i := 0; i < values.NumField(); i++ {
fieldName := typesOf.Field(i).Name
if fieldName == key {
return true, typesOf.Field(i).Type.String()
}
}
return false, "nil"
}
28 changes: 23 additions & 5 deletions common/pinot/pinotClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package pinot

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -87,6 +88,9 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {
columnName := []string{"WorkflowID", "RunID", "WorkflowType", "DomainID", "StartTime", "ExecutionTime", "CloseTime", "CloseStatus", "HistoryLength", "Encoding", "TaskList", "IsCron", "NumClusters", "UpdateTime", "Attr"}
hit1 := []interface{}{"wfid1", "rid1", "wftype1", "domainid1", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode1", "tsklst1", true, 1, testEarliestTime, "null"}
hit2 := []interface{}{"wfid2", "rid2", "wftype2", "domainid2", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode2", "tsklst2", false, 1, testEarliestTime, "null"}
hit3 := []interface{}{"wfid3", "rid3", "wftype3", "domainid3", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode3", "tsklst3", false, 1, testEarliestTime, "null"}
hit4 := []interface{}{"wfid4", "rid4", "wftype4", "domainid4", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode4", "tsklst4", false, 1, testEarliestTime, "null"}
hit5 := []interface{}{"wfid5", "rid5", "wftype5", "domainid5", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode5", "tsklst5", false, 1, testEarliestTime, "null"}

brokerResponse := &pinot.BrokerResponse{
AggregationResults: nil,
Expand All @@ -99,6 +103,9 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {
Rows: [][]interface{}{
hit1,
hit2,
hit3,
hit4,
hit5,
},
},
Exceptions: nil,
Expand All @@ -109,7 +116,7 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {
NumSegmentsProcessed: 1,
NumSegmentsMatched: 1,
NumConsumingSegmentsQueried: 1,
NumDocsScanned: 1,
NumDocsScanned: 10,
NumEntriesScannedInFilter: 1,
NumEntriesScannedPostFilter: 1,
NumGroupsLimitReached: false,
Expand All @@ -118,8 +125,12 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {
MinConsumingFreshnessTimeMs: 1,
}

token := &PinotVisibilityPageToken{
From: 0,
}

// Cannot use a table test, because they are not checking the same fields
result, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, nil)
result, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, nil, token, 5, 33)

assert.Equal(t, "wfid1", result.Executions[0].WorkflowID)
assert.Equal(t, "rid1", result.Executions[0].RunID)
Expand Down Expand Up @@ -151,17 +162,24 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) {

assert.Nil(t, err)

responseToken := result.NextPageToken
unmarshalResponseToken, err := GetNextPageToken(responseToken)
if err != nil {
panic(fmt.Sprintf("Unmarshal error in PinotClient test %s", err))
}
assert.Equal(t, 5, unmarshalResponseToken.From)

// check if record is not valid
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return false
}
emptyResult, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, isRecordValid)
emptyResult, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, isRecordValid, nil, 10, 33)
assert.Equal(t, 0, len(emptyResult.Executions))
assert.Nil(t, err)

// check nil input
nilResult, err := client.getInternalListWorkflowExecutionsResponse(nil, isRecordValid)
assert.Nil(t, nilResult)
nilResult, err := client.getInternalListWorkflowExecutionsResponse(nil, isRecordValid, nil, 10, 33)
assert.Equal(t, &p.InternalListWorkflowExecutionsResponse{}, nilResult)
assert.Nil(t, err)
}

Expand Down
3 changes: 2 additions & 1 deletion schema/Pinot/cadence-visibility-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@
"indexTypes":["TEXT"]
}
]
}
}

70 changes: 69 additions & 1 deletion schema/Pinot/cadence-visibility-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,76 @@
"dataType": "INT"
},
{
"name": "Attr",
"name": "CustomStringField",
"dataType": "STRING"
},
{
"name": "CustomKeywordField",
"dataType": "STRING"
},
{
"name": "CustomIntField",
"dataType": "INT"
},
{
"name": "CustomDoubleField",
"dataType": "DOUBLE"
},
{
"name": "CustomBoolField",
"dataType": "BOOLEAN"
},
{
"name": "CustomDatetimeField",
"dataType": "LONG"
},
{
"name": "project",
"dataType": "STRING"
},
{
"name": "service",
"dataType": "STRING"
},
{
"name": "environment",
"dataType": "STRING"
},
{
"name": "addon",
"dataType": "STRING"
},
{
"name": "addon-type",
"dataType": "STRING"
},
{
"name": "user",
"dataType": "STRING"
},
{
"name": "CustomDomain",
"dataType": "STRING"
},
{
"name": "Operator",
"dataType": "STRING"
},
{
"name": "RolloutID",
"dataType": "STRING"
},
{
"name": "CadenceChangeVersion",
"dataType": "STRING"
},
{
"name": "BinaryChecksums",
"dataType": "STRING"
},
{
"name": "Passed",
"dataType": "BOOLEAN"
}
],
"dateTimeFieldSpecs": [{
Expand Down

0 comments on commit 46f5d84

Please sign in to comment.