From 18a02d50d71fe55f14c27e5911eba10aaf7e4e7d Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 25 Jul 2017 13:35:01 -0600 Subject: [PATCH] Interrupt in progress TSM compactions When snapshots and compactions are disabled, the check to see if the compaction should be aborted occurs in between writing to the next TSM file. If a large compaction is running, it might take a while for the file to be finished writing causing long delays. This now interrupts compactions while iterating over the blocks to write which allows them to abort immediately. --- CHANGELOG.md | 6 +++ tsdb/engine/tsm1/compact.go | 79 ++++++++++++++++++++++++------ tsdb/engine/tsm1/compact_test.go | 82 ++++++++++++++++++++++++++++++-- tsdb/engine/tsm1/engine.go | 2 + 4 files changed, 150 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 362aee25f5a..5ce6d43f720 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,12 @@ - [#8601](https://github.com/influxdata/influxdb/pull/8601): Fixed time boundaries for continuous queries with time zones. - [#8097](https://github.com/influxdata/influxdb/pull/8097): Return query parsing errors in CSV formats. +## v1.3.2 [unreleased] + +### Bugfixes + +- [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions + ## v1.3.1 [unreleased] ### Bugfixes diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 83263f78943..5d7c5b01d59 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -591,6 +591,11 @@ type Compactor struct { snapshotsEnabled bool compactionsEnabled bool + // The channel to signal that any in progress snapshots should be aborted. + snapshotsInterrupt chan struct{} + // The channel to signal that any in progress level compactions should be aborted. + compactionsInterrupt chan struct{} + files map[string]struct{} } @@ -604,6 +609,9 @@ func (c *Compactor) Open() { c.snapshotsEnabled = true c.compactionsEnabled = true + c.snapshotsInterrupt = make(chan struct{}) + c.compactionsInterrupt = make(chan struct{}) + c.files = make(map[string]struct{}) } @@ -616,12 +624,22 @@ func (c *Compactor) Close() { } c.snapshotsEnabled = false c.compactionsEnabled = false + if c.compactionsInterrupt != nil { + close(c.compactionsInterrupt) + } + if c.snapshotsInterrupt != nil { + close(c.snapshotsInterrupt) + } } // DisableSnapshots disables the compactor from performing snapshots. func (c *Compactor) DisableSnapshots() { c.mu.Lock() c.snapshotsEnabled = false + if c.snapshotsInterrupt != nil { + close(c.snapshotsInterrupt) + c.snapshotsInterrupt = nil + } c.mu.Unlock() } @@ -629,6 +647,9 @@ func (c *Compactor) DisableSnapshots() { func (c *Compactor) EnableSnapshots() { c.mu.Lock() c.snapshotsEnabled = true + if c.snapshotsInterrupt == nil { + c.snapshotsInterrupt = make(chan struct{}) + } c.mu.Unlock() } @@ -636,6 +657,10 @@ func (c *Compactor) EnableSnapshots() { func (c *Compactor) DisableCompactions() { c.mu.Lock() c.compactionsEnabled = false + if c.compactionsInterrupt != nil { + close(c.compactionsInterrupt) + c.compactionsInterrupt = nil + } c.mu.Unlock() } @@ -643,6 +668,9 @@ func (c *Compactor) DisableCompactions() { func (c *Compactor) EnableCompactions() { c.mu.Lock() c.compactionsEnabled = true + if c.compactionsInterrupt == nil { + c.compactionsInterrupt = make(chan struct{}) + } c.mu.Unlock() } @@ -650,13 +678,14 @@ func (c *Compactor) EnableCompactions() { func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled + intC := c.snapshotsInterrupt c.mu.RUnlock() if !enabled { return nil, errSnapshotsDisabled } - iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock) + iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC) files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) // See if we were disabled while writing a snapshot @@ -717,7 +746,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, nil } - tsm, err := NewTSMKeyIterator(size, fast, trs...) + c.mu.RLock() + intC := c.compactionsInterrupt + c.mu.RUnlock() + + tsm, err := NewTSMKeyIterator(size, fast, intC, trs...) if err != nil { return nil, err } @@ -994,7 +1027,8 @@ type tsmKeyIterator struct { // merged are encoded blocks that have been combined or used as is // without decode - merged blocks + merged blocks + interrupt chan struct{} } type block struct { @@ -1046,7 +1080,7 @@ func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // NewTSMKeyIterator returns a new TSM key iterator from readers. // size indicates the maximum number of values to encode in a single block. -func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) { +func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) { var iter []*BlockIterator for _, r := range readers { iter = append(iter, r.BlockIterator()) @@ -1060,6 +1094,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, iterators: iter, fast: fast, buf: make([]blocks, len(iter)), + interrupt: interrupt, }, nil } @@ -1203,6 +1238,13 @@ func (k *tsmKeyIterator) merge() { } func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) { + // See if compactions were disabled while we were running. + select { + case <-k.interrupt: + return "", 0, 0, nil, errCompactionAborted + default: + } + if len(k.merged) == 0 { return "", 0, 0, nil, k.err } @@ -1228,9 +1270,10 @@ type cacheKeyIterator struct { size int order []string - i int - blocks [][]cacheBlock - ready []chan struct{} + i int + blocks [][]cacheBlock + ready []chan struct{} + interrupt chan struct{} } type cacheBlock struct { @@ -1241,7 +1284,7 @@ type cacheBlock struct { } // NewCacheKeyIterator returns a new KeyIterator from a Cache. -func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { +func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIterator { keys := cache.Keys() chans := make([]chan struct{}, len(keys)) @@ -1250,12 +1293,13 @@ func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { } cki := &cacheKeyIterator{ - i: -1, - size: size, - cache: cache, - order: keys, - ready: chans, - blocks: make([][]cacheBlock, len(keys)), + i: -1, + size: size, + cache: cache, + order: keys, + ready: chans, + blocks: make([][]cacheBlock, len(keys)), + interrupt: interrupt, } go cki.encode() return cki @@ -1334,6 +1378,13 @@ func (c *cacheKeyIterator) Next() bool { } func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) { + // See if snapshot compactions were disabled while we were running. + select { + case <-c.interrupt: + return "", 0, 0, nil, errCompactionAborted + default: + } + blk := c.blocks[c.i][0] return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index ec903fd4a7a..5422f6cd9d3 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -830,7 +830,7 @@ func TestTSMKeyIterator_Single(t *testing.T) { r := MustTSMReader(dir, 1, writes) - iter, err := tsm1.NewTSMKeyIterator(1, false, r) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -890,7 +890,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) { r2 := MustTSMReader(dir, 2, writes2) - iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -951,7 +951,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { r2 := MustTSMReader(dir, 2, points2) r2.Delete([]string{"cpu,host=A#!~#count"}) - iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -993,6 +993,41 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { } } +// Tests that the TSMKeyIterator will abort if the interrupt channel is closed +func TestTSMKeyIterator_Abort(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(1, 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + r := MustTSMReader(dir, 1, writes) + + intC := make(chan struct{}) + iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var aborted bool + for iter.Next() { + // Abort + close(intC) + + _, _, _, _, err := iter.Read() + if err == nil { + t.Fatalf("unexpected error read: %v", err) + } + aborted = err != nil + } + + if !aborted { + t.Fatalf("iteration not aborted") + } +} + func TestCacheKeyIterator_Single(t *testing.T) { v0 := tsm1.NewValue(1, 1.0) @@ -1008,7 +1043,7 @@ func TestCacheKeyIterator_Single(t *testing.T) { } } - iter := tsm1.NewCacheKeyIterator(c, 1) + iter := tsm1.NewCacheKeyIterator(c, 1, nil) var readValues bool for iter.Next() { key, _, _, block, err := iter.Read() @@ -1056,7 +1091,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } - iter := tsm1.NewCacheKeyIterator(c, 1) + iter := tsm1.NewCacheKeyIterator(c, 1, nil) var readValues bool var chunk int for iter.Next() { @@ -1090,6 +1125,43 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } +// Tests that the CacheKeyIterator will abort if the interrupt channel is closed +func TestCacheKeyIterator_Abort(t *testing.T) { + v0 := tsm1.NewValue(1, 1.0) + + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v0}, + } + + c := tsm1.NewCache(0, "") + + for k, v := range writes { + if err := c.Write(k, v); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + } + + intC := make(chan struct{}) + + iter := tsm1.NewCacheKeyIterator(c, 1, intC) + + var aborted bool + for iter.Next() { + //Abort + close(intC) + + _, _, _, _, err := iter.Read() + if err == nil { + t.Fatalf("unexpected error read: %v", err) + } + aborted = err != nil + } + + if !aborted { + t.Fatalf("iteration not aborted") + } +} + func TestDefaultPlanner_Plan_Min(t *testing.T) { cp := tsm1.NewDefaultPlanner( &fakeFileStore{ diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index a16d4391ae8..85d341abb75 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -290,6 +290,7 @@ func (e *Engine) enableSnapshotCompactions() { return } + e.Compactor.EnableSnapshots() quit := make(chan struct{}) e.snapDone = quit e.snapWG.Add(1) @@ -304,6 +305,7 @@ func (e *Engine) disableSnapshotCompactions() { if e.snapDone != nil { close(e.snapDone) e.snapDone = nil + e.Compactor.DisableSnapshots() } e.mu.Unlock()