diff --git a/tsdb/engine.go b/tsdb/engine.go index ba27d6ff2ec..73c74fa63be 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -45,6 +45,7 @@ type Engine interface { Export(w io.Writer, basePath string, start time.Time, end time.Time) error Restore(r io.Reader, basePath string) error Import(r io.Reader, basePath string) error + Digest() (io.ReadCloser, error) CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error) CreateCursor(ctx context.Context, r *CursorRequest) (Cursor, error) diff --git a/tsdb/engine/tsm1/digest.go b/tsdb/engine/tsm1/digest.go new file mode 100644 index 00000000000..7b3d9142a78 --- /dev/null +++ b/tsdb/engine/tsm1/digest.go @@ -0,0 +1,136 @@ +package tsm1 + +import ( + "bytes" + "fmt" + "io" + "math" + "os" + "path/filepath" + "sort" +) + +type DigestOptions struct { + MinTime, MaxTime int64 + MinKey, MaxKey []byte +} + +// DigestWithOptions writes a digest of dir to w using options to filter by +// time and key range. +func DigestWithOptions(dir string, opts DigestOptions, w io.WriteCloser) error { + if dir == "" { + return fmt.Errorf("dir is required") + } + + files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", TSMFileExtension))) + if err != nil { + return err + } + + readers := make([]*TSMReader, 0, len(files)) + + for _, fi := range files { + f, err := os.Open(fi) + if err != nil { + return err + } + + r, err := NewTSMReader(f) + if err != nil { + return err + } + readers = append(readers, r) + } + + ch := make([]chan seriesKey, 0, len(files)) + for _, fi := range files { + f, err := os.Open(fi) + if err != nil { + return err + } + + r, err := NewTSMReader(f) + if err != nil { + return err + } + defer r.Close() + + s := make(chan seriesKey) + ch = append(ch, s) + go func() { + for i := 0; i < r.KeyCount(); i++ { + key, typ := r.KeyAt(i) + if len(opts.MinKey) > 0 && bytes.Compare(key, opts.MinKey) < 0 { + continue + } + + if len(opts.MaxKey) > 0 && bytes.Compare(key, opts.MaxKey) > 0 { + continue + } + + s <- seriesKey{key: key, typ: typ} + } + close(s) + }() + + } + + dw, err := NewDigestWriter(w) + if err != nil { + return err + } + defer dw.Close() + + var n int + for key := range merge(ch...) { + + ts := &DigestTimeSpan{} + n++ + kstr := string(key.key) + + for _, r := range readers { + entries := r.Entries(key.key) + for _, entry := range entries { + crc, b, err := r.ReadBytes(&entry, nil) + if err != nil { + return err + } + + // Filter blocks that are outside the time filter. If they overlap, we + // still include them. + if entry.MaxTime < opts.MinTime || entry.MinTime > opts.MaxTime { + continue + } + + cnt := BlockCount(b) + ts.Add(entry.MinTime, entry.MaxTime, cnt, crc) + } + } + + sort.Sort(ts) + if err := dw.WriteTimeSpan(kstr, ts); err != nil { + return err + } + } + return dw.Close() +} + +// Digest writes a digest of dir to w of a full shard dir. +func Digest(dir string, w io.WriteCloser) error { + return DigestWithOptions(dir, DigestOptions{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, w) +} + +type rwPair struct { + r *TSMReader + w TSMWriter + outf *os.File +} + +func (rw *rwPair) close() { + rw.r.Close() + rw.w.Close() + rw.outf.Close() +} diff --git a/tsdb/engine/tsm1/digest_reader.go b/tsdb/engine/tsm1/digest_reader.go new file mode 100644 index 00000000000..215b3f60bb2 --- /dev/null +++ b/tsdb/engine/tsm1/digest_reader.go @@ -0,0 +1,66 @@ +package tsm1 + +import ( + "bufio" + "compress/gzip" + "encoding/binary" + "io" +) + +type DigestReader struct { + io.ReadCloser +} + +func NewDigestReader(r io.ReadCloser) (*DigestReader, error) { + gr, err := gzip.NewReader(bufio.NewReader(r)) + if err != nil { + return nil, err + } + return &DigestReader{ReadCloser: gr}, nil +} + +func (w *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) { + var n uint16 + if err := binary.Read(w.ReadCloser, binary.BigEndian, &n); err != nil { + return "", nil, err + } + + b := make([]byte, n) + if _, err := io.ReadFull(w.ReadCloser, b); err != nil { + return "", nil, err + } + + var cnt uint32 + if err := binary.Read(w.ReadCloser, binary.BigEndian, &cnt); err != nil { + return "", nil, err + } + + ts := &DigestTimeSpan{} + for i := 0; i < int(cnt); i++ { + var min, max int64 + var crc uint32 + + if err := binary.Read(w.ReadCloser, binary.BigEndian, &min); err != nil { + return "", nil, err + } + + if err := binary.Read(w.ReadCloser, binary.BigEndian, &max); err != nil { + return "", nil, err + } + + if err := binary.Read(w.ReadCloser, binary.BigEndian, &crc); err != nil { + return "", nil, err + } + + if err := binary.Read(w.ReadCloser, binary.BigEndian, &n); err != nil { + return "", nil, err + } + ts.Add(min, max, int(n), crc) + } + + return string(b), ts, nil +} + +func (w *DigestReader) Close() error { + return w.ReadCloser.Close() +} diff --git a/tsdb/engine/tsm1/digest_writer.go b/tsdb/engine/tsm1/digest_writer.go new file mode 100644 index 00000000000..bd523de5196 --- /dev/null +++ b/tsdb/engine/tsm1/digest_writer.go @@ -0,0 +1,95 @@ +package tsm1 + +import ( + "compress/gzip" + "encoding/binary" + "io" +) + +type writeFlushCloser interface { + Close() error + Write(b []byte) (int, error) + Flush() error +} + +// DigestWriter allows for writing a digest of a shard. A digest is a condensed +// representation of the contents of a shard. It can be scoped to one or more series +// keys, ranges of times or sets of files. +type DigestWriter struct { + F writeFlushCloser +} + +func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) { + gw := gzip.NewWriter(w) + return &DigestWriter{F: gw}, nil +} + +func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error { + if err := binary.Write(w.F, binary.BigEndian, uint16(len(key))); err != nil { + return err + } + + if _, err := w.F.Write([]byte(key)); err != nil { + return err + } + + if err := binary.Write(w.F, binary.BigEndian, uint32(t.Len())); err != nil { + return err + } + + for _, tr := range t.Ranges { + if err := binary.Write(w.F, binary.BigEndian, tr.Min); err != nil { + return err + } + + if err := binary.Write(w.F, binary.BigEndian, tr.Max); err != nil { + return err + } + + if err := binary.Write(w.F, binary.BigEndian, tr.CRC); err != nil { + return err + } + + if err := binary.Write(w.F, binary.BigEndian, uint16(tr.N)); err != nil { + return err + } + } + + return nil +} + +func (w *DigestWriter) Flush() error { + return w.F.Flush() +} + +func (w *DigestWriter) Close() error { + if err := w.Flush(); err != nil { + return err + } + return w.F.Close() +} + +type DigestTimeSpan struct { + Ranges []DigestTimeRange +} + +func (a DigestTimeSpan) Len() int { return len(a.Ranges) } +func (a DigestTimeSpan) Swap(i, j int) { a.Ranges[i], a.Ranges[j] = a.Ranges[j], a.Ranges[i] } +func (a DigestTimeSpan) Less(i, j int) bool { + return a.Ranges[i].Min < a.Ranges[j].Min +} + +func (t *DigestTimeSpan) Add(min, max int64, n int, crc uint32) { + for _, v := range t.Ranges { + if v.Min == min && v.Max == max && v.N == n && v.CRC == crc { + return + } + } + t.Ranges = append(t.Ranges, DigestTimeRange{Min: min, Max: max, N: n, CRC: crc}) +} + +type DigestTimeRange struct { + Min, Max int64 + N int + CRC uint32 +} diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 434d9e93ae3..f4a6c8a35a4 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -226,6 +226,48 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, return e } +// Digest returns a reader for the shard's digest. +func (e *Engine) Digest() (io.ReadCloser, error) { + digestPath := filepath.Join(e.path, "digest.tsd") + + // See if there's an existing digest file on disk. + f, err := os.Open(digestPath) + if err == nil { + // There is an existing digest file. Now see if it is still fresh. + fi, err := f.Stat() + if err != nil { + return nil, err + } + + if !e.LastModified().After(fi.ModTime()) { + // Existing digest is still fresh so return a reader for it. + return f, nil + } + } + + // Either no digest existed or the existing one was stale + // so generate a new digest. + + // Create a tmp file to write the digest to. + tf, err := os.Create(digestPath + ".tmp") + if err != nil { + return nil, err + } + + // Write the new digest to the tmp file. + if err := Digest(e.path, tf); err != nil { + tf.Close() + os.Remove(tf.Name()) + return nil, err + } + + // Rename the temporary digest file to the actual digest file. + renameFile(tf.Name(), digestPath) + + // Create and return a reader for the new digest file. + return os.Open(digestPath) +} + // SetEnabled sets whether the engine is enabled. func (e *Engine) SetEnabled(enabled bool) { e.enableCompactionsOnOpen = enabled diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 842d414c92f..e48121bf985 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -148,6 +148,165 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) { } } +// Ensure that the engine can write & read shard digest files. +func TestEngine_Digest(t *testing.T) { + // Create a tmp directory for test files. + tmpDir, err := ioutil.TempDir("", "TestEngine_Digest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + walPath := filepath.Join(tmpDir, "wal") + os.MkdirAll(walPath, 0777) + + idxPath := filepath.Join(tmpDir, "index") + + // Create an engine to write a tsm file. + dbName := "db0" + opt := tsdb.NewEngineOptions() + opt.InmemIndex = inmem.NewIndex(dbName) + idx := tsdb.MustOpenIndex(1, dbName, idxPath, opt) + defer idx.Close() + + e := tsm1.NewEngine(1, idx, dbName, tmpDir, walPath, opt).(*tsm1.Engine) + + if err := e.Open(); err != nil { + t.Fatalf("failed to open tsm1 engine: %s", err.Error()) + } + + // Create a few points. + points := []models.Point{ + MustParsePointString("cpu,host=A value=1.1 1000000000"), + MustParsePointString("cpu,host=B value=1.2 2000000000"), + } + + if err := e.WritePoints(points); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // Force a compaction. + e.ScheduleFullCompaction() + + digest := func() ([]span, error) { + // Get a reader for the shard's digest. + r, err := e.Digest() + if err != nil { + return nil, err + } + + // Make sure the digest can be read. + dr, err := tsm1.NewDigestReader(r) + if err != nil { + return nil, err + } + + got := []span{} + + for { + k, s, err := dr.ReadTimeSpan() + if err == io.EOF { + break + } else if err != nil { + return nil, err + } + + got = append(got, span{ + key: k, + tspan: s, + }) + } + + if err := dr.Close(); err != nil { + return nil, err + } + + return got, nil + } + + exp := []span{ + span{ + key: "cpu,host=A#!~#value", + tspan: &tsm1.DigestTimeSpan{ + Ranges: []tsm1.DigestTimeRange{ + tsm1.DigestTimeRange{ + Min: 1000000000, + Max: 1000000000, + N: 1, + CRC: 1048747083, + }, + }, + }, + }, + span{ + key: "cpu,host=B#!~#value", + tspan: &tsm1.DigestTimeSpan{ + Ranges: []tsm1.DigestTimeRange{ + tsm1.DigestTimeRange{ + Min: 2000000000, + Max: 2000000000, + N: 1, + CRC: 734984746, + }, + }, + }, + }, + } + + for n := 0; n < 2; n++ { + got, err := digest() + if err != nil { + t.Fatalf("n = %d: %s", n, err) + } + + // Make sure the data in the digest was valid. + if !reflect.DeepEqual(exp, got) { + t.Fatalf("n = %d\nexp = %v\ngot = %v\n", n, exp, got) + } + } + + // Test that writing more points causes the digest to be updated. + points = []models.Point{ + MustParsePointString("cpu,host=C value=1.1 3000000000"), + } + + if err := e.WritePoints(points); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // Force a compaction. + e.ScheduleFullCompaction() + + // Get new digest. + got, err := digest() + if err != nil { + t.Fatal(err) + } + + exp = append(exp, span{ + key: "cpu,host=C#!~#value", + tspan: &tsm1.DigestTimeSpan{ + Ranges: []tsm1.DigestTimeRange{ + tsm1.DigestTimeRange{ + Min: 3000000000, + Max: 3000000000, + N: 1, + CRC: 2553233514, + }, + }, + }, + }) + + if !reflect.DeepEqual(exp, got) { + t.Fatalf("\nexp = %v\ngot = %v\n", exp, got) + } +} + +type span struct { + key string + tspan *tsm1.DigestTimeSpan +} + // Ensure that the engine will backup any TSM files created since the passed in time func TestEngine_Backup(t *testing.T) { // Generate temporary file. diff --git a/tsdb/shard.go b/tsdb/shard.go index 492b447d40f..4beedcf3bd4 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -71,6 +71,10 @@ var ( // the file's magic number. ErrUnknownFieldsFormat = errors.New("unknown field index format") + // ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is + // attempted on a hot shard. + ErrShardNotIdle = errors.New("shard not idle") + // fieldsIndexMagicNumber is the file magic number for the fields index file. fieldsIndexMagicNumber = []byte{0, 6, 1, 3} ) @@ -1172,6 +1176,22 @@ func (s *Shard) TagKeyCardinality(name, key []byte) int { return engine.TagKeyCardinality(name, key) } +// Digest returns a digest of the shard. +func (s *Shard) Digest() (io.ReadCloser, error) { + engine, err := s.engine() + if err != nil { + return nil, err + } + + // Make sure the shard is idle/cold. (No use creating a digest of a + // hot shard that is rapidly changing.) + if !engine.IsIdle() { + return nil, ErrShardNotIdle + } + + return engine.Digest() +} + // engine safely (under an RLock) returns a reference to the shard's Engine, or // an error if the Engine is closed, or the shard is currently disabled. // diff --git a/tsdb/store.go b/tsdb/store.go index 6d3c6bdea29..a26e4e8fcf9 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -379,6 +379,16 @@ func (s *Store) ShardN() int { return len(s.shards) } +// ShardDigest returns a digest of the shard with the specified ID. +func (s *Store) ShardDigest(id uint64) (io.ReadCloser, error) { + sh := s.Shard(id) + if sh == nil { + return nil, ErrShardNotFound + } + + return sh.Digest() +} + // CreateShard creates a shard with the given id and retention policy on a database. func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error { s.mu.Lock()