diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 4505ad9fd77c..ad9e3027da84 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -52,7 +52,7 @@ type Tailer struct { tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error) querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters - querierTailClientsMtx sync.Mutex + querierTailClientsMtx sync.RWMutex stopped bool delayFor time.Duration @@ -66,6 +66,9 @@ type Tailer struct { } func (t *Tailer) readTailClients() { + t.querierTailClientsMtx.RLock() + defer t.querierTailClientsMtx.RUnlock() + for addr, querierTailClient := range t.querierTailClients { go t.readTailClient(addr, querierTailClient) } @@ -126,7 +129,11 @@ func (t *Tailer) loop() { // If no entry has been consumed we should ensure it's not caused by all ingesters // connections dropped and then throttle for a while if len(tailResponse.Streams) == 0 { - if len(t.querierTailClients) == 0 { + t.querierTailClientsMtx.RLock() + numClients := len(t.querierTailClients) + t.querierTailClientsMtx.RUnlock() + + if numClients == 0 { // All the connections to ingesters are dropped, try reconnecting or return error if err := t.checkIngesterConnections(); err != nil { level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err)) @@ -164,6 +171,9 @@ func (t *Tailer) loop() { // Checks whether we are connected to all the ingesters to tail the logs. // Helps in connecting to disconnected ingesters or connecting to new ingesters func (t *Tailer) checkIngesterConnections() error { + t.querierTailClientsMtx.Lock() + defer t.querierTailClientsMtx.Unlock() + connectedIngestersAddr := make([]string, 0, len(t.querierTailClients)) for addr := range t.querierTailClients { connectedIngestersAddr = append(connectedIngestersAddr, addr)