diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index f113f83acb9..a613e15dbf5 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -55,7 +55,7 @@ func (e *entry) add(values []Value) { // deduplicate sorts and orders the entry's values. If values are already deduped and // and sorted, the function does no work and simply returns. func (e *entry) deduplicate() { - if !e.needSort || len(e.values) == 0 { + if !e.needSort || len(e.values) <= 1 { return } e.values = e.values.Deduplicate() @@ -79,8 +79,10 @@ const ( // Cache maintains an in-memory store of Values for a set of keys. type Cache struct { + commit sync.Mutex mu sync.RWMutex store map[string]*entry + dirty map[string]*entry size uint64 maxSize uint64 @@ -89,6 +91,7 @@ type Cache struct { // they are read only and should never be modified snapshots []*Cache snapshotsSize uint64 + files []string statMap *expvar.Map // nil for snapshots. lastSnapshot time.Time @@ -164,15 +167,56 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { return nil } -// Snapshot will take a snapshot of the current cache, add it to the slice of caches that -// are being flushed, and reset the current cache with new values -func (c *Cache) Snapshot() *Cache { +// Answers the names WAL segment files which are captured by the snapshot. The contents +// of the specified files and the receiving snapshot should be identical. +func (c *Cache) Files() []string { + return c.files +} + +// Filter the specified list of files to exclude any file already referenced +// by an existing snapshot +func (c *Cache) newFiles(files []string) []string { + filtered := []string{} + existing := map[string]bool{} + for _, s := range c.snapshots { + for _, f := range s.files { + existing[f] = true + } + } + for _, f := range files { + if !existing[f] { + filtered = append(filtered, f) + } + } + return filtered +} + +// PrepareSnapshots accepts a list of the closed files and prepares a new snapshot corresponding +// to the changes in newly closed files that were not captured by previous snapshots. It returns a slice +// containing references to every snapshot that has not yet been successfully committed. +// +// Every call to this method must be matched with exactly one corresponding call to either +// CommitSnapshots() or RollbackSnapshots(). +func (c *Cache) PrepareSnapshots(files []string) []*Cache { + + c.commit.Lock() // released by RollbackSnapshot() or CommitSnapshot() + c.mu.Lock() defer c.mu.Unlock() snapshot := &Cache{ store: c.store, size: c.size, + dirty: make(map[string]*entry), + files: c.newFiles(files), + } + + for k, e := range c.store { + if e.needSort { + snapshot.dirty[k] = &entry{needSort: true, values: e.values} + } else { + snapshot.dirty[k] = e + } } c.store = make(map[string]*entry) @@ -186,30 +230,59 @@ func (c *Cache) Snapshot() *Cache { c.updateCachedBytes(snapshot.size) c.updateSnapshots() - return snapshot + clone := make([]*Cache, len(c.snapshots)) + copy(clone, c.snapshots) + + return clone } // Deduplicate sorts the snapshot before returning it. The compactor and any queries // coming in while it writes will need the values sorted func (c *Cache) Deduplicate() { - for _, e := range c.store { + for _, e := range c.dirty { e.deduplicate() } } -// ClearSnapshot will remove the snapshot cache from the list of flushing caches and -// adjust the size -func (c *Cache) ClearSnapshot(snapshot *Cache) { +// This method must be called while holding the write lock of the cache that +// create this snapshot. +func (c *Cache) UpdateStore() { + c.store, c.dirty = c.dirty, nil +} + +// RollbackSnapshot rolls back a previously prepared snapshot by releasing the commit lock. +// +// We leave the snapshots slice untouched because we need to use it to resolve +// queries that hit the WAL segments. +// RollbackSnapshot rolls back a previously prepared snapshot by resetting +// the + +func (c *Cache) RollbackSnapshots(incomplete []*Cache) { + defer c.commit.Unlock() c.mu.Lock() defer c.mu.Unlock() - for i, cache := range c.snapshots { - if cache == snapshot { - c.snapshots = append(c.snapshots[:i], c.snapshots[i+1:]...) - c.snapshotsSize -= snapshot.size - break + c.snapshots = make([]*Cache, 0, len(incomplete)) // not strictly necessary since we expect incomplete[i] != nil for at least one i. + c.snapshotsSize = 0 + + // remove any snapshots that have been nil'd + for _, s := range incomplete { + if s != nil { + c.snapshots = append(c.snapshots, s) + c.snapshotsSize += s.Size() } } +} + +// CommitSnapshot commits a previously prepared snapshot by reset the snapshots array +// and releasing the commit lock. +func (c *Cache) CommitSnapshots() { + defer c.commit.Unlock() + c.mu.Lock() + defer c.mu.Unlock() + + c.snapshots = nil + c.snapshotsSize = 0 c.updateSnapshots() } diff --git a/tsdb/engine/tsm1/cache_race_test.go b/tsdb/engine/tsm1/cache_race_test.go new file mode 100644 index 00000000000..11d456d4e6f --- /dev/null +++ b/tsdb/engine/tsm1/cache_race_test.go @@ -0,0 +1,55 @@ +// +build !race + +package tsm1_test + +import ( + "fmt" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "math/rand" + "sync" + "testing" + "time" +) + +func TestCacheRace(t *testing.T) { + values := make(tsm1.Values, 1000) + timestamps := make([]time.Time, len(values)) + series := make([]string, 100) + for i := range timestamps { + timestamps[i] = time.Unix(int64(rand.Int63n(int64(len(values)))), 0).UTC() + } + + for i := range values { + values[i] = tsm1.NewValue(timestamps[i*len(timestamps)/len(values)], float64(i)) + } + + for i := range series { + series[i] = fmt.Sprintf("series%d", i) + } + + wg := sync.WaitGroup{} + c := tsm1.NewCache(1000000, "") + + ch := make(chan struct{}) + for _, s := range series { + for _, v := range values { + c.Write(s, tsm1.Values{v}) + } + wg.Add(1) + go func(s string) { + defer wg.Done() + <-ch + c.Values(s) + }(s) + } + wg.Add(1) + go func() { + wg.Done() + <-ch + s := c.PrepareSnapshots([]string{"wal"}) + s[len(s)-1].Deduplicate() + c.CommitSnapshots() + }() + close(ch) + wg.Wait() +} diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index bb28f7d69ca..8ae19c100a9 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -134,6 +134,7 @@ func TestCache_CacheValues(t *testing.T) { } func TestCache_CacheSnapshot(t *testing.T) { + segment1 := "segment1.wal" v0 := NewValue(time.Unix(2, 0).UTC(), 0.0) v1 := NewValue(time.Unix(3, 0).UTC(), 2.0) v2 := NewValue(time.Unix(4, 0).UTC(), 3.0) @@ -147,7 +148,23 @@ func TestCache_CacheSnapshot(t *testing.T) { } // Grab snapshot, and ensure it's as expected. - snapshot := c.Snapshot() + snapshots := c.PrepareSnapshots([]string{segment1}) + if length := len(snapshots); length != 1 { + t.Fatalf("invalid snapshots length, exp %d, got %d", 1, length) + } + + // check that the snapshot we got is not nil + snapshot := snapshots[0] + if snapshot == nil { + t.Fatalf("snapshot is nil") + } + + // check that the snapshot has captured the segment files + expFiles := [][]string{[]string{segment1}} + if files := [][]string{snapshot.Files()}; !reflect.DeepEqual(expFiles, files) { + t.Fatalf("files is incorrect, exp: %v, got %v", expFiles, files) + } + expValues := Values{v0, v1, v2, v3} if deduped := snapshot.values("foo"); !reflect.DeepEqual(expValues, deduped) { t.Fatalf("snapshotted values for foo incorrect, exp: %v, got %v", expValues, deduped) @@ -177,19 +194,180 @@ func TestCache_CacheSnapshot(t *testing.T) { } // Clear snapshot, ensuring non-snapshot data untouched. - c.ClearSnapshot(snapshot) + c.CommitSnapshots() + expValues = Values{v5, v4} if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) { - t.Fatalf("post-clear values for foo incorrect, exp: %v, got %v", expValues, deduped) + t.Fatalf("post-commit values for foo incorrect, exp: %v, got %v", expValues, deduped) + } +} + +func TestCache_CacheRollbackSnapshots(t *testing.T) { + segment2 := "segment2.wal" + segment3 := "segment3.wal" + segment4 := "segment4.wal" + segment5 := "segment5.wal" + v4 := NewValue(time.Unix(6, 0).UTC(), 5.0) + v5 := NewValue(time.Unix(1, 0).UTC(), 5.0) + v6 := NewValue(time.Unix(7, 0).UTC(), 5.0) + + c := NewCache(512, "") + + // Write a new value to the cache. + if err := c.Write("foo", Values{v4, v5}); err != nil { + t.Fatalf("failed to write value, key foo to cache: %s", err.Error()) + } + + expValues := Values{v5, v4} + if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) { + t.Fatalf("pre-snapshot write values for foo incorrect, exp: %v, got %v", expValues, deduped) } + + snapshots := c.PrepareSnapshots([]string{segment2}) + + if err := c.Write("foo", Values{v6}); err != nil { + t.Fatalf("failed to write post-prepare-snapshots key foo to cache: %s", err.Error()) + } + + expValues = Values{v5, v4, v6} + if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) { + t.Fatalf("post-prepare-snapshots values for foo incorrect, exp: %v, got %v", expValues, deduped) + } + + c.RollbackSnapshots(snapshots) + + expValues = Values{v5, v4, v6} + if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) { + t.Fatalf("post-rollback values for foo incorrect, exp: %v, got %v", expValues, deduped) + } + + snapshots = c.PrepareSnapshots([]string{segment2, segment3, segment4}) + expLength := 2 + if length := len(snapshots); length != expLength { + t.Fatalf("post-prepare-snap-shots: length of snaphots incorrect, exp: %v, got %v", expLength, length) + } + + expFiles := [][]string{[]string{segment2}, []string{segment3, segment4}} + if files := [][]string{snapshots[0].Files(), snapshots[1].Files()}; !reflect.DeepEqual(expFiles, files) { + t.Fatalf("files is incorrect, exp: %v, got %v", expFiles, files) + } + + expFooValues := []Values{Values{v5, v4}, Values{v6}} + if fooValues := []Values{snapshots[0].Values("foo"), snapshots[1].Values("foo")}; !reflect.DeepEqual(expFooValues, fooValues) { + t.Fatalf("snapshot values are incorrect, exp: %v, got %v", expFooValues, fooValues) + } + + snapshots[0] = nil + c.RollbackSnapshots(snapshots) + + expValues = Values{v6} + if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) { + t.Fatalf("post-rollback values for foo incorrect, exp: %v, got %v", expValues, deduped) + } + + snapshots = c.PrepareSnapshots([]string{segment3, segment4, segment5}) + expLength = 2 + if length := len(snapshots); length != expLength { + t.Fatalf("post-prepare-snap-shots: length of snaphots incorrect, exp: %v, got %v", expLength, length) + } + + expFiles = [][]string{[]string{segment3, segment4}, []string{segment5}} + if files := [][]string{snapshots[0].Files(), snapshots[1].Files()}; !reflect.DeepEqual(expFiles, files) { + t.Fatalf("snapshot files are incorrect, exp: %v, got %v", expFiles, files) + } + + expFooValues = []Values{Values{v6}, nil} + if fooValues := []Values{snapshots[0].Values("foo"), snapshots[1].Values("foo")}; !reflect.DeepEqual(expFooValues, fooValues) { + t.Fatalf("snapshot values are incorrect, exp: %v, got %v", expFooValues, fooValues) + } + + c.CommitSnapshots() + + expValues = nil + if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) { + t.Fatalf("post-commit values for foo incorrect, exp: %v, got %v", expValues, deduped) + } +} + +// Test that the commit lock prevents two threads preparing snapshots from the same +// cache. +func TestCache_CacheTestCommitLock(t *testing.T) { + + v0 := NewValue(time.Unix(2, 0).UTC(), 0.0) + in := make(chan int, 0) + + go func() { + c := NewCache(512, "") + out := make(chan int, 1) + lockTaken := make(chan struct{}, 1) + valueWritten := make(chan error, 0) + + out <- 0 + + _ = c.PrepareSnapshots([]string{"sync"}) + + lockTaken <- struct{}{} + + go func() { + + _ = <-lockTaken + + valueWritten <- c.Write("foo", Values{v0}) + + c.PrepareSnapshots([]string{"async"}) + observedState := <-out + c.CommitSnapshots() + in <- observedState + }() + + if err := <-valueWritten; err != nil { + t.Fatalf("concurrent write value failed: %s", err) + } + + expValues := Values{v0} + if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) { + t.Fatalf("unexpected values in cache after goroutine has written exp: %v, got %v", expValues, deduped) + } + + select { + case _ = <-out: + default: + t.Fatalf("locking failed in state 0") + } + out <- 1 + + // give the async goroutine a small chance to see state 1 + time.Sleep(time.Millisecond * 100) + + select { + case _ = <-out: + default: + t.Fatalf("locking failed in state 1") + } + out <- 2 + + c.CommitSnapshots() + }() + + expectedState := 2 + select { + case observedState := <-in: + if observedState != expectedState { + t.Fatalf("locking failed: expected observed state: %d, got: %d", expectedState, observedState) + } + case _ = <-time.NewTimer(time.Second).C: + // this should never happen except on really slow machines. consider bumping the value higher. + t.Fatalf("unexpected deadlock encountered during locking test") + } + } func TestCache_CacheEmptySnapshot(t *testing.T) { c := NewCache(512, "") // Grab snapshot, and ensure it's as expected. - snapshot := c.Snapshot() - if deduped := snapshot.values("foo"); !reflect.DeepEqual(Values(nil), deduped) { + snapshots := c.PrepareSnapshots([]string{"foo.wal"}) + if deduped := snapshots[0].Values("foo"); !reflect.DeepEqual(Values(nil), deduped) { t.Fatalf("snapshotted values for foo incorrect, exp: %v, got %v", nil, deduped) } @@ -199,7 +377,7 @@ func TestCache_CacheEmptySnapshot(t *testing.T) { } // Clear snapshot. - c.ClearSnapshot(snapshot) + c.CommitSnapshots() if deduped := c.Values("foo"); !reflect.DeepEqual(Values(nil), deduped) { t.Fatalf("post-snapshot-clear values for foo incorrect, exp: %v, got %v", Values(nil), deduped) } @@ -222,13 +400,13 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { } // Grab snapshot, write should still fail since we're still using the memory. - snapshot := c.Snapshot() + _ = c.PrepareSnapshots([]string{"foobar.wal"}) if err := c.Write("bar", Values{v1}); err != ErrCacheMemoryExceeded { t.Fatalf("wrong error writing key bar to cache") } // Clear the snapshot and the write should now succeed. - c.ClearSnapshot(snapshot) + c.CommitSnapshots() if err := c.Write("bar", Values{v1}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 1632f620279..768a3afa8f2 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -410,7 +410,6 @@ func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemen func (e *Engine) WriteSnapshot() error { // Lock and grab the cache snapshot along with all the closed WAL // filenames associated with the snapshot - var started *time.Time defer func() { @@ -419,7 +418,7 @@ func (e *Engine) WriteSnapshot() error { } }() - closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) { + snapshots, compactor, err := func() ([]*Cache, *Compactor, error) { e.mu.Lock() defer e.mu.Unlock() @@ -427,17 +426,17 @@ func (e *Engine) WriteSnapshot() error { started = &now if err := e.WAL.CloseSegment(); err != nil { - return nil, nil, nil, err + return nil, nil, err } segments, err := e.WAL.ClosedSegments() if err != nil { - return nil, nil, nil, err + return nil, nil, err } - snapshot := e.Cache.Snapshot() + snapshots := e.Cache.PrepareSnapshots(segments) - return segments, snapshot, e.Compactor.Clone(), nil + return snapshots, e.Compactor.Clone(), nil }() if err != nil { @@ -447,36 +446,77 @@ func (e *Engine) WriteSnapshot() error { // The snapshotted cache may have duplicate points and unsorted data. We need to deduplicate // it before writing the snapshot. This can be very expensive so it's done while we are not // holding the engine write lock. + // + // We only need to deduplicate the last snapshot, since any earlier snapshots would have + // been deduplicated when they were taken. + snapshot := snapshots[len(snapshots)-1] snapshot.Deduplicate() - return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor) + // once we are done, we need to quickly update the snapshot's store with the dirty slice (which is now + // clean). We need to hold the cache write lock (rather than the snapshot write lock) + // since active users of the cache hold a read lock on the same object. + e.Cache.mu.Lock() + snapshot.UpdateStore() + e.Cache.mu.Unlock() + + return e.writeSnapshotsAndCommit(snapshots, compactor) } -// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments -func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) error { - // write the new snapshot files - newFiles, err := compactor.WriteSnapshot(snapshot) - if err != nil { - e.logger.Printf("error writing snapshot from compactor: %v", err) - return err - } +// writeSnapshotsAndCommit will write the passed snapshots to new TSM files, +// remove the corresponding closed WAL segments and commit the snapshot to the cache. +// +// Any snapshot not successfully written will be returned to the cache with RollbackSnapshots. +func (e *Engine) writeSnapshotsAndCommit(snapshots []*Cache, compactor *Compactor) (err error) { - e.mu.RLock() - defer e.mu.RUnlock() + // ensure that any snapshot that was not completely written in returned + // to the cache + defer func() { + if err != nil { - // update the file store with these new files - if err := e.FileStore.Replace(nil, newFiles); err != nil { - e.logger.Printf("error adding new TSM files from snapshot: %v", err) - return err - } + // at least one snapshot failed to write - rollback it + // and all remaining snapshots - // clear the snapshot from the in-memory cache, then the old WAL files - e.Cache.ClearSnapshot(snapshot) + e.Cache.RollbackSnapshots(snapshots) + } + }() - if err := e.WAL.Remove(closedFiles); err != nil { - e.logger.Printf("error removing closed wal segments: %v", err) + for i, snapshot := range snapshots { + + if closedFiles, err := func(snapshot *Cache) ([]string, error) { + // write the new snapshot files + newFiles, err := compactor.WriteSnapshot(snapshot) + if err != nil { + e.logger.Printf("error writing snapshot from compactor: %v", err) + return nil, err + } + e.mu.RLock() + defer e.mu.RUnlock() + + // update the file store with these new files + if err := e.FileStore.Replace(nil, newFiles); err != nil { + e.logger.Printf("error adding new TSM files from snapshot: %v", err) + return nil, err + } + + return snapshot.Files(), nil + + }(snapshot); err != nil { + return err + } else if err := e.WAL.Remove(closedFiles); err != nil { + + // TBD: isn't this really an error - don't we risk causing out of order + // writes if we leave WAL segments hanging around? + + e.logger.Printf("error removing closed wal segments: %v", err) + } + + // success! - mark the snapshot as completely written so that rollback will remove it + snapshots[i] = nil } + // clear the snapshot from the in-memory cache, then the old WAL files + e.Cache.CommitSnapshots() + return nil }