Skip to content

Commit

Permalink
Loki Update the cut block size counter when creating a memchunk from …
Browse files Browse the repository at this point in the history
…byte slice (#1946)

* Update the cut block size counter when creating a memchunk from byte slice

* Refactor how we create memchunks to require specifying the block size and target size
  • Loading branch information
slim-bean authored Apr 16, 2020
1 parent 15c4ebb commit b5b3e78
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 45 deletions.
12 changes: 8 additions & 4 deletions pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ func init() {

// Facade for compatibility with cortex chunk type, so we can use its chunk store.
type Facade struct {
c Chunk
c Chunk
blockSize int
targetSize int
encoding.Chunk
}

// NewFacade makes a new Facade.
func NewFacade(c Chunk) encoding.Chunk {
func NewFacade(c Chunk, blockSize, targetSize int) encoding.Chunk {
return &Facade{
c: c,
c: c,
blockSize: blockSize,
targetSize: targetSize,
}
}

Expand All @@ -51,7 +55,7 @@ func (f Facade) Marshal(w io.Writer) error {
// UnmarshalFromBuf implements encoding.Chunk.
func (f *Facade) UnmarshalFromBuf(buf []byte) error {
var err error
f.c, err = NewByteChunk(buf)
f.c, err = NewByteChunk(buf, f.blockSize, f.targetSize)
return err
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,8 @@ type entry struct {
s string
}

// NewMemChunk returns a new in-mem chunk for query.
func NewMemChunk(enc Encoding) *MemChunk {
return NewMemChunkSize(enc, 256*1024, 0)
}

// NewMemChunkSize returns a new in-mem chunk.
// Mainly for config push size.
func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk {
// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk {
c := &MemChunk{
blockSize: blockSize, // The blockSize in bytes.
targetSize: targetSize, // Desired chunk size in compressed bytes
Expand All @@ -170,9 +164,11 @@ func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk {
}

// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
bc := &MemChunk{
head: &headBlock{}, // Dummy, empty headblock.
head: &headBlock{}, // Dummy, empty headblock.
blockSize: blockSize,
targetSize: targetSize,
}
db := decbuf{b: b}

Expand Down Expand Up @@ -235,6 +231,9 @@ func NewByteChunk(b []byte) (*MemChunk, error) {

bc.blocks = append(bc.blocks, blk)

// Update the counter used to track the size of cut blocks.
bc.cutBlockSize += len(blk.b)

if db.err() != nil {
return nil, errors.Wrap(db.err(), "decoding block meta")
}
Expand Down
42 changes: 23 additions & 19 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ var testEncoding = []Encoding{
EncSnappy,
}

var (
testBlockSize = 256 * 1024
testTargetSize = 1500 * 1024
)

func TestBlock(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
chk := NewMemChunk(enc)
chk := NewMemChunk(enc, testBlockSize, testTargetSize)
cases := []struct {
ts int64
str string
Expand Down Expand Up @@ -128,7 +133,7 @@ func TestBlock(t *testing.T) {
}

func TestReadFormatV1(t *testing.T) {
c := NewMemChunk(EncGZIP)
c := NewMemChunk(EncGZIP, testBlockSize, testTargetSize)
fillChunk(c)
// overrides default v2 format
c.format = chunkFormatV1
Expand All @@ -138,7 +143,7 @@ func TestReadFormatV1(t *testing.T) {
t.Fatal(err)
}

r, err := NewByteChunk(b)
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
Expand All @@ -165,7 +170,7 @@ func TestReadFormatV1(t *testing.T) {
func TestRoundtripV2(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
populated := fillChunk(c)

assertLines := func(c *MemChunk) {
Expand Down Expand Up @@ -195,7 +200,7 @@ func TestRoundtripV2(t *testing.T) {
t.Fatal(err)
}

r, err := NewByteChunk(b)
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
Expand All @@ -205,7 +210,7 @@ func TestRoundtripV2(t *testing.T) {
rOut, err := r.Bytes()
require.Nil(t, err)

loaded, err := NewByteChunk(rOut)
loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize)
require.Nil(t, err)

assertLines(loaded)
Expand All @@ -218,7 +223,7 @@ func TestRoundtripV2(t *testing.T) {
func TestSerialization(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
chk := NewMemChunk(enc)
chk := NewMemChunk(enc, testBlockSize, testTargetSize)

numSamples := 500000

Expand All @@ -229,7 +234,7 @@ func TestSerialization(t *testing.T) {
byt, err := chk.Bytes()
require.NoError(t, err)

bc, err := NewByteChunk(byt)
bc, err := NewByteChunk(byt, testBlockSize, testTargetSize)
require.NoError(t, err)

it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
Expand All @@ -255,7 +260,7 @@ func TestSerialization(t *testing.T) {
func TestChunkFilling(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
chk := NewMemChunk(enc)
chk := NewMemChunk(enc, testBlockSize, 0)
chk.blockSize = 1024

// We should be able to append only 10KB of logs.
Expand Down Expand Up @@ -292,8 +297,7 @@ func TestChunkFilling(t *testing.T) {
}

func TestGZIPChunkTargetSize(t *testing.T) {
targetSize := 1024 * 1024
chk := NewMemChunkSize(EncGZIP, 1024, targetSize)
chk := NewMemChunk(EncGZIP, testBlockSize, testTargetSize)

lineSize := 512
entry := &logproto.Entry{
Expand Down Expand Up @@ -330,8 +334,8 @@ func TestGZIPChunkTargetSize(t *testing.T) {

// Even though the seed is static above and results should be deterministic,
// we will allow +/- 10% variance
minSize := int(float64(targetSize) * 0.9)
maxSize := int(float64(targetSize) * 1.1)
minSize := int(float64(testTargetSize) * 0.9)
maxSize := int(float64(testTargetSize) * 1.1)
require.Greater(t, chk.CompressedSize(), minSize)
require.Less(t, chk.CompressedSize(), maxSize)

Expand Down Expand Up @@ -378,15 +382,15 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

tester(t, NewMemChunk(EncGZIP))
tester(t, NewMemChunk(EncGZIP, testBlockSize, testTargetSize))
})
}
}

func TestChunkSize(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
if err != nil {
Expand All @@ -400,7 +404,7 @@ func TestChunkSize(t *testing.T) {
}

func TestChunkStats(t *testing.T) {
c := NewMemChunk(EncSnappy)
c := NewMemChunk(EncSnappy, testBlockSize, 0)
first := time.Now()
entry := &logproto.Entry{
Timestamp: first,
Expand Down Expand Up @@ -445,7 +449,7 @@ func TestChunkStats(t *testing.T) {
}

// test on a new chunk.
cb, err := NewByteChunk(b)
cb, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -496,7 +500,7 @@ func TestIteratorClose(t *testing.T) {
}
},
} {
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
inserted := fillChunk(c)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, nil)
if err != nil {
Expand All @@ -523,7 +527,7 @@ func BenchmarkWrite(b *testing.B) {
for _, enc := range testEncoding {
b.Run(enc.String(), func(b *testing.B) {
for n := 0; n < b.N; n++ {
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
// adds until full so we trigger cut which serialize using gzip
for c.SpaceFor(entry) {
_ = c.Append(entry)
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func generateData(enc Encoding) ([]Chunk, uint64) {

for n := 0; n < 50; n++ {
entry := logprotoEntry(0, testdata.LogString(0))
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
for c.SpaceFor(entry) {
size += uint64(len(entry.Line))
_ = c.Append(entry)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestIterator(t *testing.T) {
new func() chunkenc.Chunk
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }},
} {
t.Run(chk.name, func(t *testing.T) {
chunk := chk.new()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
c := chunk.NewChunk(
userID, fp, metric,
chunkenc.NewFacade(c.chunk),
chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),
firstTime,
lastTime,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunkSize(enc, cfg.BlockSize, cfg.TargetChunkSize)
return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

var defaultFactory = func() chunkenc.Chunk {
return chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 512, 0)
return chunkenc.NewMemChunk(chunkenc.EncGZIP, 512, 0)
}

func TestLabelsCollisions(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, factory
// consumeChunk manually adds a chunk to the stream that was received during
// ingester chunk transfer.
func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
c, err := chunkenc.NewByteChunk(chunk.Data)
c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestStreamIterator(t *testing.T) {
new func() chunkenc.Chunk
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }},
} {
t.Run(chk.name, func(t *testing.T) {
var s stream
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/hack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func fillStore() error {
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.FastFingerprint(lbs)
chunkEnc := chunkenc.NewMemChunkSize(chunkenc.EncLZ4_64k, 262144, 1572864)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864)
for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() {
entry := &logproto.Entry{
Timestamp: time.Unix(0, ts),
Expand All @@ -106,7 +106,7 @@ func fillStore() error {
_ = chunkEnc.Append(entry)
} else {
from, to := chunkEnc.Bounds()
c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano()))
c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc, 0, 0), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano()))
if err := c.Encode(); err != nil {
panic(err)
}
Expand All @@ -119,7 +119,7 @@ func fillStore() error {
if flushCount >= maxChunks {
return
}
chunkEnc = chunkenc.NewMemChunkSize(chunkenc.EncLZ4_64k, 262144, 1572864)
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk {
l = builder.Labels()
}
from, through := model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()), model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano())
chk := chunkenc.NewMemChunk(chunkenc.EncGZIP)
chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0)
for _, e := range stream.Entries {
if e.Timestamp.UnixNano() < from.UnixNano() {
from = model.TimeFromUnixNano(e.Timestamp.UnixNano())
Expand All @@ -76,7 +76,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk {
_ = chk.Append(&e)
}
chk.Close()
c := chunk.NewChunk("fake", client.Fingerprint(l), l, chunkenc.NewFacade(chk), from, through)
c := chunk.NewChunk("fake", client.Fingerprint(l), l, chunkenc.NewFacade(chk, 0, 0), from, through)
// force the checksum creation
if err := c.Encode(); err != nil {
panic(err)
Expand Down

0 comments on commit b5b3e78

Please sign in to comment.