Skip to content

Commit

Permalink
Traceql instant query (#3859) (#3884)
Browse files Browse the repository at this point in the history
* first working draft

* Cleanup request time manipulation code

* comment

* Update after merge, oops restore accidentally deleted query metrics

* Fix request clone for logging

* tweak method signature

* changelog

* Make TrimToOverlap aware of instant query, fix alignment issue on generator complete blocks

* Fix test

* Add streaming version of metrics query instant, add to cli, fix timestamp handling in metrics diff detection

* Fix typo in QueryInstantResponse name

* lint

* docs

* lint, rename

(cherry picked from commit 7f788b3)

Co-authored-by: Martin Disibio <martin.disibio@grafana.com>
  • Loading branch information
github-actions[bot] and mdisibio authored Jul 19, 2024
1 parent be6e14a commit df4f83f
Show file tree
Hide file tree
Showing 16 changed files with 1,732 additions and 407 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [FEATURE] TraecQL support for event attributes [#3708](https://github.com/grafana/tempo/pull/3748) (@ie-pham)
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)
* [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio)
* [FEATURE] Add new api `/api/metrics/query` for instant metrics queries [#3859](https://github.com/grafana/tempo/pull/3859) (@mdisibio)
* [FEATURE] Add a `q` parameter to `/api/v2/serach/tags` for tag name filtering [#3822](https://github.com/grafana/tempo/pull/3822) (@joe-elliott)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio)
Expand Down
107 changes: 100 additions & 7 deletions cmd/tempo-cli/cmd-query-metrics-query-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
)

type metricsQueryRangeCmd struct {
type metricsQueryCmd struct {
HostPort string `arg:"" help:"tempo host and port. scheme and path will be provided based on query type. e.g. localhost:3200"`
TraceQL string `arg:"" optional:"" help:"traceql query"`
Start string `arg:"" optional:"" help:"start time in ISO8601 format"`
End string `arg:"" optional:"" help:"end time in ISO8601 format"`

OrgID string `help:"optional orgID"`
UseGRPC bool `help:"stream search results over GRPC"`
Instant bool `help:"perform an instant query instead of a range query"`
PathPrefix string `help:"string to prefix all http paths with"`
}

func (cmd *metricsQueryRangeCmd) Run(_ *globalOptions) error {
func (cmd *metricsQueryCmd) Run(_ *globalOptions) error {
startDate, err := time.Parse(time.RFC3339, cmd.Start)
if err != nil {
return err
Expand All @@ -42,6 +43,20 @@ func (cmd *metricsQueryRangeCmd) Run(_ *globalOptions) error {
}
end := endDate.UnixNano()

if cmd.Instant {
req := &tempopb.QueryInstantRequest{
Query: cmd.TraceQL,
Start: uint64(start),
End: uint64(end),
}

if cmd.UseGRPC {
return cmd.queryInstantGRPC(req)
}

return cmd.queryInstantHTTP(req)
}

req := &tempopb.QueryRangeRequest{
Query: cmd.TraceQL,
Start: uint64(start),
Expand All @@ -50,20 +65,20 @@ func (cmd *metricsQueryRangeCmd) Run(_ *globalOptions) error {
}

if cmd.UseGRPC {
return cmd.searchGRPC(req)
return cmd.queryRangeGRPC(req)
}

return cmd.searchHTTP(req)
return cmd.queryRangeHTTP(req)
}

func (cmd *metricsQueryRangeCmd) searchGRPC(req *tempopb.QueryRangeRequest) error {
func (cmd *metricsQueryCmd) queryRangeGRPC(req *tempopb.QueryRangeRequest) error {
ctx := user.InjectOrgID(context.Background(), cmd.OrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
return err
}

clientConn, err := grpc.DialContext(ctx, cmd.HostPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
clientConn, err := grpc.NewClient(cmd.HostPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
Expand Down Expand Up @@ -93,7 +108,7 @@ func (cmd *metricsQueryRangeCmd) searchGRPC(req *tempopb.QueryRangeRequest) erro
}

// nolint: goconst // goconst wants us to make http:// a const
func (cmd *metricsQueryRangeCmd) searchHTTP(req *tempopb.QueryRangeRequest) error {
func (cmd *metricsQueryCmd) queryRangeHTTP(req *tempopb.QueryRangeRequest) error {
httpReq, err := http.NewRequest("GET", "http://"+path.Join(cmd.HostPort, cmd.PathPrefix, api.PathMetricsQueryRange), nil)
if err != nil {
return err
Expand Down Expand Up @@ -133,3 +148,81 @@ func (cmd *metricsQueryRangeCmd) searchHTTP(req *tempopb.QueryRangeRequest) erro

return nil
}

func (cmd *metricsQueryCmd) queryInstantGRPC(req *tempopb.QueryInstantRequest) error {
ctx := user.InjectOrgID(context.Background(), cmd.OrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
return err
}

clientConn, err := grpc.NewClient(cmd.HostPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

client := tempopb.NewStreamingQuerierClient(clientConn)

resp, err := client.MetricsQueryInstant(ctx, req)
if err != nil {
return err
}

for {
searchResp, err := resp.Recv()
if searchResp != nil {
err = printAsJSON(searchResp)
if err != nil {
return err
}
}
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
}
}

// nolint: goconst // goconst wants us to make http:// a const
func (cmd *metricsQueryCmd) queryInstantHTTP(req *tempopb.QueryInstantRequest) error {
httpReq, err := http.NewRequest("GET", "http://"+path.Join(cmd.HostPort, cmd.PathPrefix, api.PathMetricsQueryInstant), nil)
if err != nil {
return err
}

httpReq = api.BuildQueryInstantRequest(httpReq, req)
httpReq.Header = http.Header{}
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cmd.OrgID), httpReq)
if err != nil {
return err
}

httpResp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return err
}
defer httpResp.Body.Close()

body, err := io.ReadAll(httpResp.Body)
if err != nil {
return err
}

if httpResp.StatusCode != http.StatusOK {
return errors.New("failed to query. body: " + string(body) + " status: " + httpResp.Status)
}

resp := &tempopb.QueryInstantResponse{}
err = jsonpb.Unmarshal(bytes.NewReader(body), resp)
if err != nil {
panic("failed to parse resp: " + err.Error())
}
err = printAsJSON(resp)
if err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var cli struct {
SearchTags querySearchTagsCmd `cmd:"" help:"query Tempo search tags"`
SearchTagValues querySearchTagValuesCmd `cmd:"" help:"query Tempo search tag values"`
Search querySearchCmd `cmd:"" help:"query Tempo search"`
Metrics metricsQueryRangeCmd `cmd:"" help:"query Tempo metrics query range"`
Metrics metricsQueryCmd `cmd:"" help:"query Tempo metrics query range"`
} `cmd:""`
TraceID queryBlocksCmd `cmd:"" help:"query for a traceid directly from backend blocks"`
TraceSummary queryTraceSummaryCmd `cmd:"" help:"query summary for a traceid directly from backend blocks"`
Expand Down
1 change: 1 addition & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

// http metrics endpoints
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), base.Wrap(queryFrontend.MetricsSummaryHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryInstant), base.Wrap(queryFrontend.MetricsQueryInstantHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange), base.Wrap(queryFrontend.MetricsQueryRangeHandler))

// the query frontend needs to have knowledge of the blocks so it can shard search jobs
Expand Down
62 changes: 62 additions & 0 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ For externally supported GRPC API, [see below](#tempo-grpc-api).
| [Search tag names V2](#search-tags-v2) | Query-frontend | HTTP | `GET /api/v2/search/tags` |
| [Search tag values](#search-tag-values) | Query-frontend | HTTP | `GET /api/search/tag/<tag>/values` |
| [Search tag values V2](#search-tag-values-v2) | Query-frontend | HTTP | `GET /api/v2/search/tag/<tag>/values` |
| [TraceQL Metrics](#traceql-metrics) | Query-frontend | HTTP | `GET /api/metrics/query_range` |
| [TraceQL Metrics (instant)](#instant) | Query-frontend | HTTP | `GET /api/metrics/query` |
| [Query Echo Endpoint](#query-echo-endpoint) | Query-frontend | HTTP | `GET /api/echo` |
| [Overrides API](#overrides-api) | Query-frontend | HTTP | `GET,POST,PATCH,DELETE /api/overrides` |
| Memberlist | Distributor, Ingester, Querier, Compactor | HTTP | `GET /memberlist` |
Expand Down Expand Up @@ -468,6 +470,66 @@ GET /api/v2/search/tag/.service.name/values?q="{span.http.method='GET'}"
If a particular service name (for example, `shopping-cart`) is only present on spans with `span.http.method=POST`, it won't be included in the list of values returned.
### TraceQL Metrics
The TraceQL Metrics API returns Prometheus-like time-series for a given metrics query. Metrics queries are those using metrics functions like `rate()` and `quantile_over_time()`. See the [documentation]({{< relref "../traceql/metrics-queries" >}}) for the complete list.
Parameters:
- `q = (traceql query)`
The TraceQL metrics query to process.
- `start = (unix epoch seconds | unix epoch nanoseconds | RFC3339 string)`
Optional. Along with `end` defines the time range.
- `end = (unix epoch seconds | unix epoch nanoseconds | RFC3339 string)`
Optional. Along with `start` define the time range. Providing both `start` and `end` includes blocks for the specified time range only.
- `since = (duration string)`
Optional. Can be used instead of `start` and `end` to define the time range in relative values. For example `since=15m` will query the last 15 minutes. Default is last 1 hour.
- `step = (duration string)`
Optional. Defines the granularity of the returned time-series. For example `step=15s` will return a data point every 15s within the time range. If not specified then the default behavior will choose a dynamic step based on the time range.
The API is available in the query frontend service in
a microservices deployment, or the Tempo endpoint in a monolithic mode deployment.
For example the following request computes the rate of spans received for `myservice` over the last three hours, at 1 minute intervals.
{{< admonition type="note" >}}
Actual API parameters must be url-encoded. This example is left unencoded for readability.
{{% /admonition %}}
```
GET /api/metrics/query_range?q={resource.service.name="myservice"}|rate()&since=3h&step=1m
```
#### Instant
The instant version of the metrics API is similar to the range version, but instead returns a single value for the query. This version is useful when you don't need the granularity of a full time-series, but instead want a total sum, or single value computed across the whole time range.
The parameters are identical to the range version except there is no `step`.
Parameters:
- `q = (traceql query)`
The TraceQL metrics query to process.
- `start = (unix epoch seconds | unix epoch nanoseconds | RFC3339 string)`
Optional. Along with `end` defines the time range.
- `end = (unix epoch seconds | unix epoch nanoseconds | RFC3339 string)`
Optional. Along with `start` define the time range. Providing both `start` and `end` includes blocks for the specified time range only.
- `since = (duration string)`
Optional. Can be used instead of `start` and `end` to define the time range in relative values. For example `since=15m` will query the last 15 minutes. Default is last 1 hour.
The API is available in the query frontend service in
a microservices deployment, or the Tempo endpoint in a monolithic mode deployment.
For example the following request computes the total number of failed spans over the last hour per service.
{{< admonition type="note" >}}
Actual API parameters must be url-encoded. This example is left unencoded for readability.
{{% /admonition %}}
```
GET /api/metrics/query?q={status=error}|count_over_time()by(resource.service.name)
```
### Query Echo endpoint
```
Expand Down
69 changes: 39 additions & 30 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,27 @@ import (
// these handler funcs could likely be removed and the code written directly into the respective
// gRPC functions
type (
streamingSearchHandler func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error
streamingTagsHandler func(req *tempopb.SearchTagsRequest, srv tempopb.StreamingQuerier_SearchTagsServer) error
streamingTagsV2Handler func(req *tempopb.SearchTagsRequest, srv tempopb.StreamingQuerier_SearchTagsV2Server) error
streamingTagValuesHandler func(req *tempopb.SearchTagValuesRequest, srv tempopb.StreamingQuerier_SearchTagValuesServer) error
streamingTagValuesV2Handler func(req *tempopb.SearchTagValuesRequest, srv tempopb.StreamingQuerier_SearchTagValuesV2Server) error
streamingQueryRangeHandler func(req *tempopb.QueryRangeRequest, srv tempopb.StreamingQuerier_MetricsQueryRangeServer) error
streamingSearchHandler func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error
streamingTagsHandler func(req *tempopb.SearchTagsRequest, srv tempopb.StreamingQuerier_SearchTagsServer) error
streamingTagsV2Handler func(req *tempopb.SearchTagsRequest, srv tempopb.StreamingQuerier_SearchTagsV2Server) error
streamingTagValuesHandler func(req *tempopb.SearchTagValuesRequest, srv tempopb.StreamingQuerier_SearchTagValuesServer) error
streamingTagValuesV2Handler func(req *tempopb.SearchTagValuesRequest, srv tempopb.StreamingQuerier_SearchTagValuesV2Server) error
streamingQueryRangeHandler func(req *tempopb.QueryRangeRequest, srv tempopb.StreamingQuerier_MetricsQueryRangeServer) error
streamingQueryInstantHandler func(req *tempopb.QueryInstantRequest, srv tempopb.StreamingQuerier_MetricsQueryInstantServer) error
)

type QueryFrontend struct {
TraceByIDHandler, SearchHandler, MetricsSummaryHandler, MetricsQueryRangeHandler http.Handler
SearchTagsHandler, SearchTagsV2Handler, SearchTagsValuesHandler, SearchTagsValuesV2Handler http.Handler
cacheProvider cache.Provider
streamingSearch streamingSearchHandler
streamingTags streamingTagsHandler
streamingTagsV2 streamingTagsV2Handler
streamingTagValues streamingTagValuesHandler
streamingTagValuesV2 streamingTagValuesV2Handler
streamingQueryRange streamingQueryRangeHandler
logger log.Logger
TraceByIDHandler, SearchHandler, MetricsSummaryHandler, MetricsQueryInstantHandler, MetricsQueryRangeHandler http.Handler
SearchTagsHandler, SearchTagsV2Handler, SearchTagsValuesHandler, SearchTagsValuesV2Handler http.Handler
cacheProvider cache.Provider
streamingSearch streamingSearchHandler
streamingTags streamingTagsHandler
streamingTagsV2 streamingTagsV2Handler
streamingTagValues streamingTagValuesHandler
streamingTagValuesV2 streamingTagValuesV2Handler
streamingQueryRange streamingQueryRangeHandler
streamingQueryInstant streamingQueryInstantHandler
logger log.Logger
}

// New returns a new QueryFrontend
Expand Down Expand Up @@ -140,26 +142,29 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
searchTagValues := newTagHTTPHandler(cfg, searchTagValuesPipeline, o, combiner.NewSearchTagValues, logger)
searchTagValuesV2 := newTagHTTPHandler(cfg, searchTagValuesPipeline, o, combiner.NewSearchTagValuesV2, logger)
metrics := newMetricsSummaryHandler(metricsPipeline, logger)
queryInstant := newMetricsQueryInstantHTTPHandler(cfg, queryRangePipeline, logger) // Reuses the same pipeline
queryrange := newMetricsQueryRangeHTTPHandler(cfg, queryRangePipeline, logger)

return &QueryFrontend{
// http/discrete
TraceByIDHandler: newHandler(cfg.Config.LogQueryRequestHeaders, traces, logger),
SearchHandler: newHandler(cfg.Config.LogQueryRequestHeaders, search, logger),
SearchTagsHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTags, logger),
SearchTagsV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagsV2, logger),
SearchTagsValuesHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagValues, logger),
SearchTagsValuesV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagValuesV2, logger),
MetricsSummaryHandler: newHandler(cfg.Config.LogQueryRequestHeaders, metrics, logger),
MetricsQueryRangeHandler: newHandler(cfg.Config.LogQueryRequestHeaders, queryrange, logger),
TraceByIDHandler: newHandler(cfg.Config.LogQueryRequestHeaders, traces, logger),
SearchHandler: newHandler(cfg.Config.LogQueryRequestHeaders, search, logger),
SearchTagsHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTags, logger),
SearchTagsV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagsV2, logger),
SearchTagsValuesHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagValues, logger),
SearchTagsValuesV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagValuesV2, logger),
MetricsSummaryHandler: newHandler(cfg.Config.LogQueryRequestHeaders, metrics, logger),
MetricsQueryInstantHandler: newHandler(cfg.Config.LogQueryRequestHeaders, queryInstant, logger),
MetricsQueryRangeHandler: newHandler(cfg.Config.LogQueryRequestHeaders, queryrange, logger),

// grpc/streaming
streamingSearch: newSearchStreamingGRPCHandler(cfg, searchPipeline, apiPrefix, logger),
streamingTags: newTagStreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger),
streamingTagsV2: newTagV2StreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger),
streamingTagValues: newTagValuesStreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger),
streamingTagValuesV2: newTagValuesV2StreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger),
streamingQueryRange: newQueryRangeStreamingGRPCHandler(cfg, queryRangePipeline, apiPrefix, logger),
streamingSearch: newSearchStreamingGRPCHandler(cfg, searchPipeline, apiPrefix, logger),
streamingTags: newTagStreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger),
streamingTagsV2: newTagV2StreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger),
streamingTagValues: newTagValuesStreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger),
streamingTagValuesV2: newTagValuesV2StreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger),
streamingQueryRange: newQueryRangeStreamingGRPCHandler(cfg, queryRangePipeline, apiPrefix, logger),
streamingQueryInstant: newQueryInstantStreamingGRPCHandler(cfg, queryRangePipeline, apiPrefix, logger), // Reuses the same pipeline

cacheProvider: cacheProvider,
logger: logger,
Expand Down Expand Up @@ -191,6 +196,10 @@ func (q *QueryFrontend) MetricsQueryRange(req *tempopb.QueryRangeRequest, srv te
return q.streamingQueryRange(req, srv)
}

func (q *QueryFrontend) MetricsQueryInstant(req *tempopb.QueryInstantRequest, srv tempopb.StreamingQuerier_MetricsQueryInstantServer) error {
return q.streamingQueryInstant(req, srv)
}

// newSpanMetricsMiddleware creates a new frontend middleware to handle metrics-generator requests.
func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper {
return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
Expand Down
Loading

0 comments on commit df4f83f

Please sign in to comment.