From 529b4843666f257bcb1d08d9766a60f42c358b77 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 8 Jan 2020 15:10:42 -0500 Subject: [PATCH 1/4] buffers splitby interval channels to prevent deadlocks --- pkg/querier/queryrange/roundtrip_test.go | 11 +++- pkg/querier/queryrange/split_by_interval.go | 7 ++- .../queryrange/split_by_interval_test.go | 60 +++++++++++++++++++ 3 files changed, 73 insertions(+), 5 deletions(-) diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 7138703cd886c..fe8d900225135 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -243,14 +243,19 @@ func TestUnhandledPath(t *testing.T) { require.NoError(t, err) } -type fakeLimits struct{} +type fakeLimits struct { + maxQueryParallelism int +} func (fakeLimits) MaxQueryLength(string) time.Duration { return time.Hour * 7 } -func (fakeLimits) MaxQueryParallelism(string) int { - return 1 +func (f fakeLimits) MaxQueryParallelism(string) int { + if f.maxQueryParallelism == 0 { + return 1 + } + return f.maxQueryParallelism } func counter() (*int, http.Handler) { diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 394b6b0c12b89..e1f91adad1d85 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -58,6 +58,9 @@ func (h *splitByInterval) Process( threshold int64, input []*lokiResult, ) (responses []queryrange.Response, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ch := h.Feed(ctx, input) for i := 0; i < parallelism; i++ { @@ -118,8 +121,8 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra for _, interval := range intervals { input = append(input, &lokiResult{ req: interval, - resp: make(chan queryrange.Response), - err: make(chan error), + resp: make(chan queryrange.Response, 1), + err: make(chan error, 1), }) } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 51160c9d26d3c..768428ad550dd 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "fmt" + "runtime" "sync" "testing" "time" @@ -327,3 +328,62 @@ func Test_ExitEarly(t *testing.T) { require.NoError(t, err) require.Equal(t, expected, res) } + +func Test_DoesntDeadlock(t *testing.T) { + n := 10 + + split := splitByInterval{ + next: queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { + + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Direction: r.(*LokiRequest).Direction, + Limit: r.(*LokiRequest).Limit, + Version: uint32(loghttp.VersionV1), + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{ + { + Labels: `{foo="bar", level="debug"}`, + Entries: []logproto.Entry{ + + { + Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), + Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()), + }, + }, + }, + }, + }, + }, nil + }), + limits: fakeLimits{ + maxQueryParallelism: n, + }, + merger: lokiCodec, + interval: time.Hour, + } + + // split into n requests w/ n/2 limit, ensuring unused responses are cleaned up properly + req := &LokiRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(0, (time.Duration(n) * time.Hour).Nanoseconds()), + Query: "", + Limit: uint32(n / 2), + Step: 1, + Direction: logproto.FORWARD, + Path: "/api/prom/query_range", + } + + ctx := user.InjectOrgID(context.Background(), "1") + + startingGoroutines := runtime.NumGoroutine() + res, err := split.Do(ctx, req) + time.Sleep(time.Millisecond) // allow for runtime scheduling to catch up before we check n_goroutines + endingGoroutines := runtime.NumGoroutine() + require.NoError(t, err) + require.Equal(t, 1, len(res.(*LokiResponse).Data.Result)) + require.Equal(t, n/2, len(res.(*LokiResponse).Data.Result[0].Entries)) + require.Equal(t, startingGoroutines, endingGoroutines) + +} From 48e96052f70cc71cd6ea8837b1314436f92f29d7 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 8 Jan 2020 15:15:36 -0500 Subject: [PATCH 2/4] improves codec merge test requiring merging from multiple intervals --- pkg/querier/queryrange/codec_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index 0d0a3bf9c3c81..6e8fd6cf3104c 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -337,7 +337,7 @@ func Test_codec_MergeResponse(t *testing.T) { &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: logproto.BACKWARD, - Limit: 4, + Limit: 6, Version: 1, Data: LokiData{ ResultType: loghttp.ResultTypeStream, @@ -363,7 +363,7 @@ func Test_codec_MergeResponse(t *testing.T) { &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: logproto.BACKWARD, - Limit: 4, + Limit: 6, Version: 1, Data: LokiData{ ResultType: loghttp.ResultTypeStream, @@ -389,7 +389,7 @@ func Test_codec_MergeResponse(t *testing.T) { &LokiResponse{ Status: loghttp.QueryStatusSuccess, Direction: logproto.BACKWARD, - Limit: 4, + Limit: 6, Version: 1, Data: LokiData{ ResultType: loghttp.ResultTypeStream, @@ -399,6 +399,7 @@ func Test_codec_MergeResponse(t *testing.T) { Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 10), Line: "10"}, {Timestamp: time.Unix(0, 9), Line: "9"}, + {Timestamp: time.Unix(0, 9), Line: "9"}, }, }, { @@ -406,6 +407,7 @@ func Test_codec_MergeResponse(t *testing.T) { Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 16), Line: "16"}, {Timestamp: time.Unix(0, 15), Line: "15"}, + {Timestamp: time.Unix(0, 6), Line: "6"}, }, }, }, From 14d97f7c89e8423a7176d2b090465b09535b8b1c Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 8 Jan 2020 16:22:54 -0500 Subject: [PATCH 3/4] bound concurrency by min(parallelism,jobs) --- pkg/querier/queryrange/split_by_interval.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index e1f91adad1d85..2c16b95fbd5a7 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -63,7 +63,13 @@ func (h *splitByInterval) Process( ch := h.Feed(ctx, input) - for i := 0; i < parallelism; i++ { + // don't spawn unnecessary goroutines + var p int = parallelism + if len(input) < parallelism { + p = len(input) + } + + for i := 0; i < p; i++ { go h.loop(ctx, ch) } From 3cd157c27d235275279ce1381f71a9833a6329dc Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 8 Jan 2020 16:23:17 -0500 Subject: [PATCH 4/4] approximates goroutine test --- .../queryrange/split_by_interval_test.go | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 768428ad550dd..78758a98fc8c9 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -378,12 +378,20 @@ func Test_DoesntDeadlock(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") startingGoroutines := runtime.NumGoroutine() - res, err := split.Do(ctx, req) - time.Sleep(time.Millisecond) // allow for runtime scheduling to catch up before we check n_goroutines + + // goroutines shouldn't blow up across 100 rounds + for i := 0; i < 100; i++ { + res, err := split.Do(ctx, req) + require.NoError(t, err) + require.Equal(t, 1, len(res.(*LokiResponse).Data.Result)) + require.Equal(t, n/2, len(res.(*LokiResponse).Data.Result[0].Entries)) + + } + runtime.GC() endingGoroutines := runtime.NumGoroutine() - require.NoError(t, err) - require.Equal(t, 1, len(res.(*LokiResponse).Data.Result)) - require.Equal(t, n/2, len(res.(*LokiResponse).Data.Result[0].Entries)) - require.Equal(t, startingGoroutines, endingGoroutines) + + // give runtime a bit of slack when catching up -- this isn't an exact science :( + // Allow for 1% increase in goroutines + require.LessOrEqual(t, endingGoroutines, startingGoroutines*101/100) }