Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow configuring a target chunk size in compressed bytes #1406

Merged
merged 2 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,16 @@ The `ingester_config` block configures Ingesters.
# period as long as they receieve no further activity.
[chunk_idle_period: <duration> | default = 30m]

# The maximum size in bytes a chunk can be before it should be flushed.
# The targeted _uncompressed_ size in bytes of a chunk block
# When this threshold is exceeded the head block will be cut and compressed inside the chunk
[chunk_block_size: <int> | default = 262144]

# A target _compressed_ size in bytes for chunks.
# This is a desired size not an exact size, chunks may be slightly bigger
# or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period)
# The default value of 0 for this will create chunks with a fixed 10 blocks,
# A non zero value will create chunks with a variable number of blocks to meet the target size.
[chunk_target_size: <int> | default = 0]
```

### lifecycler_config
Expand Down
13 changes: 13 additions & 0 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (c *dumbChunk) UncompressedSize() int {
return c.Size()
}

// CompressedSize implements Chunk.
func (c *dumbChunk) CompressedSize() int {
return 0
}

// Utilization implements Chunk
func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
Expand Down Expand Up @@ -91,6 +96,14 @@ func (c *dumbChunk) Bytes() ([]byte, error) {
return nil, nil
}

func (c *dumbChunk) Blocks() int {
return 0
}

func (c *dumbChunk) Close() error {
return nil
}

type dumbChunkIterator struct {
direction logproto.Direction
i int
Expand Down
50 changes: 42 additions & 8 deletions pkg/chunkenc/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"io"
"time"

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/pkg/errors"
)

const blocksPerChunk = 10
Expand Down Expand Up @@ -43,9 +44,13 @@ func newCRC32() hash.Hash32 {
type MemChunk struct {
// The number of uncompressed bytes per block.
blockSize int
// Target size in compressed bytes
targetSize int

// The finished blocks.
blocks []block
// The compressed size of all the blocks
cutBlockSize int

// Current in-mem block being appended to.
head *headBlock
Expand Down Expand Up @@ -129,10 +134,11 @@ type entry struct {

// NewMemChunkSize returns a new in-mem chunk.
// Mainly for config push size.
func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk {
func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk {
c := &MemChunk{
blockSize: blockSize, // The blockSize in bytes.
blocks: []block{},
blockSize: blockSize, // The blockSize in bytes.
targetSize: targetSize, // Desired chunk size in compressed bytes
blocks: []block{},

head: &headBlock{},

Expand All @@ -151,7 +157,7 @@ func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk {

// NewMemChunk returns a new in-mem chunk for query.
func NewMemChunk(enc Encoding) *MemChunk {
return NewMemChunkSize(enc, 256*1024)
return NewMemChunkSize(enc, 256*1024, 0)
}

// NewByteChunk returns a MemChunk on the passed bytes.
Expand Down Expand Up @@ -309,8 +315,20 @@ func (c *MemChunk) Size() int {
return ne
}

// Blocks implements Chunk.
func (c *MemChunk) Blocks() int {
return len(c.blocks)
}

// SpaceFor implements Chunk.
func (c *MemChunk) SpaceFor(*logproto.Entry) bool {
func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
if c.targetSize > 0 {
// This is looking to see if the uncompressed lines will fit which is not
// a great check, but it will guarantee we are always under the target size
newHBSize := c.head.size + len(e.Line)
return (c.cutBlockSize + newHBSize) < c.targetSize
}
// if targetSize is not defined, default to the original behavior of fixed blocks per chunk
return len(c.blocks) < blocksPerChunk
}

Expand All @@ -329,11 +347,25 @@ func (c *MemChunk) UncompressedSize() int {
return size
}

// Utilization implements Chunk. It is the bytes used as a percentage of the
// CompressedSize implements Chunk
func (c *MemChunk) CompressedSize() int {
size := 0
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
if !c.head.isEmpty() {
size += c.head.size
}
size += c.cutBlockSize
return size
}

// Utilization implements Chunk.
func (c *MemChunk) Utilization() float64 {
if c.targetSize != 0 {
return float64(c.CompressedSize()) / float64(c.targetSize)
}
size := c.UncompressedSize()

return float64(size) / float64(blocksPerChunk*c.blockSize)

}

// Append implements Chunk.
Expand Down Expand Up @@ -382,6 +414,8 @@ func (c *MemChunk) cut() error {
uncompressedSize: c.head.size,
})

c.cutBlockSize += len(b)

c.head.entries = c.head.entries[:0]
c.head.mint = 0 // Will be set on first append.
c.head.size = 0
Expand Down
54 changes: 53 additions & 1 deletion pkg/chunkenc/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

"github.com/stretchr/testify/assert"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
)

func TestGZIPBlock(t *testing.T) {
Expand Down Expand Up @@ -175,6 +176,57 @@ func TestGZIPChunkFilling(t *testing.T) {
require.Equal(t, int64(lines), i)
}

func TestGZIPChunkTargetSize(t *testing.T) {
targetSize := 1024 * 1024
chk := NewMemChunkSize(EncGZIP, 1024, targetSize)

lineSize := 512
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: "",
}

// Use a random number to generate random log data, otherwise the gzip compression is way too good
// and the following loop has to run waaayyyyy to many times
// Using the same seed should guarantee the same random numbers and same test data.
r := rand.New(rand.NewSource(99))

i := int64(0)

for ; chk.SpaceFor(entry) && i < 5000; i++ {
logLine := make([]byte, lineSize)
for j := range logLine {
logLine[j] = byte(r.Int())
}
entry = &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: string(logLine),
}
entry.Timestamp = time.Unix(0, i)
require.NoError(t, chk.Append(entry))
}

// 5000 is a limit ot make sure the test doesn't run away, we shouldn't need this many log lines to make 1MB chunk
require.NotEqual(t, 5000, i)

require.NoError(t, chk.Close())

require.Equal(t, 0, chk.head.size)

// Even though the seed is static above and results should be deterministic,
// we will allow +/- 10% variance
minSize := int(float64(targetSize) * 0.9)
maxSize := int(float64(targetSize) * 1.1)
require.Greater(t, chk.CompressedSize(), minSize)
require.Less(t, chk.CompressedSize(), maxSize)

// Also verify our utilization is close to 1.0
ut := chk.Utilization()
require.Greater(t, ut, 0.99)
require.Less(t, ut, 1.01)

}

func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 3 additions & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ type Chunk interface {
Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error)
Size() int
Bytes() ([]byte, error)
Blocks() int
Utilization() float64
UncompressedSize() int
CompressedSize() int
Close() error
}

// CompressionWriter is the writer that compresses the data passed to it.
Expand Down
4 changes: 3 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error)
Expand All @@ -63,6 +64,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 15*time.Minute, "")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "")
f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 0, "")
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -189,7 +191,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(instanceID, i.cfg.BlockSize, i.limits)
inst = newInstance(instanceID, i.cfg.BlockSize, i.cfg.TargetChunkSize, i.limits)
i.instances[instanceID] = inst
}
return inst
Expand Down
20 changes: 11 additions & 9 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ type instance struct {
streamsCreatedTotal prometheus.Counter
streamsRemovedTotal prometheus.Counter

blockSize int
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
blockSize int
targetChunkSize int // Compressed bytes
tailers map[uint32]*tailer
tailerMtx sync.RWMutex

limits *validation.Overrides
}

func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance {
func newInstance(instanceID string, blockSize, targetChunkSize int, limits *validation.Overrides) *instance {
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
Expand All @@ -76,9 +77,10 @@ func newInstance(instanceID string, blockSize int, limits *validation.Overrides)
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),

blockSize: blockSize,
tailers: map[uint32]*tailer{},
limits: limits,
blockSize: blockSize,
targetChunkSize: targetChunkSize,
tailers: map[uint32]*tailer{},
limits: limits,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
Expand All @@ -96,7 +98,7 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
stream, ok := i.streams[fp]
if !ok {
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.blockSize)
stream = newStream(fp, sortedLabels, i.blockSize, i.targetChunkSize)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.Inc()
Expand Down Expand Up @@ -154,7 +156,7 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
}
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.blockSize)
stream = newStream(fp, sortedLabels, i.blockSize, i.targetChunkSize)
i.streams[fp] = stream
memoryStreams.Inc()
i.streamsCreatedTotal.Inc()
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/util/validation"
"github.com/grafana/loki/pkg/logproto"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/util/validation"
)

func TestLabelsCollisions(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)

i := newInstance("test", 512, o)
i := newInstance("test", 512, 0, o)

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -45,7 +47,7 @@ func TestConcurrentPushes(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)

inst := newInstance("test", 512, o)
inst := newInstance("test", 512, 0, o)

const (
concurrent = 10
Expand Down
Loading