Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes tailer.droppedStreams slice bounded. #5334

Merged
merged 3 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type Config struct {
Wrapper Wrapper `yaml:"-"`

IndexShards int `yaml:"index_shards"`

MaxDroppedStreams int `yaml:"max_dropped_streams"`
}

// RegisterFlags registers the flags.
Expand All @@ -113,6 +115,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`")
f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
f.IntVar(&cfg.MaxDroppedStreams, "ingester.tailer.max-dropped-streams", 10, "Maximum number of dropped streams to keep in memory during tailing")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -811,7 +814,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
}

instance := i.GetOrCreateInstance(instanceID)
tailer, err := newTailer(instanceID, req.Query, queryServer)
tailer, err := newTailer(instanceID, req.Query, queryServer, i.cfg.MaxDroppedStreams)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()

inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil)
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil, 10)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
require.NoError(b, inst.Push(ctx, &logproto.PushRequest{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func Benchmark_PushStream(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NilMetrics)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{})
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10)
require.NoError(b, err)

go t.loop()
Expand Down
33 changes: 20 additions & 13 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ type tailer struct {
closeChan chan struct{}
closeOnce sync.Once

blockedAt *time.Time
blockedMtx sync.RWMutex
droppedStreams []*logproto.DroppedStream
blockedAt *time.Time
blockedMtx sync.RWMutex
droppedStreams []*logproto.DroppedStream
maxDroppedStreams int

conn TailServer
}

func newTailer(orgID, query string, conn TailServer) (*tailer, error) {
func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*tailer, error) {
expr, err := logql.ParseLogSelector(query, true)
if err != nil {
return nil, err
Expand All @@ -58,15 +59,16 @@ func newTailer(orgID, query string, conn TailServer) (*tailer, error) {
matchers := expr.Matchers()

return &tailer{
orgID: orgID,
matchers: matchers,
pipeline: pipeline,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
conn: conn,
droppedStreams: []*logproto.DroppedStream{},
id: generateUniqueID(orgID, query),
closeChan: make(chan struct{}),
expr: expr,
orgID: orgID,
matchers: matchers,
pipeline: pipeline,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
conn: conn,
droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams),
maxDroppedStreams: maxDroppedStreams,
id: generateUniqueID(orgID, query),
closeChan: make(chan struct{}),
expr: expr,
}, nil
}

Expand Down Expand Up @@ -219,6 +221,11 @@ func (t *tailer) dropStream(stream logproto.Stream) {
t.blockedAt = &blockedAt
}

if len(t.droppedStreams) >= t.maxDroppedStreams {
level.Info(util_log.Logger).Log("msg", "tailer dropped streams is reset", "length", len(t.droppedStreams))
t.droppedStreams = nil
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
}

t.droppedStreams = append(t.droppedStreams, &logproto.DroppedStream{
From: stream.Entries[0].Timestamp,
To: stream.Entries[len(stream.Entries)-1].Timestamp,
Expand Down
48 changes: 46 additions & 2 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
}

for run := 0; run < runs; run++ {
tailer, err := newTailer("org-id", stream.Labels, nil)
tailer, err := newTailer("org-id", stream.Labels, nil, 10)
require.NoError(t, err)
require.NotNil(t, tailer)

Expand All @@ -49,13 +49,57 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
}
}

func Test_dropstream(t *testing.T) {
maxDroppedStreams := 10

entry := logproto.Entry{Timestamp: time.Now(), Line: "foo"}

cases := []struct {
name string
drop int
expected int
}{
{
name: "less than maxDroppedStreams",
drop: maxDroppedStreams - 2,
expected: maxDroppedStreams - 2,
},
{
name: "equal to maxDroppedStreams",
drop: maxDroppedStreams,
expected: maxDroppedStreams,
},
{
name: "greater than maxDroppedStreams",
drop: maxDroppedStreams + 2,
expected: 2, // should be bounded to maxDroppedStreams
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, maxDroppedStreams)
require.NoError(t, err)

for i := 0; i < c.drop; i++ {
tail.dropStream(logproto.Stream{
Entries: []logproto.Entry{
entry,
},
})
}
assert.Equal(t, c.expected, len(tail.droppedStreams))
})
}
}

type fakeTailServer struct{}

func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil }
func (f *fakeTailServer) Context() context.Context { return context.Background() }

func Test_TailerSendRace(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{})
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10)
require.NoError(t, err)

var wg sync.WaitGroup
Expand Down