Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raw_data query support to backend #203

Merged
merged 23 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/opensearch/client/search_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func (b *SearchRequestBuilder) AddDocValueField(field string) *SearchRequestBuil
return b
}

// AddTimeFieldWithStandardizedFormat adds timeField as field with standardized time format to not receive
// invalid formats that Elasticsearch/OpenSearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss)
// https://opensearch.org/docs/latest/api-reference/search/#request-body
// https://opensearch.org/docs/latest/field-types/supported-field-types/date/#full-date-formats
func (b *SearchRequestBuilder) AddTimeFieldWithStandardizedFormat(timeField string) {
fridgepoet marked this conversation as resolved.
Show resolved Hide resolved
b.customProps["fields"] = []map[string]string{{"field": timeField, "format": "strict_date_optional_time_nanos"}}
}

// Query creates and return a query builder
func (b *SearchRequestBuilder) Query() *QueryBuilder {
if b.queryBuilder == nil {
Expand Down
36 changes: 26 additions & 10 deletions pkg/opensearch/lucene_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opensearch

import (
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -50,16 +51,33 @@ func (h *luceneHandler) processQuery(q *Query) error {
}

if len(q.BucketAggs) == 0 {
if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" {
return nil
// If no aggregations, only document and logs queries are valid
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType) {
return fmt.Errorf("invalid query, missing metrics and aggregations")
}
metric := q.Metrics[0]
b.Size(metric.Settings.Get("size").MustInt(500))
b.SortDesc("@timestamp", "boolean")
b.AddDocValueField("@timestamp")
return nil
}

switch {
case q.Metrics[0].Type == rawDataType:
processRawDataQuery(q, b, h.client.GetTimeField())
default:
processTimeSeriesQuery(q, b, fromMs, toMs)
}

return nil
}

func processRawDataQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) {
metric := q.Metrics[0]
b.SortDesc(defaultTimeField, "boolean")
b.SortDesc("_doc", "")
b.AddTimeFieldWithStandardizedFormat(defaultTimeField)
b.Size(metric.Settings.Get("size").MustInt(500))
}

func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, fromMs int64, toMs int64) {
metric := q.Metrics[0]
b.Size(metric.Settings.Get("size").MustInt(500))
aggBuilder := b.Agg()

// iterate backwards to create aggregations bottom-down
Expand Down Expand Up @@ -143,8 +161,6 @@ func (h *luceneHandler) processQuery(q *Query) error {
})
}
}

return nil
}

func getPipelineAggField(m *MetricAgg) string {
Expand Down Expand Up @@ -177,7 +193,7 @@ func (h *luceneHandler) executeQueries() (*backend.QueryDataResponse, error) {
}

rp := newResponseParser(res.Responses, h.queries, res.DebugInfo)
return rp.getTimeSeries()
return rp.getTimeSeries(h.client.GetTimeField())
}

func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64) es.AggBuilder {
Expand Down
215 changes: 208 additions & 7 deletions pkg/opensearch/response_parser.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opensearch

import (
"encoding/json"
"errors"
"regexp"
"sort"
Expand All @@ -26,6 +27,8 @@ const (
filtersType = "filters"
termsType = "terms"
geohashGridType = "geohash_grid"
rawDataType = "raw_data"
rawDocumentType = "raw_document"
)

type responseParser struct {
Expand All @@ -42,7 +45,7 @@ var newResponseParser = func(responses []*es.SearchResponse, targets []*Query, d
}
}

func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
func (rp *responseParser) getTimeSeries(timeField string) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()

if rp.Responses == nil {
Expand Down Expand Up @@ -74,19 +77,217 @@ func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
queryRes := backend.DataResponse{
Frames: data.Frames{},
}
props := make(map[string]string)
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
if err != nil {
return nil, err

switch {
case target.Metrics[0].Type == rawDataType:
queryRes = processRawDataResponse(res, timeField, queryRes)
default:
props := make(map[string]string)
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
if err != nil {
return nil, err
}
rp.nameFields(&queryRes.Frames, target)
rp.trimDatapoints(&queryRes.Frames, target)
}
rp.nameFields(&queryRes.Frames, target)
rp.trimDatapoints(&queryRes.Frames, target)

result.Responses[target.RefID] = queryRes
}
return result, nil
}

func processRawDataResponse(res *es.SearchResponse, timeField string, queryRes backend.DataResponse) backend.DataResponse {
propNames := make(map[string]bool)
docs := make([]map[string]interface{}, len(res.Hits.Hits))

for hitIdx, hit := range res.Hits.Hits {
var flattenedSource map[string]interface{}
if hit["_source"] != nil {
// On frontend maxDepth wasn't used but as we are processing on backend
// let's put a limit to avoid infinite loop. 10 was chosen arbitrarily.
flattenedSource = flatten(hit["_source"].(map[string]interface{}), 10)
fridgepoet marked this conversation as resolved.
Show resolved Hide resolved
}

flattenedSource["_id"] = hit["_id"]
flattenedSource["_type"] = hit["_type"]
flattenedSource["_index"] = hit["_index"]
if timestamp, ok := getTimestamp(hit, flattenedSource, timeField); ok {
flattenedSource[timeField] = timestamp
}

for key := range flattenedSource {
propNames[key] = true
}

docs[hitIdx] = flattenedSource
}
fields := processDocsToDataFrameFields(docs, propNames)

frames := data.Frames{}
frame := data.NewFrame("", fields...)
frames = append(frames, frame)

queryRes.Frames = frames
return queryRes
}

func getTimestamp(hit, source map[string]interface{}, timeField string) (*time.Time, bool) {
// "fields" is requested in the query with a specific format in AddTimeFieldWithStandardizedFormat
timeString, ok := lookForTimeFieldInFields(hit, timeField)
if !ok {
// When "fields" is absent, then getTimestamp tries to find a timestamp in _source
timeString, ok = lookForTimeFieldInSource(source, timeField)
if !ok {
// When both "fields" and "_source" timestamps are not present in the expected JSON structure, nil time.Time is returned
return nil, false
}
}

timeValue, err := time.Parse(time.RFC3339Nano, timeString)
if err != nil {
// For an invalid format, nil time.Time is returned
return nil, false
}

return &timeValue, true
}

func lookForTimeFieldInFields(hit map[string]interface{}, timeField string) (string, bool) {
// "fields" should be present with an array of timestamps
if hit["fields"] != nil {
if fieldsMap, ok := hit["fields"].(map[string]interface{}); ok {
timesArray, ok := fieldsMap[timeField].([]interface{})
if !ok {
return "", false
}
if len(timesArray) == 1 {
if timeString, ok := timesArray[0].(string); ok {
return timeString, true
}
}
}
}
return "", false
}

func lookForTimeFieldInSource(source map[string]interface{}, timeField string) (string, bool) {
if source[timeField] != nil {
if timeString, ok := source[timeField].(string); ok {
return timeString, true
}
}

return "", false
}

func flatten(target map[string]interface{}, maxDepth int) map[string]interface{} {
// On frontend maxDepth wasn't used but as we are processing on backend
// let's put a limit to avoid infinite loop. 10 was chosen arbitrary.
output := make(map[string]interface{})
fridgepoet marked this conversation as resolved.
Show resolved Hide resolved
step(0, maxDepth, target, "", output)
return output
}

func step(currentDepth, maxDepth int, target map[string]interface{}, prev string, output map[string]interface{}) {
nextDepth := currentDepth + 1
for key, value := range target {
newKey := strings.Trim(prev+"."+key, ".")

v, ok := value.(map[string]interface{})
if ok && len(v) > 0 && currentDepth < maxDepth {
step(nextDepth, maxDepth, v, newKey, output)
} else {
output[newKey] = value
}
}
}

func processDocsToDataFrameFields(docs []map[string]interface{}, propNames map[string]bool) []*data.Field {
allFields := make([]*data.Field, 0, len(propNames))
var timeDataField *data.Field
for propName := range propNames {
propNameValue := findTheFirstNonNilDocValueForPropName(docs, propName)
switch propNameValue.(type) {
// We are checking for default data types values (float64, int, bool, string)
// and default to json.RawMessage if we cannot find any of them
case *time.Time:
timeDataField = createTimeField(docs, propName)
case float64:
allFields = append(allFields, createFieldOfType[float64](docs, propName))
case int:
allFields = append(allFields, createFieldOfType[int](docs, propName))
case string:
allFields = append(allFields, createFieldOfType[string](docs, propName))
case bool:
allFields = append(allFields, createFieldOfType[bool](docs, propName))
default:
fieldVector := make([]*json.RawMessage, len(docs))
for i, doc := range docs {
bytes, err := json.Marshal(doc[propName])
if err != nil {
// We skip values that cannot be marshalled
continue
}
value := json.RawMessage(bytes)
fieldVector[i] = &value
}
field := data.NewField(propName, nil, fieldVector)
isFilterable := true
field.Config = &data.FieldConfig{Filterable: &isFilterable}
allFields = append(allFields, field)
}
}

sort.Slice(allFields, func(i, j int) bool {
return allFields[i].Name < allFields[j].Name
})

if timeDataField != nil {
allFields = append([]*data.Field{timeDataField}, allFields...)
}

return allFields
}

func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propName string) interface{} {
for _, doc := range docs {
if doc[propName] != nil {
return doc[propName]
}
}
return docs[0][propName]
}

func createTimeField(docs []map[string]interface{}, timeField string) *data.Field {
isFilterable := true
fieldVector := make([]*time.Time, len(docs))
for i, doc := range docs {
value, ok := doc[timeField].(*time.Time) // cannot use generic function below because the type is already a pointer
if !ok {
continue
}
fieldVector[i] = value
}
field := data.NewField(timeField, nil, fieldVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}
return field
}

func createFieldOfType[T int | float64 | bool | string](docs []map[string]interface{}, propName string) *data.Field {
isFilterable := true
fieldVector := make([]*T, len(docs))
for i, doc := range docs {
value, ok := doc[propName].(T)
if !ok {
continue
}
fieldVector[i] = &value
}
field := data.NewField(propName, nil, fieldVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}
return field
}

func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, queryResult *backend.DataResponse, props map[string]string, depth int) error {
var err error
maxDepth := len(target.BucketAggs) - 1
Expand Down
Loading