Skip to content

Commit

Permalink
pkg/ingester: prevent shutdowns from processing during joining handoff
Browse files Browse the repository at this point in the history
This commit fixes a race condition where an ingester that is shut down
during the joining handoff (i.e., receiving chunks from a leaving
ingester before claiming its tokens) hangs and can never shut down
cleanly. A shutdown mutex is implemented which is obtained at the
start of the handoff process and released after the handoff process
completes. This race condition also prevented the leaving ingester from
completing its shutdown, since it waits for the joining ingester to
claim the tokens.

This means that if a brand new ingester is shut down, it will always
have finished receiving chunks from the previous leaving ingester and
have finished obtaining the tokens from the ingester attempting to
leave.
  • Loading branch information
rfratto authored and slim-bean committed Oct 9, 2019
1 parent b75965e commit 2cb3c82
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Ingester struct {
cfg Config
clientConfig client.Config

shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown
instancesMtx sync.RWMutex
instances map[string]*instance
readonly bool
Expand Down
9 changes: 9 additions & 0 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var (
// TransferChunks receives all chunks from another ingester. The Ingester
// must be in PENDING state or else the call will fail.
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error {
// Prevent a shutdown from happening until we've completely finished a handoff
// from a leaving ingester.
i.shutdownMtx.Lock()
defer i.shutdownMtx.Unlock()

// Entry JOINING state (only valid from PENDING)
if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil {
return err
Expand Down Expand Up @@ -122,8 +127,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)

// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {
i.shutdownMtx.Lock()
defer i.shutdownMtx.Unlock()

i.instancesMtx.Lock()
defer i.instancesMtx.Unlock()

i.readonly = true
}

Expand Down
23 changes: 22 additions & 1 deletion pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,30 +176,51 @@ type testIngesterClient struct {
func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) {
chunkCh := make(chan *logproto.TimeSeriesChunk)
respCh := make(chan *logproto.TransferChunksResponse)
waitCh := make(chan bool)

client := &testTransferChunksClient{ch: chunkCh, resp: respCh}
client := &testTransferChunksClient{ch: chunkCh, resp: respCh, wait: waitCh}
go func() {
server := &testTransferChunksServer{ch: chunkCh, resp: respCh}
err := c.i.TransferChunks(server)
require.NoError(c.t, err)
}()

// After 50ms, we try killing the target ingester's lifecycler to verify
// that it obtained a lock on the shutdown process. This operation should
// block until the transfer completes.
//
// Then after another 50ms, we also allow data to start sending. This tests an issue
// where an ingester is shut down before it completes the handoff and ends up in an
// unhealthy state, permanently stuck in the handler for claiming tokens.
go func() {
time.Sleep(time.Millisecond * 50)
c.i.lifecycler.Shutdown()
}()

go func() {
time.Sleep(time.Millisecond * 100)
close(waitCh)
}()

return client, nil
}

type testTransferChunksClient struct {
wait chan bool
ch chan *logproto.TimeSeriesChunk
resp chan *logproto.TransferChunksResponse

grpc.ClientStream
}

func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error {
<-c.wait
c.ch <- chunk
return nil
}

func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) {
<-c.wait
close(c.ch)
resp := <-c.resp
close(c.resp)
Expand Down

0 comments on commit 2cb3c82

Please sign in to comment.