Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki Update the cut block size counter when creating a memchunk from byte slice #1946

Merged
merged 3 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ func init() {

// Facade for compatibility with cortex chunk type, so we can use its chunk store.
type Facade struct {
c Chunk
c Chunk
blockSize int
targetSize int
encoding.Chunk
}

// NewFacade makes a new Facade.
func NewFacade(c Chunk) encoding.Chunk {
func NewFacade(c Chunk, blockSize, targetSize int) encoding.Chunk {
return &Facade{
c: c,
c: c,
blockSize: blockSize,
targetSize: targetSize,
}
}

Expand All @@ -51,7 +55,7 @@ func (f Facade) Marshal(w io.Writer) error {
// UnmarshalFromBuf implements encoding.Chunk.
func (f *Facade) UnmarshalFromBuf(buf []byte) error {
var err error
f.c, err = NewByteChunk(buf)
f.c, err = NewByteChunk(buf, f.blockSize, f.targetSize)
return err
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,8 @@ type entry struct {
s string
}

// NewMemChunk returns a new in-mem chunk for query.
func NewMemChunk(enc Encoding) *MemChunk {
return NewMemChunkSize(enc, 256*1024, 0)
}

// NewMemChunkSize returns a new in-mem chunk.
// Mainly for config push size.
func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk {
// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk {
c := &MemChunk{
blockSize: blockSize, // The blockSize in bytes.
targetSize: targetSize, // Desired chunk size in compressed bytes
Expand All @@ -170,9 +164,11 @@ func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk {
}

// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
bc := &MemChunk{
head: &headBlock{}, // Dummy, empty headblock.
head: &headBlock{}, // Dummy, empty headblock.
blockSize: blockSize,
targetSize: targetSize,
}
db := decbuf{b: b}

Expand Down Expand Up @@ -235,6 +231,9 @@ func NewByteChunk(b []byte) (*MemChunk, error) {

bc.blocks = append(bc.blocks, blk)

// Update the counter used to track the size of cut blocks.
bc.cutBlockSize += len(blk.b)

if db.err() != nil {
return nil, errors.Wrap(db.err(), "decoding block meta")
}
Expand Down
42 changes: 23 additions & 19 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ var testEncoding = []Encoding{
EncSnappy,
}

var (
testBlockSize = 256 * 1024
testTargetSize = 1500 * 1024
)

func TestBlock(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
chk := NewMemChunk(enc)
chk := NewMemChunk(enc, testBlockSize, testTargetSize)
cases := []struct {
ts int64
str string
Expand Down Expand Up @@ -128,7 +133,7 @@ func TestBlock(t *testing.T) {
}

func TestReadFormatV1(t *testing.T) {
c := NewMemChunk(EncGZIP)
c := NewMemChunk(EncGZIP, testBlockSize, testTargetSize)
fillChunk(c)
// overrides default v2 format
c.format = chunkFormatV1
Expand All @@ -138,7 +143,7 @@ func TestReadFormatV1(t *testing.T) {
t.Fatal(err)
}

r, err := NewByteChunk(b)
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
Expand All @@ -165,7 +170,7 @@ func TestReadFormatV1(t *testing.T) {
func TestRoundtripV2(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
populated := fillChunk(c)

assertLines := func(c *MemChunk) {
Expand Down Expand Up @@ -195,7 +200,7 @@ func TestRoundtripV2(t *testing.T) {
t.Fatal(err)
}

r, err := NewByteChunk(b)
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
Expand All @@ -205,7 +210,7 @@ func TestRoundtripV2(t *testing.T) {
rOut, err := r.Bytes()
require.Nil(t, err)

loaded, err := NewByteChunk(rOut)
loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize)
require.Nil(t, err)

assertLines(loaded)
Expand All @@ -218,7 +223,7 @@ func TestRoundtripV2(t *testing.T) {
func TestSerialization(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
chk := NewMemChunk(enc)
chk := NewMemChunk(enc, testBlockSize, testTargetSize)

numSamples := 500000

Expand All @@ -229,7 +234,7 @@ func TestSerialization(t *testing.T) {
byt, err := chk.Bytes()
require.NoError(t, err)

bc, err := NewByteChunk(byt)
bc, err := NewByteChunk(byt, testBlockSize, testTargetSize)
require.NoError(t, err)

it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
Expand All @@ -255,7 +260,7 @@ func TestSerialization(t *testing.T) {
func TestChunkFilling(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
chk := NewMemChunk(enc)
chk := NewMemChunk(enc, testBlockSize, 0)
chk.blockSize = 1024

// We should be able to append only 10KB of logs.
Expand Down Expand Up @@ -292,8 +297,7 @@ func TestChunkFilling(t *testing.T) {
}

func TestGZIPChunkTargetSize(t *testing.T) {
targetSize := 1024 * 1024
chk := NewMemChunkSize(EncGZIP, 1024, targetSize)
chk := NewMemChunk(EncGZIP, testBlockSize, testTargetSize)

lineSize := 512
entry := &logproto.Entry{
Expand Down Expand Up @@ -330,8 +334,8 @@ func TestGZIPChunkTargetSize(t *testing.T) {

// Even though the seed is static above and results should be deterministic,
// we will allow +/- 10% variance
minSize := int(float64(targetSize) * 0.9)
maxSize := int(float64(targetSize) * 1.1)
minSize := int(float64(testTargetSize) * 0.9)
maxSize := int(float64(testTargetSize) * 1.1)
require.Greater(t, chk.CompressedSize(), minSize)
require.Less(t, chk.CompressedSize(), maxSize)

Expand Down Expand Up @@ -378,15 +382,15 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

tester(t, NewMemChunk(EncGZIP))
tester(t, NewMemChunk(EncGZIP, testBlockSize, testTargetSize))
})
}
}

func TestChunkSize(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
if err != nil {
Expand All @@ -400,7 +404,7 @@ func TestChunkSize(t *testing.T) {
}

func TestChunkStats(t *testing.T) {
c := NewMemChunk(EncSnappy)
c := NewMemChunk(EncSnappy, testBlockSize, 0)
first := time.Now()
entry := &logproto.Entry{
Timestamp: first,
Expand Down Expand Up @@ -445,7 +449,7 @@ func TestChunkStats(t *testing.T) {
}

// test on a new chunk.
cb, err := NewByteChunk(b)
cb, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -496,7 +500,7 @@ func TestIteratorClose(t *testing.T) {
}
},
} {
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
inserted := fillChunk(c)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, nil)
if err != nil {
Expand All @@ -523,7 +527,7 @@ func BenchmarkWrite(b *testing.B) {
for _, enc := range testEncoding {
b.Run(enc.String(), func(b *testing.B) {
for n := 0; n < b.N; n++ {
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
// adds until full so we trigger cut which serialize using gzip
for c.SpaceFor(entry) {
_ = c.Append(entry)
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func generateData(enc Encoding) ([]Chunk, uint64) {

for n := 0; n < 50; n++ {
entry := logprotoEntry(0, testdata.LogString(0))
c := NewMemChunk(enc)
c := NewMemChunk(enc, testBlockSize, testTargetSize)
for c.SpaceFor(entry) {
size += uint64(len(entry.Line))
_ = c.Append(entry)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestIterator(t *testing.T) {
new func() chunkenc.Chunk
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }},
} {
t.Run(chk.name, func(t *testing.T) {
chunk := chk.new()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
c := chunk.NewChunk(
userID, fp, metric,
chunkenc.NewFacade(c.chunk),
chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),
firstTime,
lastTime,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunkSize(enc, cfg.BlockSize, cfg.TargetChunkSize)
return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

var defaultFactory = func() chunkenc.Chunk {
return chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 512, 0)
return chunkenc.NewMemChunk(chunkenc.EncGZIP, 512, 0)
}

func TestLabelsCollisions(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, factory
// consumeChunk manually adds a chunk to the stream that was received during
// ingester chunk transfer.
func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
c, err := chunkenc.NewByteChunk(chunk.Data)
c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestStreamIterator(t *testing.T) {
new func() chunkenc.Chunk
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }},
} {
t.Run(chk.name, func(t *testing.T) {
var s stream
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/hack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func fillStore() error {
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.FastFingerprint(lbs)
chunkEnc := chunkenc.NewMemChunkSize(chunkenc.EncLZ4_64k, 262144, 1572864)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864)
for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() {
entry := &logproto.Entry{
Timestamp: time.Unix(0, ts),
Expand All @@ -106,7 +106,7 @@ func fillStore() error {
_ = chunkEnc.Append(entry)
} else {
from, to := chunkEnc.Bounds()
c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano()))
c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc, 0, 0), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano()))
if err := c.Encode(); err != nil {
panic(err)
}
Expand All @@ -119,7 +119,7 @@ func fillStore() error {
if flushCount >= maxChunks {
return
}
chunkEnc = chunkenc.NewMemChunkSize(chunkenc.EncLZ4_64k, 262144, 1572864)
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk {
l = builder.Labels()
}
from, through := model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()), model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano())
chk := chunkenc.NewMemChunk(chunkenc.EncGZIP)
chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0)
for _, e := range stream.Entries {
if e.Timestamp.UnixNano() < from.UnixNano() {
from = model.TimeFromUnixNano(e.Timestamp.UnixNano())
Expand All @@ -76,7 +76,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk {
_ = chk.Append(&e)
}
chk.Close()
c := chunk.NewChunk("fake", client.Fingerprint(l), l, chunkenc.NewFacade(chk), from, through)
c := chunk.NewChunk("fake", client.Fingerprint(l), l, chunkenc.NewFacade(chk, 0, 0), from, through)
// force the checksum creation
if err := c.Encode(); err != nil {
panic(err)
Expand Down