From 2743e47285d16c3b09faa4c992db2cc9108fd743 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Wed, 17 Apr 2024 09:07:27 -0400 Subject: [PATCH 1/7] Data race cleanup for promtail --- .../promtail/client/client_writeto_test.go | 7 +++++ clients/pkg/promtail/client/manager_test.go | 24 +++++++++++++++++ clients/pkg/promtail/promtail_wal_test.go | 26 +++++++++++++++++-- .../promtail/targets/cloudflare/util_test.go | 6 +++++ .../pkg/promtail/targets/file/filetarget.go | 4 +++ .../promtail/targets/file/filetarget_test.go | 13 ++++++++++ .../targets/kafka/target_syncer_test.go | 11 ++++++-- .../pkg/promtail/targets/kafka/target_test.go | 3 +++ .../pkg/promtail/targets/kafka/topics_test.go | 4 +++ clients/pkg/promtail/utils/entries_test.go | 14 +++++++++- clients/pkg/promtail/wal/watcher_test.go | 26 +++++++++++++++++++ clients/pkg/promtail/wal/writer_test.go | 11 ++++++++ 12 files changed, 144 insertions(+), 5 deletions(-) diff --git a/clients/pkg/promtail/client/client_writeto_test.go b/clients/pkg/promtail/client/client_writeto_test.go index 3693b677f2ccf..4044d1641fb12 100644 --- a/clients/pkg/promtail/client/client_writeto_test.go +++ b/clients/pkg/promtail/client/client_writeto_test.go @@ -29,11 +29,14 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing ch := make(chan api.Entry) defer close(ch) + var mu sync.Mutex var receivedEntries []api.Entry go func() { for e := range ch { + mu.Lock() receivedEntries = append(receivedEntries, e) + mu.Unlock() } }() @@ -72,12 +75,16 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing } require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return len(receivedEntries) == len(lines) }, time.Second*10, time.Second) + mu.Lock() for _, receivedEntry := range receivedEntries { require.Contains(t, lines, receivedEntry.Line, "entry line was not expected") require.Equal(t, model.LabelValue("test"), receivedEntry.Labels["app"]) } + mu.Unlock() } func TestClientWriter_LogEntriesWithoutMatchingSeriesAreIgnored(t *testing.T) { diff --git a/clients/pkg/promtail/client/manager_test.go b/clients/pkg/promtail/client/manager_test.go index f11821c82120a..2105e6a90e3d9 100644 --- a/clients/pkg/promtail/client/manager_test.go +++ b/clients/pkg/promtail/client/manager_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/url" "os" + "sync" "testing" "time" @@ -127,10 +128,13 @@ func TestManager_WALEnabled(t *testing.T) { require.NoError(t, err) require.Equal(t, "wal:test-client", manager.Name()) + var mu sync.Mutex receivedRequests := []utils.RemoteWriteRequest{} go func() { for req := range rwReceivedReqs { + mu.Lock() receivedRequests = append(receivedRequests, req) + mu.Unlock() } }() @@ -155,17 +159,21 @@ func TestManager_WALEnabled(t *testing.T) { } require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return len(receivedRequests) == totalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} // assert over rw client received entries + mu.Lock() for _, req := range receivedRequests { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request") require.Equal(t, `{wal_enabled="true"}`, req.Request.Streams[0].Labels) seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{} } + mu.Unlock() require.Len(t, seenEntries, totalLines) } @@ -182,10 +190,13 @@ func TestManager_WALDisabled(t *testing.T) { require.NoError(t, err) require.Equal(t, "multi:test-client", manager.Name()) + var mu sync.Mutex receivedRequests := []utils.RemoteWriteRequest{} go func() { for req := range rwReceivedReqs { + mu.Lock() receivedRequests = append(receivedRequests, req) + mu.Unlock() } }() @@ -209,17 +220,21 @@ func TestManager_WALDisabled(t *testing.T) { } require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return len(receivedRequests) == totalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} // assert over rw client received entries + mu.Lock() for _, req := range receivedRequests { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request") require.Equal(t, `{pizza-flavour="fugazzeta"}`, req.Request.Streams[0].Labels) seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{} } + mu.Unlock() require.Len(t, seenEntries, totalLines) } @@ -250,15 +265,20 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) { require.NoError(t, err) require.Equal(t, "multi:test-client,test-client-2", manager.Name()) + var mu sync.Mutex receivedRequests := []utils.RemoteWriteRequest{} ctx, cancel := context.WithCancel(context.Background()) go func(ctx context.Context) { for { select { case req := <-rwReceivedReqs: + mu.Lock() receivedRequests = append(receivedRequests, req) + mu.Unlock() case req := <-rwReceivedReqs2: + mu.Lock() receivedRequests = append(receivedRequests, req) + mu.Unlock() case <-ctx.Done(): return } @@ -289,16 +309,20 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) { // times 2 due to clients being run expectedTotalLines := totalLines * 2 require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return len(receivedRequests) == expectedTotalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} // assert over rw client received entries + mu.Lock() for _, req := range receivedRequests { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request") seenEntries[fmt.Sprintf("%s-%s", req.Request.Streams[0].Labels, req.Request.Streams[0].Entries[0].Line)] = struct{}{} } + mu.Unlock() require.Len(t, seenEntries, expectedTotalLines) } diff --git a/clients/pkg/promtail/promtail_wal_test.go b/clients/pkg/promtail/promtail_wal_test.go index dfc7ce7273453..b4027ed2d9091 100644 --- a/clients/pkg/promtail/promtail_wal_test.go +++ b/clients/pkg/promtail/promtail_wal_test.go @@ -59,19 +59,25 @@ func TestPromtailWithWAL_SingleTenant(t *testing.T) { // create receive channel and start a collect routine receivedCh := make(chan utils.RemoteWriteRequest) received := map[string][]push.Entry{} + var mu sync.Mutex + // Create a channel for log messages + logCh := make(chan string, 100) // Buffered channel to avoid blocking + wg.Add(1) go func() { defer wg.Done() for req := range receivedCh { + mu.Lock() // Add some observability to the requests received in the remote write endpoint var counts []string for _, str := range req.Request.Streams { counts = append(counts, fmt.Sprint(len(str.Entries))) } - t.Logf("received request: %s", counts) + logCh <- fmt.Sprintf("received request: %s", counts) for _, stream := range req.Request.Streams { received[stream.Labels] = append(received[stream.Labels], stream.Entries...) } + mu.Unlock() } }() @@ -120,14 +126,23 @@ func TestPromtailWithWAL_SingleTenant(t *testing.T) { for i := 0; i < entriesToWrite; i++ { _, err = logsFile.WriteString(fmt.Sprintf("log line # %d\n", i)) if err != nil { - t.Logf("error writing to log file. Err: %s", err.Error()) + logCh <- fmt.Sprintf("error writing to log file. Err: %s", err.Error()) } // not overkill log file time.Sleep(1 * time.Millisecond) } }() + // Goroutine to handle log messages + go func() { + for msg := range logCh { + t.Log(msg) + } + }() + require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() if seen, ok := received[expectedLabelSet]; ok { return len(seen) == entriesToWrite } @@ -158,11 +173,13 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) { receivedCh := make(chan utils.RemoteWriteRequest) // received is a mapping from tenant, string-formatted label set to received entries received := map[string]map[string][]push.Entry{} + var mu sync.Mutex var totalReceived = 0 wg.Add(1) go func() { defer wg.Done() for req := range receivedCh { + mu.Lock() // start received label entries map if first time tenant is seen if _, ok := received[req.TenantID]; !ok { received[req.TenantID] = map[string][]push.Entry{} @@ -173,6 +190,7 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) { // increment total count totalReceived += len(stream.Entries) } + mu.Unlock() } }() @@ -250,15 +268,19 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) { // wait for all entries to be remote written require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return totalReceived == entriesToWrite }, time.Second*20, time.Second, "timed out waiting for entries to be remote written") // assert over received entries require.Len(t, received, expectedTenantCounts, "not expected tenant count") + mu.Lock() for tenantID := 0; tenantID < expectedTenantCounts; tenantID++ { // we should've received at least entriesToWrite / expectedTenantCounts require.GreaterOrEqual(t, len(received[fmt.Sprint(tenantID)][expectedLabelSet]), entriesToWrite/expectedTenantCounts) } + mu.Unlock() pr.Shutdown() close(receivedCh) diff --git a/clients/pkg/promtail/targets/cloudflare/util_test.go b/clients/pkg/promtail/targets/cloudflare/util_test.go index 18efefee5cb55..a702bb90f5ddf 100644 --- a/clients/pkg/promtail/targets/cloudflare/util_test.go +++ b/clients/pkg/promtail/targets/cloudflare/util_test.go @@ -3,6 +3,7 @@ package cloudflare import ( "context" "errors" + "sync" "time" "github.com/grafana/cloudflare-go" @@ -13,10 +14,13 @@ var ErrorLogpullReceived = errors.New("error logpull received") type fakeCloudflareClient struct { mock.Mock + mu sync.Mutex } func (f *fakeCloudflareClient) CallCount() int { var actualCalls int + f.mu.Lock() + defer f.mu.Unlock() for _, call := range f.Calls { if call.Method == "LogpullReceived" { actualCalls++ @@ -59,7 +63,9 @@ func newFakeCloudflareClient() *fakeCloudflareClient { } func (f *fakeCloudflareClient) LogpullReceived(ctx context.Context, start, end time.Time) (cloudflare.LogpullReceivedIterator, error) { + f.mu.Lock() r := f.Called(ctx, start, end) + f.mu.Unlock() if r.Get(0) != nil { it := r.Get(0).(cloudflare.LogpullReceivedIterator) if it.Err() == ErrorLogpullReceived { diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index 97dc10f148293..e0ae5834b6642 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -4,6 +4,7 @@ import ( "flag" "os" "path/filepath" + "sync" "time" "github.com/bmatcuk/doublestar" @@ -91,6 +92,7 @@ type FileTarget struct { fileEventWatcher chan fsnotify.Event targetEventHandler chan fileTargetEvent + mu sync.Mutex watches map[string]struct{} path string pathExclude string @@ -226,6 +228,8 @@ func (t *FileTarget) run() { } func (t *FileTarget) sync() error { + t.mu.Lock() + defer t.mu.Unlock() var matches, matchesExcluded []string if fi, err := os.Stat(t.path); err == nil && !fi.IsDir() { // if the path points to a file that exists, then it we can skip the Glob search diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index 57bc31b0802ee..583e0bc637386 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -75,6 +75,7 @@ func TestFileTargetSync(t *testing.T) { }, DefaultWatchConig, nil, fakeHandler, "", nil) assert.NoError(t, err) + target.mu.Lock() // Start with nothing watched. if len(target.watches) != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") @@ -82,6 +83,7 @@ func TestFileTargetSync(t *testing.T) { if len(target.readers) != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } + target.mu.Unlock() // Create the base dir, still nothing watched. err = os.MkdirAll(logDir1, 0750) @@ -138,12 +140,14 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) + target.mu.Lock() assert.Equal(t, 1, len(target.watches), "Expected watches to be 1 at this point in the test...", ) assert.Equal(t, 1, len(target.readers), "Expected tails to be 1 at this point in the test...", ) + target.mu.Unlock() // Remove the entire directory, other tailer should stop and watcher should go away. err = os.RemoveAll(logDir1) @@ -152,12 +156,14 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) + target.mu.Lock() assert.Equal(t, 0, len(target.watches), "Expected watches to be 0 at this point in the test...", ) assert.Equal(t, 0, len(target.readers), "Expected tails to be 0 at this point in the test...", ) + target.mu.Unlock() requireEventually(t, func() bool { return receivedStartWatch.Load() == 1 }, "Expected received starting watch event to be 1 at this point in the test...") @@ -198,6 +204,9 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) { assert.NoError(t, err) requireEventually(t, func() bool { + + target.mu.Lock() + defer target.mu.Unlock() return len(target.readers) == 1 }, "expected 1 tailer to be created") @@ -213,6 +222,8 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) { // Tailer will be replaced by a new one requireEventually(t, func() bool { + target.mu.Lock() + defer target.mu.Unlock() return len(target.readers) == 1 && target.readers[logFile].(*tailer) != initailTailer }, "expected dead tailer to be replaced by a new one") @@ -389,9 +400,11 @@ func TestFileTargetPathExclusion(t *testing.T) { assert.NoError(t, err) // Start with nothing watched. + target.mu.Lock() if len(target.watches) != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } + target.mu.Unlock() if len(target.readers) != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } diff --git a/clients/pkg/promtail/targets/kafka/target_syncer_test.go b/clients/pkg/promtail/targets/kafka/target_syncer_test.go index 1f0255cedf62e..76dcfa1420df7 100644 --- a/clients/pkg/promtail/targets/kafka/target_syncer_test.go +++ b/clients/pkg/promtail/targets/kafka/target_syncer_test.go @@ -52,21 +52,28 @@ func Test_TopicDiscovery(t *testing.T) { } ts.loop() + tmpTopics := []string{} require.Eventually(t, func() bool { if !group.consuming.Load() { return false } + group.mu.Lock() + defer group.mu.Unlock() + tmpTopics = group.topics return reflect.DeepEqual([]string{"topic1"}, group.topics) - }, 200*time.Millisecond, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1"}, group.topics) + }, 200*time.Millisecond, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1"}, tmpTopics) + client.mu.Lock() client.topics = []string{"topic1", "topic2"} // introduce new topics + client.mu.Unlock() require.Eventually(t, func() bool { if !group.consuming.Load() { return false } + tmpTopics = group.topics return reflect.DeepEqual([]string{"topic1", "topic2"}, group.topics) - }, 200*time.Millisecond, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1", "topic2"}, group.topics) + }, 200*time.Millisecond, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1", "topic2"}, tmpTopics) require.NoError(t, ts.Stop()) require.True(t, closed) diff --git a/clients/pkg/promtail/targets/kafka/target_test.go b/clients/pkg/promtail/targets/kafka/target_test.go index 0f8061027de3a..c786a2fe5e486 100644 --- a/clients/pkg/promtail/targets/kafka/target_test.go +++ b/clients/pkg/promtail/targets/kafka/target_test.go @@ -21,6 +21,7 @@ type testConsumerGroupHandler struct { handler sarama.ConsumerGroupHandler ctx context.Context topics []string + mu sync.Mutex returnErr error @@ -32,7 +33,9 @@ func (c *testConsumerGroupHandler) Consume(ctx context.Context, topics []string, return c.returnErr } c.ctx = ctx + c.mu.Lock() c.topics = topics + c.mu.Unlock() c.handler = handler c.consuming.Store(true) <-ctx.Done() diff --git a/clients/pkg/promtail/targets/kafka/topics_test.go b/clients/pkg/promtail/targets/kafka/topics_test.go index e24d8fd1eb604..447a8a0a65afc 100644 --- a/clients/pkg/promtail/targets/kafka/topics_test.go +++ b/clients/pkg/promtail/targets/kafka/topics_test.go @@ -3,12 +3,14 @@ package kafka import ( "errors" "strings" + "sync" "testing" "github.com/stretchr/testify/require" ) type mockKafkaClient struct { + mu sync.Mutex topics []string err error } @@ -18,6 +20,8 @@ func (m *mockKafkaClient) RefreshMetadata(_ ...string) error { } func (m *mockKafkaClient) Topics() ([]string, error) { + m.mu.Lock() + defer m.mu.Unlock() return m.topics, m.err } diff --git a/clients/pkg/promtail/utils/entries_test.go b/clients/pkg/promtail/utils/entries_test.go index c9b098d9ee4a4..0164794a89d2d 100644 --- a/clients/pkg/promtail/utils/entries_test.go +++ b/clients/pkg/promtail/utils/entries_test.go @@ -43,7 +43,14 @@ func TestFanoutEntryHandler_SuccessfulFanout(t *testing.T) { } require.Eventually(t, func() bool { - return len(eh1.Received) == len(expectedLines) && len(eh2.Received) == len(expectedLines) + eh1.mu.Lock() + len1 := len(eh1.Received) + eh1.mu.Unlock() + eh2.mu.Lock() + len2 := len(eh2.Received) + eh2.mu.Unlock() + + return len1 == len(expectedLines) && len2 == len(expectedLines) }, time.Second*10, time.Second, "expected entries to be received by fanned out channels") } @@ -77,6 +84,8 @@ func TestFanoutEntryHandler_TimeoutWaitingForEntriesToBeSent(t *testing.T) { }() require.Eventually(t, func() bool { + controlEH.mu.Lock() + defer controlEH.mu.Unlock() return len(controlEH.Received) == 1 }, time.Second*5, time.Second, "expected control entry handler to receive an entry") @@ -89,6 +98,7 @@ type savingEntryHandler struct { entries chan api.Entry Received []api.Entry wg sync.WaitGroup + mu sync.Mutex } func newSavingEntryHandler() *savingEntryHandler { @@ -99,7 +109,9 @@ func newSavingEntryHandler() *savingEntryHandler { eh.wg.Add(1) go func() { for e := range eh.entries { + eh.mu.Lock() eh.Received = append(eh.Received, e) + eh.mu.Unlock() } eh.wg.Done() }() diff --git a/clients/pkg/promtail/wal/watcher_test.go b/clients/pkg/promtail/wal/watcher_test.go index b41880f5d20ff..642b76702daa2 100644 --- a/clients/pkg/promtail/wal/watcher_test.go +++ b/clients/pkg/promtail/wal/watcher_test.go @@ -3,6 +3,7 @@ package wal import ( "fmt" "os" + "sync" "testing" "time" @@ -25,6 +26,7 @@ type testWriteTo struct { series map[uint64]model.LabelSet logger log.Logger ReceivedSeriesReset []int + mu sync.Mutex } func (t *testWriteTo) StoreSeries(series []record.RefSeries, _ int) { @@ -42,10 +44,12 @@ func (t *testWriteTo) AppendEntries(entries wal.RefEntries) error { var entry api.Entry if l, ok := t.series[uint64(entries.Ref)]; ok { entry.Labels = l + t.mu.Lock() for _, e := range entries.Entries { entry.Entry = e t.ReadEntries = append(t.ReadEntries, entry) } + t.mu.Unlock() } else { level.Debug(t.logger).Log("series for entry not found") } @@ -94,11 +98,15 @@ var cases = map[string]watcherTest{ res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 3 }, time.Second*10, time.Second, "expected watcher to catch up with written entries") + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries { require.Contains(t, lines, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() }, "read entries from WAL, just using backup timer to trigger reads": func(t *testing.T, res *watcherTestResources) { @@ -127,11 +135,15 @@ var cases = map[string]watcherTest{ // do not notify, let the backup timer trigger the watcher reads require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 3 }, time.Second*10, time.Second, "expected watcher to catch up with written entries") + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries { require.Contains(t, lines, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() }, "continue reading entries in next segment after initial segment is closed": func(t *testing.T, res *watcherTestResources) { @@ -164,11 +176,15 @@ var cases = map[string]watcherTest{ res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 3 }, time.Second*10, time.Second, "expected watcher to catch up with written entries") + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries { require.Contains(t, lines, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() err := res.nextWALSegment() require.NoError(t, err, "expected no error when moving to next wal segment") @@ -186,12 +202,16 @@ var cases = map[string]watcherTest{ res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 6 }, time.Second*10, time.Second, "expected watcher to catch up after new wal segment is cut") // assert over second half of entries + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries[3:] { require.Contains(t, linesAfter, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() }, "start reading from last segment": func(t *testing.T, res *watcherTestResources) { @@ -234,12 +254,16 @@ var cases = map[string]watcherTest{ res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 3 }, time.Second*10, time.Second, "expected watcher to catch up after new wal segment is cut") // assert over second half of entries + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries[3:] { require.Contains(t, linesAfter, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() }, "watcher receives segments reclaimed notifications correctly": func(t *testing.T, res *watcherTestResources) { @@ -259,6 +283,8 @@ var cases = map[string]watcherTest{ require.NoError(t, res.syncWAL()) res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == expectedReadEntries }, time.Second*10, time.Second, "expected watcher to catch up with written entries") } diff --git a/clients/pkg/promtail/wal/writer_test.go b/clients/pkg/promtail/wal/writer_test.go index a9c637f98b1ce..4dae546044933 100644 --- a/clients/pkg/promtail/wal/writer_test.go +++ b/clients/pkg/promtail/wal/writer_test.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "testing" "time" @@ -77,6 +78,8 @@ func TestWriter_OldSegmentsAreCleanedUp(t *testing.T) { maxSegmentAge := time.Second * 2 + var mu1 sync.Mutex + var mu2 sync.Mutex subscriber1 := []int{} subscriber2 := []int{} @@ -92,10 +95,14 @@ func TestWriter_OldSegmentsAreCleanedUp(t *testing.T) { // add writer events subscriber. Add multiple to test fanout writer.SubscribeCleanup(notifySegmentsCleanedFunc(func(num int) { + mu1.Lock() subscriber1 = append(subscriber1, num) + mu1.Unlock() })) writer.SubscribeCleanup(notifySegmentsCleanedFunc(func(num int) { + mu2.Lock() subscriber2 = append(subscriber2, num) + mu2.Unlock() })) // write entries to wal and sync @@ -148,11 +155,15 @@ func TestWriter_OldSegmentsAreCleanedUp(t *testing.T) { require.ErrorIs(t, err, os.ErrNotExist, "expected file not exists error") // assert all subscribers were notified + mu1.Lock() require.Len(t, subscriber1, 1, "expected one segment reclaimed notification in subscriber1") require.Equal(t, 0, subscriber1[0]) + mu1.Unlock() + mu2.Lock() require.Len(t, subscriber2, 1, "expected one segment reclaimed notification in subscriber2") require.Equal(t, 0, subscriber2[0]) + mu2.Unlock() // Expect last, or "head" segment to still be alive _, err = os.Stat(filepath.Join(dir, "00000001")) From 611ceee1344d2553c7f78683b05c7b6f5f7c7bbb Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Wed, 17 Apr 2024 10:17:35 -0400 Subject: [PATCH 2/7] A few more --- clients/pkg/promtail/targets/file/filetarget_test.go | 2 ++ clients/pkg/promtail/wal/watcher_test.go | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index 583e0bc637386..f1e677f9bcb27 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -92,12 +92,14 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) + target.mu.Lock() if len(target.watches) != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } if len(target.readers) != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } + target.mu.Unlock() // Add a file, which should create a watcher and a tailer. _, err = os.Create(logDir1File1) diff --git a/clients/pkg/promtail/wal/watcher_test.go b/clients/pkg/promtail/wal/watcher_test.go index 642b76702daa2..adf6dbef32de0 100644 --- a/clients/pkg/promtail/wal/watcher_test.go +++ b/clients/pkg/promtail/wal/watcher_test.go @@ -301,6 +301,8 @@ var cases = map[string]watcherTest{ // collecting segment 0 res.notifySegmentReclaimed(0) require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReceivedSeriesReset) == 1 && res.writeTo.ReceivedSeriesReset[0] == 0 }, time.Second*10, time.Second, "timed out waiting to receive series reset") @@ -316,6 +318,8 @@ var cases = map[string]watcherTest{ res.notifySegmentReclaimed(2) // Expect second SeriesReset call to have the highest numbered deleted segment, 2 require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() t.Logf("received series reset: %v", res.writeTo.ReceivedSeriesReset) return len(res.writeTo.ReceivedSeriesReset) == 2 && res.writeTo.ReceivedSeriesReset[1] == 2 }, time.Second*10, time.Second, "timed out waiting to receive series reset") From 90121c99812f9901ff4ef0b03c2c8c97017b6c6f Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Wed, 17 Apr 2024 10:41:03 -0400 Subject: [PATCH 3/7] Lint --- clients/pkg/promtail/targets/kafka/consumer_test.go | 4 +++- clients/pkg/promtail/targets/kafka/target_syncer_test.go | 3 ++- clients/pkg/promtail/targets/kafka/target_test.go | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/clients/pkg/promtail/targets/kafka/consumer_test.go b/clients/pkg/promtail/targets/kafka/consumer_test.go index 7420bdf6c1f11..a4d87e7c3c71e 100644 --- a/clients/pkg/promtail/targets/kafka/consumer_test.go +++ b/clients/pkg/promtail/targets/kafka/consumer_test.go @@ -3,6 +3,7 @@ package kafka import ( "context" "errors" + "sync" "testing" "time" @@ -34,7 +35,7 @@ func (f *fakeTarget) Details() interface{} { return nil } func Test_ComsumerConsume(t *testing.T) { var ( - group = &testConsumerGroupHandler{} + group = &testConsumerGroupHandler{mu: &sync.Mutex{}} session = &testSession{} ctx, cancel = context.WithCancel(context.Background()) c = &consumer{ @@ -86,6 +87,7 @@ func Test_ComsumerConsume(t *testing.T) { func Test_ComsumerRetry(_ *testing.T) { var ( group = &testConsumerGroupHandler{ + mu: &sync.Mutex{}, returnErr: errors.New("foo"), } ctx, cancel = context.WithCancel(context.Background()) diff --git a/clients/pkg/promtail/targets/kafka/target_syncer_test.go b/clients/pkg/promtail/targets/kafka/target_syncer_test.go index 76dcfa1420df7..6514afeefcb01 100644 --- a/clients/pkg/promtail/targets/kafka/target_syncer_test.go +++ b/clients/pkg/promtail/targets/kafka/target_syncer_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "sync" "testing" "time" @@ -24,7 +25,7 @@ import ( func Test_TopicDiscovery(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - group := &testConsumerGroupHandler{} + group := &testConsumerGroupHandler{mu: &sync.Mutex{}} TopicPollInterval = time.Microsecond var closed bool client := &mockKafkaClient{ diff --git a/clients/pkg/promtail/targets/kafka/target_test.go b/clients/pkg/promtail/targets/kafka/target_test.go index c786a2fe5e486..3ffe4ac69f16b 100644 --- a/clients/pkg/promtail/targets/kafka/target_test.go +++ b/clients/pkg/promtail/targets/kafka/target_test.go @@ -21,7 +21,7 @@ type testConsumerGroupHandler struct { handler sarama.ConsumerGroupHandler ctx context.Context topics []string - mu sync.Mutex + mu *sync.Mutex returnErr error From 39a62364829f22a2db3eb6480430b680ce742343 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 18 Apr 2024 09:55:34 -0400 Subject: [PATCH 4/7] Rework mutex usage to be contained within filetarget.go, resulting in cleaner code --- .../pkg/promtail/targets/file/filetarget.go | 76 ++++++++++++++++--- .../promtail/targets/file/filetarget_test.go | 70 ++++++++--------- 2 files changed, 95 insertions(+), 51 deletions(-) diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index e0ae5834b6642..2c71a39bcafac 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -92,14 +92,15 @@ type FileTarget struct { fileEventWatcher chan fsnotify.Event targetEventHandler chan fileTargetEvent - mu sync.Mutex watches map[string]struct{} + watchesMutex sync.Mutex path string pathExclude string quit chan struct{} done chan struct{} - readers map[string]Reader + readers map[string]Reader + readersMutex sync.Mutex targetConfig *Config watchConfig WatchConfig @@ -152,7 +153,10 @@ func NewFileTarget( // Ready if at least one file is being tailed func (t *FileTarget) Ready() bool { - return len(t.readers) > 0 + t.readersMutex.Lock() + readersLen := len(t.readers) + t.readersMutex.Unlock() + return readersLen > 0 } // Stop the target. @@ -180,17 +184,21 @@ func (t *FileTarget) Labels() model.LabelSet { // Details implements a Target func (t *FileTarget) Details() interface{} { files := map[string]int64{} + t.readersMutex.Lock() for fileName := range t.readers { files[fileName], _ = t.positions.Get(fileName) } + t.readersMutex.Unlock() return files } func (t *FileTarget) run() { defer func() { + t.readersMutex.Lock() for _, v := range t.readers { v.Stop() } + t.readersMutex.Unlock() level.Info(t.logger).Log("msg", "filetarget: watcher closed, tailer stopped, positions saved", "path", t.path) close(t.done) }() @@ -228,8 +236,6 @@ func (t *FileTarget) run() { } func (t *FileTarget) sync() error { - t.mu.Lock() - defer t.mu.Unlock() var matches, matchesExcluded []string if fi, err := os.Stat(t.path); err == nil && !fi.IsDir() { // if the path points to a file that exists, then it we can skip the Glob search @@ -285,15 +291,22 @@ func (t *FileTarget) sync() error { } // Add any directories which are not already being watched. + t.watchesMutex.Lock() toStartWatching := missing(t.watches, dirs) + t.watchesMutex.Unlock() t.startWatching(toStartWatching) // Remove any directories which no longer need watching. + t.watchesMutex.Lock() toStopWatching := missing(dirs, t.watches) + t.watchesMutex.Unlock() + t.stopWatching(toStopWatching) // fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves. + t.watchesMutex.Lock() t.watches = dirs + t.watchesMutex.Unlock() // Check if any running tailers have stopped because of errors and remove them from the running list // (They will be restarted in startTailing) @@ -303,7 +316,9 @@ func (t *FileTarget) sync() error { t.startTailing(matches) // Stop tailing any files which no longer exist + t.readersMutex.Lock() toStopTailing := toStopTailing(matches, t.readers) + t.readersMutex.Unlock() t.stopTailingAndRemovePosition(toStopTailing) return nil @@ -311,9 +326,10 @@ func (t *FileTarget) sync() error { func (t *FileTarget) startWatching(dirs map[string]struct{}) { for dir := range dirs { - if _, ok := t.watches[dir]; ok { + if _, ok := t.getWatch(dir); ok { continue } + level.Info(t.logger).Log("msg", "watching new directory", "directory", dir) t.targetEventHandler <- fileTargetEvent{ path: dir, @@ -324,9 +340,10 @@ func (t *FileTarget) startWatching(dirs map[string]struct{}) { func (t *FileTarget) stopWatching(dirs map[string]struct{}) { for dir := range dirs { - if _, ok := t.watches[dir]; !ok { + if _, ok := t.getWatch(dir); !ok { continue } + level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir) t.targetEventHandler <- fileTargetEvent{ path: dir, @@ -337,7 +354,7 @@ func (t *FileTarget) stopWatching(dirs map[string]struct{}) { func (t *FileTarget) startTailing(ps []string) { for _, p := range ps { - if _, ok := t.readers[p]; ok { + if _, ok := t.getReader(p); ok { continue } @@ -391,7 +408,9 @@ func (t *FileTarget) startTailing(ps []string) { } reader = tailer } + t.readersMutex.Lock() t.readers[p] = reader + t.readersMutex.Unlock() } } @@ -399,10 +418,10 @@ func (t *FileTarget) startTailing(ps []string) { // Call this when a file no longer exists and you want to remove all traces of it. func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { for _, p := range ps { - if reader, ok := t.readers[p]; ok { + if reader, ok := t.getReader(p); ok { reader.Stop() t.positions.Remove(reader.Path()) - delete(t.readers, p) + t.removeReader(p) } } } @@ -410,6 +429,7 @@ func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { // pruneStoppedTailers removes any tailers which have stopped running from // the list of active tailers. This allows them to be restarted if there were errors. func (t *FileTarget) pruneStoppedTailers() { + t.readersMutex.Lock() toRemove := make([]string, 0, len(t.readers)) for k, t := range t.readers { if !t.IsRunning() { @@ -419,6 +439,39 @@ func (t *FileTarget) pruneStoppedTailers() { for _, tr := range toRemove { delete(t.readers, tr) } + t.readersMutex.Unlock() +} + +func (t *FileTarget) getReadersLen() int { + t.readersMutex.Lock() + defer t.readersMutex.Unlock() + return len(t.readers) +} + +func (t *FileTarget) getReader(val string) (Reader, bool) { + t.readersMutex.Lock() + defer t.readersMutex.Unlock() + reader, ok := t.readers[val] + return reader, ok +} + +func (t *FileTarget) getWatch(val string) (struct{}, bool) { + t.watchesMutex.Lock() + defer t.watchesMutex.Unlock() + fileTarget, ok := t.watches[val] + return fileTarget, ok +} + +func (t *FileTarget) removeReader(val string) { + t.readersMutex.Lock() + defer t.readersMutex.Unlock() + delete(t.readers, val) +} + +func (t *FileTarget) getWatchesLen() int { + t.watchesMutex.Lock() + defer t.watchesMutex.Unlock() + return len(t.watches) } func toStopTailing(nt []string, et map[string]Reader) []string { @@ -446,7 +499,7 @@ func toStopTailing(nt []string, et map[string]Reader) []string { func (t *FileTarget) reportSize(ms []string) { for _, m := range ms { // Ask the tailer to update the size if a tailer exists, this keeps position and size metrics in sync - if reader, ok := t.readers[m]; ok { + if reader, ok := t.getReader(m); ok { err := reader.MarkPositionAndSize() if err != nil { level.Warn(t.logger).Log("msg", "failed to get file size from tailer, ", "file", m, "error", err) @@ -463,7 +516,6 @@ func (t *FileTarget) reportSize(ms []string) { } t.metrics.totalBytes.WithLabelValues(m).Set(float64(fi.Size())) } - } } diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index f1e677f9bcb27..5064b79673d3d 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -75,15 +75,13 @@ func TestFileTargetSync(t *testing.T) { }, DefaultWatchConig, nil, fakeHandler, "", nil) assert.NoError(t, err) - target.mu.Lock() // Start with nothing watched. - if len(target.watches) != 0 { + if target.getWatchesLen() != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } - if len(target.readers) != 0 { + if target.getReadersLen() != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } - target.mu.Unlock() // Create the base dir, still nothing watched. err = os.MkdirAll(logDir1, 0750) @@ -92,14 +90,12 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - target.mu.Lock() - if len(target.watches) != 0 { + if target.getWatchesLen() != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } - if len(target.readers) != 0 { + if target.getReadersLen() != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } - target.mu.Unlock() // Add a file, which should create a watcher and a tailer. _, err = os.Create(logDir1File1) @@ -110,10 +106,10 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 1, len(target.watches), + assert.Equal(t, 1, target.getWatchesLen(), "Expected watches to be 1 at this point in the test...", ) - assert.Equal(t, 1, len(target.readers), + assert.Equal(t, 1, target.getReadersLen(), "Expected tails to be 1 at this point in the test...", ) @@ -128,10 +124,10 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 1, len(target.watches), + assert.Equal(t, 1, target.getWatchesLen(), "Expected watches to be 1 at this point in the test...", ) - assert.Equal(t, 2, len(target.readers), + assert.Equal(t, 2, target.getReadersLen(), "Expected tails to be 2 at this point in the test...", ) @@ -142,14 +138,12 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - target.mu.Lock() - assert.Equal(t, 1, len(target.watches), + assert.Equal(t, 1, target.getWatchesLen(), "Expected watches to be 1 at this point in the test...", ) - assert.Equal(t, 1, len(target.readers), + assert.Equal(t, 1, target.getReadersLen(), "Expected tails to be 1 at this point in the test...", ) - target.mu.Unlock() // Remove the entire directory, other tailer should stop and watcher should go away. err = os.RemoveAll(logDir1) @@ -158,14 +152,12 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - target.mu.Lock() - assert.Equal(t, 0, len(target.watches), + assert.Equal(t, 0, target.getWatchesLen(), "Expected watches to be 0 at this point in the test...", ) - assert.Equal(t, 0, len(target.readers), + assert.Equal(t, 0, target.getReadersLen(), "Expected tails to be 0 at this point in the test...", ) - target.mu.Unlock() requireEventually(t, func() bool { return receivedStartWatch.Load() == 1 }, "Expected received starting watch event to be 1 at this point in the test...") @@ -206,10 +198,7 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) { assert.NoError(t, err) requireEventually(t, func() bool { - - target.mu.Lock() - defer target.mu.Unlock() - return len(target.readers) == 1 + return target.getReadersLen() == 1 }, "expected 1 tailer to be created") require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` @@ -219,14 +208,19 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) { `), "promtail_files_active_total")) // Inject an error to tailer - initailTailer := target.readers[logFile].(*tailer) + + initialReader, _ := target.getReader(logFile) + initailTailer := initialReader.(*tailer) _ = initailTailer.tail.Tomb.Killf("test: network file systems can be unreliable") // Tailer will be replaced by a new one requireEventually(t, func() bool { - target.mu.Lock() - defer target.mu.Unlock() - return len(target.readers) == 1 && target.readers[logFile].(*tailer) != initailTailer + currentReader, _ := target.getReader(logFile) + var currentTailer *tailer = nil + if currentReader != nil { + currentTailer = currentReader.(*tailer) + } + return target.getReadersLen() == 1 && currentTailer != initailTailer }, "expected dead tailer to be replaced by a new one") // The old tailer should be stopped: @@ -402,12 +396,10 @@ func TestFileTargetPathExclusion(t *testing.T) { assert.NoError(t, err) // Start with nothing watched. - target.mu.Lock() - if len(target.watches) != 0 { + if target.getWatchesLen() != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } - target.mu.Unlock() - if len(target.readers) != 0 { + if target.getReadersLen() != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } @@ -422,10 +414,10 @@ func TestFileTargetPathExclusion(t *testing.T) { err = target.sync() assert.NoError(t, err) - if len(target.watches) != 0 { + if target.getWatchesLen() != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } - if len(target.readers) != 0 { + if target.getReadersLen() != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } @@ -440,10 +432,10 @@ func TestFileTargetPathExclusion(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 2, len(target.watches), + assert.Equal(t, 2, target.getWatchesLen(), "Expected watches to be 2 at this point in the test...", ) - assert.Equal(t, 3, len(target.readers), + assert.Equal(t, 3, target.getReadersLen(), "Expected tails to be 3 at this point in the test...", ) requireEventually(t, func() bool { @@ -461,10 +453,10 @@ func TestFileTargetPathExclusion(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 1, len(target.watches), + assert.Equal(t, 1, target.getWatchesLen(), "Expected watches to be 1 at this point in the test...", ) - assert.Equal(t, 1, len(target.readers), + assert.Equal(t, 1, target.getReadersLen(), "Expected tails to be 1 at this point in the test...", ) requireEventually(t, func() bool { @@ -553,7 +545,7 @@ func TestHandleFileCreationEvent(t *testing.T) { Op: fsnotify.Create, } requireEventually(t, func() bool { - return len(target.readers) == 1 + return target.getReadersLen() == 1 }, "Expected tails to be 1 at this point in the test...") } From 9d13e3b03e22f67487840be3d50ef66f0da38cbe Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 18 Apr 2024 11:07:05 -0400 Subject: [PATCH 5/7] Lint --- clients/pkg/promtail/targets/file/filetarget_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index 5064b79673d3d..579ea19e2e56e 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -216,7 +216,7 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) { // Tailer will be replaced by a new one requireEventually(t, func() bool { currentReader, _ := target.getReader(logFile) - var currentTailer *tailer = nil + var currentTailer *tailer if currentReader != nil { currentTailer = currentReader.(*tailer) } From 792004a6577911fa4c86ed849617954f9de34fd0 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 18 Apr 2024 15:45:51 -0400 Subject: [PATCH 6/7] Code review comment --- clients/pkg/promtail/targets/file/filetarget.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index 2c71a39bcafac..eca4883003c18 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -153,10 +153,7 @@ func NewFileTarget( // Ready if at least one file is being tailed func (t *FileTarget) Ready() bool { - t.readersMutex.Lock() - readersLen := len(t.readers) - t.readersMutex.Unlock() - return readersLen > 0 + return t.getReadersLen() > 0 } // Stop the target. From f932a31b657001ff5d80b4234dacece9e1ea772a Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 18 Apr 2024 16:12:56 -0400 Subject: [PATCH 7/7] Add setReader() function --- clients/pkg/promtail/targets/file/filetarget.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index eca4883003c18..0ade51902b492 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -405,9 +405,7 @@ func (t *FileTarget) startTailing(ps []string) { } reader = tailer } - t.readersMutex.Lock() - t.readers[p] = reader - t.readersMutex.Unlock() + t.setReader(p, reader) } } @@ -452,6 +450,12 @@ func (t *FileTarget) getReader(val string) (Reader, bool) { return reader, ok } +func (t *FileTarget) setReader(val string, reader Reader) { + t.readersMutex.Lock() + defer t.readersMutex.Unlock() + t.readers[val] = reader +} + func (t *FileTarget) getWatch(val string) (struct{}, bool) { t.watchesMutex.Lock() defer t.watchesMutex.Unlock()