Skip to content

Commit

Permalink
fix: promtail race fixes (#12656)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r authored Apr 19, 2024
1 parent 4a05964 commit 4e04d07
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 41 deletions.
7 changes: 7 additions & 0 deletions clients/pkg/promtail/client/client_writeto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing
ch := make(chan api.Entry)
defer close(ch)

var mu sync.Mutex
var receivedEntries []api.Entry

go func() {
for e := range ch {
mu.Lock()
receivedEntries = append(receivedEntries, e)
mu.Unlock()
}
}()

Expand Down Expand Up @@ -72,12 +75,16 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing
}

require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedEntries) == len(lines)
}, time.Second*10, time.Second)
mu.Lock()
for _, receivedEntry := range receivedEntries {
require.Contains(t, lines, receivedEntry.Line, "entry line was not expected")
require.Equal(t, model.LabelValue("test"), receivedEntry.Labels["app"])
}
mu.Unlock()
}

func TestClientWriter_LogEntriesWithoutMatchingSeriesAreIgnored(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions clients/pkg/promtail/client/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/url"
"os"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -127,10 +128,13 @@ func TestManager_WALEnabled(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "wal:test-client", manager.Name())

var mu sync.Mutex
receivedRequests := []utils.RemoteWriteRequest{}
go func() {
for req := range rwReceivedReqs {
mu.Lock()
receivedRequests = append(receivedRequests, req)
mu.Unlock()
}
}()

Expand All @@ -155,17 +159,21 @@ func TestManager_WALEnabled(t *testing.T) {
}

require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedRequests) == totalLines
}, 5*time.Second, time.Second, "timed out waiting for requests to be received")

var seenEntries = map[string]struct{}{}
// assert over rw client received entries
mu.Lock()
for _, req := range receivedRequests {
require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received")
require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
require.Equal(t, `{wal_enabled="true"}`, req.Request.Streams[0].Labels)
seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{}
}
mu.Unlock()
require.Len(t, seenEntries, totalLines)
}

Expand All @@ -182,10 +190,13 @@ func TestManager_WALDisabled(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "multi:test-client", manager.Name())

var mu sync.Mutex
receivedRequests := []utils.RemoteWriteRequest{}
go func() {
for req := range rwReceivedReqs {
mu.Lock()
receivedRequests = append(receivedRequests, req)
mu.Unlock()
}
}()

Expand All @@ -209,17 +220,21 @@ func TestManager_WALDisabled(t *testing.T) {
}

require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedRequests) == totalLines
}, 5*time.Second, time.Second, "timed out waiting for requests to be received")

var seenEntries = map[string]struct{}{}
// assert over rw client received entries
mu.Lock()
for _, req := range receivedRequests {
require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received")
require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
require.Equal(t, `{pizza-flavour="fugazzeta"}`, req.Request.Streams[0].Labels)
seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{}
}
mu.Unlock()
require.Len(t, seenEntries, totalLines)
}

Expand Down Expand Up @@ -250,15 +265,20 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "multi:test-client,test-client-2", manager.Name())

var mu sync.Mutex
receivedRequests := []utils.RemoteWriteRequest{}
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case req := <-rwReceivedReqs:
mu.Lock()
receivedRequests = append(receivedRequests, req)
mu.Unlock()
case req := <-rwReceivedReqs2:
mu.Lock()
receivedRequests = append(receivedRequests, req)
mu.Unlock()
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -289,16 +309,20 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) {
// times 2 due to clients being run
expectedTotalLines := totalLines * 2
require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedRequests) == expectedTotalLines
}, 5*time.Second, time.Second, "timed out waiting for requests to be received")

var seenEntries = map[string]struct{}{}
// assert over rw client received entries
mu.Lock()
for _, req := range receivedRequests {
require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received")
require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
seenEntries[fmt.Sprintf("%s-%s", req.Request.Streams[0].Labels, req.Request.Streams[0].Entries[0].Line)] = struct{}{}
}
mu.Unlock()
require.Len(t, seenEntries, expectedTotalLines)
}

Expand Down
26 changes: 24 additions & 2 deletions clients/pkg/promtail/promtail_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,25 @@ func TestPromtailWithWAL_SingleTenant(t *testing.T) {
// create receive channel and start a collect routine
receivedCh := make(chan utils.RemoteWriteRequest)
received := map[string][]push.Entry{}
var mu sync.Mutex
// Create a channel for log messages
logCh := make(chan string, 100) // Buffered channel to avoid blocking

wg.Add(1)
go func() {
defer wg.Done()
for req := range receivedCh {
mu.Lock()
// Add some observability to the requests received in the remote write endpoint
var counts []string
for _, str := range req.Request.Streams {
counts = append(counts, fmt.Sprint(len(str.Entries)))
}
t.Logf("received request: %s", counts)
logCh <- fmt.Sprintf("received request: %s", counts)
for _, stream := range req.Request.Streams {
received[stream.Labels] = append(received[stream.Labels], stream.Entries...)
}
mu.Unlock()
}
}()

Expand Down Expand Up @@ -120,14 +126,23 @@ func TestPromtailWithWAL_SingleTenant(t *testing.T) {
for i := 0; i < entriesToWrite; i++ {
_, err = logsFile.WriteString(fmt.Sprintf("log line # %d\n", i))
if err != nil {
t.Logf("error writing to log file. Err: %s", err.Error())
logCh <- fmt.Sprintf("error writing to log file. Err: %s", err.Error())
}
// not overkill log file
time.Sleep(1 * time.Millisecond)
}
}()

// Goroutine to handle log messages
go func() {
for msg := range logCh {
t.Log(msg)
}
}()

require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
if seen, ok := received[expectedLabelSet]; ok {
return len(seen) == entriesToWrite
}
Expand Down Expand Up @@ -158,11 +173,13 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) {
receivedCh := make(chan utils.RemoteWriteRequest)
// received is a mapping from tenant, string-formatted label set to received entries
received := map[string]map[string][]push.Entry{}
var mu sync.Mutex
var totalReceived = 0
wg.Add(1)
go func() {
defer wg.Done()
for req := range receivedCh {
mu.Lock()
// start received label entries map if first time tenant is seen
if _, ok := received[req.TenantID]; !ok {
received[req.TenantID] = map[string][]push.Entry{}
Expand All @@ -173,6 +190,7 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) {
// increment total count
totalReceived += len(stream.Entries)
}
mu.Unlock()
}
}()

Expand Down Expand Up @@ -250,15 +268,19 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) {

// wait for all entries to be remote written
require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return totalReceived == entriesToWrite
}, time.Second*20, time.Second, "timed out waiting for entries to be remote written")

// assert over received entries
require.Len(t, received, expectedTenantCounts, "not expected tenant count")
mu.Lock()
for tenantID := 0; tenantID < expectedTenantCounts; tenantID++ {
// we should've received at least entriesToWrite / expectedTenantCounts
require.GreaterOrEqual(t, len(received[fmt.Sprint(tenantID)][expectedLabelSet]), entriesToWrite/expectedTenantCounts)
}
mu.Unlock()

pr.Shutdown()
close(receivedCh)
Expand Down
6 changes: 6 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudflare
import (
"context"
"errors"
"sync"
"time"

"github.com/grafana/cloudflare-go"
Expand All @@ -13,10 +14,13 @@ var ErrorLogpullReceived = errors.New("error logpull received")

type fakeCloudflareClient struct {
mock.Mock
mu sync.Mutex
}

func (f *fakeCloudflareClient) CallCount() int {
var actualCalls int
f.mu.Lock()
defer f.mu.Unlock()
for _, call := range f.Calls {
if call.Method == "LogpullReceived" {
actualCalls++
Expand Down Expand Up @@ -59,7 +63,9 @@ func newFakeCloudflareClient() *fakeCloudflareClient {
}

func (f *fakeCloudflareClient) LogpullReceived(ctx context.Context, start, end time.Time) (cloudflare.LogpullReceivedIterator, error) {
f.mu.Lock()
r := f.Called(ctx, start, end)
f.mu.Unlock()
if r.Get(0) != nil {
it := r.Get(0).(cloudflare.LogpullReceivedIterator)
if it.Err() == ErrorLogpullReceived {
Expand Down
Loading

0 comments on commit 4e04d07

Please sign in to comment.