Skip to content

Commit

Permalink
Add raw_document query support to backend (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
fridgepoet authored Jul 17, 2023
1 parent 49d045c commit cf5462c
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 52 deletions.
8 changes: 4 additions & 4 deletions pkg/opensearch/lucene_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func (h *luceneHandler) processQuery(q *Query) error {
}
}

switch {
case q.Metrics[0].Type == rawDataType:
processRawDataQuery(q, b, defaultTimeField)
switch q.Metrics[0].Type {
case rawDocumentType, rawDataType:
processDocumentQuery(q, b, h.client.GetTimeField())
default:
processTimeSeriesQuery(q, b, fromMs, toMs, defaultTimeField)
}
Expand All @@ -70,7 +70,7 @@ func (h *luceneHandler) processQuery(q *Query) error {

const defaultSize = 500

func processRawDataQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) {
func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) {
metric := q.Metrics[0]
order := metric.Settings.Get("order").MustString()
b.Sort(order, defaultTimeField, "boolean")
Expand Down
56 changes: 54 additions & 2 deletions pkg/opensearch/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ func (rp *responseParser) getTimeSeries(timeField string) (*backend.QueryDataRes
Frames: data.Frames{},
}

switch {
case target.Metrics[0].Type == rawDataType:
switch target.Metrics[0].Type {
case rawDataType:
queryRes = processRawDataResponse(res, timeField, queryRes)
case rawDocumentType:
queryRes = processRawDocumentResponse(res, timeField, target.RefID, queryRes)
default:
props := make(map[string]string)
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
Expand Down Expand Up @@ -131,6 +133,56 @@ func processRawDataResponse(res *es.SearchResponse, timeField string, queryRes b
return queryRes
}

func processRawDocumentResponse(res *es.SearchResponse, timeField, refID string, queryRes backend.DataResponse) backend.DataResponse {
documents := make([]map[string]interface{}, len(res.Hits.Hits))
for hitIdx, hit := range res.Hits.Hits {
doc := map[string]interface{}{
"_id": hit["_id"],
"_type": hit["_type"],
"_index": hit["_index"],
}

if hit["_source"] != nil {
source, ok := hit["_source"].(map[string]interface{})
if ok {
for k, v := range source {
doc[k] = v
}
}
}

if hit["fields"] != nil {
source, ok := hit["fields"].(map[string]interface{})
if ok {
for k, v := range source {
doc[k] = v
}
}
}

documents[hitIdx] = doc
}

fieldVector := make([]*json.RawMessage, len(res.Hits.Hits))
for i, doc := range documents {
bytes, err := json.Marshal(doc)
if err != nil {
// We skip docs that can't be marshalled
// should not happen
continue
}
value := json.RawMessage(bytes)
fieldVector[i] = &value
}

isFilterable := true
field := data.NewField(refID, nil, fieldVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}

queryRes.Frames = data.Frames{data.NewFrame(refID, field)}
return queryRes
}

func getTimestamp(hit, source map[string]interface{}, timeField string) (*time.Time, bool) {
// "fields" is requested in the query with a specific format in AddTimeFieldWithStandardizedFormat
timeString, ok := lookForTimeFieldInFields(hit, timeField)
Expand Down
278 changes: 278 additions & 0 deletions pkg/opensearch/response_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1694,3 +1694,281 @@ func Test_flatten(t *testing.T) {
assert.True(t, ok)
})
}

func TestProcessRawDocumentResponse(t *testing.T) {
t.Run("Simple raw document query", func(t *testing.T) {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"refId": "A",
"metrics": [{ "type": "raw_document", "id": "1" }],
"bucketAggs": []
}`,
}

response := `
{
"responses": [
{
"hits": {
"total": 100,
"hits": [
{
"_id": "1",
"_type": "type",
"_index": "index",
"_source": { "sourceProp": "asd" },
"fields": { "fieldProp": "field" }
},
{
"_source": { "sourceProp": "asd2" },
"fields": { "fieldProp": "field2" }
}
]
}
}
]
}`

rp, err := newResponseParserForTest(targets, response)
assert.Nil(t, err)
result, err := rp.getTimeSeries("@timestamp")
require.NoError(t, err)
require.Len(t, result.Responses, 1)

queryRes := result.Responses["A"]
require.NotNil(t, queryRes)
dataframes := queryRes.Frames
require.Len(t, dataframes, 1)
require.Len(t, dataframes[0].Fields, 1)
require.Equal(t, data.FieldTypeNullableJSON, dataframes[0].Fields[0].Type())
require.Equal(t, 2, dataframes[0].Fields[0].Len())

doc1 := dataframes[0].Fields[0].At(0).(*json.RawMessage)
assert.JSONEq(t, `{"_id":"1","_index":"index","_type":"type","fieldProp":"field","sourceProp":"asd"}`, string(*doc1))
doc2 := dataframes[0].Fields[0].At(1).(*json.RawMessage)
assert.JSONEq(t, `{"_id":null,"_index":null,"_type":null,"fieldProp":"field2","sourceProp":"asd2"}`, string(*doc2))
})

t.Run("More complex raw document query", func(t *testing.T) {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "raw_document" }]
}`,
}

// cSpell:disable
response := `{
"responses":[
{
"hits":{
"total":{
"value":109,
"relation":"eq"
},
"max_score":null,
"hits":[
{
"_index":"logs-2023.02.08",
"_id":"GB2UMYYBfCQ-FCMjayJa",
"_score":null,
"fields":{
"test_field":"A",
"@timestamp":[
"2023-02-08T15:10:55.830Z"
]
},
"_source":{
"line":"log text [479231733]",
"counter":"109",
"float":58.253758485091,
"label":"val1",
"level":"info",
"location":"17.089705232090438, 41.62861966340297",
"nested":{
"field":{
"double_nested":"value"
}
}
}
},
{
"_index":"logs-2023.02.08",
"_id":"Fx2UMYYBfCQ-FCMjZyJ_",
"_score":null,
"fields":{
"test_field":"A"
},
"_source":{
"@timestamp":"2023-02-08T15:10:54.835Z",
"line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]",
"counter":"108",
"float":54.5977098233944,
"label":"val1",
"level":"info",
"location":"19.766305918490463, 40.42639175509792",
"nested":{
"field":{
"double_nested":"value1"
}
}
}
}
]
},
"status":200
}
]
}`
// cSpell:enable

rp, err := newResponseParserForTest(targets, response)
assert.Nil(t, err)
result, err := rp.getTimeSeries("@timestamp")
require.NoError(t, err)
require.Len(t, result.Responses, 1)

queryRes := result.Responses["A"]
require.NotNil(t, queryRes)
dataframes := queryRes.Frames
require.Len(t, dataframes, 1)
require.Len(t, dataframes[0].Fields, 1)
require.Equal(t, data.FieldTypeNullableJSON, dataframes[0].Fields[0].Type())
require.Equal(t, 2, dataframes[0].Fields[0].Len())

// cSpell:disable
doc1 := dataframes[0].Fields[0].At(0).(*json.RawMessage)
assert.JSONEq(t, `{
"@timestamp":["2023-02-08T15:10:55.830Z"],
"_id":"GB2UMYYBfCQ-FCMjayJa",
"_index":"logs-2023.02.08",
"_type":null,
"counter":"109",
"float":58.253758485091,
"label":"val1",
"level":"info",
"line":"log text [479231733]",
"location":"17.089705232090438, 41.62861966340297",
"nested":{
"field":{
"double_nested":"value"
}
},
"test_field":"A"
}`, string(*doc1))
doc2 := dataframes[0].Fields[0].At(1).(*json.RawMessage)
assert.JSONEq(t, `{
"@timestamp":"2023-02-08T15:10:54.835Z",
"_id":"Fx2UMYYBfCQ-FCMjZyJ_",
"_index":"logs-2023.02.08",
"_type":null,
"counter":"108",
"float":54.5977098233944,
"label":"val1",
"level":"info",
"line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]",
"location":"19.766305918490463, 40.42639175509792",
"nested":{
"field":{
"double_nested":"value1"
}
},
"test_field":"A"
}`, string(*doc2))
})
// cSpell:enable

t.Run("doc returns timeField preferentially from fields", func(t *testing.T) {
// documents that the timefield is taken from `fields` preferentially because we want to ensure it is the format requested in AddTimeFieldWithStandardizedFormat
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "raw_document", "id": "1" }]
}`,
}

response := `
{
"responses":[
{
"hits":{
"hits":[
{
"_source":{
"@timestamp":"1999-01-01T12:12:12.111Z"
},
"fields":{
"@timestamp":[
"2023-02-08T15:10:55.830Z"
]
}
}
]
}
}
]
}`

rp, err := newResponseParserForTest(targets, response)
assert.Nil(t, err)
result, err := rp.getTimeSeries("@timestamp")
require.NoError(t, err)
require.Len(t, result.Responses, 1)

queryRes := result.Responses["A"]
require.NotNil(t, queryRes)
dataframes := queryRes.Frames
require.Len(t, dataframes, 1)
require.Len(t, dataframes[0].Fields, 1)
require.Equal(t, data.FieldTypeNullableJSON, dataframes[0].Fields[0].Type())
require.Equal(t, 1, dataframes[0].Fields[0].Len())

doc1 := dataframes[0].Fields[0].At(0).(*json.RawMessage)
assert.JSONEq(t, `{"_id":null,"_index":null,"_type":null,"@timestamp":["2023-02-08T15:10:55.830Z"]}`, string(*doc1))
})

t.Run("doc returns timeField from _source if fields does not have timeField", func(t *testing.T) {
// documents that timeField that in _source will be returned
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "raw_document", "id": "1" }]
}`,
}

response := `
{
"responses":[
{
"hits":{
"hits":[
{
"_source":{
"@timestamp":"1999-01-01T12:12:12.111Z"
}
}
]
}
}
]
}`

rp, err := newResponseParserForTest(targets, response)
assert.Nil(t, err)
result, err := rp.getTimeSeries("@timestamp")
require.NoError(t, err)
require.Len(t, result.Responses, 1)

queryRes := result.Responses["A"]
require.NotNil(t, queryRes)
dataframes := queryRes.Frames
require.Len(t, dataframes, 1)
require.Len(t, dataframes[0].Fields, 1)
require.Equal(t, data.FieldTypeNullableJSON, dataframes[0].Fields[0].Type())
require.Equal(t, 1, dataframes[0].Fields[0].Len())

doc1 := dataframes[0].Fields[0].At(0).(*json.RawMessage)
assert.JSONEq(t, `{"_id":null,"_index":null,"_type":null,"@timestamp":"1999-01-01T12:12:12.111Z"}`, string(*doc1))
})
}
Loading

0 comments on commit cf5462c

Please sign in to comment.