Skip to content

Commit

Permalink
Fix data race in BucketedBytes pool (#4792)
Browse files Browse the repository at this point in the history
* Fix data race in BucketedBytes pool

Previous test didn't detect the data race: we copied the bytes header to
the bytes.Buffer so when appending to the slice we were not modifying
the original one.

However, the usage of this in bucketChunkReader.save() actually modifies
the referenced slice, so the test was modified to test that it can be
done safely.

The race condition happened because we were reading the referenced slice
capacity after putting it back to the pool, when someone else might
already retrieved and modified it.

Before modifying the implementation, this was the data race reported:

==================
WARNING: DATA RACE
Read at 0x00c0000bc900 by goroutine 36:
  github.com/thanos-io/thanos/pkg/pool.(*BucketedBytes).Put()
      /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool.go:124 +0x1f9
  github.com/thanos-io/thanos/pkg/pool.TestRacePutGet.func1()
      /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:108 +0xfa
  github.com/thanos-io/thanos/pkg/pool.TestRacePutGet·dwrap·3()
      /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:119 +0x65

Previous write at 0x00c0000bc900 by goroutine 27:
  github.com/thanos-io/thanos/pkg/pool.TestRacePutGet.func1()
      /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:94 +0x1fa
  github.com/thanos-io/thanos/pkg/pool.TestRacePutGet·dwrap·3()
      /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:119 +0x65

Goroutine 36 (running) created at:
  github.com/thanos-io/thanos/pkg/pool.TestRacePutGet()
      /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:119 +0x257
  testing.tRunner()
      /usr/local/Cellar/go/1.17/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
  1 Fix data race in BucketedBytes pool
      /usr/local/Cellar/go/1.17/libexec/src/testing/testing.go:1306 +0x47

Goroutine 27 (running) created at:
  github.com/thanos-io/thanos/pkg/pool.TestRacePutGet()
      /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:119 +0x257
  testing.tRunner()
      /usr/local/Cellar/go/1.17/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/Cellar/go/1.17/libexec/src/testing/testing.go:1306 +0x47
==================

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Update CHANGELOG.md

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* goimports fix

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
colega authored Oct 20, 2021
1 parent 6723c86 commit d2d53e5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races.
- [#4754](https://github.com/thanos-io/thanos/pull/4754) Query: Fix possible panic on stores endpoint.
- [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: validate block sync concurrency parameter
- [#4792](https://github.com/thanos-io/thanos/pull/4792) Store: Fix data race in BucketedBytes pool.

## [v0.23.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.10.1

Expand Down
9 changes: 4 additions & 5 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func (p *BucketedBytes) Put(b *[]byte) {
return
}

sz := cap(*b)
for i, bktSize := range p.sizes {
if cap(*b) > bktSize {
if sz > bktSize {
continue
}
*b = (*b)[:0]
Expand All @@ -118,13 +119,11 @@ func (p *BucketedBytes) Put(b *[]byte) {

p.mtx.Lock()
defer p.mtx.Unlock()

// We could assume here that our users will not make the slices larger
// but lets be on the safe side to avoid an underflow of p.usedTotal.
sz := uint64(cap(*b))
if sz >= p.usedTotal {
if uint64(sz) >= p.usedTotal {
p.usedTotal = 0
} else {
p.usedTotal -= sz
p.usedTotal -= uint64(sz)
}
}
56 changes: 30 additions & 26 deletions pkg/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
package pool

import (
"bytes"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -71,52 +70,57 @@ func TestRacePutGet(t *testing.T) {

s := sync.WaitGroup{}

// Start two goroutines: they always Get and Put two byte slices
// to which they write 'foo' / 'barbazbaz' and check if the data is still
const goroutines = 100

// Start multiple goroutines: they always Get and Put two byte slices
// to which they write their contents and check if the data is still
// there after writing it, before putting it back.
errs := make(chan error, 2)
stop := make(chan bool, 2)
errs := make(chan error, goroutines)
stop := make(chan struct{})

f := func(txt string) {
f := func(txt string, grow bool) {
defer s.Done()
for {
select {
case <-stop:
return
default:
c, err := chunkPool.Get(3)
if err != nil {
errs <- errors.Wrapf(err, "goroutine %s", txt)
return
}

buf := bytes.NewBuffer(*c)

_, err = fmt.Fprintf(buf, "%s", txt)
c, err := chunkPool.Get(len(txt))
if err != nil {
errs <- errors.Wrapf(err, "goroutine %s", txt)
return
}

if buf.String() != txt {
*c = append(*c, txt...)
if string(*c) != txt {
errs <- errors.New("expected to get the data just written")
return
}
if grow {
*c = append(*c, txt...)
*c = append(*c, txt...)
if string(*c) != txt+txt+txt {
errs <- errors.New("expected to get the data just written")
return
}
}

b := buf.Bytes()
chunkPool.Put(&b)
chunkPool.Put(c)
}
}
}

s.Add(2)
go f("foo")
go f("barbazbaz")

time.Sleep(5 * time.Second)
stop <- true
stop <- true
for i := 0; i < goroutines; i++ {
s.Add(1)
// make sure we start multiple goroutines with same len buf requirements, to hit same pools
s := strings.Repeat(string(byte(i)), i%10)
// some of the goroutines will append more elements to the provided slice
grow := i%2 == 0
go f(s, grow)
}

time.Sleep(1 * time.Second)
close(stop)
s.Wait()
select {
case err := <-errs:
Expand Down

0 comments on commit d2d53e5

Please sign in to comment.