Skip to content

Commit

Permalink
Merge pull request grafana#1044 from cortexproject/shrunk-chunks
Browse files Browse the repository at this point in the history
 Write only as many chunk bytes as needed
  • Loading branch information
bboreham authored Nov 30, 2018
2 parents ff29d37 + 8c782fb commit 189bb9c
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 48 deletions.
4 changes: 4 additions & 0 deletions chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (

const userID = "userID"

func init() {
encoding.DefaultEncoding = encoding.Varbit
}

func dummyChunk(now model.Time) Chunk {
return dummyChunkFor(now, model.Metric{
model.MetricNameLabel: "foo",
Expand Down
9 changes: 0 additions & 9 deletions encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/binary"
"errors"
"io"
"io/ioutil"

"github.com/prometheus/common/model"
"github.com/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -95,14 +94,6 @@ func (b *bigchunk) MarshalToBuf(buf []byte) error {
return b.Marshal(writer)
}

func (b *bigchunk) Unmarshal(r io.Reader) error {
buf, err := ioutil.ReadAll(r)
if err != nil {
return err
}
return b.UnmarshalFromBuf(buf)
}

func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
r := reader{buf: buf}
numChunks, err := r.ReadUint16()
Expand Down
1 change: 0 additions & 1 deletion encoding/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type Chunk interface {
Add(sample model.SamplePair) ([]Chunk, error)
NewIterator() Iterator
Marshal(io.Writer) error
Unmarshal(io.Reader) error
UnmarshalFromBuf([]byte) error
Encoding() Encoding
Utilization() float64
Expand Down
6 changes: 4 additions & 2 deletions encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestLen(t *testing.T) {
var step = int(15 * time.Second / time.Millisecond)

func TestChunk(t *testing.T) {
alwaysMarshalFullsizeChunks = false
for _, tc := range []struct {
encoding Encoding
maxSamples int
Expand Down Expand Up @@ -112,7 +113,7 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {

bs1 := buf.Bytes()
chunk, err = NewForEncoding(encoding)
err = chunk.Unmarshal(&buf)
err = chunk.UnmarshalFromBuf(bs1)
require.NoError(t, err)

// Check all the samples are in there.
Expand All @@ -127,11 +128,12 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {
require.NoError(t, iter.Err())

// Check the byte representation after another Marshall is the same.
buf = bytes.Buffer{}
err = chunk.Marshal(&buf)
require.NoError(t, err)
bs2 := buf.Bytes()

require.True(t, bytes.Equal(bs1, bs2))
require.Equal(t, bs1, bs2)
}

// testChunkSeek checks seek works as expected.
Expand Down
9 changes: 0 additions & 9 deletions encoding/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,6 @@ func (c deltaEncodedChunk) Marshal(w io.Writer) error {
return nil
}

// Unmarshal implements chunk.
func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
return c.setLen()
}

// UnmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
Expand Down
6 changes: 0 additions & 6 deletions encoding/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
err = cs[0].UnmarshalFromBuf(buf)
verifyUnmarshallingError(err, c.chunkTypeName, "buf", "invalid number of time bytes")

err = cs[0].Unmarshal(bytes.NewBuffer(buf))
verifyUnmarshallingError(err, c.chunkTypeName, "Reader", "invalid number of time bytes")

// Fix the corruption to go on.
buf[c.timeBytesPos] = byte(d1)

Expand All @@ -111,9 +108,6 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {

err = cs[0].UnmarshalFromBuf(buf)
verifyUnmarshallingError(err, c.chunkTypeName, "buf", "header size")

err = cs[0].Unmarshal(bytes.NewBuffer(buf))
verifyUnmarshallingError(err, c.chunkTypeName, "Reader", "header size")
}
}
}
9 changes: 0 additions & 9 deletions encoding/doubledelta.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,6 @@ func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error {
return nil
}

// Unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
return c.setLen()
}

// UnmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
Expand Down
46 changes: 34 additions & 12 deletions encoding/varbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package encoding

import (
"encoding/binary"
"flag"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -286,26 +287,21 @@ func (c *varbitChunk) Slice(_, _ model.Time) Chunk {

// Marshal implements chunk.
func (c varbitChunk) Marshal(w io.Writer) error {
n, err := w.Write(c)
size := c.marshalLen()
n, err := w.Write(c[:size])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
if n != size {
return fmt.Errorf("wanted to write %d bytes, wrote %d", size, n)
}
return nil
}

// Unmarshal implements chunk.
func (c varbitChunk) Unmarshal(r io.Reader) error {
_, err := io.ReadFull(r, c)
return err
}

// UnmarshalFromBuf implements chunk.
func (c varbitChunk) UnmarshalFromBuf(buf []byte) error {
if copied := copy(c, buf); copied != cap(c) {
return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied)
if copied := copy(c, buf); copied != cap(c) && copied != c.marshalLen() {
return fmt.Errorf("incorrect byte count copied from buffer during unmarshaling, want %d or %d, got %d", c.marshalLen(), ChunkLen, copied)
}
return nil
}
Expand All @@ -319,6 +315,32 @@ func (c varbitChunk) Utilization() float64 {
return math.Min(float64(c.nextSampleOffset()/8+15)/float64(cap(c)), 1)
}

// MarshalConfig configures the behaviour of marshalling
type MarshalConfig struct{}

var alwaysMarshalFullsizeChunks = true

// RegisterFlags registers configuration settings.
func (MarshalConfig) RegisterFlags(f *flag.FlagSet) {
flag.BoolVar(&alwaysMarshalFullsizeChunks, "store.fullsize-chunks", alwaysMarshalFullsizeChunks, "When saving varbit chunks, pad to 1024 bytes")
}

// marshalLen returns the number of bytes that should be marshalled for this chunk
func (c varbitChunk) marshalLen() int {
if alwaysMarshalFullsizeChunks {
return cap(c)
}
bits := c.nextSampleOffset()
if bits < varbitThirdSampleBitOffset {
bits = varbitThirdSampleBitOffset
}
bytes := int(bits)/8 + 1
if bytes > len(c) {
bytes = len(c)
}
return bytes
}

// Len implements chunk. Runs in O(n).
func (c varbitChunk) Len() int {
it := c.NewIterator()
Expand All @@ -329,7 +351,7 @@ func (c varbitChunk) Len() int {
}

func (c varbitChunk) Size() int {
return len(c)
return c.marshalLen()
}

func (c varbitChunk) firstTime() model.Time {
Expand Down

0 comments on commit 189bb9c

Please sign in to comment.