Skip to content

Commit

Permalink
feat #9212: add ability to generate shard digests
Browse files Browse the repository at this point in the history
  • Loading branch information
dgnorton committed Dec 13, 2017
1 parent cfc7428 commit 4e13248
Show file tree
Hide file tree
Showing 8 changed files with 529 additions and 0 deletions.
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Engine interface {
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
Digest() (io.ReadCloser, error)

CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
CreateCursor(ctx context.Context, r *CursorRequest) (Cursor, error)
Expand Down
136 changes: 136 additions & 0 deletions tsdb/engine/tsm1/digest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package tsm1

import (
"bytes"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
)

type DigestOptions struct {
MinTime, MaxTime int64
MinKey, MaxKey []byte
}

// DigestWithOptions writes a digest of dir to w using options to filter by
// time and key range.
func DigestWithOptions(dir string, opts DigestOptions, w io.WriteCloser) error {
if dir == "" {
return fmt.Errorf("dir is required")
}

files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", TSMFileExtension)))
if err != nil {
return err
}

readers := make([]*TSMReader, 0, len(files))

for _, fi := range files {
f, err := os.Open(fi)
if err != nil {
return err
}

r, err := NewTSMReader(f)
if err != nil {
return err
}
readers = append(readers, r)
}

ch := make([]chan seriesKey, 0, len(files))
for _, fi := range files {
f, err := os.Open(fi)
if err != nil {
return err
}

r, err := NewTSMReader(f)
if err != nil {
return err
}
defer r.Close()

s := make(chan seriesKey)
ch = append(ch, s)
go func() {
for i := 0; i < r.KeyCount(); i++ {
key, typ := r.KeyAt(i)
if len(opts.MinKey) > 0 && bytes.Compare(key, opts.MinKey) < 0 {
continue
}

if len(opts.MaxKey) > 0 && bytes.Compare(key, opts.MaxKey) > 0 {
continue
}

s <- seriesKey{key: key, typ: typ}
}
close(s)
}()

}

dw, err := NewDigestWriter(w)
if err != nil {
return err
}
defer dw.Close()

var n int
for key := range merge(ch...) {

ts := &DigestTimeSpan{}
n++
kstr := string(key.key)

for _, r := range readers {
entries := r.Entries(key.key)
for _, entry := range entries {
crc, b, err := r.ReadBytes(&entry, nil)
if err != nil {
return err
}

// Filter blocks that are outside the time filter. If they overlap, we
// still include them.
if entry.MaxTime < opts.MinTime || entry.MinTime > opts.MaxTime {
continue
}

cnt := BlockCount(b)
ts.Add(entry.MinTime, entry.MaxTime, cnt, crc)
}
}

sort.Sort(ts)
if err := dw.WriteTimeSpan(kstr, ts); err != nil {
return err
}
}
return dw.Close()
}

// Digest writes a digest of dir to w of a full shard dir.
func Digest(dir string, w io.WriteCloser) error {
return DigestWithOptions(dir, DigestOptions{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
}, w)
}

type rwPair struct {
r *TSMReader
w TSMWriter
outf *os.File
}

func (rw *rwPair) close() {
rw.r.Close()
rw.w.Close()
rw.outf.Close()
}
66 changes: 66 additions & 0 deletions tsdb/engine/tsm1/digest_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package tsm1

import (
"bufio"
"compress/gzip"
"encoding/binary"
"io"
)

type DigestReader struct {
io.ReadCloser
}

func NewDigestReader(r io.ReadCloser) (*DigestReader, error) {
gr, err := gzip.NewReader(bufio.NewReader(r))
if err != nil {
return nil, err
}
return &DigestReader{ReadCloser: gr}, nil
}

func (w *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) {
var n uint16
if err := binary.Read(w.ReadCloser, binary.BigEndian, &n); err != nil {
return "", nil, err
}

b := make([]byte, n)
if _, err := io.ReadFull(w.ReadCloser, b); err != nil {
return "", nil, err
}

var cnt uint32
if err := binary.Read(w.ReadCloser, binary.BigEndian, &cnt); err != nil {
return "", nil, err
}

ts := &DigestTimeSpan{}
for i := 0; i < int(cnt); i++ {
var min, max int64
var crc uint32

if err := binary.Read(w.ReadCloser, binary.BigEndian, &min); err != nil {
return "", nil, err
}

if err := binary.Read(w.ReadCloser, binary.BigEndian, &max); err != nil {
return "", nil, err
}

if err := binary.Read(w.ReadCloser, binary.BigEndian, &crc); err != nil {
return "", nil, err
}

if err := binary.Read(w.ReadCloser, binary.BigEndian, &n); err != nil {
return "", nil, err
}
ts.Add(min, max, int(n), crc)
}

return string(b), ts, nil
}

func (w *DigestReader) Close() error {
return w.ReadCloser.Close()
}
95 changes: 95 additions & 0 deletions tsdb/engine/tsm1/digest_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package tsm1

import (
"compress/gzip"
"encoding/binary"
"io"
)

type writeFlushCloser interface {
Close() error
Write(b []byte) (int, error)
Flush() error
}

// DigestWriter allows for writing a digest of a shard. A digest is a condensed
// representation of the contents of a shard. It can be scoped to one or more series
// keys, ranges of times or sets of files.
type DigestWriter struct {
F writeFlushCloser
}

func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) {
gw := gzip.NewWriter(w)
return &DigestWriter{F: gw}, nil
}

func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error {
if err := binary.Write(w.F, binary.BigEndian, uint16(len(key))); err != nil {
return err
}

if _, err := w.F.Write([]byte(key)); err != nil {
return err
}

if err := binary.Write(w.F, binary.BigEndian, uint32(t.Len())); err != nil {
return err
}

for _, tr := range t.Ranges {
if err := binary.Write(w.F, binary.BigEndian, tr.Min); err != nil {
return err
}

if err := binary.Write(w.F, binary.BigEndian, tr.Max); err != nil {
return err
}

if err := binary.Write(w.F, binary.BigEndian, tr.CRC); err != nil {
return err
}

if err := binary.Write(w.F, binary.BigEndian, uint16(tr.N)); err != nil {
return err
}
}

return nil
}

func (w *DigestWriter) Flush() error {
return w.F.Flush()
}

func (w *DigestWriter) Close() error {
if err := w.Flush(); err != nil {
return err
}
return w.F.Close()
}

type DigestTimeSpan struct {
Ranges []DigestTimeRange
}

func (a DigestTimeSpan) Len() int { return len(a.Ranges) }
func (a DigestTimeSpan) Swap(i, j int) { a.Ranges[i], a.Ranges[j] = a.Ranges[j], a.Ranges[i] }
func (a DigestTimeSpan) Less(i, j int) bool {
return a.Ranges[i].Min < a.Ranges[j].Min
}

func (t *DigestTimeSpan) Add(min, max int64, n int, crc uint32) {
for _, v := range t.Ranges {
if v.Min == min && v.Max == max && v.N == n && v.CRC == crc {
return
}
}
t.Ranges = append(t.Ranges, DigestTimeRange{Min: min, Max: max, N: n, CRC: crc})
}

type DigestTimeRange struct {
Min, Max int64
N int
CRC uint32
}
42 changes: 42 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,48 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
return e
}

// Digest returns a reader for the shard's digest.
func (e *Engine) Digest() (io.ReadCloser, error) {
digestPath := filepath.Join(e.path, "digest.tsd")

// See if there's an existing digest file on disk.
f, err := os.Open(digestPath)
if err == nil {
// There is an existing digest file. Now see if it is still fresh.
fi, err := f.Stat()
if err != nil {
return nil, err
}

if !e.LastModified().After(fi.ModTime()) {
// Existing digest is still fresh so return a reader for it.
return f, nil
}
}

// Either no digest existed or the existing one was stale
// so generate a new digest.

// Create a tmp file to write the digest to.
tf, err := os.Create(digestPath + ".tmp")
if err != nil {
return nil, err
}

// Write the new digest to the tmp file.
if err := Digest(e.path, tf); err != nil {
tf.Close()
os.Remove(tf.Name())
return nil, err
}

// Rename the temporary digest file to the actual digest file.
renameFile(tf.Name(), digestPath)

// Create and return a reader for the new digest file.
return os.Open(digestPath)
}

// SetEnabled sets whether the engine is enabled.
func (e *Engine) SetEnabled(enabled bool) {
e.enableCompactionsOnOpen = enabled
Expand Down
Loading

0 comments on commit 4e13248

Please sign in to comment.