Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch digests to use snappy compression #10215

Merged
merged 1 commit into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tsdb/engine/tsm1/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func DigestFresh(dir string, files []string, shardLastMod time.Time) (bool, stri
if err != nil {
return false, fmt.Sprintf("Can't read digest: err=%s", err)
}
defer dr.Close()

mfest, err := dr.ReadManifest()
if err != nil {
Expand Down
61 changes: 25 additions & 36 deletions tsdb/engine/tsm1/digest_reader.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,73 @@
package tsm1

import (
"bufio"
"compress/gzip"
"encoding/binary"
"encoding/json"
"fmt"
"io"

"github.com/golang/snappy"
)

type DigestReader struct {
r io.ReadCloser
gr *gzip.Reader
sr *snappy.Reader
}

func NewDigestReader(r io.ReadCloser) (*DigestReader, error) {
gr, err := gzip.NewReader(bufio.NewReader(r))
if err != nil {
return nil, err
}
return &DigestReader{r: r, gr: gr}, nil
return &DigestReader{r: r, sr: snappy.NewReader(r)}, nil
}

func (w *DigestReader) ReadManifest() (*DigestManifest, error) {
func (r *DigestReader) ReadManifest() (*DigestManifest, error) {
var n uint32
// Read manifest length.
if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil {
if err := binary.Read(r.sr, binary.BigEndian, &n); err != nil {
return nil, err
}

r := io.LimitReader(w.gr, int64(n))
lr := io.LimitReader(r.sr, int64(n))

m := &DigestManifest{}
return m, json.NewDecoder(r).Decode(m)
return m, json.NewDecoder(lr).Decode(m)
}

func (w *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) {
func (r *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) {
var n uint16
if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil {
if err := binary.Read(r.sr, binary.BigEndian, &n); err != nil {
return "", nil, err
}

b := make([]byte, n)
if _, err := io.ReadFull(w.gr, b); err != nil {
if _, err := io.ReadFull(r.sr, b); err != nil {
return "", nil, err
}

var cnt uint32
if err := binary.Read(w.gr, binary.BigEndian, &cnt); err != nil {
if err := binary.Read(r.sr, binary.BigEndian, &cnt); err != nil {
return "", nil, err
}

ts := &DigestTimeSpan{}
ts.Ranges = make([]DigestTimeRange, cnt)
for i := 0; i < int(cnt); i++ {
var min, max int64
var crc uint32

if err := binary.Read(w.gr, binary.BigEndian, &min); err != nil {
return "", nil, err
}
var buf [22]byte

if err := binary.Read(w.gr, binary.BigEndian, &max); err != nil {
n, err := io.ReadFull(r.sr, buf[:])
if err != nil {
return "", nil, err
} else if n != len(buf) {
return "", nil, fmt.Errorf("read %d bytes, expected %d, data %v", n, len(buf), buf[:n])
}

if err := binary.Read(w.gr, binary.BigEndian, &crc); err != nil {
return "", nil, err
}

if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil {
return "", nil, err
}
ts.Add(min, max, int(n), crc)
ts.Ranges[i].Min = int64(binary.BigEndian.Uint64(buf[0:]))
ts.Ranges[i].Max = int64(binary.BigEndian.Uint64(buf[8:]))
ts.Ranges[i].CRC = binary.BigEndian.Uint32(buf[16:])
ts.Ranges[i].N = int(binary.BigEndian.Uint16(buf[20:]))
}

return string(b), ts, nil
}

func (w *DigestReader) Close() error {
if err := w.gr.Close(); err != nil {
return err
}
return w.r.Close()
func (r *DigestReader) Close() error {
return r.r.Close()
}
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/digest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func TestDigest_Manifest(t *testing.T) {
t.Fatal(err)
}

if err := df.Close(); err != nil {
if err := r.Close(); err != nil {
t.Fatal(err)
}

Expand Down
38 changes: 16 additions & 22 deletions tsdb/engine/tsm1/digest_writer.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
package tsm1

import (
"compress/gzip"
"encoding/binary"
"encoding/json"
"io"
)

type writeFlushCloser interface {
Close() error
Write(b []byte) (int, error)
Flush() error
}
"github.com/golang/snappy"
)

// DigestWriter allows for writing a digest of a shard. A digest is a condensed
// representation of the contents of a shard. It can be scoped to one or more series
// keys, ranges of times or sets of files.
type DigestWriter struct {
w io.WriteCloser
F writeFlushCloser
w io.WriteCloser
sw *snappy.Writer
}

func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) {
gw := gzip.NewWriter(w)
return &DigestWriter{w: w, F: gw}, nil
return &DigestWriter{w: w, sw: snappy.NewBufferedWriter(w)}, nil
}

func (w *DigestWriter) WriteManifest(m *DigestManifest) error {
Expand All @@ -33,42 +27,42 @@ func (w *DigestWriter) WriteManifest(m *DigestManifest) error {
}

// Write length of manifest.
if err := binary.Write(w.F, binary.BigEndian, uint32(len(b))); err != nil {
if err := binary.Write(w.sw, binary.BigEndian, uint32(len(b))); err != nil {
return err
}

// Write manifest.
_, err = w.F.Write(b)
_, err = w.sw.Write(b)
return err
}

func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error {
if err := binary.Write(w.F, binary.BigEndian, uint16(len(key))); err != nil {
if err := binary.Write(w.sw, binary.BigEndian, uint16(len(key))); err != nil {
return err
}

if _, err := w.F.Write([]byte(key)); err != nil {
if _, err := w.sw.Write([]byte(key)); err != nil {
return err
}

if err := binary.Write(w.F, binary.BigEndian, uint32(t.Len())); err != nil {
if err := binary.Write(w.sw, binary.BigEndian, uint32(t.Len())); err != nil {
return err
}

for _, tr := range t.Ranges {
if err := binary.Write(w.F, binary.BigEndian, tr.Min); err != nil {
if err := binary.Write(w.sw, binary.BigEndian, tr.Min); err != nil {
return err
}

if err := binary.Write(w.F, binary.BigEndian, tr.Max); err != nil {
if err := binary.Write(w.sw, binary.BigEndian, tr.Max); err != nil {
return err
}

if err := binary.Write(w.F, binary.BigEndian, tr.CRC); err != nil {
if err := binary.Write(w.sw, binary.BigEndian, tr.CRC); err != nil {
return err
}

if err := binary.Write(w.F, binary.BigEndian, uint16(tr.N)); err != nil {
if err := binary.Write(w.sw, binary.BigEndian, uint16(tr.N)); err != nil {
return err
}
}
Expand All @@ -77,15 +71,15 @@ func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error {
}

func (w *DigestWriter) Flush() error {
return w.F.Flush()
return w.sw.Flush()
}

func (w *DigestWriter) Close() error {
if err := w.Flush(); err != nil {
return err
}

if err := w.F.Close(); err != nil {
if err := w.sw.Close(); err != nil {
return err
}

Expand Down