Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race in BucketedBytes pool #4792

Merged
merged 3 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races.
- [#4754](https://github.com/thanos-io/thanos/pull/4754) Query: Fix possible panic on stores endpoint.
- [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: validate block sync concurrency parameter
- [#4792](https://github.com/thanos-io/thanos/pull/4792) Store: Fix data race in BucketedBytes pool.

## [v0.23.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.10.1

Expand Down
9 changes: 4 additions & 5 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func (p *BucketedBytes) Put(b *[]byte) {
return
}

sz := cap(*b)
for i, bktSize := range p.sizes {
if cap(*b) > bktSize {
if sz > bktSize {
continue
}
*b = (*b)[:0]
Expand All @@ -118,13 +119,11 @@ func (p *BucketedBytes) Put(b *[]byte) {

p.mtx.Lock()
defer p.mtx.Unlock()

// We could assume here that our users will not make the slices larger
// but lets be on the safe side to avoid an underflow of p.usedTotal.
sz := uint64(cap(*b))
if sz >= p.usedTotal {
if uint64(sz) >= p.usedTotal {
p.usedTotal = 0
} else {
p.usedTotal -= sz
p.usedTotal -= uint64(sz)
}
}
56 changes: 30 additions & 26 deletions pkg/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
package pool

import (
"bytes"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -71,52 +70,57 @@ func TestRacePutGet(t *testing.T) {

s := sync.WaitGroup{}

// Start two goroutines: they always Get and Put two byte slices
// to which they write 'foo' / 'barbazbaz' and check if the data is still
const goroutines = 100

// Start multiple goroutines: they always Get and Put two byte slices
// to which they write their contents and check if the data is still
// there after writing it, before putting it back.
errs := make(chan error, 2)
stop := make(chan bool, 2)
errs := make(chan error, goroutines)
stop := make(chan struct{})

f := func(txt string) {
f := func(txt string, grow bool) {
defer s.Done()
for {
select {
case <-stop:
return
default:
c, err := chunkPool.Get(3)
if err != nil {
errs <- errors.Wrapf(err, "goroutine %s", txt)
return
}

buf := bytes.NewBuffer(*c)

_, err = fmt.Fprintf(buf, "%s", txt)
c, err := chunkPool.Get(len(txt))
if err != nil {
errs <- errors.Wrapf(err, "goroutine %s", txt)
return
}

if buf.String() != txt {
*c = append(*c, txt...)
if string(*c) != txt {
errs <- errors.New("expected to get the data just written")
return
}
if grow {
*c = append(*c, txt...)
*c = append(*c, txt...)
if string(*c) != txt+txt+txt {
errs <- errors.New("expected to get the data just written")
return
}
}

b := buf.Bytes()
chunkPool.Put(&b)
chunkPool.Put(c)
}
}
}

s.Add(2)
go f("foo")
go f("barbazbaz")

time.Sleep(5 * time.Second)
stop <- true
stop <- true
for i := 0; i < goroutines; i++ {
s.Add(1)
// make sure we start multiple goroutines with same len buf requirements, to hit same pools
s := strings.Repeat(string(byte(i)), i % 10)
// some of the goroutines will append more elements to the provided slice
grow := i % 2 == 0
go f(s, grow)
}

time.Sleep(1 * time.Second)
close(stop)
s.Wait()
select {
case err := <-errs:
Expand Down