Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/ingester: added sync period flags #1438

Merged
merged 1 commit into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ The `ingester_config` block configures Ingesters.
# - `lz4` fastest compression speed (188 kB per chunk)
# - `snappy` fast and popular compression algorithm (272 kB per chunk)
[chunk_encoding: <string> | default = gzip]

# Parameters used to synchronize ingesters to cut chunks at the same moment.
# Sync period is used to roll over incoming entry to a new chunk. If chunk's utilization
# isn't high enough (eg. less than 50% when sync_min_utilization is set to 0.5), then
# this chunk rollover doesn't happen.
[sync_period: <duration> | default = 0]
[sync_min_utilization: <float> | Default = 0]
```

### lifecycler_config
Expand Down
8 changes: 7 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Config struct {
TargetChunkSize int `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`

// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
SyncPeriod time.Duration `yaml:"sync_period"`
SyncMinUtilization float64 `yaml:"sync_min_utilization"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error)
}
Expand All @@ -69,6 +73,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "")
f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 0, "")
f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding()))
f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 0, "How often to cut chunks to synchronize ingesters.")
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -202,7 +208,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(instanceID, i.factory, i.limits)
inst = newInstance(instanceID, i.factory, i.limits, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
i.instances[instanceID] = inst
}
return inst
Expand Down
12 changes: 10 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -66,9 +67,13 @@ type instance struct {

limits *validation.Overrides
factory func() chunkenc.Chunk

// sync
syncPeriod time.Duration
syncMinUtil float64
}

func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *validation.Overrides) *instance {
func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *validation.Overrides, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
Expand All @@ -80,6 +85,9 @@ func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *valid
factory: factory,
tailers: map[uint32]*tailer{},
limits: limits,

syncPeriod: syncPeriod,
syncMinUtil: syncMinUtil,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
Expand Down Expand Up @@ -131,7 +139,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
}

prevNumChunks := len(stream.chunks)
if err := stream.Push(ctx, s.Entries); err != nil {
if err := stream.Push(ctx, s.Entries, i.syncPeriod, i.syncMinUtil); err != nil {
appendErr = err
continue
}
Expand Down
49 changes: 47 additions & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/util"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/chunkenc"
Expand All @@ -26,7 +27,7 @@ func TestLabelsCollisions(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)

i := newInstance("test", defaultFactory, o)
i := newInstance("test", defaultFactory, o, 0, 0)

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -52,7 +53,7 @@ func TestConcurrentPushes(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)

inst := newInstance("test", defaultFactory, o)
inst := newInstance("test", defaultFactory, o, 0, 0)

const (
concurrent = 10
Expand Down Expand Up @@ -98,6 +99,50 @@ func TestConcurrentPushes(t *testing.T) {
// test passes if no goroutine reports error
}

func TestSyncPeriod(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)

const (
syncPeriod = 1 * time.Minute
randomStep = time.Second
entries = 1000
minUtil = 0.20
)

inst := newInstance("test", defaultFactory, o, syncPeriod, minUtil)
lbls := makeRandomLabels()

tt := time.Now()

var result []logproto.Entry
for i := 0; i < entries; i++ {
result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)})
tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds())))
}

err = inst.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}})
require.NoError(t, err)

// let's verify results.
ls, err := util.ToClientLabels(lbls)
require.NoError(t, err)

s, err := inst.getOrCreateStream(ls)
require.NoError(t, err)

// make sure each chunk spans max 'sync period' time
for _, c := range s.chunks {
start, end := c.chunk.Bounds()
span := end.Sub(start)

const format = "15:04:05.000"
t.Log(start.Format(format), "--", end.Format(format), span, c.chunk.Utilization())

require.True(t, span < syncPeriod || c.chunk.Utilization() >= minUtil)
}
}

func entries(n int, t time.Time) []logproto.Entry {
var result []logproto.Entry
for i := 0; i < n; i++ {
Expand Down
35 changes: 33 additions & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,15 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
return nil
}

func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
var lastChunkTimestamp time.Time
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, chunkDesc{
chunk: s.factory(),
})
chunksCreatedTotal.Inc()
} else {
_, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds()
}

storedEntries := []logproto.Entry{}
Expand All @@ -115,7 +118,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
// we still want to append the later ones.
for i := range entries {
chunk := &s.chunks[len(s.chunks)-1]
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) {
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk.chunk, synchronizePeriod, minUtilization) {
// If the chunk has no more space call Close to make sure anything in the head block is cut and compressed
err := chunk.chunk.Close()
if err != nil {
Expand All @@ -133,12 +136,14 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
chunk: s.factory(),
})
chunk = &s.chunks[len(s.chunks)-1]
lastChunkTimestamp = time.Time{}
}
if err := chunk.chunk.Append(&entries[i]); err != nil {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})
} else {
// send only stored entries to tailers
storedEntries = append(storedEntries, entries[i])
lastChunkTimestamp = entries[i].Timestamp
}
chunk.lastUpdated = time.Now()
}
Expand Down Expand Up @@ -193,6 +198,32 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
return nil
}

// Returns true, if chunk should be cut before adding new entry. This is done to make ingesters
// cut the chunk for this stream at the same moment, so that new chunk will contain exactly the same entries.
func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp time.Time, chunk chunkenc.Chunk, synchronizePeriod time.Duration, minUtilization float64) bool {
if synchronizePeriod <= 0 || prevEntryTimestamp.IsZero() {
return false
}

// we use fingerprint as a jitter here, basically offsetting stream synchronization points to different
// this breaks if streams are mapped to different fingerprints on different ingesters, which is too bad.
cts := (uint64(entryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())
pts := (uint64(prevEntryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())

// if current entry timestamp has rolled over synchronization period
if cts < pts {
slim-bean marked this conversation as resolved.
Show resolved Hide resolved
if minUtilization <= 0 {
return true
}

if chunk.Utilization() > minUtilization {
return true
}
}

return false
}

// Returns an iterator.
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
Expand Down