Skip to content

Commit

Permalink
receive: Add support for TSDB per tenant
Browse files Browse the repository at this point in the history
Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>
  • Loading branch information
brancz committed Jan 21, 2020
1 parent 46a97fd commit 5446bef
Show file tree
Hide file tree
Showing 13 changed files with 524 additions and 85 deletions.
44 changes: 20 additions & 24 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).String()

defaultTenantId := cmd.Flag("receive.default-tenant-id", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenant).String()

tenantLabelName := cmd.Flag("receive.tenant-label-name", "Label name through which the tenant will be announced.").Default(receive.DefaultTenantLabel).String()

replicaHeader := cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).String()

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()
Expand Down Expand Up @@ -140,6 +144,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
cw,
*local,
*tenantHeader,
*defaultTenantId,
*tenantLabelName,
*replicaHeader,
*replicationFactor,
comp,
Expand Down Expand Up @@ -175,14 +181,15 @@ func runReceive(
cw *receive.ConfigWatcher,
endpoint string,
tenantHeader string,
defaultTenantId string,
tenantLabelName string,
replicaHeader string,
replicationFactor uint64,
comp component.SourceStoreAPI,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

localStorage := &tsdb.ReadyStorage{}
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
if err != nil {
return err
Expand All @@ -196,11 +203,15 @@ func runReceive(
return err
}

dbs := receive.NewMultiTSDB(dataDir, logger, reg, tsdbOpts, lset, tenantLabelName)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
DefaultTenantId: defaultTenantId,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
Tracer: tracer,
Expand Down Expand Up @@ -244,24 +255,13 @@ func runReceive(
{
// TSDB.
cancel := make(chan struct{})
startTimeMargin := int64(2 * time.Duration(tsdbOpts.MinBlockDuration).Seconds() * 1000)
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring is loaded.
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbOpts,
)

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
}
}()
Expand All @@ -274,28 +274,25 @@ func runReceive(
if !ok {
return nil
}
if err := db.Flush(); err != nil {
if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if err := db.Open(); err != nil {
if err := dbs.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
}
level.Info(logger).Log("msg", "tsdb started")
localStorage.Set(db.Get(), startTimeMargin)
webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
statusProber.Ready()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
dbReady <- struct{}{}
}
}
}, func(err error) {
close(cancel)
},
)
})
}

level.Debug(logger).Log("msg", "setting up hashring")
Expand Down Expand Up @@ -334,7 +331,6 @@ func runReceive(
if !ok {
return nil
}
webHandler.SetWriter(nil)
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
statusProber.NotReady(errors.New(msg))
Expand Down Expand Up @@ -383,12 +379,12 @@ func runReceive(
if s != nil {
s.Shutdown(errors.New("reload hashrings"))
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset)

multiStore := store.NewMultiTSDBStore(logger, reg, comp, dbs.TSDBClients)
rw := store.ReadWriteTSDBStore{
StoreServer: tsdbStore,
StoreServer: multiStore,
WriteableStoreServer: webHandler,
}

s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, rw,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
Expand All @@ -406,7 +402,7 @@ func runReceive(
// whenever the DB changes, thus it needs its own run group.
g.Add(func() error {
for range startGRPC {
level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", grpcBindAddr)
if err := s.ListenAndServe(); err != nil {
return errors.Wrap(err, "serve gRPC")
}
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuB
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9 h1:QsgXACQhd9QJhEmRumbsMQQvBtmdS0mafoVEBplWXEg=
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w=
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
Expand Down Expand Up @@ -521,8 +522,10 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand All @@ -532,6 +535,7 @@ github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUr
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
Expand Down
58 changes: 33 additions & 25 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -36,6 +37,10 @@ import (
const (
// DefaultTenantHeader is the default header used to designate the tenant making a write request.
DefaultTenantHeader = "THANOS-TENANT"
// DefaultTenant is the default value used for when no tenant is passed via the tenant header.
DefaultTenant = "default-tenant"
// DefaultTenantLabel is the default label-name used for when no tenant is passed via the tenant header.
DefaultTenantLabel = "tenant_id"
// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
DefaultReplicaHeader = "THANOS-REPLICA"
)
Expand All @@ -51,6 +56,7 @@ type Options struct {
ListenAddress string
Registry prometheus.Registerer
TenantHeader string
DefaultTenantId string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
Expand Down Expand Up @@ -114,16 +120,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
return h
}

// SetWriter sets the writer.
// The writer must be set to a non-nil value in order for the
// handler to be ready and usable.
// If the writer is nil, then the handler is marked as not ready.
func (h *Handler) SetWriter(w *Writer) {
h.mtx.Lock()
defer h.mtx.Unlock()
h.writer = w
}

// Hashring sets the hashring for the handler and marks the hashring as ready.
// The hashring must be set to a non-nil value in order for the
// handler to be ready and usable.
Expand Down Expand Up @@ -257,11 +253,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}

tenant := r.Header.Get(h.options.TenantHeader)
if len(tenant) == 0 {
tenant = h.options.DefaultTenantId
}

err = h.handleRequest(r.Context(), rep, tenant, &wreq)
switch err {
case nil:
return
case tsdb.ErrNotReady:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case conflictErr:
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
Expand Down Expand Up @@ -348,23 +349,18 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
// can be ignored if the replication factor is met.
if endpoint == h.options.Endpoint {
go func(endpoint string) {
var err error
h.mtx.RLock()
if h.writer == nil {
err = errors.New("storage is not ready")
} else {
err = h.writer.Write(wreqs[endpoint])
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
if errs, ok := err.(terrors.MultiError); ok {
if countCause(errs, isConflict) > 0 {
err = errors.Wrap(conflictErr, errs.Error())
} else {
err = errors.New(errs.Error())
}
err := h.writer.Write(tenant, wreqs[endpoint])
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
if errs, ok := err.(terrors.MultiError); ok {
if countCause(errs, isConflict) > 0 {
err = errors.Wrap(conflictErr, errs.Error())
} else if countCause(errs, isNotReady) > 0 {
err = tsdb.ErrNotReady
} else {
err = errors.New(errs.Error())
}
}
h.mtx.RUnlock()
if err != nil {
level.Error(h.logger).Log("msg", "storing locally", "err", err, "endpoint", endpoint)
}
Expand Down Expand Up @@ -458,6 +454,9 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri

err := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
if errs, ok := err.(terrors.MultiError); ok {
if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 {
return tsdb.ErrNotReady
}
if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 {
return errors.Wrap(conflictErr, "did not meet replication threshold")
}
Expand All @@ -475,6 +474,8 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
switch err {
case nil:
return &storepb.WriteResponse{}, nil
case tsdb.ErrNotReady:
return nil, status.Error(codes.Unavailable, err.Error())
case conflictErr:
return nil, status.Error(codes.AlreadyExists, err.Error())
case errBadReplica:
Expand Down Expand Up @@ -515,6 +516,13 @@ func isConflict(err error) bool {
status.Code(err) == codes.AlreadyExists
}

// isNotReady returns whether or not the given error represents a not ready error.
func isNotReady(err error) bool {
return err == tsdb.ErrNotReady ||
err.Error() == strconv.Itoa(http.StatusConflict) ||
status.Code(err) == codes.Unavailable
}

func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
return &peerGroup{
dialOpts: dialOpts,
Expand Down
16 changes: 8 additions & 8 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)
TenantHeader: DefaultTenantHeader,
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
Writer: NewWriter(log.NewNopLogger(), appendables[i]),
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])),
})
handlers = append(handlers, h)
h.peers = peers
Expand Down Expand Up @@ -477,12 +477,12 @@ func TestReceive(t *testing.T) {
// on which node is erroring and which node is receiving.
for i, handler := range handlers {
// Test that the correct status is returned.
status, err := makeRequest(handler, tenant, tc.wreq)
rec, err := makeRequest(handler, tenant, tc.wreq)
if err != nil {
t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err)
}
if status != tc.status {
t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d", i, tc.status, status)
if rec.Code != tc.status {
t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String())
}
}
// Test that each time series is stored
Expand Down Expand Up @@ -545,22 +545,22 @@ func cycleErrors(errs []error) func() error {
}

// makeRequest is a helper to make a correct request against a remote write endpoint given a request.
func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (int, error) {
func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (*httptest.ResponseRecorder, error) {
buf, err := proto.Marshal(wreq)
if err != nil {
return 0, errors.Wrap(err, "marshal request")
return nil, errors.Wrap(err, "marshal request")
}
req, err := http.NewRequest("POST", h.options.Endpoint, bytes.NewBuffer(snappy.Encode(nil, buf)))
if err != nil {
return 0, errors.Wrap(err, "create request")
return nil, errors.Wrap(err, "create request")
}
req.Header.Add(h.options.TenantHeader, tenant)

rec := httptest.NewRecorder()
h.receiveHTTP(rec, req)
rec.Flush()

return rec.Code, nil
return rec, nil
}

func randomAddr() string {
Expand Down
Loading

0 comments on commit 5446bef

Please sign in to comment.