Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Improved efficiency of multitsdb appends, upgraded Prometheus deps. #4078

Merged
merged 7 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/thanos-io/thanos

require (
cloud.google.com/go v0.74.0
cloud.google.com/go v0.79.0
cloud.google.com/go/storage v1.10.0
github.com/Azure/azure-pipeline-go v0.2.2
github.com/Azure/azure-storage-blob-go v0.8.0
Expand All @@ -10,6 +10,7 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.1
github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0
github.com/chromedp/chromedp v0.5.3
github.com/cortexproject/cortex v1.7.1-0.20210316085356-3fedc1108a49
Expand All @@ -33,7 +34,7 @@ require (
github.com/leanovate/gopter v0.2.4
github.com/lightstep/lightstep-tracer-go v0.18.1
github.com/lovoo/gcloud-opentracing v0.3.0
github.com/miekg/dns v1.1.38
github.com/miekg/dns v1.1.41
github.com/minio/minio-go/v7 v7.0.10
github.com/mozillazg/go-cos v0.13.0
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
Expand All @@ -48,8 +49,8 @@ require (
github.com/prometheus/alertmanager v0.21.1-0.20210310093010-0f9cab6991e6
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.18.0
github.com/prometheus/prometheus v1.8.2-0.20210315220929-1cba1741828b
github.com/prometheus/common v0.20.0
github.com/prometheus/prometheus v1.8.2-0.20210413124018-62afcabd01ea
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
Expand All @@ -58,13 +59,13 @@ require (
go.uber.org/atomic v1.7.0
go.uber.org/automaxprocs v1.2.0
go.uber.org/goleak v1.1.10
golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9
golang.org/x/oauth2 v0.0.0-20210210192628-66670185b0cd
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/oauth2 v0.0.0-20210323180902-22b0adad7558
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/text v0.3.5
google.golang.org/api v0.39.0
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d
google.golang.org/grpc v1.34.0
google.golang.org/api v0.42.0
google.golang.org/genproto v0.0.0-20210312152112-fc591d9ea70f
google.golang.org/grpc v1.36.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -81,7 +82,7 @@ replace (
// TODO: Remove this: https://github.com/thanos-io/thanos/issues/3967.
github.com/minio/minio-go/v7 => github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210315220929-1cba1741828b
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210413124018-62afcabd01ea
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
google.golang.org/grpc => google.golang.org/grpc v1.29.1

Expand Down
123 changes: 80 additions & 43 deletions go.sum

Large diffs are not rendered by default.

32 changes: 20 additions & 12 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
package receive

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"io"
stdlog "log"
"net"
"net/http"
Expand Down Expand Up @@ -265,7 +266,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string,
replicated: rep != 0,
}

// on-the-wire format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated.
// On the wire, format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated.
if r.replicated {
r.n--
}
Expand All @@ -280,17 +281,24 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
defer span.Finish()

// TODO(bwplotka): Optimize readAll https://github.com/thanos-io/thanos/pull/3334/files.
compressed, err := ioutil.ReadAll(r.Body)
// ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
if r.ContentLength >= 0 {
compressed.Grow(int(r.ContentLength))
} else {
compressed.Grow(512)
}
_, err := io.Copy(&compressed, r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
return
}

reqBuf, err := snappy.Decode(nil, compressed)
reqBuf, err := snappy.Decode(nil, compressed.Bytes())
if err != nil {
level.Error(h.logger).Log("msg", "snappy decode error", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
return
}

Expand Down Expand Up @@ -413,9 +421,9 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
}
}()

logger := log.With(h.logger, "tenant", tenant)
logTags := []interface{}{"tenant", tenant}
if id, ok := middleware.RequestIDFromContext(pctx); ok {
logger = log.With(logger, "request-id", id)
logTags = append(logTags, "request-id", id)
}

ec := make(chan error)
Expand Down Expand Up @@ -465,7 +473,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(h.logger).Log("msg", "local tsdb write failed", "err", err.Error())
level.Debug(h.logger).Log(append(logTags, "msg", "local tsdb write failed", "err", err.Error()))
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
return
}
Expand Down Expand Up @@ -528,7 +536,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
b.attempt++
dur := h.expBackoff.ForAttempt(b.attempt)
b.nextAllowed = time.Now().Add(dur)
level.Debug(h.logger).Log("msg", "target unavailable backing off", "for", dur)
level.Debug(h.logger).Log(append(logTags, "msg", "target unavailable backing off", "for", dur))
} else {
h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
}
Expand Down Expand Up @@ -557,7 +565,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
go func() {
for err := range ec {
if err != nil {
level.Debug(logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
level.Debug(h.logger).Log(append(logTags, "msg", "request failed, but not needed to achieve quorum", "err", err))
}
}
}()
Expand Down
116 changes: 113 additions & 3 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -185,6 +186,108 @@ func TestDetermineWriteErrorCause(t *testing.T) {
}
}

type fakeTenantAppendable struct {
f *fakeAppendable
}

func newFakeTenantAppendable(f *fakeAppendable) *fakeTenantAppendable {
return &fakeTenantAppendable{f: f}
}

func (t *fakeTenantAppendable) TenantAppendable(_ string) (Appendable, error) {
return t.f, nil
}

type fakeAppendable struct {
appender storage.Appender
appenderErr func() error
}

var _ Appendable = &fakeAppendable{}

func nilErrFn() error {
return nil
}

func (f *fakeAppendable) Appender(_ context.Context) (storage.Appender, error) {
errf := f.appenderErr
if errf == nil {
errf = nilErrFn
}
return f.appender, errf()
}

type fakeAppender struct {
sync.Mutex
samples map[uint64][]prompb.Sample
exemplars map[uint64][]exemplar.Exemplar
appendErr func() error
commitErr func() error
rollbackErr func() error
}

var _ storage.Appender = &fakeAppender{}
var _ storage.GetRef = &fakeAppender{}

func newFakeAppender(appendErr, commitErr, rollbackErr func() error) *fakeAppender { //nolint:unparam
if appendErr == nil {
appendErr = nilErrFn
}
if commitErr == nil {
commitErr = nilErrFn
}
if rollbackErr == nil {
rollbackErr = nilErrFn
}
return &fakeAppender{
samples: make(map[uint64][]prompb.Sample),
appendErr: appendErr,
commitErr: commitErr,
rollbackErr: rollbackErr,
}
}

func (f *fakeAppender) Get(l labels.Labels) []prompb.Sample {
f.Lock()
defer f.Unlock()
s := f.samples[l.Hash()]
res := make([]prompb.Sample, len(s))
copy(res, s)
return res
}

func (f *fakeAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) {
f.Lock()
defer f.Unlock()
if ref == 0 {
ref = l.Hash()
}
f.samples[ref] = append(f.samples[ref], prompb.Sample{Timestamp: t, Value: v})
return ref, f.appendErr()
}

func (f *fakeAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
f.Lock()
defer f.Unlock()
if ref == 0 {
ref = l.Hash()
}
f.exemplars[ref] = append(f.exemplars[ref], e)
return ref, f.appendErr()
}

func (f *fakeAppender) GetRef(l labels.Labels) (uint64, labels.Labels) {
return l.Hash(), l
}

func (f *fakeAppender) Commit() error {
return f.commitErr()
}

func (f *fakeAppender) Rollback() error {
return f.rollbackErr()
}

func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) {
var (
cfg = []HashringConfig{{Hashring: "test"}}
Expand Down Expand Up @@ -1010,6 +1113,10 @@ func (a *tsOverrideAppender) Append(ref uint64, l labels.Labels, _ int64, v floa
return a.Appender.Append(ref, l, cnt, v)
}

func (a *tsOverrideAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) {
return a.Appender.(storage.GetRef).GetRef(lset)
}

// serializeSeriesWithOneSample returns marshaled and compressed remote write requests like it would
// be send to Thanos receive.
// It has one sample and allow passing multiple series, in same manner as typical Prometheus would batch it.
Expand Down Expand Up @@ -1166,15 +1273,18 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
for i := 0; i < n; i++ {
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
testutil.Equals(b, http.StatusConflict, r.Code, "%v", i)
testutil.Equals(b, http.StatusConflict, r.Code, "%v-%s", i, func() string {
b, _ := ioutil.ReadAll(r.Body)
return string(b)
}())
}
})
})
}

runtime.GC()
// Take snapshot at the end to reveal how much memory we keep in TSDB.
testutil.Ok(b, Heap("../../"))
testutil.Ok(b, Heap("../../../_dev/thanos/2021/receive2"))

}

Expand All @@ -1183,7 +1293,7 @@ func Heap(dir string) (err error) {
return err
}

f, err := os.Create(filepath.Join(dir, "mem.pprof"))
f, err := os.Create(filepath.Join(dir, "impr5-go1.16.3.pprof"))
if err != nil {
return err
}
Expand Down
27 changes: 6 additions & 21 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import (
"sort"
"sync"

"github.com/cespare/xxhash"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/labelpb"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

const sep = '\xff'

// insufficientNodesError is returned when a hashring does not
// have enough nodes to satisfy a request for a node.
type insufficientNodesError struct {
Expand All @@ -39,23 +37,6 @@ type Hashring interface {
GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error)
}

// hash returns a hash for the given tenant and time series.
func hash(tenant string, ts *prompb.TimeSeries) uint64 {
// Sort labelset to ensure a stable hash.
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name })
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

b := make([]byte, 0, 1024)
b = append(b, []byte(tenant)...)
b = append(b, sep)
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
for _, v := range ts.Labels {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}

// SingleNodeHashring always returns the same node.
type SingleNodeHashring string

Expand Down Expand Up @@ -85,7 +66,11 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
if n >= uint64(len(s)) {
return "", &insufficientNodesError{have: uint64(len(s)), want: n + 1}
}
return s[(hash(tenant, ts)+n)%uint64(len(s))], nil

// TODO(bwplotka): This might be not needed, double check.
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name })

return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil
}

// multiHashring represents a set of hashrings.
Expand Down
23 changes: 0 additions & 23 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

func TestHash(t *testing.T) {
ts := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "qux",
},
},
}

ts2 := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{ts.Labels[1], ts.Labels[0]},
}

if hash("", ts) != hash("", ts2) {
t.Errorf("expected hashes to be independent of label order")
}
}

func TestHashringGet(t *testing.T) {
ts := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{
Expand Down
Loading