Skip to content

Commit

Permalink
Merge pull request #6483 from influxdata/jw-delete-series
Browse files Browse the repository at this point in the history
Delete series support for TSM
  • Loading branch information
jwilder committed Apr 27, 2016
2 parents 1d9919a + e7891cb commit 51b1af4
Show file tree
Hide file tree
Showing 23 changed files with 2,264 additions and 656 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- [#3558](https://github.com/influxdata/influxdb/issues/3558): Support field math inside a WHERE clause.
- [#6429](https://github.com/influxdata/influxdb/issues/6429): Log slow queries if they pass a configurable threshold.
- [#4675](https://github.com/influxdata/influxdb/issues/4675): Allow derivative() function to be used with ORDER BY desc.
- [#6483](https://github.com/influxdata/influxdb/pull/6483): Delete series support for TSM

### Bugfixes

Expand All @@ -41,6 +42,7 @@
- [#6361](https://github.com/influxdata/influxdb/pull/6361): Fix cluster/pool release of connection
- [#6470](https://github.com/influxdata/influxdb/pull/6470): Remove SHOW SERVERS & DROP SERVER support
- [#6477] (https://github.com/influxdata/influxdb/pull/6477): Don't catch SIGQUIT or SIGHUP signals.
- [#6468](https://github.com/influxdata/influxdb/issues/6468): Panic with truncated wal segments

## v0.12.2 [2016-04-20]

Expand Down
15 changes: 8 additions & 7 deletions cmd/influx_inspect/tsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,16 +470,15 @@ func cmdDumpTsm1dev(opts *tsdmDumpOpts) {
}
b := make([]byte, 8)

r, err := tsm1.NewTSMReaderWithOptions(tsm1.TSMReaderOptions{
MMAPFile: f,
})
r, err := tsm1.NewTSMReader(f)
if err != nil {
println("Error opening TSM files: ", err.Error())
os.Exit(1)
}
defer r.Close()

minTime, maxTime := r.TimeRange()
keys := r.Keys()
keyCount := r.KeyCount()

blockStats := &blockStats{}

Expand All @@ -490,14 +489,15 @@ func cmdDumpTsm1dev(opts *tsdmDumpOpts) {
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
)
fmt.Printf(" Duration: %s ", time.Unix(0, maxTime).Sub(time.Unix(0, minTime)))
fmt.Printf(" Series: %d ", len(keys))
fmt.Printf(" Series: %d ", keyCount)
fmt.Printf(" File Size: %d\n", stat.Size())
println()

tw := tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0)
fmt.Fprintln(tw, " "+strings.Join([]string{"Pos", "Min Time", "Max Time", "Ofs", "Size", "Key", "Field"}, "\t"))
var pos int
for _, key := range keys {
for i := 0; i < keyCount; i++ {
key, _ := r.KeyAt(i)
for _, e := range r.Entries(key) {
pos++
split := strings.Split(key, "#!~#")
Expand Down Expand Up @@ -539,7 +539,8 @@ func cmdDumpTsm1dev(opts *tsdmDumpOpts) {
indexSize := r.IndexSize()

// Start at the beginning and read every block
for _, key := range keys {
for j := 0; j < keyCount; j++ {
key, _ := r.KeyAt(j)
for _, e := range r.Entries(key) {

f.Seek(int64(e.Offset), 0)
Expand Down
6 changes: 2 additions & 4 deletions cmd/influx_inspect/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ func cmdVerify(path string) {
os.Exit(1)
}

reader, err := tsm1.NewTSMReaderWithOptions(tsm1.TSMReaderOptions{
MMAPFile: file,
})

reader, err := tsm1.NewTSMReader(file)
if err != nil {
fmt.Printf("%v", err)
os.Exit(1)
Expand All @@ -73,6 +70,7 @@ func cmdVerify(path string) {
if brokenFileBlocks == 0 {
fmt.Fprintf(tw, "%s: healthy\n", f)
}
reader.Close()
}

fmt.Fprintf(tw, "Broken Blocks: %d / %d, in %vs\n", brokenBlocks, totalBlocks, time.Since(start).Seconds())
Expand Down
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Engine interface {
SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error)
WritePoints(points []models.Point) error
DeleteSeries(keys []string) error
DeleteSeriesRange(keys []string, min, max int64) error
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)
MeasurementFields(measurement string) *MeasurementFields
Expand Down
44 changes: 41 additions & 3 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log"
"math"
"os"
"sort"
"sync"
Expand Down Expand Up @@ -80,10 +81,27 @@ func (e *entry) deduplicate() {
e.needSort = false
}

// count returns number of values for this entry
func (e *entry) count() int {
e.mu.RLock()
defer e.mu.RUnlock()
return len(e.values)
n := len(e.values)
e.mu.RUnlock()
return n
}

// filter removes all values between min and max inclusive
func (e *entry) filter(min, max int64) {
e.mu.Lock()
e.values = e.values.Filter(min, max)
e.mu.Unlock()
}

// size returns the size of this entry in bytes
func (e *entry) size() int {
e.mu.RLock()
sz := e.values.Size()
e.mu.RUnlock()
return sz
}

// Statistics gathered by the Cache.
Expand Down Expand Up @@ -306,11 +324,29 @@ func (c *Cache) Values(key string) Values {

// Delete will remove the keys from the cache
func (c *Cache) Delete(keys []string) {
c.DeleteRange(keys, math.MinInt64, math.MaxInt64)
}

// DeleteRange will remove the values for all keys containing points
// between min and max from the cache.
func (c *Cache) DeleteRange(keys []string, min, max int64) {
c.mu.Lock()
defer c.mu.Unlock()

for _, k := range keys {
delete(c.store, k)
origSize := c.store[k].size()
if min == math.MinInt64 && max == math.MaxInt64 {
c.size -= uint64(origSize)
delete(c.store, k)
continue
}

c.store[k].filter(min, max)
if c.store[k].count() == 0 {
delete(c.store, k)
}

c.size -= uint64(origSize - c.store[k].size())
}
}

Expand Down Expand Up @@ -482,6 +518,8 @@ func (cl *CacheLoader) Load(cache *Cache) error {
if err := cache.WriteMulti(t.Values); err != nil {
return err
}
case *DeleteRangeWALEntry:
cache.DeleteRange(t.Keys, t.Min, t.Max)
case *DeleteWALEntry:
cache.Delete(t.Keys)
}
Expand Down
172 changes: 172 additions & 0 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsm1
import (
"fmt"
"io/ioutil"
"math"
"os"
"reflect"
"testing"
Expand Down Expand Up @@ -72,6 +73,118 @@ func TestCache_CacheWriteMulti(t *testing.T) {
}
}

func TestCache_Cache_DeleteRange(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
v2 := NewValue(3, 3.0)
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3*valuesSize, "")

if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != 2*valuesSize {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}

if exp, keys := []string{"bar", "foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}

c.DeleteRange([]string{"bar"}, 2, math.MaxInt64)

if exp, keys := []string{"bar", "foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}

if got, exp := c.Size(), valuesSize+uint64(v0.Size()); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}

if got, exp := len(c.Values("bar")), 1; got != exp {
t.Fatalf("cache values mismatch: got %v, exp %v", got, exp)
}

if got, exp := len(c.Values("foo")), 3; got != exp {
t.Fatalf("cache values mismatch: got %v, exp %v", got, exp)
}
}

func TestCache_DeleteRange_NoValues(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
v2 := NewValue(3, 3.0)
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3*valuesSize, "")

if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != valuesSize {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}

if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}

c.DeleteRange([]string{"foo"}, math.MinInt64, math.MaxInt64)

if exp, keys := 0, len(c.Keys()); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}

if got, exp := c.Size(), uint64(0); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}

if got, exp := len(c.Values("foo")), 0; got != exp {
t.Fatalf("cache values mismatch: got %v, exp %v", got, exp)
}
}
func TestCache_Cache_Delete(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
v2 := NewValue(3, 3.0)
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3*valuesSize, "")

if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != 2*valuesSize {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}

if exp, keys := []string{"bar", "foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}

c.Delete([]string{"bar"})

if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}

if got, exp := c.Size(), valuesSize; exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}

if got, exp := len(c.Values("bar")), 0; got != exp {
t.Fatalf("cache values mismatch: got %v, exp %v", got, exp)
}

if got, exp := len(c.Values("foo")), 3; got != exp {
t.Fatalf("cache values mismatch: got %v, exp %v", got, exp)
}
}

// This tests writing two batches to the same series. The first batch
// is sorted. The second batch is also sorted but contains duplicates.
func TestCache_CacheWriteMulti_Duplicates(t *testing.T) {
Expand Down Expand Up @@ -406,6 +519,65 @@ func TestCacheLoader_LoadDouble(t *testing.T) {
}
}

// Ensure the CacheLoader can load deleted series
func TestCacheLoader_LoadDeleted(t *testing.T) {
// Create a WAL segment.
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w := NewWALSegmentWriter(f)

p1 := NewValue(1, 1.0)
p2 := NewValue(2, 2.0)
p3 := NewValue(3, 3.0)

values := map[string][]Value{
"foo": []Value{p1, p2, p3},
}

entry := &WriteWALEntry{
Values: values,
}

if err := w.Write(mustMarshalEntry(entry)); err != nil {
t.Fatal("write points", err)
}

dentry := &DeleteRangeWALEntry{
Keys: []string{"foo"},
Min: 2,
Max: 3,
}

if err := w.Write(mustMarshalEntry(dentry)); err != nil {
t.Fatal("write points", err)
}

// Load the cache using the segment.
cache := NewCache(1024, "")
loader := NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
}

// Check the cache.
if values := cache.Values("foo"); !reflect.DeepEqual(values, Values{p1}) {
t.Fatalf("cache key foo not as expected, got %v, exp %v", values, Values{p1})
}

// Reload the cache using the segment.
cache = NewCache(1024, "")
loader = NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
}

// Check the cache.
if values := cache.Values("foo"); !reflect.DeepEqual(values, Values{p1}) {
t.Fatalf("cache key foo not as expected, got %v, exp %v", values, Values{p1})
}
}

func mustTempDir() string {
dir, err := ioutil.TempDir("", "tsm1-test")
if err != nil {
Expand Down
Loading

0 comments on commit 51b1af4

Please sign in to comment.