From ae66bb0a41eaebb13491b6f7f615d54615f8e97a Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 12 Dec 2019 11:21:35 -0500 Subject: [PATCH 1/2] allow configuring a target chunk size in compressed bytes Signed-off-by: Edward Welch --- docs/configuration/README.md | 10 +++++- pkg/chunkenc/dumb_chunk.go | 13 ++++++++ pkg/chunkenc/gzip.go | 57 ++++++++++++++++++++++++++++------- pkg/chunkenc/gzip_test.go | 54 ++++++++++++++++++++++++++++++++- pkg/chunkenc/interface.go | 3 ++ pkg/ingester/ingester.go | 4 ++- pkg/ingester/instance.go | 20 ++++++------ pkg/ingester/instance_test.go | 10 +++--- pkg/ingester/stream.go | 37 ++++++++++++++++------- pkg/storage/hack/main.go | 4 +-- 10 files changed, 172 insertions(+), 40 deletions(-) diff --git a/docs/configuration/README.md b/docs/configuration/README.md index e05087aba9b45..e80bbdab4f144 100644 --- a/docs/configuration/README.md +++ b/docs/configuration/README.md @@ -267,8 +267,16 @@ The `ingester_config` block configures Ingesters. # period as long as they receieve no further activity. [chunk_idle_period: | default = 30m] -# The maximum size in bytes a chunk can be before it should be flushed. +# The maximum _uncompressed_ size in bytes of a chunk block, +# if chunk_target_size == 0 a chunk will have a fixed 10 blocks. +# If chunk_target_size != 0 the chunk will have a variable number of blocks +# to try to meet the target size [chunk_block_size: | default = 262144] + +# A target _compressed_ size in bytes for chunks. +# This is a desired size not an exact size, chunks may be slightly bigger +# or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period) +[chunk_target_size: | default = 0] ``` ### lifecycler_config diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index edb154b050b8e..f956ac63b8b82 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -55,6 +55,11 @@ func (c *dumbChunk) UncompressedSize() int { return c.Size() } +// CompressedSize implements Chunk. +func (c *dumbChunk) CompressedSize() int { + return 0 +} + // Utilization implements Chunk func (c *dumbChunk) Utilization() float64 { return float64(len(c.entries)) / float64(tmpNumEntries) @@ -91,6 +96,14 @@ func (c *dumbChunk) Bytes() ([]byte, error) { return nil, nil } +func (c *dumbChunk) Blocks() int { + return 0 +} + +func (c *dumbChunk) Close() error { + return nil +} + type dumbChunkIterator struct { direction logproto.Direction i int diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index c3bbc9239513a..d9c13c01967bb 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -10,10 +10,11 @@ import ( "io" "time" + "github.com/pkg/errors" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/pkg/errors" ) const blocksPerChunk = 10 @@ -43,9 +44,13 @@ func newCRC32() hash.Hash32 { type MemChunk struct { // The number of uncompressed bytes per block. blockSize int + // Target size in compressed bytes + targetSize int // The finished blocks. blocks []block + // The compressed size of all the blocks + cutBlockSize int // Current in-mem block being appended to. head *headBlock @@ -129,10 +134,11 @@ type entry struct { // NewMemChunkSize returns a new in-mem chunk. // Mainly for config push size. -func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk { +func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk { c := &MemChunk{ - blockSize: blockSize, // The blockSize in bytes. - blocks: []block{}, + blockSize: blockSize, // The blockSize in bytes. + targetSize: targetSize, // Desired chunk size in compressed bytes + blocks: []block{}, head: &headBlock{}, @@ -151,7 +157,7 @@ func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk { // NewMemChunk returns a new in-mem chunk for query. func NewMemChunk(enc Encoding) *MemChunk { - return NewMemChunkSize(enc, 256*1024) + return NewMemChunkSize(enc, 256*1024, 0) } // NewByteChunk returns a MemChunk on the passed bytes. @@ -309,9 +315,22 @@ func (c *MemChunk) Size() int { return ne } +// Blocks implements Chunk. +func (c *MemChunk) Blocks() int { + return len(c.blocks) +} + // SpaceFor implements Chunk. -func (c *MemChunk) SpaceFor(*logproto.Entry) bool { - return len(c.blocks) < blocksPerChunk +func (c *MemChunk) SpaceFor(e *logproto.Entry) bool { + if c.targetSize > 0 { + // This is looking to see if the uncompressed lines will fit which is not + // a great check, but it will guarantee we are always under the target size + newHBSize := c.head.size + len(e.Line) + return (c.cutBlockSize + newHBSize) < c.targetSize + } else { + // if targetSize is not defined, default to the original behavior of fixed blocks per chunk + return len(c.blocks) < blocksPerChunk + } } // UncompressedSize implements Chunk. @@ -329,11 +348,25 @@ func (c *MemChunk) UncompressedSize() int { return size } -// Utilization implements Chunk. It is the bytes used as a percentage of the -func (c *MemChunk) Utilization() float64 { - size := c.UncompressedSize() +// CompressedSize implements Chunk +func (c *MemChunk) CompressedSize() int { + size := 0 + // Better to account for any uncompressed data than ignore it even though this isn't accurate. + if !c.head.isEmpty() { + size += c.head.size + } + size += c.cutBlockSize + return size +} - return float64(size) / float64(blocksPerChunk*c.blockSize) +// Utilization implements Chunk. +func (c *MemChunk) Utilization() float64 { + if c.targetSize != 0 { + return float64(c.targetSize) / float64(c.CompressedSize()) + } else { + size := c.UncompressedSize() + return float64(size) / float64(blocksPerChunk*c.blockSize) + } } // Append implements Chunk. @@ -382,6 +415,8 @@ func (c *MemChunk) cut() error { uncompressedSize: c.head.size, }) + c.cutBlockSize += len(b) + c.head.entries = c.head.entries[:0] c.head.mint = 0 // Will be set on first append. c.head.size = 0 diff --git a/pkg/chunkenc/gzip_test.go b/pkg/chunkenc/gzip_test.go index b89ce8e4879f3..7cebc3d1373be 100644 --- a/pkg/chunkenc/gzip_test.go +++ b/pkg/chunkenc/gzip_test.go @@ -11,8 +11,9 @@ import ( "github.com/stretchr/testify/assert" - "github.com/grafana/loki/pkg/logproto" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" ) func TestGZIPBlock(t *testing.T) { @@ -175,6 +176,57 @@ func TestGZIPChunkFilling(t *testing.T) { require.Equal(t, int64(lines), i) } +func TestGZIPChunkTargetSize(t *testing.T) { + targetSize := 1024 * 1024 + chk := NewMemChunkSize(EncGZIP, 1024, targetSize) + + lineSize := 512 + entry := &logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: "", + } + + // Use a random number to generate random log data, otherwise the gzip compression is way too good + // and the following loop has to run waaayyyyy to many times + // Using the same seed should guarantee the same random numbers and same test data. + r := rand.New(rand.NewSource(99)) + + i := int64(0) + + for ; chk.SpaceFor(entry) && i < 5000; i++ { + logLine := make([]byte, lineSize) + for j := range logLine { + logLine[j] = byte(r.Int()) + } + entry = &logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: string(logLine), + } + entry.Timestamp = time.Unix(0, i) + require.NoError(t, chk.Append(entry)) + } + + // 5000 is a limit ot make sure the test doesn't run away, we shouldn't need this many log lines to make 1MB chunk + require.NotEqual(t, 5000, i) + + require.NoError(t, chk.Close()) + + require.Equal(t, 0, chk.head.size) + + // Even though the seed is static above and results should be deterministic, + // we will allow +/- 10% variance + minSize := int(float64(targetSize) * 0.9) + maxSize := int(float64(targetSize) * 1.1) + require.Greater(t, chk.CompressedSize(), minSize) + require.Less(t, chk.CompressedSize(), maxSize) + + // Also verify our utilization is close to 1.0 + ut := chk.Utilization() + require.Greater(t, ut, 0.99) + require.Less(t, ut, 1.01) + +} + func TestMemChunk_AppendOutOfOrder(t *testing.T) { t.Parallel() diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 1f8d405e8b4b3..b9446b45a95e1 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -50,8 +50,11 @@ type Chunk interface { Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) Size() int Bytes() ([]byte, error) + Blocks() int Utilization() float64 UncompressedSize() int + CompressedSize() int + Close() error } // CompressionWriter is the writer that compresses the data passed to it. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index ba9462205c2e5..ccf2aee0f25ac 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -47,6 +47,7 @@ type Config struct { RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` BlockSize int `yaml:"chunk_block_size"` + TargetChunkSize int `yaml:"chunk_target_size"` // For testing, you can override the address and ID of this ingester. ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error) @@ -63,6 +64,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 15*time.Minute, "") f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "") f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "") + f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 0, "") } // Ingester builds chunks for incoming log streams. @@ -189,7 +191,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { defer i.instancesMtx.Unlock() inst, ok = i.instances[instanceID] if !ok { - inst = newInstance(instanceID, i.cfg.BlockSize, i.limits) + inst = newInstance(instanceID, i.cfg.BlockSize, i.cfg.TargetChunkSize, i.limits) i.instances[instanceID] = inst } return inst diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 4375bcc18f338..f7a295e515189 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -60,14 +60,15 @@ type instance struct { streamsCreatedTotal prometheus.Counter streamsRemovedTotal prometheus.Counter - blockSize int - tailers map[uint32]*tailer - tailerMtx sync.RWMutex + blockSize int + targetChunkSize int // Compressed bytes + tailers map[uint32]*tailer + tailerMtx sync.RWMutex limits *validation.Overrides } -func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance { +func newInstance(instanceID string, blockSize, targetChunkSize int, limits *validation.Overrides) *instance { i := &instance{ streams: map[model.Fingerprint]*stream{}, index: index.New(), @@ -76,9 +77,10 @@ func newInstance(instanceID string, blockSize int, limits *validation.Overrides) streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID), streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID), - blockSize: blockSize, - tailers: map[uint32]*tailer{}, - limits: limits, + blockSize: blockSize, + targetChunkSize: targetChunkSize, + tailers: map[uint32]*tailer{}, + limits: limits, } i.mapper = newFPMapper(i.getLabelsFromFingerprint) return i @@ -96,7 +98,7 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte stream, ok := i.streams[fp] if !ok { sortedLabels := i.index.Add(labels, fp) - stream = newStream(fp, sortedLabels, i.blockSize) + stream = newStream(fp, sortedLabels, i.blockSize, i.targetChunkSize) i.streams[fp] = stream i.streamsCreatedTotal.Inc() memoryStreams.Inc() @@ -154,7 +156,7 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID)) } sortedLabels := i.index.Add(labels, fp) - stream = newStream(fp, sortedLabels, i.blockSize) + stream = newStream(fp, sortedLabels, i.blockSize, i.targetChunkSize) i.streams[fp] = stream memoryStreams.Inc() i.streamsCreatedTotal.Inc() diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 7b2452bf23dbe..c25b841efe6c5 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -8,18 +8,20 @@ import ( "testing" "time" - "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/prometheus/pkg/labels" - "github.com/grafana/loki/pkg/util/validation" + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/util/validation" ) func TestLabelsCollisions(t *testing.T) { o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000}) require.NoError(t, err) - i := newInstance("test", 512, o) + i := newInstance("test", 512, 0, o) // avoid entries from the future. tt := time.Now().Add(-5 * time.Minute) @@ -45,7 +47,7 @@ func TestConcurrentPushes(t *testing.T) { o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000}) require.NoError(t, err) - inst := newInstance("test", 512, o) + inst := newInstance("test", 512, 0, o) const ( concurrent = 10 diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index b16d42a76d420..cf30176fc96e9 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -34,6 +34,14 @@ var ( Buckets: prometheus.LinearBuckets(4096, 2048, 6), }) + blocksPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki", + Subsystem: "ingester", + Name: "blocks_per_chunk", + Help: "The number of blocks in a chunk.", + + Buckets: prometheus.ExponentialBuckets(5, 2, 6), + }) ) func init() { @@ -44,10 +52,11 @@ func init() { type stream struct { // Newest chunk at chunks[n-1]. // Not thread-safe; assume accesses to this are locked by caller. - chunks []chunkDesc - fp model.Fingerprint // possibly remapped fingerprint, used in the streams map - labels labels.Labels - blockSize int + chunks []chunkDesc + fp model.Fingerprint // possibly remapped fingerprint, used in the streams map + labels labels.Labels + blockSize int + targetChunkSize int // Compressed bytes tailers map[uint32]*tailer tailerMtx sync.RWMutex @@ -66,12 +75,13 @@ type entryWithError struct { e error } -func newStream(fp model.Fingerprint, labels labels.Labels, blockSize int) *stream { +func newStream(fp model.Fingerprint, labels labels.Labels, blockSize, targetChunkSize int) *stream { return &stream{ - fp: fp, - labels: labels, - blockSize: blockSize, - tailers: map[uint32]*tailer{}, + fp: fp, + labels: labels, + blockSize: blockSize, + targetChunkSize: targetChunkSize, + tailers: map[uint32]*tailer{}, } } @@ -93,7 +103,7 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { if len(s.chunks) == 0 { s.chunks = append(s.chunks, chunkDesc{ - chunk: chunkenc.NewMemChunkSize(chunkenc.EncGZIP, s.blockSize), + chunk: chunkenc.NewMemChunkSize(chunkenc.EncGZIP, s.blockSize, s.targetChunkSize), }) chunksCreatedTotal.Inc() } @@ -106,13 +116,18 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { for i := range entries { chunk := &s.chunks[len(s.chunks)-1] if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) { + err := chunk.chunk.Close() + if err != nil { + //TODO what the heck do we do with this error? + } chunk.closed = true samplesPerChunk.Observe(float64(chunk.chunk.Size())) + blocksPerChunk.Observe(float64(chunk.chunk.Blocks())) chunksCreatedTotal.Inc() s.chunks = append(s.chunks, chunkDesc{ - chunk: chunkenc.NewMemChunkSize(chunkenc.EncGZIP, s.blockSize), + chunk: chunkenc.NewMemChunkSize(chunkenc.EncGZIP, s.blockSize, s.targetChunkSize), }) chunk = &s.chunks[len(s.chunks)-1] } diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index b2db84a32e7cf..db2909bc06f10 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -96,7 +96,7 @@ func fillStore() error { labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := client.FastFingerprint(lbs) - chunkEnc := chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 262144) + chunkEnc := chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 262144, 0) for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() { entry := &logproto.Entry{ Timestamp: time.Unix(0, ts), @@ -119,7 +119,7 @@ func fillStore() error { if flushCount >= maxChunks { return } - chunkEnc = chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 262144) + chunkEnc = chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 262144, 0) } } From 32e17df64d12b41d5ebca7fe00c36ec76cc7b5fb Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 12 Dec 2019 13:15:37 -0500 Subject: [PATCH 2/2] addressing feedback Signed-off-by: Edward Welch --- docs/configuration/README.md | 8 ++++---- pkg/chunkenc/gzip.go | 13 ++++++------- pkg/ingester/stream.go | 6 +++++- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/docs/configuration/README.md b/docs/configuration/README.md index e80bbdab4f144..737873bf88b3e 100644 --- a/docs/configuration/README.md +++ b/docs/configuration/README.md @@ -267,15 +267,15 @@ The `ingester_config` block configures Ingesters. # period as long as they receieve no further activity. [chunk_idle_period: | default = 30m] -# The maximum _uncompressed_ size in bytes of a chunk block, -# if chunk_target_size == 0 a chunk will have a fixed 10 blocks. -# If chunk_target_size != 0 the chunk will have a variable number of blocks -# to try to meet the target size +# The targeted _uncompressed_ size in bytes of a chunk block +# When this threshold is exceeded the head block will be cut and compressed inside the chunk [chunk_block_size: | default = 262144] # A target _compressed_ size in bytes for chunks. # This is a desired size not an exact size, chunks may be slightly bigger # or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period) +# The default value of 0 for this will create chunks with a fixed 10 blocks, +# A non zero value will create chunks with a variable number of blocks to meet the target size. [chunk_target_size: | default = 0] ``` diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index d9c13c01967bb..2132205c523ff 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -327,10 +327,9 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool { // a great check, but it will guarantee we are always under the target size newHBSize := c.head.size + len(e.Line) return (c.cutBlockSize + newHBSize) < c.targetSize - } else { - // if targetSize is not defined, default to the original behavior of fixed blocks per chunk - return len(c.blocks) < blocksPerChunk } + // if targetSize is not defined, default to the original behavior of fixed blocks per chunk + return len(c.blocks) < blocksPerChunk } // UncompressedSize implements Chunk. @@ -362,11 +361,11 @@ func (c *MemChunk) CompressedSize() int { // Utilization implements Chunk. func (c *MemChunk) Utilization() float64 { if c.targetSize != 0 { - return float64(c.targetSize) / float64(c.CompressedSize()) - } else { - size := c.UncompressedSize() - return float64(size) / float64(blocksPerChunk*c.blockSize) + return float64(c.CompressedSize()) / float64(c.targetSize) } + size := c.UncompressedSize() + return float64(size) / float64(blocksPerChunk*c.blockSize) + } // Append implements Chunk. diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index cf30176fc96e9..0288c0ca47296 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -9,6 +9,7 @@ import ( "time" "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -116,9 +117,12 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { for i := range entries { chunk := &s.chunks[len(s.chunks)-1] if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) { + // If the chunk has no more space call Close to make sure anything in the head block is cut and compressed err := chunk.chunk.Close() if err != nil { - //TODO what the heck do we do with this error? + // This should be an unlikely situation, returning an error up the stack doesn't help much here + // so instead log this to help debug the issue if it ever arises. + level.Error(util.Logger).Log("msg", "failed to Close chunk", "err", err) } chunk.closed = true