diff --git a/CHANGELOG.md b/CHANGELOG.md index cf3013e82d..7a2d5be115 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3560](https://github.com/thanos-io/thanos/pull/3560) query-frontend: Allow separate label cache - [#3672](https://github.com/thanos-io/thanos/pull/3672) rule: prevent rule crash from no such host error when using `dnssrv+` or `dnssrvnoa+`. - [#3760](https://github.com/thanos-io/thanos/pull/3760) Store: Fix panic caused by a race condition happening on concurrent index-header reader usage and unload, when `--store.enable-index-header-lazy-reader` is enabled. +- [#3759](https://github.com/thanos-io/thanos/pull/3759) Store: Fix panic caused by a race condition happening on concurrent index-header lazy load and unload, when `--store.enable-index-header-lazy-reader` is enabled. ### Changed diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 0598e46f49..d4b9dee03b 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -24,7 +24,8 @@ import ( ) var ( - errNotIdle = errors.New("the reader is not idle") + errNotIdle = errors.New("the reader is not idle") + errUnloadedWhileLoading = errors.New("the index-header has been concurrently unloaded") ) // LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader. @@ -208,7 +209,7 @@ func (r *LazyBinaryReader) LabelNames() ([]string, error) { // load ensures the underlying binary index-header reader has been successfully loaded. Returns // an error on failure. This function MUST be called with the read lock already acquired. -func (r *LazyBinaryReader) load() error { +func (r *LazyBinaryReader) load() (returnErr error) { // Nothing to do if we already tried loading it. if r.reader != nil { return nil @@ -221,8 +222,16 @@ func (r *LazyBinaryReader) load() error { // the read lock once done. r.readerMx.RUnlock() r.readerMx.Lock() - defer r.readerMx.RLock() - defer r.readerMx.Unlock() + defer func() { + r.readerMx.Unlock() + r.readerMx.RLock() + + // Between the write unlock and the subsequent read lock, the unload() may have run, + // so we make sure to catch this edge case. + if returnErr == nil && r.reader == nil { + returnErr = errUnloadedWhileLoading + } + }() // Ensure none else tried to load it in the meanwhile. if r.reader != nil { diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 0b130199b6..886c0516fe 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "testing" "time" @@ -217,3 +218,71 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) } + +func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { + // Run the test for a fixed amount of time. + const runDuration = 5 * time.Second + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + m := NewLazyBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + t.Cleanup(func() { + testutil.Ok(t, r.Close()) + }) + + done := make(chan struct{}) + time.AfterFunc(runDuration, func() { close(done) }) + wg := sync.WaitGroup{} + wg.Add(2) + + // Start a goroutine which continuously try to unload the reader. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + testutil.Ok(t, r.unloadIfIdleSince(0)) + } + } + }() + + // Try to read multiple times, while the other goroutine continuously try to unload it. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + _, err := r.PostingsOffset("a", "1") + testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) + } + } + }() + + // Wait until both goroutines have done. + wg.Wait() +}