From 8c1fe88409fe832b45da1f8f3cdd8116d81f4048 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 27 Nov 2020 11:22:23 -0500 Subject: [PATCH] Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking --- pkg/ingester/checkpoint.go | 11 +++ pkg/ingester/checkpoint_test.go | 143 ++++++++++++++++++++++++++++++++ pkg/ingester/flush.go | 88 ++++++++++++-------- pkg/ingester/flush_test.go | 1 + pkg/ingester/instance.go | 14 +--- pkg/ingester/stream.go | 44 ++++++++-- pkg/ingester/stream_test.go | 4 +- pkg/ingester/transfer.go | 78 +++++++++-------- pkg/ingester/transfer_test.go | 1 + 9 files changed, 298 insertions(+), 86 deletions(-) create mode 100644 pkg/ingester/checkpoint_test.go diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 9108f0858579c..126318957406a 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -157,8 +157,19 @@ func (i *ingesterSeriesIter) Iter() <-chan *SeriesWithErr { }) for _, stream := range streams { + stream.chunkMtx.RLock() + if len(stream.chunks) < 1 { + stream.chunkMtx.RUnlock() + // it's possible the stream has been flushed to storage + // in between starting the checkpointing process and + // checkpointing this stream. + continue + } + // TODO(owen-d): use a pool chunks, err := toWireChunks(stream.chunks, nil) + stream.chunkMtx.RUnlock() + var s *Series if err == nil { s = &Series{ diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go new file mode 100644 index 0000000000000..85a99b147b810 --- /dev/null +++ b/pkg/ingester/checkpoint_test.go @@ -0,0 +1,143 @@ +package ingester + +import ( + "context" + fmt "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util/validation" +) + +// small util for ensuring data exists as we expect +func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time, i *Ingester) { + result := mockQuerierServer{ + ctx: ctx, + } + err := i.Query(&logproto.QueryRequest{ + Selector: `{foo="bar"}`, + Limit: 100, + Start: start, + End: end, + }, &result) + + ln := int(end.Sub(start) / time.Second) + require.NoError(t, err) + require.Len(t, result.resps, 1) + require.Len(t, result.resps[0].Streams, 2) + require.Len(t, result.resps[0].Streams[0].Entries, ln) + require.Len(t, result.resps[0].Streams[1].Entries, ln) +} + +func TestIngesterWAL(t *testing.T) { + + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfig(t) + ingesterConfig.MaxTransferRetries = 0 + ingesterConfig.WAL = WALConfig{ + Enabled: true, + Dir: walDir, + Recover: true, + CheckpointDuration: time.Second, + } + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + }, + } + + start := time.Now() + steps := 10 + end := start.Add(time.Second * time.Duration(steps)) + + for i := 0; i < steps; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + } + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + ensureIngesterData(ctx, t, start, end, i) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // ensure we haven't checkpointed yet + expectCheckpoint(t, walDir, false) + + // restart the ingester + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // ensure we've recovered data from wal segments + ensureIngesterData(ctx, t, start, end, i) + + time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer + // ensure we have checkpointed now + expectCheckpoint(t, walDir, true) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // restart the ingester + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // ensure we've recovered data from checkpoint+wal segments + ensureIngesterData(ctx, t, start, end, i) + +} + +func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { + fs, err := ioutil.ReadDir(walDir) + require.Nil(t, err) + var found bool + for _, f := range fs { + if _, err := checkpointIndex(f.Name(), false); err == nil { + found = true + } + } + + require.True(t, found == shouldExist) +} diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 32428b78f05d4..e3f840d4f5bdd 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -160,7 +160,10 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { } } +// must hold streamsMtx func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) { + stream.chunkMtx.RLock() + defer stream.chunkMtx.RUnlock() if len(stream.chunks) == 0 { return } @@ -214,7 +217,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat return nil } - chunks, labels := i.collectChunksToFlush(instance, fp, immediate) + chunks, labels, chunkMtx := i.collectChunksToFlush(instance, fp, immediate) if len(chunks) < 1 { return nil } @@ -222,29 +225,26 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat ctx := user.InjectOrgID(context.Background(), userID) ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) defer cancel() - err := i.flushChunks(ctx, fp, labels, chunks, &instance.streamsMtx) + err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx) if err != nil { return err } - instance.streamsMtx.Lock() - for _, chunk := range chunks { - chunk.flushed = time.Now() - } - instance.streamsMtx.Unlock() return nil } -func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels) { +func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels, *sync.RWMutex) { instance.streamsMtx.Lock() - defer instance.streamsMtx.Unlock() - stream, ok := instance.streamsByFP[fp] + instance.streamsMtx.Unlock() + if !ok { - return nil, nil + return nil, nil, nil } var result []*chunkDesc + stream.chunkMtx.Lock() + defer stream.chunkMtx.Unlock() for j := range stream.chunks { shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j]) if immediate || shouldFlush { @@ -262,7 +262,7 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint } } } - return result, stream.labels + return result, stream.labels, &stream.chunkMtx } func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) { @@ -285,9 +285,12 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) { return false, "" } +// must hold streamsMtx func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { now := time.Now() + stream.chunkMtx.Lock() + defer stream.chunkMtx.Unlock() prevNumChunks := len(stream.chunks) for len(stream.chunks) > 0 { if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod { @@ -308,7 +311,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { } } -func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, streamsMtx sync.Locker) error { +func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error { userID, err := user.ExtractOrgID(ctx) if err != nil { return err @@ -319,28 +322,37 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP metric := labelsBuilder.Labels() wireChunks := make([]chunk.Chunk, 0, len(cs)) - for _, c := range cs { - // Ensure that new blocks are cut before flushing as data in the head block is not included otherwise. - if err = c.chunk.Close(); err != nil { - return err - } - firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds()) - c := chunk.NewChunk( - userID, fp, metric, - chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize), - firstTime, - lastTime, - ) - - start := time.Now() - streamsMtx.Lock() - err := c.Encode() - streamsMtx.Unlock() - if err != nil { - return err + + // use anonymous function to make lock releasing simpler. + err = func() error { + chunkMtx.Lock() + defer chunkMtx.Unlock() + + for _, c := range cs { + // Ensure that new blocks are cut before flushing as data in the head block is not included otherwise. + if err := c.chunk.Close(); err != nil { + return err + } + firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds()) + c := chunk.NewChunk( + userID, fp, metric, + chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize), + firstTime, + lastTime, + ) + + start := time.Now() + if err := c.Encode(); err != nil { + return err + } + chunkEncodeTime.Observe(time.Since(start).Seconds()) + wireChunks = append(wireChunks, c) } - chunkEncodeTime.Observe(time.Since(start).Seconds()) - wireChunks = append(wireChunks, c) + return nil + }() + + if err != nil { + return err } if err := i.store.Put(ctx, wireChunks); err != nil { @@ -350,7 +362,15 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP // Record statistics only when actual put request did not return error. sizePerTenant := chunkSizePerTenant.WithLabelValues(userID) countPerTenant := chunksPerTenant.WithLabelValues(userID) + + chunkMtx.Lock() + defer chunkMtx.Unlock() + for i, wc := range wireChunks { + + // flush successful, write while we have lock + cs[i].flushed = time.Now() + numEntries := cs[i].chunk.Size() byt, err := wc.Encoded() if err != nil { diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index db3b15d6a7c78..9a92502a774cf 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -197,6 +197,7 @@ func defaultIngesterTestConfig(t *testing.T) Config { cfg.LifecyclerConfig.Addr = "localhost" cfg.LifecyclerConfig.ID = "localhost" cfg.LifecyclerConfig.FinalSleep = 0 + cfg.LifecyclerConfig.MinReadyDuration = 0 return cfg } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index c3b2718ec7ccf..6ab540ba593d8 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -153,13 +153,10 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { continue } - prevNumChunks := len(stream.chunks) if err := stream.Push(ctx, s.Entries, record); err != nil { appendErr = err continue } - - memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks)) } if !record.IsEmpty() { @@ -259,8 +256,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter err = i.forMatchingStreams( expr.Matchers(), func(stream *stream) error { - ingStats.TotalChunksMatched += int64(len(stream.chunks)) - iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels)) + iter, err := stream.Iterator(ctx, ingStats, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels)) if err != nil { return err } @@ -290,8 +286,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams err = i.forMatchingStreams( expr.Selector().Matchers(), func(stream *stream) error { - ingStats.TotalChunksMatched += int64(len(stream.chunks)) - iter, err := stream.SampleIterator(ctx, req.Start, req.End, extractor.ForStream(stream.labels)) + iter, err := stream.SampleIterator(ctx, ingStats, req.Start, req.End, extractor.ForStream(stream.labels)) if err != nil { return err } @@ -577,10 +572,9 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer } func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool { - firstchunkFrom, _ := stream.chunks[0].chunk.Bounds() - _, lastChunkTo := stream.chunks[len(stream.chunks)-1].chunk.Bounds() + from, to := stream.Bounds() - if req.End.UnixNano() > firstchunkFrom.UnixNano() && req.Start.UnixNano() <= lastChunkTo.UnixNano() { + if req.End.UnixNano() > from.UnixNano() && req.Start.UnixNano() <= to.UnixNano() { return true } return false diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 71568ca56356f..47319b20b3074 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/logql/stats" ) var ( @@ -60,8 +61,10 @@ type stream struct { cfg *Config // Newest chunk at chunks[n-1]. // Not thread-safe; assume accesses to this are locked by caller. - chunks []chunkDesc - fp model.Fingerprint // possibly remapped fingerprint, used in the streams map + chunks []chunkDesc + fp model.Fingerprint // possibly remapped fingerprint, used in the streams map + chunkMtx sync.RWMutex + labels labels.Labels labelsString string lastLine line @@ -104,6 +107,8 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { return err } + s.chunkMtx.Lock() + defer s.chunkMtx.Unlock() s.chunks = append(s.chunks, chunkDesc{ chunk: c, }) @@ -113,6 +118,8 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { // setChunks is used during checkpoint recovery func (s *stream) setChunks(chunks []Chunk) (entriesAdded int, err error) { + s.chunkMtx.Lock() + defer s.chunkMtx.Unlock() chks, err := fromWireChunks(s.cfg, chunks) if err != nil { return 0, err @@ -133,8 +140,11 @@ func (s *stream) Push( entries []logproto.Entry, record *WALRecord, ) error { + s.chunkMtx.Lock() + defer s.chunkMtx.Unlock() + prevNumChunks := len(s.chunks) var lastChunkTimestamp time.Time - if len(s.chunks) == 0 { + if prevNumChunks == 0 { s.chunks = append(s.chunks, chunkDesc{ chunk: s.NewChunk(), }) @@ -259,6 +269,9 @@ func (s *stream) Push( return lastEntryWithErr.e } + if len(s.chunks) != prevNumChunks { + memoryChunks.Add(float64(len(s.chunks) - prevNumChunks)) + } return nil } @@ -290,8 +303,21 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t return false } +func (s *stream) Bounds() (from, to time.Time) { + s.chunkMtx.RLock() + defer s.chunkMtx.RUnlock() + if len(s.chunks) > 0 { + from, _ = s.chunks[0].chunk.Bounds() + _, to = s.chunks[len(s.chunks)-1].chunk.Bounds() + } + return from, to + +} + // Returns an iterator. -func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { +func (s *stream) Iterator(ctx context.Context, ingStats *stats.IngesterData, from, through time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { + s.chunkMtx.RLock() + defer s.chunkMtx.RUnlock() iterators := make([]iter.EntryIterator, 0, len(s.chunks)) for _, c := range s.chunks { itr, err := c.chunk.Iterator(ctx, from, through, direction, pipeline) @@ -309,11 +335,16 @@ func (s *stream) Iterator(ctx context.Context, from, through time.Time, directio } } + if ingStats != nil { + ingStats.TotalChunksMatched += int64(len(s.chunks)) + } return iter.NewNonOverlappingIterator(iterators, ""), nil } // Returns an SampleIterator. -func (s *stream) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) { +func (s *stream) SampleIterator(ctx context.Context, ingStats *stats.IngesterData, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) { + s.chunkMtx.RLock() + defer s.chunkMtx.RUnlock() iterators := make([]iter.SampleIterator, 0, len(s.chunks)) for _, c := range s.chunks { if itr := c.chunk.SampleIterator(ctx, from, through, extractor); itr != nil { @@ -321,6 +352,9 @@ func (s *stream) SampleIterator(ctx context.Context, from, through time.Time, ex } } + if ingStats != nil { + ingStats.TotalChunksMatched += int64(len(s.chunks)) + } return iter.NewNonOverlappingSampleIterator(iterators, ""), nil } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index a6f500e2ac37e..a7a405e5764c5 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -121,7 +121,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(chunks*entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(s.labels)) + iter, err := s.Iterator(context.TODO(), nil, time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(s.labels)) require.NotNil(t, iter) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) @@ -131,7 +131,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, log.NewNoopPipeline().ForStream(s.labels)) + iter, err := s.Iterator(context.TODO(), nil, time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, log.NewNoopPipeline().ForStream(s.labels)) require.NotNil(t, iter) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 610453fd64158..e4fdbe37df088 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -229,44 +229,52 @@ func (i *Ingester) transferOut(ctx context.Context) error { for instanceID, inst := range i.instances { for _, istream := range inst.streams { - lbls := []*logproto.LabelPair{} - for _, lbl := range istream.labels { - lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) - } - - // We moved to sending one chunk at a time in a stream instead of sending all chunks for a stream - // as large chunks can create large payloads of >16MB which can hit GRPC limits, - // typically streams won't have many chunks in memory so sending one at a time - // shouldn't add too much overhead. - for _, c := range istream.chunks { - // Close the chunk first, writing any data in the headblock to a new block. - err := c.chunk.Close() - if err != nil { - return err + err = func() error { + istream.chunkMtx.Lock() + defer istream.chunkMtx.Unlock() + lbls := []*logproto.LabelPair{} + for _, lbl := range istream.labels { + lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) } - bb, err := c.chunk.Bytes() - if err != nil { - return err + // We moved to sending one chunk at a time in a stream instead of sending all chunks for a stream + // as large chunks can create large payloads of >16MB which can hit GRPC limits, + // typically streams won't have many chunks in memory so sending one at a time + // shouldn't add too much overhead. + for _, c := range istream.chunks { + // Close the chunk first, writing any data in the headblock to a new block. + err := c.chunk.Close() + if err != nil { + return err + } + + bb, err := c.chunk.Bytes() + if err != nil { + return err + } + + chunks := make([]*logproto.Chunk, 1) + chunks[0] = &logproto.Chunk{ + Data: bb, + } + + err = stream.Send(&logproto.TimeSeriesChunk{ + Chunks: chunks, + UserId: instanceID, + Labels: lbls, + FromIngesterId: i.lifecycler.ID, + }) + if err != nil { + level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) + return err + } + + sentChunks.Add(float64(len(chunks))) } - - chunks := make([]*logproto.Chunk, 1) - chunks[0] = &logproto.Chunk{ - Data: bb, - } - - err = stream.Send(&logproto.TimeSeriesChunk{ - Chunks: chunks, - UserId: instanceID, - Labels: lbls, - FromIngesterId: i.lifecycler.ID, - }) - if err != nil { - level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) - return err - } - - sentChunks.Add(float64(len(chunks))) + return nil + }() + if err != nil { + return err } } } diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index 0653b18b7fb98..7e0e1b7271d05 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -92,6 +92,7 @@ func TestTransferOut(t *testing.T) { for _, stream := range ing2.instances["test"].streams { it, err := stream.Iterator( context.TODO(), + nil, time.Unix(0, 0), time.Unix(10, 0), logproto.FORWARD,