Skip to content

Commit

Permalink
Fixed panic on concurrent index-header lazy load / unload
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Feb 2, 2021
1 parent a24acb6 commit 3c7cfae
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3560](https://github.com/thanos-io/thanos/pull/3560) query-frontend: Allow separate label cache
- [#3672](https://github.com/thanos-io/thanos/pull/3672) rule: prevent rule crash from no such host error when using `dnssrv+` or `dnssrvnoa+`.
- [#3760](https://github.com/thanos-io/thanos/pull/3760) Store: Fix panic caused by a race condition happening on concurrent index-header reader usage and unload, when `--store.enable-index-header-lazy-reader` is enabled.
- [#3759](https://github.com/thanos-io/thanos/pull/3759) Store: Fix panic caused by a race condition happening on concurrent index-header lazy load and unload, when `--store.enable-index-header-lazy-reader` is enabled.

### Changed

Expand Down
17 changes: 13 additions & 4 deletions pkg/block/indexheader/lazy_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
)

var (
errNotIdle = errors.New("the reader is not idle")
errNotIdle = errors.New("the reader is not idle")
errUnloadedWhileLoading = errors.New("the index-header has been concurrently unloaded")
)

// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader.
Expand Down Expand Up @@ -208,7 +209,7 @@ func (r *LazyBinaryReader) LabelNames() ([]string, error) {

// load ensures the underlying binary index-header reader has been successfully loaded. Returns
// an error on failure. This function MUST be called with the read lock already acquired.
func (r *LazyBinaryReader) load() error {
func (r *LazyBinaryReader) load() (returnErr error) {
// Nothing to do if we already tried loading it.
if r.reader != nil {
return nil
Expand All @@ -221,8 +222,16 @@ func (r *LazyBinaryReader) load() error {
// the read lock once done.
r.readerMx.RUnlock()
r.readerMx.Lock()
defer r.readerMx.RLock()
defer r.readerMx.Unlock()
defer func() {
r.readerMx.Unlock()
r.readerMx.RLock()

// Between the write unlock and the subsequent read lock, the unload() may have run,
// so we make sure to catch this edge case.
if returnErr == nil && r.reader == nil {
returnErr = errUnloadedWhileLoading
}
}()

// Ensure none else tried to load it in the meanwhile.
if r.reader != nil {
Expand Down
69 changes: 69 additions & 0 deletions pkg/block/indexheader/lazy_binary_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -217,3 +218,71 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) {
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
}

func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) {
// Run the test for a fixed amount of time.
const runDuration = 5 * time.Second

ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-indexheader")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

// Create block.
blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String())))

m := NewLazyBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)
t.Cleanup(func() {
testutil.Ok(t, r.Close())
})

done := make(chan struct{})
time.AfterFunc(runDuration, func() { close(done) })
wg := sync.WaitGroup{}
wg.Add(2)

// Start a goroutine which continuously try to unload the reader.
go func() {
defer wg.Done()

for {
select {
case <-done:
return
default:
testutil.Ok(t, r.unloadIfIdleSince(0))
}
}
}()

// Try to read multiple times, while the other goroutine continuously try to unload it.
go func() {
defer wg.Done()

for {
select {
case <-done:
return
default:
_, err := r.PostingsOffset("a", "1")
testutil.Assert(t, err == nil || err == errUnloadedWhileLoading)
}
}
}()

// Wait until both goroutines have done.
wg.Wait()
}

0 comments on commit 3c7cfae

Please sign in to comment.