Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tsm: ensure that snapshot writes are eventually retried/released, even if WriteSnapshot fails #5689

Closed
101 changes: 87 additions & 14 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (e *entry) add(values []Value) {
// deduplicate sorts and orders the entry's values. If values are already deduped and
// and sorted, the function does no work and simply returns.
func (e *entry) deduplicate() {
if !e.needSort || len(e.values) == 0 {
if !e.needSort || len(e.values) <= 1 {
return
}
e.values = e.values.Deduplicate()
Expand All @@ -79,8 +79,10 @@ const (

// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
commit sync.Mutex
mu sync.RWMutex
store map[string]*entry
dirty map[string]*entry
size uint64
maxSize uint64

Expand All @@ -89,6 +91,7 @@ type Cache struct {
// they are read only and should never be modified
snapshots []*Cache
snapshotsSize uint64
files []string

statMap *expvar.Map // nil for snapshots.
lastSnapshot time.Time
Expand Down Expand Up @@ -164,15 +167,56 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
return nil
}

// Snapshot will take a snapshot of the current cache, add it to the slice of caches that
// are being flushed, and reset the current cache with new values
func (c *Cache) Snapshot() *Cache {
// Answers the names WAL segment files which are captured by the snapshot. The contents
// of the specified files and the receiving snapshot should be identical.
func (c *Cache) Files() []string {
return c.files
}

// Filter the specified list of files to exclude any file already referenced
// by an existing snapshot
func (c *Cache) newFiles(files []string) []string {
filtered := []string{}
existing := map[string]bool{}
for _, s := range c.snapshots {
for _, f := range s.files {
existing[f] = true
}
}
for _, f := range files {
if !existing[f] {
filtered = append(filtered, f)
}
}
return filtered
}

// PrepareSnapshots accepts a list of the closed files and prepares a new snapshot corresponding
// to the changes in newly closed files that were not captured by previous snapshots. It returns a slice
// containing references to every snapshot that has not yet been successfully committed.
//
// Every call to this method must be matched with exactly one corresponding call to either
// CommitSnapshots() or RollbackSnapshots().
func (c *Cache) PrepareSnapshots(files []string) []*Cache {

c.commit.Lock() // released by RollbackSnapshot() or CommitSnapshot()

c.mu.Lock()
defer c.mu.Unlock()

snapshot := &Cache{
store: c.store,
size: c.size,
dirty: make(map[string]*entry),
files: c.newFiles(files),
}

for k, e := range c.store {
if e.needSort {
snapshot.dirty[k] = &entry{needSort: true, values: e.values}
} else {
snapshot.dirty[k] = e
}
}

c.store = make(map[string]*entry)
Expand All @@ -186,30 +230,59 @@ func (c *Cache) Snapshot() *Cache {
c.updateCachedBytes(snapshot.size)
c.updateSnapshots()

return snapshot
clone := make([]*Cache, len(c.snapshots))
copy(clone, c.snapshots)

return clone
}

// Deduplicate sorts the snapshot before returning it. The compactor and any queries
// coming in while it writes will need the values sorted
func (c *Cache) Deduplicate() {
for _, e := range c.store {
for _, e := range c.dirty {
e.deduplicate()
}
}

// ClearSnapshot will remove the snapshot cache from the list of flushing caches and
// adjust the size
func (c *Cache) ClearSnapshot(snapshot *Cache) {
// This method must be called while holding the write lock of the cache that
// create this snapshot.
func (c *Cache) UpdateStore() {
c.store, c.dirty = c.dirty, nil
}

// RollbackSnapshot rolls back a previously prepared snapshot by releasing the commit lock.
//
// We leave the snapshots slice untouched because we need to use it to resolve
// queries that hit the WAL segments.
// RollbackSnapshot rolls back a previously prepared snapshot by resetting
// the

func (c *Cache) RollbackSnapshots(incomplete []*Cache) {
defer c.commit.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

for i, cache := range c.snapshots {
if cache == snapshot {
c.snapshots = append(c.snapshots[:i], c.snapshots[i+1:]...)
c.snapshotsSize -= snapshot.size
break
c.snapshots = make([]*Cache, 0, len(incomplete)) // not strictly necessary since we expect incomplete[i] != nil for at least one i.
c.snapshotsSize = 0

// remove any snapshots that have been nil'd
for _, s := range incomplete {
if s != nil {
c.snapshots = append(c.snapshots, s)
c.snapshotsSize += s.Size()
}
}
}

// CommitSnapshot commits a previously prepared snapshot by reset the snapshots array
// and releasing the commit lock.
func (c *Cache) CommitSnapshots() {
defer c.commit.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

c.snapshots = nil
c.snapshotsSize = 0

c.updateSnapshots()
}
Expand Down
55 changes: 55 additions & 0 deletions tsdb/engine/tsm1/cache_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// +build !race

package tsm1_test

import (
"fmt"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"math/rand"
"sync"
"testing"
"time"
)

func TestCacheRace(t *testing.T) {
values := make(tsm1.Values, 1000)
timestamps := make([]time.Time, len(values))
series := make([]string, 100)
for i := range timestamps {
timestamps[i] = time.Unix(int64(rand.Int63n(int64(len(values)))), 0).UTC()
}

for i := range values {
values[i] = tsm1.NewValue(timestamps[i*len(timestamps)/len(values)], float64(i))
}

for i := range series {
series[i] = fmt.Sprintf("series%d", i)
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000, "")

ch := make(chan struct{})
for _, s := range series {
for _, v := range values {
c.Write(s, tsm1.Values{v})
}
wg.Add(1)
go func(s string) {
defer wg.Done()
<-ch
c.Values(s)
}(s)
}
wg.Add(1)
go func() {
wg.Done()
<-ch
s := c.PrepareSnapshots([]string{"wal"})
s[len(s)-1].Deduplicate()
c.CommitSnapshots()
}()
close(ch)
wg.Wait()
}
Loading