From 2f6a1fc03b4558ef4a532fef49ac15e3035d0bf8 Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 21 Aug 2018 13:18:21 -0400 Subject: [PATCH] switch digests to use snappy compression --- tsdb/engine/tsm1/digest.go | 1 + tsdb/engine/tsm1/digest_reader.go | 61 +++++++++++++------------------ tsdb/engine/tsm1/digest_test.go | 2 +- tsdb/engine/tsm1/digest_writer.go | 38 ++++++++----------- 4 files changed, 43 insertions(+), 59 deletions(-) diff --git a/tsdb/engine/tsm1/digest.go b/tsdb/engine/tsm1/digest.go index a2a9b7a4900..8034827aa45 100644 --- a/tsdb/engine/tsm1/digest.go +++ b/tsdb/engine/tsm1/digest.go @@ -142,6 +142,7 @@ func DigestFresh(dir string, files []string, shardLastMod time.Time) (bool, stri if err != nil { return false, fmt.Sprintf("Can't read digest: err=%s", err) } + defer dr.Close() mfest, err := dr.ReadManifest() if err != nil { diff --git a/tsdb/engine/tsm1/digest_reader.go b/tsdb/engine/tsm1/digest_reader.go index 87b895ad670..e7bc5b8ad23 100644 --- a/tsdb/engine/tsm1/digest_reader.go +++ b/tsdb/engine/tsm1/digest_reader.go @@ -1,84 +1,73 @@ package tsm1 import ( - "bufio" - "compress/gzip" "encoding/binary" "encoding/json" + "fmt" "io" + + "github.com/golang/snappy" ) type DigestReader struct { r io.ReadCloser - gr *gzip.Reader + sr *snappy.Reader } func NewDigestReader(r io.ReadCloser) (*DigestReader, error) { - gr, err := gzip.NewReader(bufio.NewReader(r)) - if err != nil { - return nil, err - } - return &DigestReader{r: r, gr: gr}, nil + return &DigestReader{r: r, sr: snappy.NewReader(r)}, nil } -func (w *DigestReader) ReadManifest() (*DigestManifest, error) { +func (r *DigestReader) ReadManifest() (*DigestManifest, error) { var n uint32 // Read manifest length. - if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil { + if err := binary.Read(r.sr, binary.BigEndian, &n); err != nil { return nil, err } - r := io.LimitReader(w.gr, int64(n)) + lr := io.LimitReader(r.sr, int64(n)) m := &DigestManifest{} - return m, json.NewDecoder(r).Decode(m) + return m, json.NewDecoder(lr).Decode(m) } -func (w *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) { +func (r *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) { var n uint16 - if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil { + if err := binary.Read(r.sr, binary.BigEndian, &n); err != nil { return "", nil, err } b := make([]byte, n) - if _, err := io.ReadFull(w.gr, b); err != nil { + if _, err := io.ReadFull(r.sr, b); err != nil { return "", nil, err } var cnt uint32 - if err := binary.Read(w.gr, binary.BigEndian, &cnt); err != nil { + if err := binary.Read(r.sr, binary.BigEndian, &cnt); err != nil { return "", nil, err } ts := &DigestTimeSpan{} + ts.Ranges = make([]DigestTimeRange, cnt) for i := 0; i < int(cnt); i++ { - var min, max int64 - var crc uint32 - - if err := binary.Read(w.gr, binary.BigEndian, &min); err != nil { - return "", nil, err - } + var buf [22]byte - if err := binary.Read(w.gr, binary.BigEndian, &max); err != nil { + n, err := io.ReadFull(r.sr, buf[:]) + if err != nil { return "", nil, err + } else if n != len(buf) { + return "", nil, fmt.Errorf("read %d bytes, expected %d, data %v", n, len(buf), buf[:n]) } - if err := binary.Read(w.gr, binary.BigEndian, &crc); err != nil { - return "", nil, err - } - - if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil { - return "", nil, err - } - ts.Add(min, max, int(n), crc) + ts.Ranges[i].Min = int64(binary.BigEndian.Uint64(buf[0:])) + ts.Ranges[i].Max = int64(binary.BigEndian.Uint64(buf[8:])) + ts.Ranges[i].CRC = binary.BigEndian.Uint32(buf[16:]) + ts.Ranges[i].N = int(binary.BigEndian.Uint16(buf[20:])) } return string(b), ts, nil } -func (w *DigestReader) Close() error { - if err := w.gr.Close(); err != nil { - return err - } - return w.r.Close() +func (r *DigestReader) Close() error { + return r.r.Close() } diff --git a/tsdb/engine/tsm1/digest_test.go b/tsdb/engine/tsm1/digest_test.go index ef94f25788c..22dce4fefc7 100644 --- a/tsdb/engine/tsm1/digest_test.go +++ b/tsdb/engine/tsm1/digest_test.go @@ -347,7 +347,7 @@ func TestDigest_Manifest(t *testing.T) { t.Fatal(err) } - if err := df.Close(); err != nil { + if err := r.Close(); err != nil { t.Fatal(err) } diff --git a/tsdb/engine/tsm1/digest_writer.go b/tsdb/engine/tsm1/digest_writer.go index 2e3c5e7c7fc..29efd7450ff 100644 --- a/tsdb/engine/tsm1/digest_writer.go +++ b/tsdb/engine/tsm1/digest_writer.go @@ -1,29 +1,23 @@ package tsm1 import ( - "compress/gzip" "encoding/binary" "encoding/json" "io" -) -type writeFlushCloser interface { - Close() error - Write(b []byte) (int, error) - Flush() error -} + "github.com/golang/snappy" +) // DigestWriter allows for writing a digest of a shard. A digest is a condensed // representation of the contents of a shard. It can be scoped to one or more series // keys, ranges of times or sets of files. type DigestWriter struct { - w io.WriteCloser - F writeFlushCloser + w io.WriteCloser + sw *snappy.Writer } func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) { - gw := gzip.NewWriter(w) - return &DigestWriter{w: w, F: gw}, nil + return &DigestWriter{w: w, sw: snappy.NewBufferedWriter(w)}, nil } func (w *DigestWriter) WriteManifest(m *DigestManifest) error { @@ -33,42 +27,42 @@ func (w *DigestWriter) WriteManifest(m *DigestManifest) error { } // Write length of manifest. - if err := binary.Write(w.F, binary.BigEndian, uint32(len(b))); err != nil { + if err := binary.Write(w.sw, binary.BigEndian, uint32(len(b))); err != nil { return err } // Write manifest. - _, err = w.F.Write(b) + _, err = w.sw.Write(b) return err } func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error { - if err := binary.Write(w.F, binary.BigEndian, uint16(len(key))); err != nil { + if err := binary.Write(w.sw, binary.BigEndian, uint16(len(key))); err != nil { return err } - if _, err := w.F.Write([]byte(key)); err != nil { + if _, err := w.sw.Write([]byte(key)); err != nil { return err } - if err := binary.Write(w.F, binary.BigEndian, uint32(t.Len())); err != nil { + if err := binary.Write(w.sw, binary.BigEndian, uint32(t.Len())); err != nil { return err } for _, tr := range t.Ranges { - if err := binary.Write(w.F, binary.BigEndian, tr.Min); err != nil { + if err := binary.Write(w.sw, binary.BigEndian, tr.Min); err != nil { return err } - if err := binary.Write(w.F, binary.BigEndian, tr.Max); err != nil { + if err := binary.Write(w.sw, binary.BigEndian, tr.Max); err != nil { return err } - if err := binary.Write(w.F, binary.BigEndian, tr.CRC); err != nil { + if err := binary.Write(w.sw, binary.BigEndian, tr.CRC); err != nil { return err } - if err := binary.Write(w.F, binary.BigEndian, uint16(tr.N)); err != nil { + if err := binary.Write(w.sw, binary.BigEndian, uint16(tr.N)); err != nil { return err } } @@ -77,7 +71,7 @@ func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error { } func (w *DigestWriter) Flush() error { - return w.F.Flush() + return w.sw.Flush() } func (w *DigestWriter) Close() error { @@ -85,7 +79,7 @@ func (w *DigestWriter) Close() error { return err } - if err := w.F.Close(); err != nil { + if err := w.sw.Close(); err != nil { return err }