Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block partial results #1007

Merged
merged 19 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
* [ENHANCEMENT] Jsonnet: add `$._config.memcached.memory_limit_mb` [#987](https://github.com/grafana/tempo/pull/987) (@kvrhdn)
* [ENHANCEMENT] Upgrade jsonnet-libs to 1.19 and update tk examples [#1001](https://github.com/grafana/tempo/pull/1001) (@mapno)
* [ENHANCEMENT] Shard tenant index creation by tenant and add functionality to handle stale indexes. [#1005](https://github.com/grafana/tempo/pull/1005) (@joe-elliott)
* [ENHANCEMENT] **BREAKING CHANGE** Support partial results from failed block queries [#1007](https://github.com/grafana/tempo/pull/1007) (@mapno)
Querier [`GET /querier/api/traces/<traceid>`](https://grafana.com/docs/tempo/latest/api_docs/#query) response's body has been modified
to return `tempopb.TraceByIDResponse` instead of simply `tempopb.Trace`. This can cause a disruption of the read path during rollout of the change.
mapno marked this conversation as resolved.
Show resolved Hide resolved
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
go.opentelemetry.io/otel/trace v1.0.0-RC2
go.uber.org/atomic v1.9.0
go.uber.org/goleak v1.1.10
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.17.0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
Expand Down Expand Up @@ -239,7 +240,6 @@ require (
go.mongodb.org/mongo-driver v1.5.1 // indirect
go.opentelemetry.io/otel/metric v0.21.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.21.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
Expand Down
8 changes: 5 additions & 3 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
)

type Config struct {
Config frontend.CombinedFrontendConfig `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
QueryShards int `yaml:"query_shards,omitempty"`
Config frontend.CombinedFrontendConfig `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
QueryShards int `yaml:"query_shards,omitempty"`
tolerateFailedBlocks int `yaml:"tolerate_failed_blocks,omitempty"`
mapno marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
Expand All @@ -19,6 +20,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Config.FrontendV1.MaxOutstandingPerTenant = 100
cfg.MaxRetries = 2
cfg.QueryShards = 20
cfg.tolerateFailedBlocks = 0
}

type CortexNoQuerierLimits struct{}
Expand Down
11 changes: 6 additions & 5 deletions modules/frontend/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,24 @@ func (s spanIDDeduper) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, err
}

traceObject := &tempopb.Trace{}
err = proto.Unmarshal(body, traceObject)
responseObject := &tempopb.TraceByIDResponse{}
err = proto.Unmarshal(body, responseObject)
if err != nil {
return nil, err
}

s.trace = traceObject
s.trace = responseObject.Trace
s.dedupe()

traceBytes, err := proto.Marshal(s.trace)
responseObject.Trace = s.trace
responseBytes, err := proto.Marshal(responseObject)
if err != nil {
return nil, err
}

return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader(traceBytes)),
Body: io.NopCloser(bytes.NewReader(responseBytes)),
Header: http.Header{},
ContentLength: resp.ContentLength,
}, nil
Expand Down
12 changes: 8 additions & 4 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewTracesMiddleware(cfg Config, logger log.Logger, registerer prometheus.Re
// - the Deduper dedupes Span IDs for Zipkin support
// - the ShardingWare shards queries by splitting the block ID space
// - the RetryWare retries requests that have failed (error or http status 500)
rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger), RetryWare(cfg.MaxRetries, registerer))
rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, cfg.tolerateFailedBlocks, logger), RetryWare(cfg.MaxRetries, registerer))

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// validate traceID
Expand Down Expand Up @@ -156,15 +156,19 @@ func NewTracesMiddleware(cfg Config, logger log.Logger, registerer prometheus.Re
if err != nil {
return nil, errors.Wrap(err, "error reading response body at query frontend")
}
traceObject := &tempopb.Trace{}
err = proto.Unmarshal(body, traceObject)
responseObject := &tempopb.TraceByIDResponse{}
err = proto.Unmarshal(body, responseObject)
if err != nil {
return nil, err
}

if responseObject.Metrics.FailedBlocks > 0 {
resp.StatusCode = http.StatusPartialContent
}

var jsonTrace bytes.Buffer
marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(&jsonTrace, traceObject)
err = marshaller.Marshal(&jsonTrace, responseObject.Trace)
if err != nil {
return nil, err
}
Expand Down
35 changes: 26 additions & 9 deletions modules/frontend/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net/http"
"strings"
Expand All @@ -13,12 +14,11 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/log/level"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/modules/querier"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/user"
)

const (
Expand All @@ -29,13 +29,14 @@ const (
queryDelimiter = "?"
)

func ShardingWare(queryShards int, logger log.Logger) Middleware {
func ShardingWare(queryShards, maxFailedBlocks int, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return shardQuery{
next: next,
queryShards: queryShards,
logger: logger,
blockBoundaries: createBlockBoundaries(queryShards - 1), // one shard will be used to query ingesters
maxFailedBlocks: uint32(maxFailedBlocks),
}
})
}
Expand All @@ -45,6 +46,7 @@ type shardQuery struct {
queryShards int
logger log.Logger
blockBoundaries [][]byte
maxFailedBlocks uint32
}

// RoundTrip implements http.RoundTripper
Expand All @@ -64,8 +66,9 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
wg := sync.WaitGroup{}
mtx := sync.Mutex{}

var overallTrace *tempopb.Trace
var overallError error
var totalFailedBlocks uint32
overallTrace := &tempopb.Trace{}
statusCode := http.StatusNotFound
statusMsg := "trace not found"

Expand Down Expand Up @@ -121,17 +124,25 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
// marshal into a trace to combine.
// todo: better define responsibilities between middleware. the parent middleware in frontend.go actually sets the header
// which forces the body here to be a proto encoded tempopb.Trace{}
trace := &tempopb.Trace{}
err = proto.Unmarshal(buff, trace)
traceResp := &tempopb.TraceByIDResponse{}
err = proto.Unmarshal(buff, traceResp)
if err != nil {
_ = level.Error(s.logger).Log("msg", "error unmarshalling response", "url", innerR.RequestURI, "err", err, "body", string(buff))
overallError = err
return
}

if traceResp.Metrics != nil {
totalFailedBlocks += traceResp.Metrics.FailedBlocks
if totalFailedBlocks > s.maxFailedBlocks {
overallError = fmt.Errorf("too many failed block queries %d (max %d)", totalFailedBlocks, s.maxFailedBlocks)
return
}
}

// happy path
statusCode = http.StatusOK
overallTrace, _, _, _ = model.CombineTraceProtos(overallTrace, trace)
overallTrace, _, _, _ = model.CombineTraceProtos(overallTrace, traceResp.Trace)
}(req)
}
wg.Wait()
Expand All @@ -155,7 +166,12 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
}, nil
}

buff, err := proto.Marshal(overallTrace)
buff, err := proto.Marshal(&tempopb.TraceByIDResponse{
Trace: overallTrace,
Metrics: &tempopb.TraceByIDMetrics{
FailedBlocks: totalFailedBlocks,
},
})
if err != nil {
_ = level.Error(s.logger).Log("msg", "error marshalling response to proto", "err", err)
return nil, err
Expand Down Expand Up @@ -238,5 +254,6 @@ func shouldQuit(ctx context.Context, statusCode int, err error) bool {
if statusCode/100 == 5 { // bail on any 5xx's
return true
}

return false
}
62 changes: 43 additions & 19 deletions modules/frontend/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"errors"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -93,16 +92,17 @@ func TestShardingWareDoRequest(t *testing.T) {
}

tests := []struct {
name string
status1 int
status2 int
trace1 *tempopb.Trace
trace2 *tempopb.Trace
err1 error
err2 error
expectedStatus int
expectedTrace *tempopb.Trace
expectedError error
name string
status1 int
status2 int
trace1 *tempopb.Trace
trace2 *tempopb.Trace
err1 error
err2 error
failedBlockQueries int
expectedStatus int
expectedTrace *tempopb.Trace
expectedError error
}{
{
name: "empty returns",
Expand Down Expand Up @@ -210,11 +210,30 @@ func TestShardingWareDoRequest(t *testing.T) {
err2: errors.New("booo"),
expectedError: errors.New("booo"),
},
{
name: "failed block queries <= max",
status1: 200,
trace1: trace1,
status2: 200,
trace2: trace2,
failedBlockQueries: 1,
expectedStatus: 200,
expectedTrace: trace,
},
{
name: "too many failed block queries",
status1: 200,
trace1: trace1,
status2: 200,
trace2: trace2,
failedBlockQueries: 10,
expectedError: errors.New("too many failed block queries 10 (max 2)"),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
sharder := ShardingWare(2, log.NewNopLogger())
sharder := ShardingWare(2, 2, log.NewNopLogger())

next := RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
var trace *tempopb.Trace
Expand All @@ -234,14 +253,19 @@ func TestShardingWareDoRequest(t *testing.T) {
return nil, err
}

var traceBytes []byte
var resBytes []byte
if trace != nil {
traceBytes, err = proto.Marshal(trace)
resBytes, err = proto.Marshal(&tempopb.TraceByIDResponse{
Trace: trace,
Metrics: &tempopb.TraceByIDMetrics{
FailedBlocks: uint32(tc.failedBlockQueries),
},
})
require.NoError(t, err)
}

return &http.Response{
Body: ioutil.NopCloser(bytes.NewReader(traceBytes)),
Body: io.NopCloser(bytes.NewReader(resBytes)),
StatusCode: statusCode,
}, nil
})
Expand All @@ -261,15 +285,15 @@ func TestShardingWareDoRequest(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
if tc.expectedTrace != nil {
actualTrace := &tempopb.Trace{}
actualResp := &tempopb.TraceByIDResponse{}
bytesTrace, err := io.ReadAll(resp.Body)
require.NoError(t, err)
err = proto.Unmarshal(bytesTrace, actualTrace)
err = proto.Unmarshal(bytesTrace, actualResp)
require.NoError(t, err)

model.SortTrace(tc.expectedTrace)
model.SortTrace(actualTrace)
assert.True(t, proto.Equal(tc.expectedTrace, actualTrace))
model.SortTrace(actualResp.Trace)
assert.True(t, proto.Equal(tc.expectedTrace, actualResp.Trace))
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {

if r.Header.Get(util.AcceptHeaderKey) == util.ProtobufTypeHeaderValue {
span.SetTag("contentType", util.ProtobufTypeHeaderValue)
b, err := proto.Marshal(resp.Trace)
b, err := proto.Marshal(resp)
mapno marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -94,7 +94,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {

span.SetTag("contentType", util.JSONTypeHeaderValue)
marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(w, resp.Trace)
err = marshaller.Marshal(w, resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
Loading