diff --git a/common/persistence/Pinot/pinotVisibilityStore.go b/common/persistence/Pinot/pinotVisibilityStore.go index a0e04633151..b30ec387a92 100644 --- a/common/persistence/Pinot/pinotVisibilityStore.go +++ b/common/persistence/Pinot/pinotVisibilityStore.go @@ -27,6 +27,7 @@ import ( "reflect" "regexp" "strings" + "time" "github.com/uber/cadence/.gen/go/indexer" @@ -44,30 +45,26 @@ import ( const ( pinotPersistenceName = "pinot" - - DescendingOrder = "DESC" - AcendingOrder = "ASC" - - DocID = "DocID" - DomainID = "DomainID" - WorkflowID = "WorkflowID" - RunID = "RunID" - WorkflowType = "WorkflowType" - CloseStatus = "CloseStatus" - HistoryLength = "HistoryLength" - TaskList = "TaskList" - IsCron = "IsCron" - NumClusters = "NumClusters" - ShardID = "ShardID" - Attr = "Attr" - StartTime = "StartTime" - CloseTime = "CloseTime" - UpdateTime = "UpdateTime" - ExecutionTime = "ExecutionTime" - - Encoding = "Encoding" - - VisibilityOperation = "VisibilityOperation" + DescendingOrder = "DESC" + AcendingOrder = "ASC" + DocID = "DocID" + DomainID = "DomainID" + WorkflowID = "WorkflowID" + RunID = "RunID" + WorkflowType = "WorkflowType" + CloseStatus = "CloseStatus" + HistoryLength = "HistoryLength" + TaskList = "TaskList" + IsCron = "IsCron" + NumClusters = "NumClusters" + ShardID = "ShardID" + Attr = "Attr" + StartTime = "StartTime" + CloseTime = "CloseTime" + UpdateTime = "UpdateTime" + ExecutionTime = "ExecutionTime" + Encoding = "Encoding" + LikeStatement = "%s LIKE '%%%s%%'\n" ) type ( @@ -90,7 +87,6 @@ type ( TaskID int64 `json:"TaskID,omitempty"` IsCron bool `json:"IsCron,omitempty"` NumClusters int64 `json:"NumClusters,omitempty"` - Attr string `json:"Attr,omitempty"` UpdateTime int64 `json:"UpdateTime,omitempty"` // update execution, ShardID int64 `json:"ShardID,omitempty"` // specific to certain status @@ -146,13 +142,8 @@ func (v *pinotVisibilityStore) RecordWorkflowExecutionStarted( request *p.InternalRecordWorkflowExecutionStartedRequest, ) error { v.checkProducer() - //attr, err := json.Marshal(request.SearchAttributes) - attr, err := decodeAttr(request.SearchAttributes) - if err != nil { - return err - } - msg := createVisibilityMessage( + msg, err := createVisibilityMessage( request.DomainUUID, request.WorkflowID, request.RunID, @@ -165,25 +156,25 @@ func (v *pinotVisibilityStore) RecordWorkflowExecutionStarted( request.Memo.GetEncoding(), request.IsCron, request.NumClusters, - string(attr), - common.RecordStarted, - 0, // will not be used - 0, // will not be used + -1, // represent invalid close time, means open workflow execution + -1, // represent invalid close status, means open workflow execution 0, // will not be used request.UpdateTimestamp.UnixMilli(), // will be updated when workflow execution updates int64(request.ShardID), + request.SearchAttributes, ) + + if err != nil { + return err + } + return v.producer.Publish(ctx, msg) } func (v *pinotVisibilityStore) RecordWorkflowExecutionClosed(ctx context.Context, request *p.InternalRecordWorkflowExecutionClosedRequest) error { v.checkProducer() - attr, err := decodeAttr(request.SearchAttributes) - if err != nil { - return err - } - msg := createVisibilityMessage( + msg, err := createVisibilityMessage( request.DomainUUID, request.WorkflowID, request.RunID, @@ -196,20 +187,24 @@ func (v *pinotVisibilityStore) RecordWorkflowExecutionClosed(ctx context.Context request.Memo.GetEncoding(), request.IsCron, request.NumClusters, - string(attr), - common.RecordClosed, request.CloseTimestamp.UnixMilli(), *thrift.FromWorkflowExecutionCloseStatus(&request.Status), request.HistoryLength, request.UpdateTimestamp.UnixMilli(), int64(request.ShardID), + request.SearchAttributes, ) + + if err != nil { + return err + } + return v.producer.Publish(ctx, msg) } func (v *pinotVisibilityStore) RecordWorkflowExecutionUninitialized(ctx context.Context, request *p.InternalRecordWorkflowExecutionUninitializedRequest) error { v.checkProducer() - msg := createVisibilityMessage( + msg, err := createVisibilityMessage( request.DomainUUID, request.WorkflowID, request.RunID, @@ -222,25 +217,24 @@ func (v *pinotVisibilityStore) RecordWorkflowExecutionUninitialized(ctx context. "", false, 0, - "", - "", - 0, - 0, + -1, // represent invalid close time, means open workflow execution + -1, // represent invalid close status, means open workflow execution 0, request.UpdateTimestamp.UnixMilli(), request.ShardID, + nil, ) - return v.producer.Publish(ctx, msg) -} -func (v *pinotVisibilityStore) UpsertWorkflowExecution(ctx context.Context, request *p.InternalUpsertWorkflowExecutionRequest) error { - v.checkProducer() - attr, err := decodeAttr(request.SearchAttributes) if err != nil { return err } - msg := createVisibilityMessage( + return v.producer.Publish(ctx, msg) +} + +func (v *pinotVisibilityStore) UpsertWorkflowExecution(ctx context.Context, request *p.InternalUpsertWorkflowExecutionRequest) error { + v.checkProducer() + msg, err := createVisibilityMessage( request.DomainUUID, request.WorkflowID, request.RunID, @@ -253,14 +247,18 @@ func (v *pinotVisibilityStore) UpsertWorkflowExecution(ctx context.Context, requ request.Memo.GetEncoding(), request.IsCron, request.NumClusters, - string(attr), - common.UpsertSearchAttributes, - 0, // will not be used - 0, // will not be used - 0, // will not be used + -1, // represent invalid close time, means open workflow execution + -1, // represent invalid close status, means open workflow execution + 0, // will not be used request.UpdateTimestamp.UnixMilli(), request.ShardID, + request.SearchAttributes, ) + + if err != nil { + return err + } + return v.producer.Publish(ctx, msg) } @@ -271,7 +269,6 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutions( isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } - query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false) req := &pnt.SearchRequest{ @@ -279,8 +276,8 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutions( IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: request, } - return v.pinotClient.Search(req) } @@ -299,6 +296,7 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutions( IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: request, } return v.pinotClient.Search(req) @@ -316,6 +314,7 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByType(ctx context.Cont IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) @@ -333,6 +332,7 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Co IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) @@ -350,6 +350,7 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx contex IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) @@ -367,6 +368,7 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(ctx cont IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) @@ -384,6 +386,7 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByStatus(ctx context. IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) @@ -397,6 +400,14 @@ func (v *pinotVisibilityStore) GetClosedWorkflowExecution(ctx context.Context, r IsOpen: true, Filter: nil, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &p.InternalListWorkflowExecutionsRequest{ // create a new request to avoid nil pointer exceptions + DomainUUID: request.DomainUUID, + Domain: request.Domain, + EarliestTime: time.Time{}, + LatestTime: time.Time{}, + PageSize: 1, + NextPageToken: nil, + }, } resp, err := v.pinotClient.Search(req) @@ -415,7 +426,6 @@ func (v *pinotVisibilityStore) GetClosedWorkflowExecution(ctx context.Context, r func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) { checkPageSize(request) - // TODO: need to check next page token in the future validMap := v.config.ValidSearchAttributes() query := getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request, validMap) @@ -424,6 +434,14 @@ func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, reque IsOpen: true, Filter: nil, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &p.InternalListWorkflowExecutionsRequest{ + DomainUUID: request.DomainUUID, + Domain: request.Domain, + EarliestTime: time.Time{}, + LatestTime: time.Time{}, + NextPageToken: request.NextPageToken, + PageSize: request.PageSize, + }, } return v.pinotClient.Search(req) @@ -432,27 +450,33 @@ func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, reque func (v *pinotVisibilityStore) ScanWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) { checkPageSize(request) - // TODO: need to check next page token in the future - - validMap := v.config.ValidSearchAttributes() - query := getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request, validMap) + query := getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request, v.config.ValidSearchAttributes()) req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: nil, MaxResultWindow: v.config.ESIndexMaxResultWindow(), + ListRequest: &p.InternalListWorkflowExecutionsRequest{ + DomainUUID: request.DomainUUID, + Domain: request.Domain, + EarliestTime: time.Time{}, + LatestTime: time.Time{}, + NextPageToken: request.NextPageToken, + PageSize: request.PageSize, + }, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) CountWorkflowExecutions(ctx context.Context, request *p.CountWorkflowExecutionsRequest) (*p.CountWorkflowExecutionsResponse, error) { - query := getCountWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request) + query := getCountWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, v.config.ValidSearchAttributes()) + resp, err := v.pinotClient.CountByQuery(query) if err != nil { return nil, &types.InternalServiceError{ - Message: fmt.Sprintf("ListClosedWorkflowExecutions failed, %v", err), + Message: fmt.Sprintf("CountClosedWorkflowExecutions failed, %v", err), } } @@ -465,7 +489,7 @@ func (v *pinotVisibilityStore) DeleteWorkflowExecution( ctx context.Context, request *p.VisibilityDeleteWorkflowExecutionRequest, ) error { - return p.ErrVisibilityOperationNotSupported + return &types.BadRequestError{Message: "Operation is not supported. Pinot doesn't support this operation so far."} } func (v *pinotVisibilityStore) DeleteUninitializedWorkflowExecution( @@ -473,7 +497,7 @@ func (v *pinotVisibilityStore) DeleteUninitializedWorkflowExecution( request *p.VisibilityDeleteWorkflowExecutionRequest, ) error { // temporary: not implemented, only implemented for ES - return p.ErrVisibilityOperationNotSupported + return &types.BadRequestError{Message: "Operation is not supported. Pinot doesn't support this operation so far."} } func (v *pinotVisibilityStore) checkProducer() { @@ -496,47 +520,75 @@ func createVisibilityMessage( memo []byte, encoding common.EncodingType, isCron bool, - NumClusters int16, - searchAttributes string, - visibilityOperation common.VisibilityOperation, + numClusters int16, // specific to certain status closeTimeUnixNano int64, // close execution closeStatus workflow.WorkflowExecutionCloseStatus, // close execution historyLength int64, // close execution updateTimeUnixNano int64, // update execution, shardID int64, -) *indexer.PinotMessage { - status := int(closeStatus) - - rawMsg := visibilityMessage{ - DocID: wid + "-" + rid, - DomainID: domainID, - WorkflowID: wid, - RunID: rid, - WorkflowType: workflowTypeName, - TaskList: taskList, - StartTime: startTimeUnixNano, - ExecutionTime: executionTimeUnixNano, - IsCron: isCron, - NumClusters: int64(NumClusters), - Attr: searchAttributes, - CloseTime: closeTimeUnixNano, - CloseStatus: int64(status), - HistoryLength: historyLength, - UpdateTime: updateTimeUnixNano, - ShardID: shardID, - } - - serializedMsg, err := json.Marshal(rawMsg) + rawSearchAttributes map[string][]byte, +) (*indexer.PinotMessage, error) { + m := make(map[string]interface{}) + //loop through all input parameters + m[DocID] = wid + "-" + rid + m[DomainID] = domainID + m[WorkflowID] = wid + m[RunID] = rid + m[WorkflowType] = workflowTypeName + m[TaskList] = taskList + m[StartTime] = startTimeUnixNano + m[ExecutionTime] = executionTimeUnixNano + m[IsCron] = isCron + m[NumClusters] = numClusters + m[CloseTime] = closeTimeUnixNano + m[CloseStatus] = int(closeStatus) + m[HistoryLength] = historyLength + m[UpdateTime] = updateTimeUnixNano + m[ShardID] = shardID + + var err error + for key, value := range rawSearchAttributes { + value, err = isTimeStruct(value) + if err != nil { + return nil, err + } + + var val interface{} + err = json.Unmarshal(value, &val) + if err != nil { + return nil, err + } + m[key] = val + } + + serializedMsg, err := json.Marshal(m) if err != nil { - panic("serialize msg error!") + return nil, err } msg := &indexer.PinotMessage{ WorkflowID: common.StringPtr(wid), Payload: serializedMsg, } - return msg + return msg, nil + +} + +// check if value is time.Time type +// if it is, convert it to unixMilli +// if it isn't time, return the original value +func isTimeStruct(value []byte) ([]byte, error) { + var time time.Time + err := json.Unmarshal(value, &time) + if err == nil { + unixTime := time.UnixMilli() + value, err = json.Marshal(unixTime) + if err != nil { + return nil, err + } + } + return value, nil } /****************************** Request Translator ******************************/ @@ -587,6 +639,10 @@ func (q *PinotQuery) addLimits(limit int) { q.limits += fmt.Sprintf("LIMIT %d\n", limit) } +func (q *PinotQuery) addOffsetAndLimits(offset int, limit int) { + q.limits += fmt.Sprintf("LIMIT %d, %d\n", offset, limit) +} + type PinotQueryFilter struct { string } @@ -628,19 +684,16 @@ func (f *PinotQueryFilter) addTimeRange(obj string, earliest interface{}, latest f.string += fmt.Sprintf("%s BETWEEN %v AND %v\n", obj, earliest, latest) } -func (f *PinotQueryFilter) addExactMatch(key string, val string) { +func (f *PinotQueryFilter) addPartialMatch(key string, val string) { f.checkFirstFilter() - f.string += fmt.Sprintf("TEXT_MATCH(%s, '%s')\n", Attr, key) - f.string += fmt.Sprintf("AND TEXT_MATCH(%s, '%s')\n", Attr, val) + f.string += getPartialFormatString(key, val) } -func (f *PinotQueryFilter) addPartialMatch(key string, val string) { - f.checkFirstFilter() - f.string += fmt.Sprintf("TEXT_MATCH(%s, '%s')\n", Attr, key) - f.string += fmt.Sprintf("AND %s LIKE '%%%s%%'\n", Attr, val) +func getPartialFormatString(key string, val string) string { + return fmt.Sprintf(LikeStatement, key, val) } -func getCountWorkflowExecutionsQuery(tableName string, request *p.CountWorkflowExecutionsRequest) string { +func getCountWorkflowExecutionsQuery(tableName string, request *p.CountWorkflowExecutionsRequest, validMap map[string]interface{}) string { if request == nil { return "" } @@ -651,8 +704,19 @@ func getCountWorkflowExecutionsQuery(tableName string, request *p.CountWorkflowE query.filters.addEqual(DomainID, request.DomainUUID) requestQuery := strings.TrimSpace(request.Query) - if requestQuery != "" { - query.filters.addQuery(request.Query) + + // if customized query is empty, directly return + if requestQuery == "" { + return query.String() + } + + requestQuery = filterPrefix(requestQuery) + + // when customized query is not empty + if common.IsJustOrderByClause(requestQuery) { + query.concatSorter(requestQuery) + } else { // check if it has a complete customized query + query = constructQueryWithCustomizedQuery(requestQuery, validMap, query) } return query.String() @@ -663,73 +727,204 @@ func getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWork return "" } - // TODO: switch to v.logger or something - queryQueryLogger := log.NewNoop() + token, err := pnt.GetNextPageToken(request.NextPageToken) + if err != nil { + panic(fmt.Sprintf("deserialize next page token error: %s", err)) + } + query := NewPinotQuery(tableName) // need to add Domain ID query.filters.addEqual(DomainID, request.DomainUUID) requestQuery := strings.TrimSpace(request.Query) + + // if customized query is empty, directly return if requestQuery == "" { - query.addLimits(request.PageSize) - } else if common.IsJustOrderByClause(requestQuery) { + query.addOffsetAndLimits(token.From, request.PageSize) + return query.String() + } + + requestQuery = filterPrefix(requestQuery) + + // when customized query is not empty + if common.IsJustOrderByClause(requestQuery) { query.concatSorter(requestQuery) - query.addLimits(request.PageSize) - } else { - // checks every case of 'and' - reg := regexp.MustCompile("(?i)(and)") - queryList := reg.Split(requestQuery, -1) - - for _, element := range queryList { - element := strings.TrimSpace(element) - pair := strings.Split(element, " ") - key := pair[0] - - // case: when order by query also passed in - if strings.ToLower(key) == "order" && strings.ToLower(pair[1]) == "by" { - query.concatSorter(element) - continue - } + } else { // check if it has a complete customized query + query = constructQueryWithCustomizedQuery(requestQuery, validMap, query) + } - if pinotContainsKey(key) { - query.filters.addQuery(element) - continue - } + query.addOffsetAndLimits(token.From, request.PageSize) + return query.String() +} + +func filterPrefix(query string) string { + prefix := fmt.Sprintf("`%s.", Attr) + postfix := "`" + + query = strings.ReplaceAll(query, prefix, "") + return strings.ReplaceAll(query, postfix, "") +} - if valType, ok := validMap[key]; ok { - val := pair[2] - indexValType := common.ConvertIndexedValueTypeToInternalType(valType, queryQueryLogger) - isKeyWord := indexValType == types.IndexedValueTypeKeyword - if isKeyWord { - query.filters.addExactMatch(key, val) - } else { - query.filters.addPartialMatch(key, val) - } +func constructQueryWithCustomizedQuery(requestQuery string, validMap map[string]interface{}, query PinotQuery) PinotQuery { + // TODO: switch to v.logger or something + queryQueryLogger := log.NewNoop() + + // checks every case of 'and' + reg := regexp.MustCompile("(?i)(and)") + queryList := reg.Split(requestQuery, -1) + var orderBy string + + for index, element := range queryList { + element := strings.TrimSpace(element) + // special case: when element is the last one + if index == len(queryList)-1 { + element, orderBy = parseLastElement(element) + } + if len(element) == 0 { + continue + } + + key, val, op := splitElement(element) + + // case 2: when key is a system key + if ok, _ := isSystemKey(key); ok { + query.filters.addQuery(element) + continue + } + + // case 3: when key is valid within validMap + if valType, ok := validMap[key]; ok { + indexValType := common.ConvertIndexedValueTypeToInternalType(valType, queryQueryLogger) + + if indexValType == types.IndexedValueTypeString { + val = removeQuote(val) + query.filters.addPartialMatch(key, val) } else { - queryQueryLogger.Error("Unregistered field!!") + query.filters.addQuery(fmt.Sprintf("%s %s %s", key, op, val)) } - + } else { + queryQueryLogger.Error("Unregistered field!!") } + } - query.addLimits(request.PageSize) + if orderBy != "" { + query.concatSorter(orderBy) } - return query.String() + return query +} + +/* +Can have cases: +1. A=B +2. A<=B +3. A>=B +4. A <= B +5. A >= B +*/ +func splitElement(element string) (string, string, string) { + if element == "" { + return "", "", "" + } + + listLE := strings.Split(element, "<=") + listGE := strings.Split(element, ">=") + listE := strings.Split(element, "=") + listL := strings.Split(element, "<") + listG := strings.Split(element, ">") + + if len(listLE) > 1 { + return strings.TrimSpace(listLE[0]), strings.TrimSpace(listLE[1]), "<=" + } + + if len(listGE) > 1 { + return strings.TrimSpace(listGE[0]), strings.TrimSpace(listGE[1]), ">=" + } + + if len(listE) > 1 { + return strings.TrimSpace(listE[0]), strings.TrimSpace(listE[1]), "=" + } + + if len(listL) > 1 { + return strings.TrimSpace(listL[0]), strings.TrimSpace(listL[1]), "<" + } + + if len(listG) > 1 { + return strings.TrimSpace(listG[0]), strings.TrimSpace(listG[1]), ">" + } + + return "", "", "" +} + +/* +Order by XXX DESC +-> if startWith("Order by") -> return "", element + +CustomizedString = 'cannot be used in order by' +-> if last character is ‘ or " -> return element, "" + +CustomizedInt = 1 (without order by clause) +-> if !contains("Order by") -> return element, "" + +CustomizedString = 'cannot be used in order by' Order by XXX DESC +-> Find the index x of last appearance of "order by" -> return element[0, x], element[x, len] + +CustomizedInt = 1 Order by XXX DESC +-> Find the index x of last appearance of "order by" -> return element[0, x], element[x, len] +*/ +func parseLastElement(element string) (string, string) { + // case 1: when order by query also passed in + if common.IsJustOrderByClause(element) { + return "", element + } + + // case 2: when last element is a string + if element[len(element)-1] == '\'' || element[len(element)-1] == '"' { + return element, "" + } + + // case 3: when last element doesn't contain "order by" + if !strings.Contains(strings.ToLower(element), "order by") { + return element, "" + } + + // case 4: general case + elementArray := strings.Split(element, " ") + orderByIndex := findLastOrderBy(elementArray) // find the last appearance of "order by" is the answer + return strings.Join(elementArray[:orderByIndex], " "), strings.Join(elementArray[orderByIndex:], " ") +} + +func findLastOrderBy(list []string) int { + for i := len(list) - 2; i >= 0; i-- { + if strings.ToLower(list[i]) == "order" && strings.ToLower(list[i+1]) == "by" { + return i + } + } + return 0 } -// hard coded for this PoC. Will use dynamic configs later. -func pinotContainsKey(key string) bool { +func removeQuote(val string) string { + if val[0] == '"' && val[len(val)-1] == '"' { + val = fmt.Sprintf("%s", val[1:len(val)-1]) + } else if val[0] == '\'' && val[len(val)-1] == '\'' { + val = fmt.Sprintf("%s", val[1:len(val)-1]) + } + return fmt.Sprintf("%s", val) +} + +// checks if a string is system key +func isSystemKey(key string) (bool, string) { msg := visibilityMessage{} values := reflect.ValueOf(msg) typesOf := values.Type() for i := 0; i < values.NumField(); i++ { fieldName := typesOf.Field(i).Name if fieldName == key { - return true + return true, typesOf.Field(i).Type.String() } } - return false + return false, "nil" } func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) string { @@ -737,6 +932,14 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor return "" } + token, err := pnt.GetNextPageToken(request.NextPageToken) + if err != nil { + panic(fmt.Sprintf("deserialize next page token error: %s", err)) + } + + from := token.From + pageSize := request.PageSize + query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) @@ -749,6 +952,9 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor query.addPinotSorter(CloseTime, DescendingOrder) query.addPinotSorter(RunID, DescendingOrder) + + query.addOffsetAndLimits(from, pageSize) + return query.String() } diff --git a/common/persistence/Pinot/pinotVisibilityStore_test.go b/common/persistence/Pinot/pinotVisibilityStore_test.go index a3081679ac5..1f6fe8e5ea3 100644 --- a/common/persistence/Pinot/pinotVisibilityStore_test.go +++ b/common/persistence/Pinot/pinotVisibilityStore_test.go @@ -23,10 +23,13 @@ package pinotVisibility import ( + "encoding/json" "fmt" "testing" "time" + pnt "github.com/uber/cadence/common/pinot" + "github.com/stretchr/testify/assert" p "github.com/uber/cadence/common/persistence" @@ -57,27 +60,40 @@ func TestGetCountWorkflowExecutionsQuery(t *testing.T) { request := &p.CountWorkflowExecutionsRequest{ DomainUUID: testDomainID, Domain: testDomain, - Query: "WorkflowId = 'wfid'", + Query: "WorkflowID = 'wfid'", } - result := getCountWorkflowExecutionsQuery(testTableName, request) - expectResult := fmt.Sprintf("SELECT COUNT(*)\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND WorkflowId = 'wfid'\n", testTableName) + result := getCountWorkflowExecutionsQuery(testTableName, request, nil) + expectResult := fmt.Sprintf(`SELECT COUNT(*) +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND WorkflowID = 'wfid' +`, testTableName) assert.Equal(t, result, expectResult) - nilResult := getCountWorkflowExecutionsQuery(testTableName, nil) + nilResult := getCountWorkflowExecutionsQuery(testTableName, nil, nil) assert.Equal(t, nilResult, "") } func TestGetListWorkflowExecutionQuery(t *testing.T) { testValidMap := make(map[string]interface{}) testValidMap["CustomizedKeyword"] = types.IndexedValueTypeKeyword - testValidMap["CustomizedString"] = types.IndexedValueTypeString - testValidMap["IndexedValueTypeInt"] = types.IndexedValueTypeInt - testValidMap["IndexedValueTypeDouble"] = types.IndexedValueTypeDouble + testValidMap["CustomStringField"] = types.IndexedValueTypeString + testValidMap["CustomIntField"] = types.IndexedValueTypeInt + testValidMap["CustomKeywordField"] = types.IndexedValueTypeDouble testValidMap["IndexedValueTypeBool"] = types.IndexedValueTypeBool testValidMap["IndexedValueTypeDatetime"] = types.IndexedValueTypeDatetime + token := pnt.PinotVisibilityPageToken{ + From: 11, + } + + serializedToken, err := json.Marshal(token) + if err != nil { + panic(fmt.Sprintf("Serialized error in PinotVisibilityStoreTest!!!, %s", err)) + } + tests := map[string]struct { input *p.ListWorkflowExecutionsByQueryRequest expectedOutput string @@ -88,9 +104,34 @@ func TestGetListWorkflowExecutionQuery(t *testing.T) { Domain: testDomain, PageSize: testPageSize, NextPageToken: nil, - Query: "CustomizedKeyword = keywordCustomized", + Query: "`Attr.CustomizedKeyword` = 'keywordCustomized'", + }, + expectedOutput: fmt.Sprintf( + `SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CustomizedKeyword = 'keywordCustomized' +LIMIT 0, 10 +`, testTableName), + }, + + "complete request from search attribute worker": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "CustomIntField=2 and CustomKeywordField='Update2' order by `Attr.CustomDatetimeField` DESC", }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND TEXT_MATCH(Attr, 'CustomizedKeyword')\nAND TEXT_MATCH(Attr, 'keywordCustomized')\nLIMIT 10\n", testTableName), + expectedOutput: fmt.Sprintf( + `SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CustomIntField = 2 +AND CustomKeywordField = 'Update2' +order by CustomDatetimeField DESC +LIMIT 0, 10 +`, testTableName), }, "complete request with keyword query and other customized query": { @@ -99,9 +140,15 @@ func TestGetListWorkflowExecutionQuery(t *testing.T) { Domain: testDomain, PageSize: testPageSize, NextPageToken: nil, - Query: "CustomizedKeyword = keywordCustomized and CustomizedString = stringCustmized", + Query: "CustomizedKeyword = 'keywordCustomized' and CustomStringField = 'String field is for text'", }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND TEXT_MATCH(Attr, 'CustomizedKeyword')\nAND TEXT_MATCH(Attr, 'keywordCustomized')\nAND TEXT_MATCH(Attr, 'CustomizedString')\nAND Attr LIKE '%%stringCustmized%%'\nLIMIT 10\n", testTableName), + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CustomizedKeyword = 'keywordCustomized' +AND CustomStringField LIKE '%%String field is for text%%' +LIMIT 0, 10 +`, testTableName), }, "complete request with customized query with not registered attribute": { @@ -110,20 +157,55 @@ func TestGetListWorkflowExecutionQuery(t *testing.T) { Domain: testDomain, PageSize: testPageSize, NextPageToken: nil, - Query: "CustomizedKeyword = keywordCustomized and CustomizedString = stringCustmized and unregistered <= 100", + Query: "CustomizedKeyword = 'keywordCustomized' and CustomStringField = 'String field is for text' and unregistered <= 100", }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND TEXT_MATCH(Attr, 'CustomizedKeyword')\nAND TEXT_MATCH(Attr, 'keywordCustomized')\nAND TEXT_MATCH(Attr, 'CustomizedString')\nAND Attr LIKE '%%stringCustmized%%'\nLIMIT 10\n", testTableName), + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CustomizedKeyword = 'keywordCustomized' +AND CustomStringField LIKE '%%String field is for text%%' +LIMIT 0, 10 +`, testTableName), }, - "complete request with customized query with everything with all cases AND": { + "complete request with customized query with all customized attributes with all cases AND & a invalid string input": { input: &p.ListWorkflowExecutionsByQueryRequest{ DomainUUID: testDomainID, Domain: testDomain, PageSize: testPageSize, NextPageToken: nil, - Query: "CloseStatus < 0 and CustomizedKeyword = keywordCustomized AND CustomizedString = stringCustmized And unregistered <= 100 aNd Order by DomainId Desc", + Query: "CloseStatus < 0 anD WorkflowType = some-test-workflow and CustomizedKeyword = 'keywordCustomized' AND CustomStringField = 'String field is for text' And unregistered <= 100 aNd Order by DomainId Desc", }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND CloseStatus < 0\nAND TEXT_MATCH(Attr, 'CustomizedKeyword')\nAND TEXT_MATCH(Attr, 'keywordCustomized')\nAND TEXT_MATCH(Attr, 'CustomizedString')\nAND Attr LIKE '%%stringCustmized%%'\nOrder by DomainId Desc\nLIMIT 10\n", testTableName), + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CloseStatus < 0 +AND WorkflowType = some-test-workflow +AND CustomizedKeyword = 'keywordCustomized' +AND CustomStringField LIKE '%%String field is for text%%' +Order by DomainId Desc +LIMIT 0, 10 +`, testTableName), + }, + + "complete request with customized query with NextPageToken": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: serializedToken, + Query: "CloseStatus < 0 and CustomizedKeyword = 'keywordCustomized' AND CustomIntField<=10 and CustomStringField = 'String field is for text' And unregistered <= 100 aNd Order by DomainId Desc", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CloseStatus < 0 +AND CustomizedKeyword = 'keywordCustomized' +AND CustomIntField <= 10 +AND CustomStringField LIKE '%%String field is for text%%' +Order by DomainId Desc +LIMIT 11, 10 +`, testTableName), }, "complete request with order by query": { @@ -134,7 +216,12 @@ func TestGetListWorkflowExecutionQuery(t *testing.T) { NextPageToken: nil, Query: "Order by DomainId Desc", }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nOrder by DomainId Desc\nLIMIT 10\n", testTableName), + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +Order by DomainId Desc +LIMIT 0, 10 +`, testTableName), }, "complete request with filter query": { @@ -145,7 +232,12 @@ func TestGetListWorkflowExecutionQuery(t *testing.T) { NextPageToken: nil, Query: "CloseStatus < 0", }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND CloseStatus < 0\nLIMIT 10\n", testTableName), + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CloseStatus < 0 +LIMIT 0, 10 +`, testTableName), }, "complete request with empty query": { @@ -156,12 +248,20 @@ func TestGetListWorkflowExecutionQuery(t *testing.T) { NextPageToken: nil, Query: "", }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nLIMIT 10\n", testTableName), + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +LIMIT 0, 10 +`, testTableName), }, "empty request": { - input: &p.ListWorkflowExecutionsByQueryRequest{}, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = ''\nLIMIT 0\n", testTableName), + input: &p.ListWorkflowExecutionsByQueryRequest{}, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = '' +LIMIT 0, 0 +`, testTableName), }, "nil request": { @@ -193,8 +293,24 @@ func TestGetListWorkflowExecutionsQuery(t *testing.T) { closeResult := getListWorkflowExecutionsQuery(testTableName, request, true) openResult := getListWorkflowExecutionsQuery(testTableName, request, false) nilResult := getListWorkflowExecutionsQuery(testTableName, nil, true) - expectCloseResult := fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND CloseTime BETWEEN 1547596872371 AND 2547596872371\nAND CloseStatus >= 0\nOrder BY CloseTime DESC\n, RunID DESC\n", testTableName) - expectOpenResult := fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND CloseTime BETWEEN 1547596872371 AND 2547596872371\nAND CloseStatus < 0\nOrder BY CloseTime DESC\n, RunID DESC\n", testTableName) + expectCloseResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +AND CloseStatus >= 0 +Order BY CloseTime DESC +, RunID DESC +LIMIT 0, 10 +`, testTableName) + expectOpenResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +AND CloseStatus < 0 +Order BY CloseTime DESC +, RunID DESC +LIMIT 0, 10 +`, testTableName) expectNilResult := "" assert.Equal(t, closeResult, expectCloseResult) @@ -218,8 +334,24 @@ func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) { closeResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, true) openResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, false) nilResult := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true) - expectCloseResult := fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND WorkflowType = 'test-wf-type'\nAND CloseTime BETWEEN 1547596872371 AND 2547596872371\nAND CloseStatus >= 0\nOrder BY CloseTime DESC\n, RunID DESC\n", testTableName) - expectOpenResult := fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND WorkflowType = 'test-wf-type'\nAND CloseTime BETWEEN 1547596872371 AND 2547596872371\nAND CloseStatus < 0\nOrder BY CloseTime DESC\n, RunID DESC\n", testTableName) + expectCloseResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND WorkflowType = 'test-wf-type' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +AND CloseStatus >= 0 +Order BY CloseTime DESC +, RunID DESC +`, testTableName) + expectOpenResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND WorkflowType = 'test-wf-type' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +AND CloseStatus < 0 +Order BY CloseTime DESC +, RunID DESC +`, testTableName) expectNilResult := "" assert.Equal(t, closeResult, expectCloseResult) @@ -243,8 +375,24 @@ func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) { closeResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true) openResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false) nilResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true) - expectCloseResult := fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND WorkflowID = 'test-wid'\nAND CloseTime BETWEEN 1547596872371 AND 2547596872371\nAND CloseStatus >= 0\nOrder BY CloseTime DESC\n, RunID DESC\n", testTableName) - expectOpenResult := fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND WorkflowID = 'test-wid'\nAND CloseTime BETWEEN 1547596872371 AND 2547596872371\nAND CloseStatus < 0\nOrder BY CloseTime DESC\n, RunID DESC\n", testTableName) + expectCloseResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND WorkflowID = 'test-wid' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +AND CloseStatus >= 0 +Order BY CloseTime DESC +, RunID DESC +`, testTableName) + expectOpenResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND WorkflowID = 'test-wid' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +AND CloseStatus < 0 +Order BY CloseTime DESC +, RunID DESC +`, testTableName) expectNilResult := "" assert.Equal(t, closeResult, expectCloseResult) @@ -267,7 +415,14 @@ func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) { closeResult := getListWorkflowExecutionsByStatusQuery(testTableName, request) nilResult := getListWorkflowExecutionsByStatusQuery(testTableName, nil) - expectCloseResult := fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND CloseStatus = '0'\nAND CloseTime BETWEEN 1547596872371 AND 2547596872371\nOrder BY CloseTime DESC\n, RunID DESC\n", testTableName) + expectCloseResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CloseStatus = '0' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +Order BY CloseTime DESC +, RunID DESC +`, testTableName) expectNilResult := "" assert.Equal(t, expectCloseResult, closeResult) @@ -288,7 +443,12 @@ func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { RunID: "", }, }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND CloseStatus >= 0\nAND WorkflowID = 'test-wid'\n", testTableName), + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CloseStatus >= 0 +AND WorkflowID = 'test-wid' +`, testTableName), }, "complete request with runId": { @@ -300,12 +460,23 @@ func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { RunID: "runid", }, }, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'\nAND CloseStatus >= 0\nAND WorkflowID = 'test-wid'\nAND RunID = 'runid'\n", testTableName), + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND CloseStatus >= 0 +AND WorkflowID = 'test-wid' +AND RunID = 'runid' +`, testTableName), }, "empty request": { - input: &p.InternalGetClosedWorkflowExecutionRequest{}, - expectedOutput: fmt.Sprintf("SELECT *\nFROM %s\nWHERE DomainID = ''\nAND CloseStatus >= 0\nAND WorkflowID = ''\n", testTableName), + input: &p.InternalGetClosedWorkflowExecutionRequest{}, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = '' +AND CloseStatus >= 0 +AND WorkflowID = '' +`, testTableName), }, "nil request": { @@ -323,3 +494,129 @@ func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { }) } } + +func TestStringFormatting(t *testing.T) { + key := "CustomizedStringField" + val := "When query; select * from users_secret_table;" + + assert.Equal(t, `CustomizedStringField LIKE '%When query; select * from users_secret_table;%' +`, getPartialFormatString(key, val)) +} + +func TestParseLastElement(t *testing.T) { + tests := map[string]struct { + input string + expectedElement string + expectedOrderBy string + }{ + "Case1: only contains order by": { + input: "Order by TestInt DESC", + expectedElement: "", + expectedOrderBy: "Order by TestInt DESC", + }, + "Case2: only contains order by": { + input: "TestString = 'cannot be used in order by'", + expectedElement: "TestString = 'cannot be used in order by'", + expectedOrderBy: "", + }, + "Case3: not contains any order by": { + input: "TestInt = 1", + expectedElement: "TestInt = 1", + expectedOrderBy: "", + }, + "Case4-1: with order by in string & real order by": { + input: "TestString = 'cannot be used in order by' Order by TestInt DESC", + expectedElement: "TestString = 'cannot be used in order by'", + expectedOrderBy: "Order by TestInt DESC", + }, + "Case4-2: with non-string attribute & real order by": { + input: "TestDouble = 1.0 Order by TestInt DESC", + expectedElement: "TestDouble = 1.0", + expectedOrderBy: "Order by TestInt DESC", + }, + "Case5: with random case order by": { + input: "TestString = 'cannot be used in OrDer by' ORdeR by TestInt DESC", + expectedElement: "TestString = 'cannot be used in OrDer by'", + expectedOrderBy: "ORdeR by TestInt DESC", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.NotPanics(t, func() { + element, orderBy := parseLastElement(test.input) + assert.Equal(t, test.expectedElement, element) + assert.Equal(t, test.expectedOrderBy, orderBy) + }) + }) + } +} + +func TestSplitElement(t *testing.T) { + tests := map[string]struct { + input string + expectedKey string + expectedVal string + expectedOp string + }{ + "Case1-1: A=B": { + input: "CustomizedTestField=Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "=", + }, + "Case1-2: A=\"B\"": { + input: "CustomizedTestField=\"Test\"", + expectedKey: "CustomizedTestField", + expectedVal: "\"Test\"", + expectedOp: "=", + }, + "Case1-3: A='B'": { + input: "CustomizedTestField='Test'", + expectedKey: "CustomizedTestField", + expectedVal: "'Test'", + expectedOp: "=", + }, + "Case2: A<=B": { + input: "CustomizedTestField<=Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "<=", + }, + "Case3: A>=B": { + input: "CustomizedTestField>=Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: ">=", + }, + "Case4: A = B": { + input: "CustomizedTestField = Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "=", + }, + "Case5: A <= B": { + input: "CustomizedTestField <= Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "<=", + }, + "Case6: A >= B": { + input: "CustomizedTestField >= Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: ">=", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.NotPanics(t, func() { + key, val, op := splitElement(test.input) + assert.Equal(t, test.expectedKey, key) + assert.Equal(t, test.expectedVal, val) + assert.Equal(t, test.expectedOp, op) + }) + }) + } +} diff --git a/common/pinot/interfaces.go b/common/pinot/interfaces.go index 945220147cc..b91d85ab203 100644 --- a/common/pinot/interfaces.go +++ b/common/pinot/interfaces.go @@ -49,6 +49,7 @@ type ( IsOpen bool Filter IsRecordValidFilter MaxResultWindow int + ListRequest *p.InternalListWorkflowExecutionsRequest } // GenericMatch is a match struct diff --git a/common/pinot/page_token.go b/common/pinot/page_token.go new file mode 100644 index 00000000000..06eab5060c5 --- /dev/null +++ b/common/pinot/page_token.go @@ -0,0 +1,76 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pinot + +import ( + "bytes" + "encoding/json" + "fmt" + + "github.com/uber/cadence/common/types" +) + +type ( + // PinotVisibilityPageToken holds the paging token for Pinot + PinotVisibilityPageToken struct { + From int + } +) + +// DeserializePageToken return the structural token +func DeserializePageToken(data []byte) (*PinotVisibilityPageToken, error) { + var token PinotVisibilityPageToken + dec := json.NewDecoder(bytes.NewReader(data)) + dec.UseNumber() + err := dec.Decode(&token) + if err != nil { + return nil, &types.BadRequestError{ + Message: fmt.Sprintf("unable to deserialize page token. err: %v", err), + } + } + return &token, nil +} + +// SerializePageToken return the token blob +func SerializePageToken(token *PinotVisibilityPageToken) ([]byte, error) { + data, err := json.Marshal(token) + if err != nil { + return nil, &types.BadRequestError{ + Message: fmt.Sprintf("unable to serialize page token. err: %v", err), + } + } + return data, nil +} + +// GetNextPageToken returns the structural token with nil handling +func GetNextPageToken(token []byte) (*PinotVisibilityPageToken, error) { + var result *PinotVisibilityPageToken + var err error + if len(token) > 0 { + result, err = DeserializePageToken(token) + if err != nil { + return nil, err + } + } else { + result = &PinotVisibilityPageToken{} + } + return result, nil +} diff --git a/common/pinot/pinotClient.go b/common/pinot/pinotClient.go index 2eda1e87910..c0b588f3cd9 100644 --- a/common/pinot/pinotClient.go +++ b/common/pinot/pinotClient.go @@ -25,6 +25,7 @@ package pinot import ( "encoding/json" "fmt" + "reflect" "time" "github.com/startreedata/pinot-client-go/pinot" @@ -60,18 +61,33 @@ func (c *PinotClient) Search(request *SearchRequest) (*SearchResponse, error) { } } - return c.getInternalListWorkflowExecutionsResponse(resp, request.Filter) + token, err := GetNextPageToken(request.ListRequest.NextPageToken) + + if err != nil { + return nil, &types.InternalServiceError{ + Message: fmt.Sprintf("Get NextPage token failed, %v", err), + } + } + + return c.getInternalListWorkflowExecutionsResponse(resp, request.Filter, token, request.ListRequest.PageSize, request.MaxResultWindow) } func (c *PinotClient) CountByQuery(query string) (int64, error) { resp, err := c.client.ExecuteSQL(c.tableName, query) if err != nil { return 0, &types.InternalServiceError{ - Message: fmt.Sprintf("ListClosedWorkflowExecutions failed, %v", err), + Message: fmt.Sprintf("CountWorkflowExecutions ExecuteSQL failed, %v", err), } } - return int64(resp.ResultTable.GetRowCount()), nil + count, err := resp.ResultTable.Rows[0][0].(json.Number).Int64() + if err == nil { + return count, nil + } + + return -1, &types.InternalServiceError{ + Message: fmt.Sprintf("can't convert result to integer!, query = %s, query result = %v, err = %v", query, resp.ResultTable.Rows[0][0], err), + } } func (c *PinotClient) GetTableName() string { @@ -84,7 +100,11 @@ func buildMap(hit []interface{}, columnNames []string) map[string]interface{} { resMap := make(map[string]interface{}) for i := 0; i < len(columnNames); i++ { - resMap[columnNames[i]] = hit[i] + key := columnNames[i] + // checks if it is system key, if yes, put it into the map + if ok, _ := isSystemKey(key); ok { + resMap[key] = hit[i] + } } return resMap @@ -172,12 +192,14 @@ func (c *PinotClient) convertSearchResultToVisibilityRecord(hit []interface{}, c func (c *PinotClient) getInternalListWorkflowExecutionsResponse( resp *pinot.BrokerResponse, isRecordValid func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool, + token *PinotVisibilityPageToken, + pageSize int, + maxResultWindow int, ) (*p.InternalListWorkflowExecutionsResponse, error) { - if resp == nil { - return nil, nil - } - response := &p.InternalListWorkflowExecutionsResponse{} + if resp == nil || resp.ResultTable == nil || resp.ResultTable.GetRowCount() == 0 { + return response, nil + } schema := resp.ResultTable.DataSchema // get the schema to map results //columnDataTypes := schema.ColumnDataTypes columnNames := schema.ColumnNames @@ -194,29 +216,23 @@ func (c *PinotClient) getInternalListWorkflowExecutionsResponse( } } - //if numOfActualHits == pageSize { // this means the response is not the last page - // var nextPageToken []byte - // var err error - // - // // ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold - // // to retrieve deeper pages, use ES SearchAfter - // if searchHits.TotalHits <= int64(maxResultWindow-pageSize) { // use ES Search From+Size - // nextPageToken, err = SerializePageToken(&ElasticVisibilityPageToken{From: token.From + numOfActualHits}) - // } else { // use ES Search After - // var sortVal interface{} - // sortVals := actualHits[len(response.Executions)-1].Sort - // sortVal = sortVals[0] - // tieBreaker := sortVals[1].(string) - // - // nextPageToken, err = SerializePageToken(&ElasticVisibilityPageToken{SortValue: sortVal, TieBreaker: tieBreaker}) - // } - // if err != nil { - // return nil, err - // } - // - // response.NextPageToken = make([]byte, len(nextPageToken)) - // copy(response.NextPageToken, nextPageToken) - //} + if numOfActualHits == pageSize { // this means the response is not the last page + var nextPageToken []byte + var err error + + // ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold + // TODO: need to confirm if pinot has similar settings + // don't need to retrieve deeper pages in pinot, and no functions like ES SearchAfter + if resp.NumDocsScanned <= int64(maxResultWindow-pageSize) { // use pinot Search From+Size + nextPageToken, err = SerializePageToken(&PinotVisibilityPageToken{From: token.From + numOfActualHits}) + } + if err != nil { + return nil, err + } + + response.NextPageToken = make([]byte, len(nextPageToken)) + copy(response.NextPageToken, nextPageToken) + } return response, nil } @@ -237,3 +253,17 @@ func (c *PinotClient) getInternalGetClosedWorkflowExecutionResponse(resp *pinot. return response, nil } + +// checks if a string is system key +func isSystemKey(key string) (bool, string) { + msg := VisibilityRecord{} + values := reflect.ValueOf(msg) + typesOf := values.Type() + for i := 0; i < values.NumField(); i++ { + fieldName := typesOf.Field(i).Name + if fieldName == key { + return true, typesOf.Field(i).Type.String() + } + } + return false, "nil" +} diff --git a/common/pinot/pinotClient_test.go b/common/pinot/pinotClient_test.go index 8af166d9c6b..28fe0754a72 100644 --- a/common/pinot/pinotClient_test.go +++ b/common/pinot/pinotClient_test.go @@ -23,6 +23,7 @@ package pinot import ( + "fmt" "testing" "time" @@ -87,6 +88,9 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) { columnName := []string{"WorkflowID", "RunID", "WorkflowType", "DomainID", "StartTime", "ExecutionTime", "CloseTime", "CloseStatus", "HistoryLength", "Encoding", "TaskList", "IsCron", "NumClusters", "UpdateTime", "Attr"} hit1 := []interface{}{"wfid1", "rid1", "wftype1", "domainid1", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode1", "tsklst1", true, 1, testEarliestTime, "null"} hit2 := []interface{}{"wfid2", "rid2", "wftype2", "domainid2", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode2", "tsklst2", false, 1, testEarliestTime, "null"} + hit3 := []interface{}{"wfid3", "rid3", "wftype3", "domainid3", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode3", "tsklst3", false, 1, testEarliestTime, "null"} + hit4 := []interface{}{"wfid4", "rid4", "wftype4", "domainid4", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode4", "tsklst4", false, 1, testEarliestTime, "null"} + hit5 := []interface{}{"wfid5", "rid5", "wftype5", "domainid5", testEarliestTime, testEarliestTime, testLatestTime, 1, 1, "encode5", "tsklst5", false, 1, testEarliestTime, "null"} brokerResponse := &pinot.BrokerResponse{ AggregationResults: nil, @@ -99,6 +103,9 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) { Rows: [][]interface{}{ hit1, hit2, + hit3, + hit4, + hit5, }, }, Exceptions: nil, @@ -109,7 +116,7 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) { NumSegmentsProcessed: 1, NumSegmentsMatched: 1, NumConsumingSegmentsQueried: 1, - NumDocsScanned: 1, + NumDocsScanned: 10, NumEntriesScannedInFilter: 1, NumEntriesScannedPostFilter: 1, NumGroupsLimitReached: false, @@ -118,8 +125,12 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) { MinConsumingFreshnessTimeMs: 1, } + token := &PinotVisibilityPageToken{ + From: 0, + } + // Cannot use a table test, because they are not checking the same fields - result, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, nil) + result, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, nil, token, 5, 33) assert.Equal(t, "wfid1", result.Executions[0].WorkflowID) assert.Equal(t, "rid1", result.Executions[0].RunID) @@ -151,17 +162,24 @@ func TestGetInternalListWorkflowExecutionsResponse(t *testing.T) { assert.Nil(t, err) + responseToken := result.NextPageToken + unmarshalResponseToken, err := GetNextPageToken(responseToken) + if err != nil { + panic(fmt.Sprintf("Unmarshal error in PinotClient test %s", err)) + } + assert.Equal(t, 5, unmarshalResponseToken.From) + // check if record is not valid isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return false } - emptyResult, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, isRecordValid) + emptyResult, err := client.getInternalListWorkflowExecutionsResponse(brokerResponse, isRecordValid, nil, 10, 33) assert.Equal(t, 0, len(emptyResult.Executions)) assert.Nil(t, err) // check nil input - nilResult, err := client.getInternalListWorkflowExecutionsResponse(nil, isRecordValid) - assert.Nil(t, nilResult) + nilResult, err := client.getInternalListWorkflowExecutionsResponse(nil, isRecordValid, nil, 10, 33) + assert.Equal(t, &p.InternalListWorkflowExecutionsResponse{}, nilResult) assert.Nil(t, err) } diff --git a/schema/Pinot/cadence-visibility-config.json b/schema/Pinot/cadence-visibility-config.json index 3bf96b8df6f..c1b505f5074 100644 --- a/schema/Pinot/cadence-visibility-config.json +++ b/schema/Pinot/cadence-visibility-config.json @@ -42,4 +42,5 @@ "indexTypes":["TEXT"] } ] -} \ No newline at end of file +} + diff --git a/schema/Pinot/cadence-visibility-schema.json b/schema/Pinot/cadence-visibility-schema.json index 26af6f3f2ef..7e91e36796a 100644 --- a/schema/Pinot/cadence-visibility-schema.json +++ b/schema/Pinot/cadence-visibility-schema.json @@ -47,8 +47,76 @@ "dataType": "INT" }, { - "name": "Attr", + "name": "CustomStringField", "dataType": "STRING" + }, + { + "name": "CustomKeywordField", + "dataType": "STRING" + }, + { + "name": "CustomIntField", + "dataType": "INT" + }, + { + "name": "CustomDoubleField", + "dataType": "DOUBLE" + }, + { + "name": "CustomBoolField", + "dataType": "BOOLEAN" + }, + { + "name": "CustomDatetimeField", + "dataType": "LONG" + }, + { + "name": "project", + "dataType": "STRING" + }, + { + "name": "service", + "dataType": "STRING" + }, + { + "name": "environment", + "dataType": "STRING" + }, + { + "name": "addon", + "dataType": "STRING" + }, + { + "name": "addon-type", + "dataType": "STRING" + }, + { + "name": "user", + "dataType": "STRING" + }, + { + "name": "CustomDomain", + "dataType": "STRING" + }, + { + "name": "Operator", + "dataType": "STRING" + }, + { + "name": "RolloutID", + "dataType": "STRING" + }, + { + "name": "CadenceChangeVersion", + "dataType": "STRING" + }, + { + "name": "BinaryChecksums", + "dataType": "STRING" + }, + { + "name": "Passed", + "dataType": "BOOLEAN" } ], "dateTimeFieldSpecs": [{