Skip to content

Commit

Permalink
Storage memory improvement (#713)
Browse files Browse the repository at this point in the history
* add benchmark for storage queries
* improve iterator to load only on next
* fix memory retained by lazy chunks
* reverse backward lazy iterator
  • Loading branch information
cyriltovena authored Jul 16, 2019
1 parent 8b06eb6 commit 3346ce1
Show file tree
Hide file tree
Showing 25 changed files with 903 additions and 223 deletions.
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,7 @@ push-plugin: build-plugin

enable-plugin:
docker plugin enable grafana/loki-docker-driver:$(PLUGIN_TAG)

benchmark-store:
go run ./pkg/storage/hack/main.go
go test ./pkg/storage/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
3 changes: 2 additions & 1 deletion cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ ingester:
kvstore:
store: inmemory
replication_factor: 1
chunk_idle_period: 15m
chunk_idle_period: 5m
chunk_retain_period: 30s

schema_config:
configs:
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)

const (
Expand Down Expand Up @@ -51,7 +52,7 @@ func (c *dumbChunk) Size() int {

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func init() {
})
}

// Facade for compatibility with cortex chunk type, so we can use it's chunk store.
// Facade for compatibility with cortex chunk type, so we can use its chunk store.
type Facade struct {
c Chunk
encoding.Chunk
Expand Down
165 changes: 95 additions & 70 deletions pkg/chunkenc/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@ package chunkenc
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"hash"
"hash/crc32"
"io"
"math"
"time"

"github.com/grafana/loki/pkg/logproto"

"github.com/grafana/loki/pkg/iter"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -53,8 +50,7 @@ type MemChunk struct {
head *headBlock

encoding Encoding
cw func(w io.Writer) CompressionWriter
cr func(r io.Reader) (CompressionReader, error)
cPool CompressionPool
}

type block struct {
Expand Down Expand Up @@ -96,10 +92,10 @@ func (hb *headBlock) append(ts int64, line string) error {
return nil
}

func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte, error) {
func (hb *headBlock) serialise(pool CompressionPool) ([]byte, error) {
buf := &bytes.Buffer{}
encBuf := make([]byte, binary.MaxVarintLen64)
compressedWriter := cw(buf)
compressedWriter := pool.GetWriter(buf)
for _, logEntry := range hb.entries {
n := binary.PutVarint(encBuf, logEntry.t)
_, err := compressedWriter.Write(encBuf[:n])
Expand All @@ -120,7 +116,7 @@ func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte,
if err := compressedWriter.Close(); err != nil {
return nil, errors.Wrap(err, "flushing pending compress buffer")
}

pool.PutWriter(compressedWriter)
return buf.Bytes(), nil
}

Expand All @@ -136,18 +132,14 @@ func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk {
blockSize: blockSize, // The blockSize in bytes.
blocks: []block{},

head: &headBlock{
mint: math.MaxInt64,
maxt: math.MinInt64,
},
head: &headBlock{},

encoding: enc,
}

switch enc {
case EncGZIP:
c.cw = func(w io.Writer) CompressionWriter { return gzip.NewWriter(w) }
c.cr = func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) }
c.cPool = &Gzip
default:
panic("unknown encoding")
}
Expand All @@ -163,8 +155,8 @@ func NewMemChunk(enc Encoding) *MemChunk {
// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
bc := &MemChunk{
cr: func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) },
head: &headBlock{}, // Dummy, empty headblock.
cPool: &Gzip,
head: &headBlock{}, // Dummy, empty headblock.
}

db := decbuf{b: b}
Expand Down Expand Up @@ -192,6 +184,7 @@ func NewByteChunk(b []byte) (*MemChunk, error) {

// Read the number of blocks.
num := db.uvarint()
bc.blocks = make([]block, 0, num)

for i := 0; i < num; i++ {
blk := block{}
Expand Down Expand Up @@ -343,7 +336,7 @@ func (c *MemChunk) cut() error {
return nil
}

b, err := c.head.serialise(c.cw)
b, err := c.head.serialise(c.cPool)
if err != nil {
return err
}
Expand Down Expand Up @@ -384,22 +377,19 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}

// Iterator implements Chunk.
func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks))
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
it, err := b.iterator(c.cr)
if err != nil {
return nil, err
}

its = append(its, it)
its = append(its, b.iterator(c.cPool, filter))
}
}

its = append(its, c.head.iterator(mint, maxt))
if !c.head.isEmpty() {
its = append(its, c.head.iterator(mint, maxt, filter))
}

iterForward := iter.NewTimeRangedIterator(
iter.NewNonOverlappingIterator(its, ""),
Expand All @@ -414,21 +404,14 @@ func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction
return iter.NewEntryIteratorBackward(iterForward)
}

func (b block) iterator(cr func(io.Reader) (CompressionReader, error)) (iter.EntryIterator, error) {
func (b block) iterator(pool CompressionPool, filter logql.Filter) iter.EntryIterator {
if len(b.b) == 0 {
return emptyIterator, nil
}

r, err := cr(bytes.NewBuffer(b.b))
if err != nil {
return nil, err
return emptyIterator
}

s := bufio.NewReader(r)
return newBufferedIterator(s), nil
return newBufferedIterator(pool, b.b, filter)
}

func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator {
func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return emptyIterator
}
Expand All @@ -438,8 +421,16 @@ func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator {
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.

entries := make([]entry, len(hb.entries))
copy(entries, hb.entries)
entries := make([]entry, 0, len(hb.entries))
for _, e := range hb.entries {
if filter == nil || filter([]byte(e.s)) {
entries = append(entries, e)
}
}

if len(entries) == 0 {
return emptyIterator
}

return &listIterator{
entries: entries,
Expand Down Expand Up @@ -477,73 +468,107 @@ func (li *listIterator) Close() error { return nil }
func (li *listIterator) Labels() string { return "" }

type bufferedIterator struct {
s *bufio.Reader
s *bufio.Reader
reader CompressionReader
pool CompressionPool

curT int64
curLog string
cur logproto.Entry

err error

buf []byte // The buffer a single entry.
decBuf []byte // The buffer for decoding the lengths.
buf *bytes.Buffer // The buffer for a single entry.
decBuf []byte // The buffer for decoding the lengths.

closed bool

filter logql.Filter
}

func newBufferedIterator(s *bufio.Reader) *bufferedIterator {
func newBufferedIterator(pool CompressionPool, b []byte, filter logql.Filter) *bufferedIterator {
r := pool.GetReader(bytes.NewBuffer(b))
return &bufferedIterator{
s: s,
buf: make([]byte, 1024),
s: BufReaderPool.Get(r),
reader: r,
pool: pool,
filter: filter,
buf: BytesBufferPool.Get(),
decBuf: make([]byte, binary.MaxVarintLen64),
}
}

func (si *bufferedIterator) Next() bool {
for {
ts, line, ok := si.moveNext()
if !ok {
si.Close()
return false
}
if si.filter != nil && !si.filter(line) {
continue
}
si.cur.Line = string(line)
si.cur.Timestamp = time.Unix(0, ts)
return true
}
}

// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
ts, err := binary.ReadVarint(si.s)
if err != nil {
if err != io.EOF {
si.err = err
}
return false
return 0, nil, false
}

l, err := binary.ReadUvarint(si.s)
if err != nil {
if err != io.EOF {
si.err = err

return false
return 0, nil, false
}
}

for len(si.buf) < int(l) {
si.buf = append(si.buf, make([]byte, 1024)...)
if si.buf.Cap() < int(l) {
si.buf.Grow(int(l) - si.buf.Cap())
}

n, err := si.s.Read(si.buf[:l])
n, err := si.s.Read(si.buf.Bytes()[:l])
if err != nil && err != io.EOF {
si.err = err
return false
return 0, nil, false
}
if n < int(l) {
_, err = si.s.Read(si.buf[n:l])
for n < int(l) {
r, err := si.s.Read(si.buf.Bytes()[n:l])
if err != nil {
si.err = err
return false
return 0, nil, false
}
n += r
}

si.curT = ts
si.curLog = string(si.buf[:l])

return true
return ts, si.buf.Bytes()[:l], true
}

func (si *bufferedIterator) Entry() logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(0, si.curT),
Line: si.curLog,
}
return si.cur
}

func (si *bufferedIterator) Error() error { return si.err }

func (si *bufferedIterator) Close() error {
if !si.closed {
si.closed = true
si.pool.PutReader(si.reader)
BufReaderPool.Put(si.s)
BytesBufferPool.Put(si.buf)
si.s = nil
si.buf = nil
si.decBuf = nil
si.reader = nil
return si.err
}
return si.err
}

func (si *bufferedIterator) Error() error { return si.err }
func (si *bufferedIterator) Close() error { return si.err }
func (si *bufferedIterator) Labels() string { return "" }
Loading

0 comments on commit 3346ce1

Please sign in to comment.