diff --git a/pkg/chunkenc/facade.go b/pkg/chunkenc/facade.go index d603920e3e65..d26be8947c54 100644 --- a/pkg/chunkenc/facade.go +++ b/pkg/chunkenc/facade.go @@ -24,14 +24,18 @@ func init() { // Facade for compatibility with cortex chunk type, so we can use its chunk store. type Facade struct { - c Chunk + c Chunk + blockSize int + targetSize int encoding.Chunk } // NewFacade makes a new Facade. -func NewFacade(c Chunk) encoding.Chunk { +func NewFacade(c Chunk, blockSize, targetSize int) encoding.Chunk { return &Facade{ - c: c, + c: c, + blockSize: blockSize, + targetSize: targetSize, } } @@ -51,7 +55,7 @@ func (f Facade) Marshal(w io.Writer) error { // UnmarshalFromBuf implements encoding.Chunk. func (f *Facade) UnmarshalFromBuf(buf []byte) error { var err error - f.c, err = NewByteChunk(buf) + f.c, err = NewByteChunk(buf, f.blockSize, f.targetSize) return err } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index d9ad7a5c4cc3..b3f450aa1636 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -145,14 +145,8 @@ type entry struct { s string } -// NewMemChunk returns a new in-mem chunk for query. -func NewMemChunk(enc Encoding) *MemChunk { - return NewMemChunkSize(enc, 256*1024, 0) -} - -// NewMemChunkSize returns a new in-mem chunk. -// Mainly for config push size. -func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk { +// NewMemChunk returns a new in-mem chunk. +func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk { c := &MemChunk{ blockSize: blockSize, // The blockSize in bytes. targetSize: targetSize, // Desired chunk size in compressed bytes @@ -170,9 +164,11 @@ func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk { } // NewByteChunk returns a MemChunk on the passed bytes. -func NewByteChunk(b []byte) (*MemChunk, error) { +func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { bc := &MemChunk{ - head: &headBlock{}, // Dummy, empty headblock. + head: &headBlock{}, // Dummy, empty headblock. + blockSize: blockSize, + targetSize: targetSize, } db := decbuf{b: b} @@ -235,6 +231,9 @@ func NewByteChunk(b []byte) (*MemChunk, error) { bc.blocks = append(bc.blocks, blk) + // Update the counter used to track the size of cut blocks. + bc.cutBlockSize += len(blk.b) + if db.err() != nil { return nil, errors.Wrap(db.err(), "decoding block meta") } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index d3bccd97d349..50f7d27e4678 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -32,10 +32,15 @@ var testEncoding = []Encoding{ EncSnappy, } +var ( + testBlockSize = 256 * 1024 + testTargetSize = 1500 * 1024 +) + func TestBlock(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { - chk := NewMemChunk(enc) + chk := NewMemChunk(enc, testBlockSize, testTargetSize) cases := []struct { ts int64 str string @@ -128,7 +133,7 @@ func TestBlock(t *testing.T) { } func TestReadFormatV1(t *testing.T) { - c := NewMemChunk(EncGZIP) + c := NewMemChunk(EncGZIP, testBlockSize, testTargetSize) fillChunk(c) // overrides default v2 format c.format = chunkFormatV1 @@ -138,7 +143,7 @@ func TestReadFormatV1(t *testing.T) { t.Fatal(err) } - r, err := NewByteChunk(b) + r, err := NewByteChunk(b, testBlockSize, testTargetSize) if err != nil { t.Fatal(err) } @@ -165,7 +170,7 @@ func TestReadFormatV1(t *testing.T) { func TestRoundtripV2(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { - c := NewMemChunk(enc) + c := NewMemChunk(enc, testBlockSize, testTargetSize) populated := fillChunk(c) assertLines := func(c *MemChunk) { @@ -195,7 +200,7 @@ func TestRoundtripV2(t *testing.T) { t.Fatal(err) } - r, err := NewByteChunk(b) + r, err := NewByteChunk(b, testBlockSize, testTargetSize) if err != nil { t.Fatal(err) } @@ -205,7 +210,7 @@ func TestRoundtripV2(t *testing.T) { rOut, err := r.Bytes() require.Nil(t, err) - loaded, err := NewByteChunk(rOut) + loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize) require.Nil(t, err) assertLines(loaded) @@ -218,7 +223,7 @@ func TestRoundtripV2(t *testing.T) { func TestSerialization(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { - chk := NewMemChunk(enc) + chk := NewMemChunk(enc, testBlockSize, testTargetSize) numSamples := 500000 @@ -229,7 +234,7 @@ func TestSerialization(t *testing.T) { byt, err := chk.Bytes() require.NoError(t, err) - bc, err := NewByteChunk(byt) + bc, err := NewByteChunk(byt, testBlockSize, testTargetSize) require.NoError(t, err) it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) @@ -255,7 +260,7 @@ func TestSerialization(t *testing.T) { func TestChunkFilling(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { - chk := NewMemChunk(enc) + chk := NewMemChunk(enc, testBlockSize, 0) chk.blockSize = 1024 // We should be able to append only 10KB of logs. @@ -292,8 +297,7 @@ func TestChunkFilling(t *testing.T) { } func TestGZIPChunkTargetSize(t *testing.T) { - targetSize := 1024 * 1024 - chk := NewMemChunkSize(EncGZIP, 1024, targetSize) + chk := NewMemChunk(EncGZIP, testBlockSize, testTargetSize) lineSize := 512 entry := &logproto.Entry{ @@ -330,8 +334,8 @@ func TestGZIPChunkTargetSize(t *testing.T) { // Even though the seed is static above and results should be deterministic, // we will allow +/- 10% variance - minSize := int(float64(targetSize) * 0.9) - maxSize := int(float64(targetSize) * 1.1) + minSize := int(float64(testTargetSize) * 0.9) + maxSize := int(float64(testTargetSize) * 1.1) require.Greater(t, chk.CompressedSize(), minSize) require.Less(t, chk.CompressedSize(), maxSize) @@ -378,7 +382,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - tester(t, NewMemChunk(EncGZIP)) + tester(t, NewMemChunk(EncGZIP, testBlockSize, testTargetSize)) }) } } @@ -386,7 +390,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { func TestChunkSize(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { - c := NewMemChunk(enc) + c := NewMemChunk(enc, testBlockSize, testTargetSize) inserted := fillChunk(c) b, err := c.Bytes() if err != nil { @@ -400,7 +404,7 @@ func TestChunkSize(t *testing.T) { } func TestChunkStats(t *testing.T) { - c := NewMemChunk(EncSnappy) + c := NewMemChunk(EncSnappy, testBlockSize, 0) first := time.Now() entry := &logproto.Entry{ Timestamp: first, @@ -445,7 +449,7 @@ func TestChunkStats(t *testing.T) { } // test on a new chunk. - cb, err := NewByteChunk(b) + cb, err := NewByteChunk(b, testBlockSize, testTargetSize) if err != nil { t.Fatal(err) } @@ -496,7 +500,7 @@ func TestIteratorClose(t *testing.T) { } }, } { - c := NewMemChunk(enc) + c := NewMemChunk(enc, testBlockSize, testTargetSize) inserted := fillChunk(c) iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, nil) if err != nil { @@ -523,7 +527,7 @@ func BenchmarkWrite(b *testing.B) { for _, enc := range testEncoding { b.Run(enc.String(), func(b *testing.B) { for n := 0; n < b.N; n++ { - c := NewMemChunk(enc) + c := NewMemChunk(enc, testBlockSize, testTargetSize) // adds until full so we trigger cut which serialize using gzip for c.SpaceFor(entry) { _ = c.Append(entry) diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index 55289a5004b8..1bfebe21b005 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -21,7 +21,7 @@ func generateData(enc Encoding) ([]Chunk, uint64) { for n := 0; n < 50; n++ { entry := logprotoEntry(0, testdata.LogString(0)) - c := NewMemChunk(enc) + c := NewMemChunk(enc, testBlockSize, testTargetSize) for c.SpaceFor(entry) { size += uint64(len(entry.Line)) _ = c.Append(entry) diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index f24533960d8c..8edd8b0ac404 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -47,7 +47,7 @@ func TestIterator(t *testing.T) { new func() chunkenc.Chunk }{ {"dumbChunk", chunkenc.NewDumbChunk}, - {"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }}, + {"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { chunk := chk.new() diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 4dfdd9972c92..e964879dc279 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -314,7 +314,7 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds()) c := chunk.NewChunk( userID, fp, metric, - chunkenc.NewFacade(c.chunk), + chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize), firstTime, lastTime, ) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index a752cc8cbde6..fd42629fb610 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -134,7 +134,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), quitting: make(chan struct{}), factory: func() chunkenc.Chunk { - return chunkenc.NewMemChunkSize(enc, cfg.BlockSize, cfg.TargetChunkSize) + return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize) }, } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 9a768d82aa3b..50bfed547388 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -21,7 +21,7 @@ import ( ) var defaultFactory = func() chunkenc.Chunk { - return chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 512, 0) + return chunkenc.NewMemChunk(chunkenc.EncGZIP, 512, 0) } func TestLabelsCollisions(t *testing.T) { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 45d00a50a8eb..a5fd390acec3 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -97,7 +97,7 @@ func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, factory // consumeChunk manually adds a chunk to the stream that was received during // ingester chunk transfer. func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { - c, err := chunkenc.NewByteChunk(chunk.Data) + c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize) if err != nil { return err } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 6f5146d74d35..9c4f84914bfc 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -99,7 +99,7 @@ func TestStreamIterator(t *testing.T) { new func() chunkenc.Chunk }{ {"dumbChunk", chunkenc.NewDumbChunk}, - {"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }}, + {"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { var s stream diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index 9075f6ff21c4..c92924675bc8 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -96,7 +96,7 @@ func fillStore() error { labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := client.FastFingerprint(lbs) - chunkEnc := chunkenc.NewMemChunkSize(chunkenc.EncLZ4_64k, 262144, 1572864) + chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864) for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() { entry := &logproto.Entry{ Timestamp: time.Unix(0, ts), @@ -106,7 +106,7 @@ func fillStore() error { _ = chunkEnc.Append(entry) } else { from, to := chunkEnc.Bounds() - c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano())) + c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc, 0, 0), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano())) if err := c.Encode(); err != nil { panic(err) } @@ -119,7 +119,7 @@ func fillStore() error { if flushCount >= maxChunks { return } - chunkEnc = chunkenc.NewMemChunkSize(chunkenc.EncLZ4_64k, 262144, 1572864) + chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864) } } diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 3f9e42259bfe..4370b744dff2 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -65,7 +65,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk { l = builder.Labels() } from, through := model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()), model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()) - chk := chunkenc.NewMemChunk(chunkenc.EncGZIP) + chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) for _, e := range stream.Entries { if e.Timestamp.UnixNano() < from.UnixNano() { from = model.TimeFromUnixNano(e.Timestamp.UnixNano()) @@ -76,7 +76,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk { _ = chk.Append(&e) } chk.Close() - c := chunk.NewChunk("fake", client.Fingerprint(l), l, chunkenc.NewFacade(chk), from, through) + c := chunk.NewChunk("fake", client.Fingerprint(l), l, chunkenc.NewFacade(chk, 0, 0), from, through) // force the checksum creation if err := c.Encode(); err != nil { panic(err)