Skip to content

Commit

Permalink
introduces "entry too far behind" instrumentation for unordered writes (
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Oct 29, 2021
1 parent de7ab3f commit cd80bc5
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 8 deletions.
5 changes: 5 additions & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ import (
var (
ErrChunkFull = errors.New("chunk full")
ErrOutOfOrder = errors.New("entry out of order")
ErrTooFarBehind = errors.New("entry too far behind")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid chunk checksum")
)

func IsOutOfOrderErr(err error) bool {
return err == ErrOutOfOrder || err == ErrTooFarBehind
}

// Encoding is the identifier for a chunk encoding.
type Encoding byte

Expand Down
12 changes: 11 additions & 1 deletion pkg/chunkenc/interface_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package chunkenc

import "testing"
import (
"testing"

"github.com/stretchr/testify/require"
)

func TestParseEncoding(t *testing.T) {
tests := []struct {
Expand All @@ -24,3 +28,9 @@ func TestParseEncoding(t *testing.T) {
})
}
}

func TestIsOutOfOrderErr(t *testing.T) {
for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind} {
require.Equal(t, true, IsOutOfOrderErr(err))
}
}
17 changes: 11 additions & 6 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,12 @@ func (s *stream) Push(
var rateLimitedSamples, rateLimitedBytes int
defer func() {
if outOfOrderSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderBytes))
name := validation.OutOfOrder
if s.unorderedWrites {
name = validation.TooFarBehind
}
validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes))
}
if rateLimitedSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples))
Expand Down Expand Up @@ -253,12 +257,12 @@ func (s *stream) Push(

// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrOutOfOrder})
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind})
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)
} else if err := chunk.chunk.Append(&entries[i]); err != nil {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})
if err == chunkenc.ErrOutOfOrder {
if chunkenc.IsOutOfOrderErr(err) {
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)
}
Expand Down Expand Up @@ -324,11 +328,12 @@ func (s *stream) Push(
if len(failedEntriesWithError) > 0 {
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
_, ok := lastEntryWithErr.e.(*validation.ErrStreamRateLimit)
if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && !ok {
outOfOrder := chunkenc.IsOutOfOrderErr(lastEntryWithErr.e)
if !outOfOrder && !ok {
return bytesAdded, lastEntryWithErr.e
}
var statusCode int
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
if outOfOrder {
statusCode = http.StatusBadRequest
}
if ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
var expected bytes.Buffer
for i := 0; i < tc.expectErrs; i++ {
fmt.Fprintf(&expected,
"entry with timestamp %s ignored, reason: 'entry out of order' for stream: {foo=\"bar\"},\n",
"entry with timestamp %s ignored, reason: 'entry too far behind' for stream: {foo=\"bar\"},\n",
time.Unix(int64(i), 0).String(),
)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
// rather than the overall ingestion rate limit.
StreamRateLimit = "per_stream_rate_limit"
OutOfOrder = "out_of_order"
TooFarBehind = "too_far_behind"
// GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age`
GreaterThanMaxSampleAge = "greater_than_max_sample_age"
GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v"
Expand Down

0 comments on commit cd80bc5

Please sign in to comment.