Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in query frontend diff handling #3315

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/pyroscope/pkg/frontend/frontendpb"
"github.com/grafana/pyroscope/pkg/querier/stats"
"github.com/grafana/pyroscope/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpcutil"
"github.com/grafana/pyroscope/pkg/validation"
Expand Down Expand Up @@ -78,6 +79,7 @@ func (cfg *Config) Validate() error {
// dispatches them to backends via gRPC, and handles retries for requests which failed.
type Frontend struct {
services.Service
connectgrpc.GRPCRoundTripper

cfg Config
log log.Logger
Expand Down Expand Up @@ -149,6 +151,7 @@ func NewFrontend(cfg Config, limits Limits, log log.Logger, reg prometheus.Regis
schedulerWorkersWatcher: services.NewFailureWatcher(),
requests: newRequestsInProgress(),
}
f.GRPCRoundTripper = &realFrontendRoundTripper{frontend: f}
// Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results
// between different queries. Note that frontend verifies the user, so it cannot leak results between tenants.
// This isn't perfect, but better than nothing.
Expand Down Expand Up @@ -191,8 +194,15 @@ func (f *Frontend) stopping(_ error) error {
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), f.schedulerWorkers), "failed to stop frontend scheduler workers")
}

// allow to test the frontend without the need of a real roundertripper
type realFrontendRoundTripper struct {
frontend *Frontend
}

// RoundTripGRPC round trips a proto (instead of an HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
func (rt *realFrontendRoundTripper) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
f := rt.frontend

if s := f.State(); s != services.Running {
return nil, fmt.Errorf("frontend not running: %v", s)
}
Expand Down
185 changes: 185 additions & 0 deletions pkg/frontend/frontend_diff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package frontend

import (
"context"
"testing"
"time"

"connectrpc.com/connect"
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
"github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
)

type mockLimits struct{}

func (m *mockLimits) QuerySplitDuration(_ string) time.Duration {
return time.Hour
}

func (m *mockLimits) MaxQueryParallelism(_ string) int {
return 100
}

func (m *mockLimits) MaxQueryLength(_ string) time.Duration {
return time.Hour
}

func (m *mockLimits) MaxQueryLookback(_ string) time.Duration {
return time.Hour * 24
}

func (m *mockLimits) QueryAnalysisEnabled(_ string) bool {
return true
}

func (m *mockLimits) MaxFlameGraphNodesDefault(_ string) int {
return 10_000
}

func (m *mockLimits) MaxFlameGraphNodesMax(_ string) int {
return 100_000
}

type mockRoundTripper struct {
callback func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}

func (m *mockRoundTripper) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if m.callback != nil {
return m.callback(ctx, req)
}
return &httpgrpc.HTTPResponse{}, errors.New("not implemented")
}

func Test_Frontend_Diff(t *testing.T) {
frontend := Frontend{
limits: &mockLimits{},
}

ctx := user.InjectOrgID(context.Background(), "test")
_, ctx = opentracing.StartSpanFromContext(ctx, "test")
now := time.Now().UnixMilli()

profileType := "memory:inuse_space:bytes:space:byte"

t.Run("Diff outside of the query window", func(t *testing.T) {
resp, err := frontend.Diff(
ctx,
connect.NewRequest(&querierv1.DiffRequest{
Left: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: 0000,
End: 1000,
},
Right: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: 2000,
End: 3000,
},
}),
)
require.NoError(t, err)
require.NotNil(t, resp)
})

t.Run("Failing left hand side", func(t *testing.T) {
frontend.GRPCRoundTripper = &mockRoundTripper{callback: func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
return connectgrpc.HandleUnary[querierv1.SelectMergeStacktracesRequest, querierv1.SelectMergeStacktracesResponse](ctx, req, func(ctx context.Context, req *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {
if req.Msg.Start == now {
return nil, errors.New("left fails")
}

return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: &querierv1.FlameGraph{},
}), nil
})
}}

_, err := frontend.Diff(
ctx,
connect.NewRequest(&querierv1.DiffRequest{
Left: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 0000,
End: now + 1000,
},
Right: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 2000,
End: now + 3000,
},
}),
)
require.ErrorContains(t, err, "left fails")
})

t.Run("simple diff", func(t *testing.T) {
frontend.GRPCRoundTripper = &mockRoundTripper{callback: func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
return connectgrpc.HandleUnary[querierv1.SelectMergeStacktracesRequest, querierv1.SelectMergeStacktracesResponse](ctx, req, func(ctx context.Context, req *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {

s := new(model.Tree)
s.InsertStack(1, "foo", "bar")

if req.Msg.Start == now {
//left
s.InsertStack(1, "foo", "bar", "baz")
} else {
//right
s.InsertStack(2, "foo", "bar", "buz")
}

return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: model.NewFlameGraph(s, -1),
}), nil
})
}}

resp, err := frontend.Diff(
ctx,
connect.NewRequest(&querierv1.DiffRequest{
Left: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 0000,
End: now + 1000,
},
Right: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 2000,
End: now + 3000,
},
}),
)
require.NoError(t, err)
require.Equal(
t,
&querierv1.FlameGraphDiff{
Names: []string{"total", "foo", "bar", "buz", "baz"},
Total: 5,
Levels: []*querierv1.Level{
{Values: []int64{0, 2, 0, 0, 3, 0, 0}},
{Values: []int64{0, 2, 0, 0, 3, 0, 1}},
{Values: []int64{0, 2, 1, 0, 3, 1, 2}},
{Values: []int64{1, 1, 1, 1, 0, 0, 4, 0, 0, 0, 0, 2, 2, 3}},
},
LeftTicks: 2,
RightTicks: 3,
MaxSelf: 2,
},
resp.Msg.Flamegraph,
)
})

}
4 changes: 3 additions & 1 deletion pkg/frontend/frontend_select_merge_stacktraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func (f *Frontend) SelectMergeStacktraces(ctx context.Context,
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
if validated.IsEmpty {
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{}), nil
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: &querierv1.FlameGraph{},
}), nil
simonswine marked this conversation as resolved.
Show resolved Hide resolved
}
maxNodes, err := validation.ValidateMaxNodes(f.limits, tenantIDs, c.Msg.GetMaxNodes())
if err != nil {
Expand Down
Loading