Skip to content

Commit

Permalink
pkg/ingester: limit total number of errors a stream can return on push (
Browse files Browse the repository at this point in the history
#1071)

* pkg/ingester: limit total number of errors a stream can return on push

* pkg/ingester: s/max_ignored_stream_errors/max_returned_stream_errors
  • Loading branch information
rfratto authored Jan 10, 2020
1 parent 0bd4620 commit 9227eed
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 11 deletions.
4 changes: 4 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ The `ingester_config` block configures Ingesters.
# this chunk rollover doesn't happen.
[sync_period: <duration> | default = 0]
[sync_min_utilization: <float> | Default = 0]

# The maximum number of errors a stream will report to the user
# when a push fails. 0 to make unlimited.
[max_returned_stream_errors: <int> | default = 10]
```
### lifecycler_config
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type Config struct {
SyncPeriod time.Duration `yaml:"sync_period"`
SyncMinUtilization float64 `yaml:"sync_min_utilization"`

MaxReturnedErrors int `yaml:"max_returned_stream_errors"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error)
}
Expand All @@ -75,6 +77,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding()))
f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 0, "How often to cut chunks to synchronize ingesters.")
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.")
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -211,7 +214,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
inst = newInstance(&i.cfg, instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
i.instances[instanceID] = inst
}
return inst
Expand Down
8 changes: 5 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
)

type instance struct {
cfg *Config
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream // we use 'mapped' fingerprints here.
index *index.InvertedIndex
Expand All @@ -73,8 +74,9 @@ type instance struct {
syncMinUtil float64
}

func newInstance(instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
cfg: cfg,
streams: map[model.Fingerprint]*stream{},
index: index.New(),
instanceID: instanceID,
Expand Down Expand Up @@ -105,7 +107,7 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
stream, ok := i.streams[fp]
if !ok {
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.factory)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.Inc()
Expand Down Expand Up @@ -165,7 +167,7 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
}

sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.factory)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
i.streams[fp] = stream
memoryStreams.Inc()
i.streamsCreatedTotal.Inc()
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance("test", defaultFactory, limiter, 0, 0)
i := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0)

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -55,7 +55,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

inst := newInstance("test", defaultFactory, limiter, 0, 0)
inst := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0)

const (
concurrent = 10
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)

inst := newInstance("test", defaultFactory, limiter, syncPeriod, minUtil)
inst := newInstance(&Config{}, "test", defaultFactory, limiter, syncPeriod, minUtil)
lbls := makeRandomLabels()

tt := time.Now()
Expand Down
15 changes: 11 additions & 4 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func init() {
}

type stream struct {
cfg *Config
// Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc
Expand All @@ -76,8 +77,9 @@ type entryWithError struct {
e error
}

func newStream(fp model.Fingerprint, labels labels.Labels, factory func() chunkenc.Chunk) *stream {
func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, factory func() chunkenc.Chunk) *stream {
return &stream{
cfg: cfg,
fp: fp,
labels: labels,
factory: factory,
Expand Down Expand Up @@ -182,13 +184,18 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
buf := bytes.Buffer{}
streamName := s.labels.String()

for _, entryWithError := range failedEntriesWithError {
_, _ = fmt.Fprintf(&buf,
limitedFailedEntries := failedEntriesWithError
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
limitedFailedEntries = limitedFailedEntries[:maxIgnore]
}

for _, entryWithError := range limitedFailedEntries {
fmt.Fprintf(&buf,
"entry with timestamp %s ignored, reason: '%s' for stream: %s,\n",
entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName)
}

_, _ = fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries))
fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries))

return httpgrpc.Errorf(http.StatusBadRequest, buf.String())
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,73 @@
package ingester

import (
"bytes"
"context"
"fmt"
"math/rand"
"net/http"
"testing"
"time"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
)

func TestMaxReturnedStreamsErrors(t *testing.T) {
numLogs := 100

tt := []struct {
name string
limit int
expectErrs int
}{
{"10", 10, 10},
{"unlimited", 0, numLogs},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := newStream(
&Config{MaxReturnedErrors: tc.limit},
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
defaultFactory,
)

err := s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(int64(numLogs), 0), Line: "log"},
}, 0, 0)
require.NoError(t, err)

newLines := make([]logproto.Entry, numLogs)
for i := 0; i < numLogs; i++ {
newLines[i] = logproto.Entry{Timestamp: time.Unix(int64(i), 0), Line: "log"}
}

var expected bytes.Buffer
for i := 0; i < tc.expectErrs; i++ {
fmt.Fprintf(&expected,
"entry with timestamp %s ignored, reason: 'entry out of order' for stream: {foo=\"bar\"},\n",
time.Unix(int64(i), 0).String(),
)
}

fmt.Fprintf(&expected, "total ignored: %d out of %d", numLogs, numLogs)
expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String())

err = s.Push(context.Background(), newLines, 0, 0)
require.Error(t, err)
require.Equal(t, expectErr.Error(), err.Error())
})
}
}

func TestStreamIterator(t *testing.T) {
const chunks = 3
const entries = 100
Expand Down

0 comments on commit 9227eed

Please sign in to comment.