Skip to content

Commit

Permalink
Fix bug in query frontend diff handling (#3315)
Browse files Browse the repository at this point in the history
This also adds some test coverage to the query-frontend diff handler.
  • Loading branch information
simonswine committed May 28, 2024
1 parent cf55c09 commit 12b0543
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 2 deletions.
12 changes: 11 additions & 1 deletion pkg/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/pyroscope/pkg/frontend/frontendpb"
"github.com/grafana/pyroscope/pkg/querier/stats"
"github.com/grafana/pyroscope/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpcutil"
"github.com/grafana/pyroscope/pkg/validation"
Expand Down Expand Up @@ -78,6 +79,7 @@ func (cfg *Config) Validate() error {
// dispatches them to backends via gRPC, and handles retries for requests which failed.
type Frontend struct {
services.Service
connectgrpc.GRPCRoundTripper

cfg Config
log log.Logger
Expand Down Expand Up @@ -149,6 +151,7 @@ func NewFrontend(cfg Config, limits Limits, log log.Logger, reg prometheus.Regis
schedulerWorkersWatcher: services.NewFailureWatcher(),
requests: newRequestsInProgress(),
}
f.GRPCRoundTripper = &realFrontendRoundTripper{frontend: f}
// Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results
// between different queries. Note that frontend verifies the user, so it cannot leak results between tenants.
// This isn't perfect, but better than nothing.
Expand Down Expand Up @@ -191,8 +194,15 @@ func (f *Frontend) stopping(_ error) error {
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), f.schedulerWorkers), "failed to stop frontend scheduler workers")
}

// allow to test the frontend without the need of a real roundertripper
type realFrontendRoundTripper struct {
frontend *Frontend
}

// RoundTripGRPC round trips a proto (instead of an HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
func (rt *realFrontendRoundTripper) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
f := rt.frontend

if s := f.State(); s != services.Running {
return nil, fmt.Errorf("frontend not running: %v", s)
}
Expand Down
185 changes: 185 additions & 0 deletions pkg/frontend/frontend_diff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package frontend

import (
"context"
"testing"
"time"

"connectrpc.com/connect"
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
"github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
)

type mockLimits struct{}

func (m *mockLimits) QuerySplitDuration(_ string) time.Duration {
return time.Hour
}

func (m *mockLimits) MaxQueryParallelism(_ string) int {
return 100
}

func (m *mockLimits) MaxQueryLength(_ string) time.Duration {
return time.Hour
}

func (m *mockLimits) MaxQueryLookback(_ string) time.Duration {
return time.Hour * 24
}

func (m *mockLimits) QueryAnalysisEnabled(_ string) bool {
return true
}

func (m *mockLimits) MaxFlameGraphNodesDefault(_ string) int {
return 10_000
}

func (m *mockLimits) MaxFlameGraphNodesMax(_ string) int {
return 100_000
}

type mockRoundTripper struct {
callback func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}

func (m *mockRoundTripper) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if m.callback != nil {
return m.callback(ctx, req)
}
return &httpgrpc.HTTPResponse{}, errors.New("not implemented")
}

func Test_Frontend_Diff(t *testing.T) {
frontend := Frontend{
limits: &mockLimits{},
}

ctx := user.InjectOrgID(context.Background(), "test")
_, ctx = opentracing.StartSpanFromContext(ctx, "test")
now := time.Now().UnixMilli()

profileType := "memory:inuse_space:bytes:space:byte"

t.Run("Diff outside of the query window", func(t *testing.T) {
resp, err := frontend.Diff(
ctx,
connect.NewRequest(&querierv1.DiffRequest{
Left: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: 0000,
End: 1000,
},
Right: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: 2000,
End: 3000,
},
}),
)
require.NoError(t, err)
require.NotNil(t, resp)
})

t.Run("Failing left hand side", func(t *testing.T) {
frontend.GRPCRoundTripper = &mockRoundTripper{callback: func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
return connectgrpc.HandleUnary[querierv1.SelectMergeStacktracesRequest, querierv1.SelectMergeStacktracesResponse](ctx, req, func(ctx context.Context, req *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {
if req.Msg.Start == now {
return nil, errors.New("left fails")
}

return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: &querierv1.FlameGraph{},
}), nil
})
}}

_, err := frontend.Diff(
ctx,
connect.NewRequest(&querierv1.DiffRequest{
Left: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 0000,
End: now + 1000,
},
Right: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 2000,
End: now + 3000,
},
}),
)
require.ErrorContains(t, err, "left fails")
})

t.Run("simple diff", func(t *testing.T) {
frontend.GRPCRoundTripper = &mockRoundTripper{callback: func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
return connectgrpc.HandleUnary[querierv1.SelectMergeStacktracesRequest, querierv1.SelectMergeStacktracesResponse](ctx, req, func(ctx context.Context, req *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {

s := new(model.Tree)
s.InsertStack(1, "foo", "bar")

if req.Msg.Start == now {
//left
s.InsertStack(1, "foo", "bar", "baz")
} else {
//right
s.InsertStack(2, "foo", "bar", "buz")
}

return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: model.NewFlameGraph(s, -1),
}), nil
})
}}

resp, err := frontend.Diff(
ctx,
connect.NewRequest(&querierv1.DiffRequest{
Left: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 0000,
End: now + 1000,
},
Right: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 2000,
End: now + 3000,
},
}),
)
require.NoError(t, err)
require.Equal(
t,
&querierv1.FlameGraphDiff{
Names: []string{"total", "foo", "bar", "buz", "baz"},
Total: 5,
Levels: []*querierv1.Level{
{Values: []int64{0, 2, 0, 0, 3, 0, 0}},
{Values: []int64{0, 2, 0, 0, 3, 0, 1}},
{Values: []int64{0, 2, 1, 0, 3, 1, 2}},
{Values: []int64{1, 1, 1, 1, 0, 0, 4, 0, 0, 0, 0, 2, 2, 3}},
},
LeftTicks: 2,
RightTicks: 3,
MaxSelf: 2,
},
resp.Msg.Flamegraph,
)
})

}
4 changes: 3 additions & 1 deletion pkg/frontend/frontend_select_merge_stacktraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func (f *Frontend) SelectMergeStacktraces(ctx context.Context,
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
if validated.IsEmpty {
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{}), nil
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: &querierv1.FlameGraph{},
}), nil
}
maxNodes, err := validation.ValidateMaxNodes(f.limits, tenantIDs, c.Msg.GetMaxNodes())
if err != nil {
Expand Down

0 comments on commit 12b0543

Please sign in to comment.