Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a startTime and endTime parameter to the trace by id query API to improve query performance #1388

Merged
merged 6 commits into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* Includes a new metric to determine how often this range is exceeded: `tempo_warnings_total{reason="outside_ingestion_time_slack"}`
* [BUGFIX] Prevent data race / ingester crash during searching by trace id by using xxhash instance as a local variable. [#1387](https://github.com/grafana/tempo/pull/1387) (@bikashmishra100, @sagarwala, @ashwinidulams)
* [BUGFIX] Fix spurious "failed to mark block compacted during retention" errors [#1372](https://github.com/grafana/tempo/issues/1372) (@mdisibio)
* [ENHANCEMENT] Add a startTime and endTime parameter to the Trace by ID Tempo Query API to improve query performance [#1388](https://github.com/grafana/tempo/pull/1388) (@sagarwala, @bikashmishra100, @ashwinidulams)

## v1.3.2 / 2022-02-23
* [BUGFIX] Fixed an issue where the query-frontend would corrupt start/end time ranges on searches which included the ingesters [#1295] (@joe-elliott)
Expand Down
13 changes: 11 additions & 2 deletions docs/tempo/website/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,18 @@ Tempo's Query API is simple. The following request is used to retrieve a trace f
a microservices deployment, or the Tempo endpoint in a monolithic mode deployment.

```
GET /api/traces/<traceid>
GET /api/traces/<traceid>?start=<start>&end=<end>
```
Parameters:
- `start = (unix epoch seconds)`
Optional. Along with `end` define a time range from which traces should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start` define a time range from which traces should be returned. Providing both `start` and `end` will include traces for the specified time range only. If the parameters are not provided then Tempo will check for the trace across all blocks in backend. If the parameters are provided, it will only check in the blocks within the specified time range, this can result in trace not being found or partial results if it does not fall in the specified time range.

The following query API is also provided on the querier service for _debugging_ purposes.

```
GET /querier/api/traces/<traceid>?mode=xxxx&blockStart=0000&blockEnd=FFFF
GET /querier/api/traces/<traceid>?mode=xxxx&blockStart=0000&blockEnd=FFFF&start=<start>&end=<end>
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
```
Parameters:
- `mode = (blocks|ingesters|all)`
Expand All @@ -111,6 +116,10 @@ Parameters:
Specifies the blockID finish boundary. If specified, the querier will only search blocks with IDs < blockEnd.
Default = `FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF`
Example: `blockStart=FFFFFFFF-FFFF-FFFF-FFFF-456787652341`
- `start = (unix epoch seconds)`
Optional. Along with `end` define a time range from which traces should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start` define a time range from which traces should be returned. Providing both `start` and `end` will include blocks for the specified time range only.

Note that this API is not meant to be used directly unless for debugging the sharding functionality of the query
frontend.
Expand Down
7 changes: 5 additions & 2 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,13 @@ storage:
[search_encoding: <string> | default = none]

# When a span is written to the WAL it adjusts the start and end times of the block it is written to.
# This block start and end time range is then used when choosing blocks for search. To prevent spans too far
# This block start and end time range is then used when choosing blocks for search.
# This is also used for querying traces by ID when the start and end parameters are specified. To prevent spans too far
# in the past or future from impacting the block start and end times we use this configuration option.
# This option only allows spans that occur within the configured duration to adjust the block start and
# end times.
# end times.
# This can result in trace not being found if the trace falls outside the slack configuration value as the
# start and end times of the block will not be updated in this case.
[ingestion_time_range_slack: <duration> | default = 2m]

# block configuration
Expand Down
10 changes: 10 additions & 0 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
}, nil
}

//validate start and end parameter
_, _, _, _, _, reqErr := api.ValidateAndSanitizeRequest(r)
if reqErr != nil {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader(reqErr.Error())),
Header: http.Header{},
}, nil
}

// check marshalling format
marshallingFormat := api.HeaderAcceptJSON
if r.Header.Get(api.HeaderAccept) == api.HeaderAcceptProtobuf {
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/searchsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type mockReader struct {
metas []*backend.BlockMeta
}

func (m *mockReader) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([]*tempopb.Trace, []error, error) {
func (m *mockReader) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string, timeStart int64, timeEnd int64) ([]*tempopb.Trace, []error, error) {
return nil, nil, nil
}

Expand Down
58 changes: 5 additions & 53 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ import (

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -44,7 +41,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
}

// validate request
blockStart, blockEnd, queryMode, err := validateAndSanitizeRequest(r)
blockStart, blockEnd, queryMode, timeStart, timeEnd, err := api.ValidateAndSanitizeRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand All @@ -53,14 +50,16 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
ot_log.String("msg", "validated request"),
ot_log.String("blockStart", blockStart),
ot_log.String("blockEnd", blockEnd),
ot_log.String("queryMode", queryMode))
ot_log.String("queryMode", queryMode),
ot_log.String("timeStart", fmt.Sprint(timeStart)),
ot_log.String("timeEnd", fmt.Sprint(timeEnd)))

resp, err := q.FindTraceByID(ctx, &tempopb.TraceByIDRequest{
TraceID: byteID,
BlockStart: blockStart,
BlockEnd: blockEnd,
QueryMode: queryMode,
})
}, timeStart, timeEnd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -98,53 +97,6 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set(api.HeaderContentType, api.HeaderAcceptJSON)
}

// return values are (blockStart, blockEnd, queryMode, error)
func validateAndSanitizeRequest(r *http.Request) (string, string, string, error) {
q := r.URL.Query().Get(QueryModeKey)

// validate queryMode. it should either be empty or one of (QueryModeIngesters|QueryModeBlocks|QueryModeAll)
var queryMode string
if len(q) == 0 || q == QueryModeAll {
queryMode = QueryModeAll
} else if q == QueryModeIngesters {
queryMode = QueryModeIngesters
} else if q == QueryModeBlocks {
queryMode = QueryModeBlocks
} else {
return "", "", "", fmt.Errorf("invalid value for mode %s", q)
}

// no need to validate/sanitize other parameters if queryMode == QueryModeIngesters
if queryMode == QueryModeIngesters {
return "", "", queryMode, nil
}

start := r.URL.Query().Get(BlockStartKey)
end := r.URL.Query().Get(BlockEndKey)

// validate start. it should either be empty or a valid uuid
if len(start) == 0 {
start = tempodb.BlockIDMin
} else {
_, err := uuid.Parse(start)
if err != nil {
return "", "", "", errors.Wrap(err, "invalid value for blockStart")
}
}

// validate end. it should either be empty or a valid uuid
if len(end) == 0 {
end = tempodb.BlockIDMax
} else {
_, err := uuid.Parse(end)
if err != nil {
return "", "", "", errors.Wrap(err, "invalid value for blockEnd")
}
}

return start, end, queryMode, nil
}

func (q *Querier) SearchHandler(w http.ResponseWriter, r *http.Request) {
isSearchBlock := api.IsSearchBlock(r)

Expand Down
6 changes: 4 additions & 2 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (q *Querier) stopping(_ error) error {
}

// FindTraceByID implements tempopb.Querier.
func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequest) (*tempopb.TraceByIDResponse, error) {
func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequest, timeStart int64, timeEnd int64) (*tempopb.TraceByIDResponse, error) {
if !validation.ValidTraceID(req.TraceID) {
return nil, fmt.Errorf("invalid trace id")
}
Expand Down Expand Up @@ -217,7 +217,9 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
var failedBlocks int
if req.QueryMode == QueryModeBlocks || req.QueryMode == QueryModeAll {
span.LogFields(ot_log.String("msg", "searching store"))
partialTraces, blockErrs, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
span.LogFields(ot_log.String("timeStart", fmt.Sprint(timeStart)))
span.LogFields(ot_log.String("timeEnd", fmt.Sprint(timeEnd)))
partialTraces, blockErrs, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd, timeStart, timeEnd)
if err != nil {
return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID")
}
Expand Down
80 changes: 80 additions & 0 deletions pkg/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gorilla/mux"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
)

Expand Down Expand Up @@ -53,6 +54,13 @@ const (
PathSearchTagValues = "/api/search/tag/{tagName}/values"
PathEcho = "/api/echo"

QueryModeKey = "mode"
QueryModeIngesters = "ingesters"
QueryModeBlocks = "blocks"
QueryModeAll = "all"
BlockStartKey = "blockStart"
BlockEndKey = "blockEnd"

defaultLimit = 20
)

Expand Down Expand Up @@ -372,3 +380,75 @@ func extractQueryParam(r *http.Request, param string) (string, bool) {
value := r.URL.Query().Get(param)
return value, value != ""
}

// ValidateAndSanitizeRequest validates params for trace by id api
// return values are (blockStart, blockEnd, queryMode, start, end, error)
func ValidateAndSanitizeRequest(r *http.Request) (string, string, string, int64, int64, error) {
q, _ := extractQueryParam(r, QueryModeKey)

// validate queryMode. it should either be empty or one of (QueryModeIngesters|QueryModeBlocks|QueryModeAll)
var queryMode string
var startTime int64
var endTime int64
var blockStart string
var blockEnd string
if len(q) == 0 || q == QueryModeAll {
queryMode = QueryModeAll
} else if q == QueryModeIngesters {
queryMode = QueryModeIngesters
} else if q == QueryModeBlocks {
queryMode = QueryModeBlocks
} else {
return "", "", "", 0, 0, fmt.Errorf("invalid value for mode %s", q)
}

// no need to validate/sanitize other parameters if queryMode == QueryModeIngesters
if queryMode == QueryModeIngesters {
return "", "", queryMode, 0, 0, nil
}

if start, ok := extractQueryParam(r, BlockStartKey); ok {
_, err := uuid.Parse(start)
if err != nil {
return "", "", "", 0, 0, fmt.Errorf("invalid value for blockstart: %w", err)
}
blockStart = start
} else {
blockStart = tempodb.BlockIDMin
}

if end, ok := extractQueryParam(r, BlockEndKey); ok {
_, err := uuid.Parse(end)
if err != nil {
return "", "", "", 0, 0, fmt.Errorf("invalid value for blockEnd: %w", err)
}
blockEnd = end
} else {
blockEnd = tempodb.BlockIDMax
}

if s, ok := extractQueryParam(r, urlParamStart); ok {
var err error
startTime, err = strconv.ParseInt(s, 10, 64)
if err != nil {
return "", "", "", 0, 0, fmt.Errorf("invalid start: %w", err)
}
} else {
startTime = 0
}

if s, ok := extractQueryParam(r, urlParamEnd); ok {
var err error
endTime, err = strconv.ParseInt(s, 10, 64)
if err != nil {
return "", "", "", 0, 0, fmt.Errorf("invalid end: %w", err)
}
} else {
endTime = 0
}

if startTime != 0 && endTime != 0 && endTime <= startTime {
return "", "", "", 0, 0, fmt.Errorf("http parameter start must be before end. received start=%d end=%d", startTime, endTime)
}
return blockStart, blockEnd, queryMode, startTime, endTime, nil
}
69 changes: 69 additions & 0 deletions pkg/api/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,75 @@ func TestBuildSearchBlockRequest(t *testing.T) {
}
}

func TestValidateAndSanitizeRequest(t *testing.T) {
tests := []struct {
httpReq *http.Request
queryMode string
startTime int64
endTime int64
blockStart string
blockEnd string
expectedError string
}{
{
httpReq: httptest.NewRequest("GET", "/api/traces/1234?blockEnd=ffffffffffffffffffffffffffffffff&blockStart=00000000000000000000000000000000&mode=blocks&start=1&end=2", nil),
queryMode: "blocks",
startTime: 1,
endTime: 2,
blockStart: "00000000000000000000000000000000",
blockEnd: "ffffffffffffffffffffffffffffffff",
},
{
httpReq: httptest.NewRequest("GET", "/api/traces/1234?blockEnd=ffffffffffffffffffffffffffffffff&blockStart=00000000000000000000000000000000&mode=blocks", nil),
queryMode: "blocks",
startTime: 0,
endTime: 0,
blockStart: "00000000000000000000000000000000",
blockEnd: "ffffffffffffffffffffffffffffffff",
},
{
httpReq: httptest.NewRequest("GET", "/api/traces/1234?mode=blocks", nil),
queryMode: "blocks",
startTime: 0,
endTime: 0,
blockStart: "00000000-0000-0000-0000-000000000000",
blockEnd: "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF",
},
{
httpReq: httptest.NewRequest("GET", "/api/traces/1234?mode=blocks&blockStart=12345678000000001235000001240000&blockEnd=ffffffffffffffffffffffffffffffff", nil),
queryMode: "blocks",
startTime: 0,
endTime: 0,
blockStart: "12345678000000001235000001240000",
blockEnd: "ffffffffffffffffffffffffffffffff",
},
{
httpReq: httptest.NewRequest("GET", "/api/traces/1234?mode=blocks&blockStart=12345678000000001235000001240000&blockEnd=ffffffffffffffffffffffffffffffff&start=1&end=1", nil),
queryMode: "blocks",
startTime: 0,
endTime: 0,
blockStart: "12345678000000001235000001240000",
blockEnd: "ffffffffffffffffffffffffffffffff",
expectedError: "http parameter start must be before end. received start=1 end=1",
},
}

for _, tc := range tests {
blockStart, blockEnd, queryMode, startTime, endTime, err := ValidateAndSanitizeRequest(tc.httpReq)
if len(tc.expectedError) != 0 {
assert.EqualError(t, err, tc.expectedError)
continue
}
assert.NoError(t, err)
assert.Equal(t, tc.queryMode, queryMode)
assert.Equal(t, tc.blockStart, blockStart)
assert.Equal(t, tc.blockEnd, blockEnd)
assert.Equal(t, tc.startTime, startTime)
assert.Equal(t, tc.endTime, endTime)
}

}

func TestBuildSearchRequest(t *testing.T) {
tests := []struct {
req *tempopb.SearchRequest
Expand Down
6 changes: 3 additions & 3 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestCompaction(t *testing.T) {

// now see if we can find our ids
for i, id := range allIds {
trs, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax)
trs, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax, 0, 0)
require.NoError(t, err)
require.Nil(t, failedBlocks)
require.NotNil(t, trs)
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestSameIDCompaction(t *testing.T) {

// search for all ids
for i, id := range allIds {
trs, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax)
trs, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax, 0, 0)
assert.NoError(t, err)
assert.Nil(t, failedBlocks)

Expand Down Expand Up @@ -373,7 +373,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
// Make sure all expected traces are found.
for i := 0; i < blockCount; i++ {
for j := 0; j < recordCount; j++ {
trace, failedBlocks, err := rw.Find(context.TODO(), testTenantID, makeTraceID(i, j), BlockIDMin, BlockIDMax)
trace, failedBlocks, err := rw.Find(context.TODO(), testTenantID, makeTraceID(i, j), BlockIDMin, BlockIDMax, 0, 0)
require.NotNil(t, trace)
require.Greater(t, len(trace), 0)
require.NoError(t, err)
Expand Down
Loading