Skip to content

Commit

Permalink
Fixes frontend panic
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine committed May 24, 2024
1 parent 616e81a commit 8aa7d5b
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 2 deletions.
12 changes: 11 additions & 1 deletion pkg/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/pyroscope/pkg/frontend/frontendpb"
"github.com/grafana/pyroscope/pkg/querier/stats"
"github.com/grafana/pyroscope/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpcutil"
"github.com/grafana/pyroscope/pkg/validation"
Expand Down Expand Up @@ -78,6 +79,7 @@ func (cfg *Config) Validate() error {
// dispatches them to backends via gRPC, and handles retries for requests which failed.
type Frontend struct {
services.Service
connectgrpc.GRPCRoundTripper

cfg Config
log log.Logger
Expand Down Expand Up @@ -149,6 +151,7 @@ func NewFrontend(cfg Config, limits Limits, log log.Logger, reg prometheus.Regis
schedulerWorkersWatcher: services.NewFailureWatcher(),
requests: newRequestsInProgress(),
}
f.GRPCRoundTripper = &realFrontendRoundTripper{frontend: f}
// Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results
// between different queries. Note that frontend verifies the user, so it cannot leak results between tenants.
// This isn't perfect, but better than nothing.
Expand Down Expand Up @@ -191,8 +194,15 @@ func (f *Frontend) stopping(_ error) error {
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), f.schedulerWorkers), "failed to stop frontend scheduler workers")
}

// allow to test the frontend without the need of a real roundertripper
type realFrontendRoundTripper struct {
frontend *Frontend
}

// RoundTripGRPC round trips a proto (instead of an HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
func (rt *realFrontendRoundTripper) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
f := rt.frontend

if s := f.State(); s != services.Running {
return nil, fmt.Errorf("frontend not running: %v", s)
}
Expand Down
118 changes: 118 additions & 0 deletions pkg/frontend/frontend_diff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package frontend

import (
"context"
"testing"
"time"

"connectrpc.com/connect"
"github.com/grafana/dskit/user"
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

type mockLimits struct{}

func (m *mockLimits) QuerySplitDuration(_ string) time.Duration {
return time.Hour
}

func (m *mockLimits) MaxQueryParallelism(_ string) int {
return 100
}

func (m *mockLimits) MaxQueryLength(_ string) time.Duration {
return time.Hour
}

func (m *mockLimits) MaxQueryLookback(_ string) time.Duration {
return time.Hour * 24
}

func (m *mockLimits) QueryAnalysisEnabled(_ string) bool {
return true
}

func (m *mockLimits) MaxFlameGraphNodesDefault(_ string) int {
return 10_000
}

func (m *mockLimits) MaxFlameGraphNodesMax(_ string) int {
return 100_000
}

type mockRoundTripper struct {
callback func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}

func (m *mockRoundTripper) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if m.callback != nil {
return m.callback(ctx, req)
}
return &httpgrpc.HTTPResponse{}, errors.New("not implemented")
}

func Test_Frontend_Diff(t *testing.T) {
frontend := Frontend{
limits: &mockLimits{},
}

ctx := user.InjectOrgID(context.Background(), "test")
_, ctx = opentracing.StartSpanFromContext(ctx, "test")
now := time.Now().UnixMilli()

profileType := "memory:inuse_space:bytes:space:byte"

t.Run("Diff outside of the query window", func(t *testing.T) {
resp, err := frontend.Diff(
ctx,
connect.NewRequest(&querierv1.DiffRequest{
Left: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: 0000,
End: 1000,
},
Right: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: 2000,
End: 3000,
},
}),
)
require.NoError(t, err)
require.NotNil(t, resp)
})

t.Run("Failing left hand side", func(t *testing.T) {
frontend.GRPCRoundTripper = &mockRoundTripper{callback: func(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
t.Logf("mockRoundTripper callback: %v", req)
return nil, errors.New("mockRoundTripper callback error")
}}

resp, err := frontend.Diff(
ctx,
connect.NewRequest(&querierv1.DiffRequest{
Left: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 0000,
End: now + 1000,
},
Right: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileType,
LabelSelector: "{}",
Start: now + 2000,
End: now + 3000,
},
}),
)
require.NoError(t, err)
require.NotNil(t, resp)
})

}
4 changes: 3 additions & 1 deletion pkg/frontend/frontend_select_merge_stacktraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func (f *Frontend) SelectMergeStacktraces(ctx context.Context,
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
if validated.IsEmpty {
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{}), nil
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: &querierv1.FlameGraph{},
}), nil
}
maxNodes, err := validation.ValidateMaxNodes(f.limits, tenantIDs, c.Msg.GetMaxNodes())
if err != nil {
Expand Down

0 comments on commit 8aa7d5b

Please sign in to comment.