Skip to content

Commit

Permalink
Fixes a race when using specific tenant and multi-client. (#3573)
Browse files Browse the repository at this point in the history
* Fixes a race when using specific tenant and multi-client.

This was because each client would try to mutate the original set of labels.
Instead of cloning the map which can be expensive, I created a little helper that generate the string
from the labelset without a set of label name.

Fixes #3571

I also added a test to see if this was reproducible and it was indeed:

```
~/go/src/github.com/grafana/loki master*
❯ go test -timeout 30s -tags dev,gofuzz -race -run ^TestMultiClient_Handle_Race$ github.com/grafana/loki/pkg/promtail/client -v -count=1 -timeout=0s
=== RUN   TestMultiClient_Handle_Race
==================
WARNING: DATA RACE
Read at 0x00c0000b77d0 by goroutine 22:
  runtime.mapaccess2_faststr()
      /usr/local/Cellar/go/1.16.2/libexec/src/runtime/map_faststr.go:107 +0x0
  github.com/grafana/loki/pkg/promtail/client.(*client).getTenantID()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/client.go:376 +0xcb
  github.com/grafana/loki/pkg/promtail/client.(*client).processEntry()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/client.go:407 +0x9a
  github.com/grafana/loki/pkg/promtail/client.(*client).run()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/client.go:235 +0x36a

Previous write at 0x00c0000b77d0 by goroutine 21:
  runtime.mapdelete_faststr()
      /usr/local/Cellar/go/1.16.2/libexec/src/runtime/map_faststr.go:297 +0x0
  github.com/grafana/loki/pkg/promtail/client.(*client).processEntry()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/client.go:408 +0x144
  github.com/grafana/loki/pkg/promtail/client.(*client).run()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/client.go:235 +0x36a

Goroutine 22 (running) created at:
  github.com/grafana/loki/pkg/promtail/client.New()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/client.go:198 +0x7dc
  github.com/grafana/loki/pkg/promtail/client.TestMultiClient_Handle_Race()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/multi_test.go:126 +0x2c7
  testing.tRunner()
      /usr/local/Cellar/go/1.16.2/libexec/src/testing/testing.go:1194 +0x202

Goroutine 21 (running) created at:
  github.com/grafana/loki/pkg/promtail/client.New()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/client.go:198 +0x7dc
  github.com/grafana/loki/pkg/promtail/client.TestMultiClient_Handle_Race()
      /Users/ctovena/go/src/github.com/grafana/loki/pkg/promtail/client/multi_test.go:124 +0x187
  testing.tRunner()
      /usr/local/Cellar/go/1.16.2/libexec/src/testing/testing.go:1194 +0x202
==================
    testing.go:1093: race detected during execution of test
--- FAIL: TestMultiClient_Handle_Race (0.00s)
=== CONT
    testing.go:1093: race detected during execution of test
FAIL
FAIL	github.com/grafana/loki/pkg/promtail/client	0.022s
FAIL
```

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* don't overload variable name

Co-authored-by: Edward Welch <edward.welch@grafana.com>
(cherry picked from commit 0107a11)
  • Loading branch information
cyriltovena authored and slim-bean committed Apr 6, 2021
1 parent e6bddfc commit a1a9e6b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
22 changes: 21 additions & 1 deletion pkg/promtail/client/batch.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package client

import (
"fmt"
"sort"
"strings"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
Expand Down Expand Up @@ -40,7 +44,7 @@ func (b *batch) add(entry api.Entry) {
b.bytes += len(entry.Line)

// Append the entry to an already existing stream (if any)
labels := entry.Labels.String()
labels := labelsMapToString(entry.Labels, ReservedLabelTenantID)
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry.Entry)
return
Expand All @@ -53,6 +57,22 @@ func (b *batch) add(entry api.Entry) {
}
}

func labelsMapToString(ls model.LabelSet, without ...model.LabelName) string {
lstrs := make([]string, 0, len(ls))
Outer:
for l, v := range ls {
for _, w := range without {
if l == w {
continue Outer
}
}
lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))
}

sort.Strings(lstrs)
return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))
}

// sizeBytes returns the current batch size in bytes
func (b *batch) sizeBytes() int {
return b.bytes
Expand Down
1 change: 0 additions & 1 deletion pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,6 @@ func (c *client) processEntry(e api.Entry) (api.Entry, string) {
e.Labels = c.externalLabels.Merge(e.Labels)
}
tenantID := c.getTenantID(e.Labels)
delete(e.Labels, ReservedLabelTenantID)
return e, tenantID
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/promtail/client/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
Expand Down Expand Up @@ -115,3 +118,22 @@ func TestMultiClient_Handle(t *testing.T) {
t.Fatal("missing handle call")
}
}

func TestMultiClient_Handle_Race(t *testing.T) {
u := flagext.URLValue{}
require.NoError(t, u.Set("http://localhost"))
c1, err := New(nil, Config{URL: u, BackoffConfig: util.BackoffConfig{MaxRetries: 1}, Timeout: time.Microsecond}, log.NewNopLogger())
require.NoError(t, err)
c2, err := New(nil, Config{URL: u, BackoffConfig: util.BackoffConfig{MaxRetries: 1}, Timeout: time.Microsecond}, log.NewNopLogger())
require.NoError(t, err)
clients := []Client{c1, c2}
m := &MultiClient{
clients: clients,
entries: make(chan api.Entry),
}
m.start()

m.Chan() <- api.Entry{Labels: model.LabelSet{"foo": "bar", ReservedLabelTenantID: "1"}, Entry: logproto.Entry{Line: "foo"}}

m.Stop()
}

0 comments on commit a1a9e6b

Please sign in to comment.