Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing bug in chuck cache that can cause panic during shutdown #4398

Merged
merged 3 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366
* [BUGFIX] Ingester: panic during shutdown while fetching batches from cache. #4397
* [BUGFIX] Querier: After query-frontend restart, querier may have lower than configured concurrency. #4417

## 1.10.0 / 2021-08-03
Expand Down
24 changes: 24 additions & 0 deletions pkg/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,25 @@ func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk
require.Equal(t, chunks, found)
}

// testChunkFetcherStop checks that stopping the fetcher while fetching chunks don't result an error
func testChunkFetcherStop(t *testing.T, c cache.Cache, keys []string, chunks []chunk.Chunk) {
fetcher, err := chunk.NewChunkFetcher(c, false, chunk.NewMockStorage())
require.NoError(t, err)

done := make(chan struct{})
go func() {
defer close(done)
if _, err := fetcher.FetchChunks(context.Background(), chunks, keys); err != nil {
// Since we stop fetcher while FetchChunks is running, we may not get everything back
// which requires the fetcher to fetch keys from storage, which is missing the keys
// so errors here is expected. Need to check the error because of the lint check.
require.NotNil(t, err)
}
}()
fetcher.Stop()
<-done
}

type byExternalKey []chunk.Chunk

func (a byExternalKey) Len() int { return len(a) }
Expand Down Expand Up @@ -155,6 +174,11 @@ func testCache(t *testing.T, cache cache.Cache) {
t.Run("Fetcher", func(t *testing.T) {
testChunkFetcher(t, cache, keys, chunks)
})
t.Run("FetcherStop", func(t *testing.T) {
// Refill the cache to avoid nil pointer error during fetch for getting missing keys from storage
keys, chunks = fillCache(t, cache)
testChunkFetcherStop(t, cache, keys, chunks)
})
}

func TestMemcache(t *testing.T) {
Expand Down
46 changes: 34 additions & 12 deletions pkg/chunk/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Memcached struct {

wg sync.WaitGroup
inputCh chan *work
quit chan struct{}

logger log.Logger
}
Expand Down Expand Up @@ -83,19 +84,24 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg
}

c.inputCh = make(chan *work)
c.quit = make(chan struct{})
c.wg.Add(cfg.Parallelism)

for i := 0; i < cfg.Parallelism; i++ {
go func() {
for input := range c.inputCh {
res := &result{
batchID: input.batchID,
defer c.wg.Done()
for {
select {
case <-c.quit:
return
case input := <-c.inputCh:
res := &result{
batchID: input.batchID,
}
res.found, res.bufs, res.missed = c.fetch(input.ctx, input.keys)
input.resultCh <- res
}
res.found, res.bufs, res.missed = c.fetch(input.ctx, input.keys)
input.resultCh <- res
}

c.wg.Done()
}()
}

Expand Down Expand Up @@ -187,11 +193,15 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found
go func() {
for i, j := 0, 0; i < len(keys); i += batchSize {
batchKeys := keys[i:math.Min(i+batchSize, len(keys))]
c.inputCh <- &work{
select {
case <-c.quit:
return
case c.inputCh <- &work{
keys: batchKeys,
ctx: ctx,
resultCh: resultsCh,
batchID: j,
}:
}
j++
}
Expand All @@ -205,13 +215,21 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found

// We need to order found by the input keys order.
results := make([]*result, numResults)
loopResults:
for i := 0; i < numResults; i++ {
result := <-resultsCh
results[result.batchID] = result
select {
case <-c.quit:
break loopResults
case result := <-resultsCh:
results[result.batchID] = result
}
}
close(resultsCh)

for _, result := range results {
if result == nil {
continue
}
found = append(found, result.found...)
bufs = append(bufs, result.bufs...)
missed = append(missed, result.missed...)
Expand Down Expand Up @@ -239,11 +257,15 @@ func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) {

// Stop does nothing.
func (c *Memcached) Stop() {
if c.inputCh == nil {
if c.quit == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] c.quit is set in the new function and I can't see where we ever set it to nil so I'm not sure we need this check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, we can merge anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If batchsize or parallelism is off then we return the cache before we set the quit/inputCh channels

if cfg.BatchSize == 0 || cfg.Parallelism == 0 { return c }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh thanks! Didn't notice it.

return
}

close(c.inputCh)
select {
case <-c.quit:
default:
close(c.quit)
bboreham marked this conversation as resolved.
Show resolved Hide resolved
}
c.wg.Wait()
}

Expand Down
36 changes: 36 additions & 0 deletions pkg/chunk/cache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,39 @@ func testMemcacheFailing(t *testing.T, memcache *cache.Memcached) {
}
}
}

func TestMemcacheStop(t *testing.T) {
t.Run("unbatched", func(t *testing.T) {
client := newMockMemcacheFailing()
memcache := cache.NewMemcached(cache.MemcachedConfig{}, client,
"test", nil, log.NewNopLogger())

testMemcachedStopping(t, memcache)
})

t.Run("batched", func(t *testing.T) {
client := newMockMemcacheFailing()
memcache := cache.NewMemcached(cache.MemcachedConfig{
BatchSize: 10,
Parallelism: 5,
}, client, "test", nil, log.NewNopLogger())

testMemcachedStopping(t, memcache)
})
}

func testMemcachedStopping(t *testing.T, memcache *cache.Memcached) {
numKeys := 1000
ctx := context.Background()
keys := make([]string, 0, numKeys)
bufs := make([][]byte, 0, numKeys)
for i := 0; i < numKeys; i++ {
keys = append(keys, fmt.Sprint(i))
bufs = append(bufs, []byte(fmt.Sprint(i)))
}

memcache.Store(ctx, keys, bufs)

go memcache.Fetch(ctx, keys)
memcache.Stop()
}
55 changes: 38 additions & 17 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Fetcher struct {

wait sync.WaitGroup
decodeRequests chan decodeRequest
quit chan struct{}
}

type decodeRequest struct {
Expand All @@ -105,6 +106,7 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetc
cache: cacher,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
quit: make(chan struct{}),
}

c.wait.Add(chunkDecodeParallelism)
Expand All @@ -117,22 +119,32 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetc

// Stop the ChunkFetcher.
func (c *Fetcher) Stop() {
close(c.decodeRequests)
select {
bboreham marked this conversation as resolved.
Show resolved Hide resolved
case <-c.quit:
default:
close(c.quit)
}

c.wait.Wait()
c.cache.Stop()
}

func (c *Fetcher) worker() {
defer c.wait.Done()
decodeContext := NewDecodeContext()
for req := range c.decodeRequests {
err := req.chunk.Decode(decodeContext, req.buf)
if err != nil {
cacheCorrupt.Inc()
}
req.responses <- decodeResponse{
chunk: req.chunk,
err: err,
for {
select {
case <-c.quit:
return
case req := <-c.decodeRequests:
err := req.chunk.Decode(decodeContext, req.buf)
if err != nil {
cacheCorrupt.Inc()
}
req.responses <- decodeResponse{
chunk: req.chunk,
err: err,
}
}
}
}
Expand Down Expand Up @@ -230,22 +242,31 @@ func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys

go func() {
for _, request := range requests {
c.decodeRequests <- request
select {
case <-c.quit:
return
case c.decodeRequests <- request:
}
}
}()

var (
err error
found []Chunk
)
for i := 0; i < len(requests); i++ {
response := <-responses

// Don't exit early, as we don't want to block the workers.
if response.err != nil {
err = response.err
} else {
found = append(found, response.chunk)
loopResponses:
for i := 0; i < len(requests); i++ {
select {
case <-c.quit:
break loopResponses
case response := <-responses:
// Don't exit early, as we don't want to block the workers.
if response.err != nil {
err = response.err
} else {
found = append(found, response.chunk)
}
}
}
return found, missing, err
Expand Down