Skip to content

Commit

Permalink
Merge pull request #9213 from influxdata/dn-generate-digests
Browse files Browse the repository at this point in the history
feat #9212: add ability to generate shard digests
  • Loading branch information
dgnorton authored Dec 13, 2017
2 parents cfc7428 + 253ea7c commit 44a06e2
Show file tree
Hide file tree
Showing 11 changed files with 834 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards.
- [#9181](https://github.com/influxdata/influxdb/pull/9181): Schedule a full compaction after a successful import
- [#9218](https://github.com/influxdata/influxdb/pull/9218): Add Prometheus `/metrics` endpoint.
- [#9213](https://github.com/influxdata/influxdb/pull/9213): Add ability to generate shard digests.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Engine interface {
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
Digest() (io.ReadCloser, error)

CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
CreateCursor(ctx context.Context, r *CursorRequest) (Cursor, error)
Expand Down
136 changes: 136 additions & 0 deletions tsdb/engine/tsm1/digest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package tsm1

import (
"bytes"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
)

type DigestOptions struct {
MinTime, MaxTime int64
MinKey, MaxKey []byte
}

// DigestWithOptions writes a digest of dir to w using options to filter by
// time and key range.
func DigestWithOptions(dir string, opts DigestOptions, w io.WriteCloser) error {
if dir == "" {
return fmt.Errorf("dir is required")
}

files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", TSMFileExtension)))
if err != nil {
return err
}

readers := make([]*TSMReader, 0, len(files))

for _, fi := range files {
f, err := os.Open(fi)
if err != nil {
return err
}

r, err := NewTSMReader(f)
if err != nil {
return err
}
readers = append(readers, r)
}

ch := make([]chan seriesKey, 0, len(files))
for _, fi := range files {
f, err := os.Open(fi)
if err != nil {
return err
}

r, err := NewTSMReader(f)
if err != nil {
return err
}
defer r.Close()

s := make(chan seriesKey)
ch = append(ch, s)
go func() {
for i := 0; i < r.KeyCount(); i++ {
key, typ := r.KeyAt(i)
if len(opts.MinKey) > 0 && bytes.Compare(key, opts.MinKey) < 0 {
continue
}

if len(opts.MaxKey) > 0 && bytes.Compare(key, opts.MaxKey) > 0 {
continue
}

s <- seriesKey{key: key, typ: typ}
}
close(s)
}()

}

dw, err := NewDigestWriter(w)
if err != nil {
return err
}
defer dw.Close()

var n int
for key := range merge(ch...) {

ts := &DigestTimeSpan{}
n++
kstr := string(key.key)

for _, r := range readers {
entries := r.Entries(key.key)
for _, entry := range entries {
crc, b, err := r.ReadBytes(&entry, nil)
if err != nil {
return err
}

// Filter blocks that are outside the time filter. If they overlap, we
// still include them.
if entry.MaxTime < opts.MinTime || entry.MinTime > opts.MaxTime {
continue
}

cnt := BlockCount(b)
ts.Add(entry.MinTime, entry.MaxTime, cnt, crc)
}
}

sort.Sort(ts)
if err := dw.WriteTimeSpan(kstr, ts); err != nil {
return err
}
}
return dw.Close()
}

// Digest writes a digest of dir to w of a full shard dir.
func Digest(dir string, w io.WriteCloser) error {
return DigestWithOptions(dir, DigestOptions{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
}, w)
}

type rwPair struct {
r *TSMReader
w TSMWriter
outf *os.File
}

func (rw *rwPair) close() {
rw.r.Close()
rw.w.Close()
rw.outf.Close()
}
70 changes: 70 additions & 0 deletions tsdb/engine/tsm1/digest_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package tsm1

import (
"bufio"
"compress/gzip"
"encoding/binary"
"io"
)

type DigestReader struct {
r io.ReadCloser
gr *gzip.Reader
}

func NewDigestReader(r io.ReadCloser) (*DigestReader, error) {
gr, err := gzip.NewReader(bufio.NewReader(r))
if err != nil {
return nil, err
}
return &DigestReader{r: r, gr: gr}, nil
}

func (w *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) {
var n uint16
if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil {
return "", nil, err
}

b := make([]byte, n)
if _, err := io.ReadFull(w.gr, b); err != nil {
return "", nil, err
}

var cnt uint32
if err := binary.Read(w.gr, binary.BigEndian, &cnt); err != nil {
return "", nil, err
}

ts := &DigestTimeSpan{}
for i := 0; i < int(cnt); i++ {
var min, max int64
var crc uint32

if err := binary.Read(w.gr, binary.BigEndian, &min); err != nil {
return "", nil, err
}

if err := binary.Read(w.gr, binary.BigEndian, &max); err != nil {
return "", nil, err
}

if err := binary.Read(w.gr, binary.BigEndian, &crc); err != nil {
return "", nil, err
}

if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil {
return "", nil, err
}
ts.Add(min, max, int(n), crc)
}

return string(b), ts, nil
}

func (w *DigestReader) Close() error {
if err := w.gr.Close(); err != nil {
return err
}
return w.r.Close()
}
Loading

0 comments on commit 44a06e2

Please sign in to comment.