diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 821b02f46031..42cca371e052 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -70,6 +70,7 @@ type Ingester struct { cfg Config clientConfig client.Config + shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown instancesMtx sync.RWMutex instances map[string]*instance readonly bool diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index ab3f1398990d..7b9a66370feb 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -33,6 +33,11 @@ var ( // TransferChunks receives all chunks from another ingester. The Ingester // must be in PENDING state or else the call will fail. func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error { + // Prevent a shutdown from happening until we've completely finished a handoff + // from a leaving ingester. + i.shutdownMtx.Lock() + defer i.shutdownMtx.Unlock() + // Entry JOINING state (only valid from PENDING) if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil { return err @@ -122,8 +127,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) // StopIncomingRequests implements ring.Lifecycler. func (i *Ingester) StopIncomingRequests() { + i.shutdownMtx.Lock() + defer i.shutdownMtx.Unlock() + i.instancesMtx.Lock() defer i.instancesMtx.Unlock() + i.readonly = true } diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index ea87cd54f590..dbf95c89453a 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -176,18 +176,37 @@ type testIngesterClient struct { func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) { chunkCh := make(chan *logproto.TimeSeriesChunk) respCh := make(chan *logproto.TransferChunksResponse) + waitCh := make(chan bool) - client := &testTransferChunksClient{ch: chunkCh, resp: respCh} + client := &testTransferChunksClient{ch: chunkCh, resp: respCh, wait: waitCh} go func() { server := &testTransferChunksServer{ch: chunkCh, resp: respCh} err := c.i.TransferChunks(server) require.NoError(c.t, err) }() + // After 50ms, we try killing the target ingester's lifecycler to verify + // that it obtained a lock on the shutdown process. This operation should + // block until the transfer completes. + // + // Then after another 50ms, we also allow data to start sending. This tests an issue + // where an ingester is shut down before it completes the handoff and ends up in an + // unhealthy state, permanently stuck in the handler for claiming tokens. + go func() { + time.Sleep(time.Millisecond * 50) + c.i.lifecycler.Shutdown() + }() + + go func() { + time.Sleep(time.Millisecond * 100) + close(waitCh) + }() + return client, nil } type testTransferChunksClient struct { + wait chan bool ch chan *logproto.TimeSeriesChunk resp chan *logproto.TransferChunksResponse @@ -195,11 +214,13 @@ type testTransferChunksClient struct { } func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error { + <-c.wait c.ch <- chunk return nil } func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) { + <-c.wait close(c.ch) resp := <-c.resp close(c.resp)