From 92f979f21a5bde9f43ae995b88b390fbb8c23cbf Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 11 Nov 2020 09:29:24 +0100 Subject: [PATCH] Fixes race condition in tailer since logql v2. Signed-off-by: Cyril Tovena --- pkg/ingester/tailer.go | 25 ++++++++++++++++++------- pkg/ingester/tailer_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 714db64264b3..73ebb23b7190 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "golang.org/x/net/context" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" @@ -19,12 +20,18 @@ import ( const bufferSizeForTailResponse = 5 +type TailServer interface { + Send(*logproto.TailResponse) error + Context() context.Context +} + type tailer struct { - id uint32 - orgID string - matchers []*labels.Matcher - pipeline logql.Pipeline - expr logql.Expr + id uint32 + orgID string + matchers []*labels.Matcher + pipeline logql.Pipeline + expr logql.Expr + pipelineMtx sync.Mutex sendChan chan *logproto.Stream @@ -37,10 +44,10 @@ type tailer struct { blockedMtx sync.RWMutex droppedStreams []*logproto.DroppedStream - conn logproto.Querier_TailServer + conn TailServer } -func newTailer(orgID, query string, conn logproto.Querier_TailServer) (*tailer, error) { +func newTailer(orgID, query string, conn TailServer) (*tailer, error) { expr, err := logql.ParseLogSelector(query) if err != nil { return nil, err @@ -141,6 +148,10 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error if log.IsNoopPipeline(t.pipeline) { return []logproto.Stream{stream}, nil } + // pipeline are not thread safe and tailer can process multiple stream at once. + t.pipelineMtx.Lock() + defer t.pipelineMtx.Unlock() + streams := map[uint64]*logproto.Stream{} lbs, err := util.ParseLabels(stream.Labels) if err != nil { diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index dff3cffdddcd..466f1948f515 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -1,6 +1,7 @@ package ingester import ( + "context" "math/rand" "sync" "testing" @@ -46,3 +47,30 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) { routines.Wait() } } + +type fakeTailServer struct{} + +func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil } +func (f *fakeTailServer) Context() context.Context { return context.Background() } + +func Test_TailerSendRace(t *testing.T) { + tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}) + require.NoError(t, err) + + var wg sync.WaitGroup + for i := 1; i <= 20; i++ { + wg.Add(1) + go func() { + _ = tail.send(logproto.Stream{ + Labels: makeRandomLabels(), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1"}, + {Timestamp: time.Unix(0, 2), Line: "2"}, + {Timestamp: time.Unix(0, 3), Line: "3"}, + }, + }) + wg.Done() + }() + } + wg.Wait() +}