Skip to content

Commit

Permalink
Service Map: Build Stats request and process responses to dataframes (#…
Browse files Browse the repository at this point in the history
…366)

---------

Co-authored-by: Nathan Verzemnieks <njvrzm@gmail.com>
Co-authored-by: Isabella Siu <isabella.siu@grafana.com>
  • Loading branch information
3 people authored May 9, 2024
1 parent 134a667 commit 20cd334
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 65 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ e2e-results/
.idea
.vscode/launch.json

.DS_Store
.DS_Store
__debug_bin*
5 changes: 4 additions & 1 deletion cspell.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@
"loru",
"nosql",
"Equalf",
"unmarshaled"
"unmarshaled",
"Throughputs",
"mainstat",
"secondarystat",
]
}
6 changes: 3 additions & 3 deletions pkg/opensearch/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,14 @@ func (a *AggContainer) MarshalJSON() ([]byte, error) {
return json.Marshal(root)
}

type aggDef struct {
type aggDefinition struct {
key string
aggregation *AggContainer
builders []AggBuilder
}

func newAggDef(key string, aggregation *AggContainer) *aggDef {
return &aggDef{
func newAggDefinition(key string, aggregation *AggContainer) *aggDefinition {
return &aggDefinition{
key: key,
aggregation: aggregation,
builders: make([]AggBuilder, 0),
Expand Down
130 changes: 113 additions & 17 deletions pkg/opensearch/client/search_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"github.com/grafana/opensearch-datasource/pkg/tsdb"
)

// nodeGraphSize is used for setting node graph query sizes. Arbitrarily chosen.
const nodeGraphSize = 1000

// SearchRequestBuilder represents a builder which can build a search request
type SearchRequestBuilder struct {
flavor Flavor
Expand Down Expand Up @@ -142,6 +145,71 @@ func NewMultiSearchRequestBuilder(flavor Flavor, version *semver.Version) *Multi
}
}

type StatsParameters struct {
ServiceNames []string
Operations []string
}

// SetStatsFilters sets the filters for the stats query
// We filter on spans that:
// - Match the given list of services, and either
// - Have a parent span and match the given operations, or
// - Have no parent span
func (b *SearchRequestBuilder) SetStatsFilters(to, from int64, traceId string, parameters StatsParameters) {
fqb := FilterQueryBuilder{}
fqb.AddTermsFilter("serviceName", parameters.ServiceNames)
if traceId != "" {
fqb.AddTermsFilter("traceId", []string{traceId})
}

parentFilter := TermsFilter{
Key: "parentSpanId",
Values: []string{""},
}
fqb.AddFilterQuery(Query{
&BoolQuery{
ShouldFilters: []Filter{
Query{
&BoolQuery{
Filters: []Filter{
Query{
&BoolQuery{
MustNotFilters: []Filter{parentFilter},
},
},
TermsFilter{
Key: "name",
Values: parameters.Operations,
},
},
},
},
Query{
&BoolQuery{
MustFilters: []Filter{parentFilter},
},
},
},
},
})

timeFilter := &RangeFilter{
Key: "startTime",
Lte: to,
Gte: from,
}
b.queryBuilder = &QueryBuilder{
boolQueryBuilder: &BoolQueryBuilder{
mustFilterList: &FilterList{
filters: []Filter{timeFilter},
},
filterQueryBuilder: &fqb,
},
}

b.Size(nodeGraphSize)
}

// SetTraceListFilters sets the "query" object of the query to OpenSearch for the trace list
func (b *SearchRequestBuilder) SetTraceListFilters(to, from int64, query string) {
b.queryBuilder = &QueryBuilder{
Expand Down Expand Up @@ -170,17 +238,17 @@ func (b *SearchRequestBuilder) SetTraceListFilters(to, from int64, query string)

func (b *aggBuilderImpl) ServiceMap() AggBuilder {
b.Terms("service_name", "serviceName", func(a *TermsAggregation, b AggBuilder) {
a.Size = 500
a.Size = nodeGraphSize
b.Terms("destination_domain", "destination.domain", func(a *TermsAggregation, b AggBuilder) {
a.Size = 500
a.Size = nodeGraphSize
b.Terms("destination_resource", "destination.resource", func(a *TermsAggregation, b AggBuilder) {
a.Size = 500
a.Size = nodeGraphSize
})
})
b.Terms("target_domain", "target.domain", func(a *TermsAggregation, b AggBuilder) {
a.Size = 500
a.Size = nodeGraphSize
b.Terms("target_resource", "target.resource", func(a *TermsAggregation, b AggBuilder) {
a.Size = 500
a.Size = nodeGraphSize
})
})
})
Expand Down Expand Up @@ -348,28 +416,29 @@ type AggBuilder interface {
Filters(key string, fn func(a *FiltersAggregation, b AggBuilder)) AggBuilder
TraceList() AggBuilder
ServiceMap() AggBuilder
Stats() AggBuilder
GeoHashGrid(key, field string, fn func(a *GeoHashGridAggregation, b AggBuilder)) AggBuilder
Metric(key, metricType, field string, fn func(a *MetricAggregation)) AggBuilder
Pipeline(key, pipelineType string, bucketPath interface{}, fn func(a *PipelineAggregation)) AggBuilder
Build() (AggArray, error)
AddAggDef(*aggDef)
AddAggDef(*aggDefinition)
}

type aggBuilderImpl struct {
aggDefs []*aggDef
aggDefs []*aggDefinition
flavor Flavor
version *semver.Version
}

func newAggBuilder(version *semver.Version, flavor Flavor) AggBuilder {
return &aggBuilderImpl{
aggDefs: make([]*aggDef, 0),
aggDefs: make([]*aggDefinition, 0),
version: version,
flavor: flavor,
}
}

func (b *aggBuilderImpl) AddAggDef(ad *aggDef) {
func (b *aggBuilderImpl) AddAggDef(ad *aggDefinition) {
b.aggDefs = append(b.aggDefs, ad)
}

Expand Down Expand Up @@ -401,7 +470,7 @@ func (b *aggBuilderImpl) Histogram(key, field string, fn func(a *HistogramAgg, b
innerAgg := &HistogramAgg{
Field: field,
}
aggDef := newAggDef(key, &AggContainer{
aggDef := newAggDefinition(key, &AggContainer{
Type: "histogram",
Aggregation: innerAgg,
})
Expand All @@ -421,7 +490,7 @@ func (b *aggBuilderImpl) DateHistogram(key, field string, fn func(a *DateHistogr
innerAgg := &DateHistogramAgg{
Field: field,
}
aggDef := newAggDef(key, &AggContainer{
aggDef := newAggDefinition(key, &AggContainer{
Type: "date_histogram",
Aggregation: innerAgg,
})
Expand All @@ -443,7 +512,7 @@ func (b *aggBuilderImpl) Terms(key, field string, fn func(a *TermsAggregation, b
innerAgg := &TermsAggregation{
Field: field,
}
aggDef := newAggDef(key, &AggContainer{
aggDef := newAggDefinition(key, &AggContainer{
Type: "terms",
Aggregation: innerAgg,
})
Expand All @@ -470,7 +539,7 @@ func (b *aggBuilderImpl) Filters(key string, fn func(a *FiltersAggregation, b Ag
innerAgg := &FiltersAggregation{
Filters: make(map[string]interface{}),
}
aggDef := newAggDef(key, &AggContainer{
aggDef := newAggDefinition(key, &AggContainer{
Type: "filters",
Aggregation: innerAgg,
})
Expand Down Expand Up @@ -506,9 +575,36 @@ func (b *SearchRequestBuilder) SetTraceSpansFilters(to, from int64, traceId stri

}

// Stats adds the needed aggregations for the Stats request, used to
// display latency and throughput
func (b *aggBuilderImpl) Stats() AggBuilder {
b.Terms("service_name", "serviceName", func(a *TermsAggregation, b AggBuilder) {
a.Size = nodeGraphSize
b.Metric("avg_latency_nanos", "avg", "durationInNanos", nil)
b.AddAggDef(&aggDefinition{
key: "error_count",
aggregation: &AggContainer{
Type: "filter",
Aggregation: FilterAggregation{Key: "status.code", Value: "2"},
},
})
b.AddAggDef(&aggDefinition{
key: "error_rate",
aggregation: &AggContainer{
Type: "bucket_script",
Aggregation: BucketScriptAggregation{
Path: map[string]string{"total": "_count", "errors": "error_count._count"},
Script: "params.errors / params.total",
},
},
})
})
return b
}

// TraceList sets the "aggs" object of the query to OpenSearch for the trace list
func (b *aggBuilderImpl) TraceList() AggBuilder {
aggDef := &aggDef{
aggDef := &aggDefinition{
key: "traces",
aggregation: &AggContainer{
Type: "terms",
Expand Down Expand Up @@ -587,7 +683,7 @@ func (b *aggBuilderImpl) GeoHashGrid(key, field string, fn func(a *GeoHashGridAg
Field: field,
Precision: 5,
}
aggDef := newAggDef(key, &AggContainer{
aggDef := newAggDefinition(key, &AggContainer{
Type: "geohash_grid",
Aggregation: innerAgg,
})
Expand All @@ -608,7 +704,7 @@ func (b *aggBuilderImpl) Metric(key, metricType, field string, fn func(a *Metric
Field: field,
Settings: make(map[string]interface{}),
}
aggDef := newAggDef(key, &AggContainer{
aggDef := newAggDefinition(key, &AggContainer{
Type: metricType,
Aggregation: innerAgg,
})
Expand All @@ -627,7 +723,7 @@ func (b *aggBuilderImpl) Pipeline(key, pipelineType string, bucketPath interface
BucketPath: bucketPath,
Settings: make(map[string]interface{}),
}
aggDef := newAggDef(key, &AggContainer{
aggDef := newAggDefinition(key, &AggContainer{
Type: pipelineType,
Aggregation: innerAgg,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/opensearch/client/search_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func Test_Given_new_search_request_builder_for_es_OpenSearch_1_0_0(t *testing.T)
b := NewSearchRequestBuilder(OpenSearch, version, tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
aggBuilder := b.Agg()
aggBuilder.Terms("service_name", "fieldServiceName", func(a *TermsAggregation, innerBuilder AggBuilder) {
innerBuilder.AddAggDef(&aggDef{
innerBuilder.AddAggDef(&aggDefinition{
key: "error_count",
aggregation: &AggContainer{
Type: "filter",
Expand Down
6 changes: 5 additions & 1 deletion pkg/opensearch/lucene_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,14 @@ func (h *luceneHandler) processQuery(q *Query) error {
if q.luceneQueryType == luceneQueryTypeTraces {
traceId := getTraceId(q.RawQuery)
switch q.serviceMapInfo.Type {
case Prefetch:
case ServiceMap, Prefetch:
b.Size(0)
aggBuilder := b.Agg()
aggBuilder.ServiceMap()
case Stats:
b.SetStatsFilters(toMs, fromMs, traceId, q.serviceMapInfo.Parameters)
aggBuilder := b.Agg()
aggBuilder.Stats()
default:
if traceId != "" {
b.Size(1000)
Expand Down
8 changes: 3 additions & 5 deletions pkg/opensearch/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/bitly/go-simplejson"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/opensearch-datasource/pkg/opensearch/client"
)

// Query represents the time series query model of the datasource
Expand All @@ -19,6 +20,7 @@ type Query struct {
Interval time.Duration
RefID string
Format string
TimeRange backend.TimeRange

// serviceMapInfo is used on the backend to pass information for service map queries
serviceMapInfo serviceMapInfo
Expand Down Expand Up @@ -52,7 +54,7 @@ type MetricAgg struct {

type serviceMapInfo struct {
Type ServiceMapQueryType
Parameters StatsParameters
Parameters client.StatsParameters
}

type ServiceMapQueryType int
Expand All @@ -63,10 +65,6 @@ const (
Stats
Prefetch
)
type StatsParameters struct {
ServiceNames []string
Operations []string
}

var metricAggType = map[string]string{
"count": "Count",
Expand Down
Loading

0 comments on commit 20cd334

Please sign in to comment.