Skip to content

Commit

Permalink
ingester.max-chunk-age (#1558)
Browse files Browse the repository at this point in the history
* ingester.max-chunk-age

* bumps helm & changelog

* adjusts max-chunk-age test

* bumps wait period to let ci catch up

* updates changelog
  • Loading branch information
owen-d authored and slim-bean committed Jan 23, 2020
1 parent bde0389 commit 5026dfe
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 3 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## master / unreleased

### Features

* [1558](https://github.com/grafana/loki/pull/1558) **owen-d**: Introduces `ingester.max-chunk-age` which specifies the maximum chunk age before it's cut.

## 1.3.0 (2019-01-16)

### What's New?? ###
Expand Down Expand Up @@ -156,7 +162,6 @@ Once again we can't thank our community and contributors enough for the signific
#### New Members!
* [1415](https://github.com/grafana/loki/pull/1415) **cyriltovena**: Add Joe as member of the team.


# 1.2.0 (2019-12-09)

One week has passed since the last Loki release, and it's time for a new one!
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ The `ingester_config` block configures Ingesters.
# The maximum number of errors a stream will report to the user
# when a push fails. 0 to make unlimited.
[max_returned_stream_errors: <int> | default = 10]

# The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created.
[max_chunk_age: <duration> | default = 1h]

```
### lifecycler_config
Expand Down
7 changes: 5 additions & 2 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
}

lastChunk := stream.chunks[len(stream.chunks)-1]
if len(stream.chunks) == 1 && time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && !immediate {
if len(stream.chunks) == 1 && !immediate && !i.shouldFlushChunk(&lastChunk) {
return
}

Expand Down Expand Up @@ -246,7 +246,10 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool {
}

if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle {
chunk.closed = true
return true
}

if from, to := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge {
return true
}

Expand Down
53 changes: 53 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,59 @@ func TestFlushingCollidingLabels(t *testing.T) {
}
}

func TestFlushMaxAge(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = time.Millisecond * 100
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour

store, ing := newTestStore(t, cfg)
defer store.Stop()

now := time.Unix(0, 0)

firstEntries := []logproto.Entry{
{Timestamp: now.Add(time.Nanosecond), Line: "1"},
{Timestamp: now.Add(time.Minute), Line: "2"},
}

secondEntries := []logproto.Entry{
{Timestamp: now.Add(time.Second * 61), Line: "3"},
}

req := &logproto.PushRequest{Streams: []*logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: firstEntries},
}}

const userID = "testUser"
ctx := user.InjectOrgID(context.Background(), userID)

_, err := ing.Push(ctx, req)
require.NoError(t, err)

time.Sleep(2 * cfg.FlushCheckPeriod)

// ensure chunk is not flushed after flush period elapses
store.checkData(t, map[string][]*logproto.Stream{})

req2 := &logproto.PushRequest{Streams: []*logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: secondEntries},
}}

_, err = ing.Push(ctx, req2)
require.NoError(t, err)

time.Sleep(2 * cfg.FlushCheckPeriod)

// assert stream is now both batches
store.checkData(t, map[string][]*logproto.Stream{
userID: []*logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: append(firstEntries, secondEntries...)},
},
})

}

type testStore struct {
mtx sync.Mutex
// Chunks keyed by userID.
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Config struct {
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`
MaxChunkAge time.Duration `yaml:"max_chunk_age"`

// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
SyncPeriod time.Duration `yaml:"sync_period"`
Expand Down Expand Up @@ -78,6 +79,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 0, "How often to cut chunks to synchronize ingesters.")
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.")
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
}

// Ingester builds chunks for incoming log streams.
Expand Down

0 comments on commit 5026dfe

Please sign in to comment.