diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index 0d0a3bf9c3c81..6e8fd6cf3104c 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -337,7 +337,7 @@ func Test_codec_MergeResponse(t *testing.T) { &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: logproto.BACKWARD, - Limit: 4, + Limit: 6, Version: 1, Data: LokiData{ ResultType: loghttp.ResultTypeStream, @@ -363,7 +363,7 @@ func Test_codec_MergeResponse(t *testing.T) { &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: logproto.BACKWARD, - Limit: 4, + Limit: 6, Version: 1, Data: LokiData{ ResultType: loghttp.ResultTypeStream, @@ -389,7 +389,7 @@ func Test_codec_MergeResponse(t *testing.T) { &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: logproto.BACKWARD, - Limit: 4, + Limit: 6, Version: 1, Data: LokiData{ ResultType: loghttp.ResultTypeStream, @@ -399,6 +399,7 @@ func Test_codec_MergeResponse(t *testing.T) { Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 10), Line: "10"}, {Timestamp: time.Unix(0, 9), Line: "9"}, + {Timestamp: time.Unix(0, 9), Line: "9"}, }, }, { @@ -406,6 +407,7 @@ func Test_codec_MergeResponse(t *testing.T) { Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 16), Line: "16"}, {Timestamp: time.Unix(0, 15), Line: "15"}, + {Timestamp: time.Unix(0, 6), Line: "6"}, }, }, }, diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 7138703cd886c..fe8d900225135 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -243,14 +243,19 @@ func TestUnhandledPath(t *testing.T) { require.NoError(t, err) } -type fakeLimits struct{} +type fakeLimits struct { + maxQueryParallelism int +} func (fakeLimits) MaxQueryLength(string) time.Duration { return time.Hour * 7 } -func (fakeLimits) MaxQueryParallelism(string) int { - return 1 +func (f fakeLimits) MaxQueryParallelism(string) int { + if f.maxQueryParallelism == 0 { + return 1 + } + return f.maxQueryParallelism } func counter() (*int, http.Handler) { diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 394b6b0c12b89..2c16b95fbd5a7 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -58,9 +58,18 @@ func (h *splitByInterval) Process( threshold int64, input []*lokiResult, ) (responses []queryrange.Response, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ch := h.Feed(ctx, input) - for i := 0; i < parallelism; i++ { + // don't spawn unnecessary goroutines + var p int = parallelism + if len(input) < parallelism { + p = len(input) + } + + for i := 0; i < p; i++ { go h.loop(ctx, ch) } @@ -118,8 +127,8 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra for _, interval := range intervals { input = append(input, &lokiResult{ req: interval, - resp: make(chan queryrange.Response), - err: make(chan error), + resp: make(chan queryrange.Response, 1), + err: make(chan error, 1), }) } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 51160c9d26d3c..78758a98fc8c9 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "fmt" + "runtime" "sync" "testing" "time" @@ -327,3 +328,70 @@ func Test_ExitEarly(t *testing.T) { require.NoError(t, err) require.Equal(t, expected, res) } + +func Test_DoesntDeadlock(t *testing.T) { + n := 10 + + split := splitByInterval{ + next: queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { + + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Direction: r.(*LokiRequest).Direction, + Limit: r.(*LokiRequest).Limit, + Version: uint32(loghttp.VersionV1), + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{ + { + Labels: `{foo="bar", level="debug"}`, + Entries: []logproto.Entry{ + + { + Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), + Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()), + }, + }, + }, + }, + }, + }, nil + }), + limits: fakeLimits{ + maxQueryParallelism: n, + }, + merger: lokiCodec, + interval: time.Hour, + } + + // split into n requests w/ n/2 limit, ensuring unused responses are cleaned up properly + req := &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(0, (time.Duration(n) * time.Hour).Nanoseconds()), + Query: "", + Limit: uint32(n / 2), + Step: 1, + Direction: logproto.FORWARD, + Path: "/api/prom/query_range", + } + + ctx := user.InjectOrgID(context.Background(), "1") + + startingGoroutines := runtime.NumGoroutine() + + // goroutines shouldn't blow up across 100 rounds + for i := 0; i < 100; i++ { + res, err := split.Do(ctx, req) + require.NoError(t, err) + require.Equal(t, 1, len(res.(*LokiResponse).Data.Result)) + require.Equal(t, n/2, len(res.(*LokiResponse).Data.Result[0].Entries)) + + } + runtime.GC() + endingGoroutines := runtime.NumGoroutine() + + // give runtime a bit of slack when catching up -- this isn't an exact science :( + // Allow for 1% increase in goroutines + require.LessOrEqual(t, endingGoroutines, startingGoroutines*101/100) + +}