diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index ba552e67e3d..1292c3d2601 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -68,6 +68,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).String() + defaultTenantId := cmd.Flag("receive.default-tenant-id", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenant).String() + + tenantLabelName := cmd.Flag("receive.tenant-label-name", "Label name through which the tenant will be announced.").Default(receive.DefaultTenantLabel).String() + replicaHeader := cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).String() replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64() @@ -140,6 +144,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { cw, *local, *tenantHeader, + *defaultTenantId, + *tenantLabelName, *replicaHeader, *replicationFactor, comp, @@ -175,6 +181,8 @@ func runReceive( cw *receive.ConfigWatcher, endpoint string, tenantHeader string, + defaultTenantId string, + tenantLabelName string, replicaHeader string, replicationFactor uint64, comp component.SourceStoreAPI, @@ -182,7 +190,6 @@ func runReceive( logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") - localStorage := &tsdb.ReadyStorage{} rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA) if err != nil { return err @@ -196,11 +203,15 @@ func runReceive( return err } + dbs := receive.NewMultiTSDB(dataDir, logger, reg, tsdbOpts, lset, tenantLabelName) + writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ + Writer: writer, ListenAddress: rwAddress, Registry: reg, Endpoint: endpoint, TenantHeader: tenantHeader, + DefaultTenantId: defaultTenantId, ReplicaHeader: replicaHeader, ReplicationFactor: replicationFactor, Tracer: tracer, @@ -244,24 +255,13 @@ func runReceive( { // TSDB. cancel := make(chan struct{}) - startTimeMargin := int64(2 * time.Duration(tsdbOpts.MinBlockDuration).Seconds() * 1000) g.Add(func() error { defer close(dbReady) defer close(uploadC) - // Before actually starting, we need to make sure the - // WAL is flushed. The WAL is flushed after the - // hashring is loaded. - db := receive.NewFlushableStorage( - dataDir, - log.With(logger, "component", "tsdb"), - reg, - tsdbOpts, - ) - // Before quitting, ensure the WAL is flushed and the DB is closed. defer func() { - if err := db.Flush(); err != nil { + if err := dbs.Flush(); err != nil { level.Warn(logger).Log("err", err, "msg", "failed to flush storage") } }() @@ -274,10 +274,10 @@ func runReceive( if !ok { return nil } - if err := db.Flush(); err != nil { + if err := dbs.Flush(); err != nil { return errors.Wrap(err, "flushing storage") } - if err := db.Open(); err != nil { + if err := dbs.Open(); err != nil { return errors.Wrap(err, "opening storage") } if upload { @@ -285,8 +285,6 @@ func runReceive( <-uploadDone } level.Info(logger).Log("msg", "tsdb started") - localStorage.Set(db.Get(), startTimeMargin) - webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)) statusProber.Ready() level.Info(logger).Log("msg", "server is ready to receive web requests.") dbReady <- struct{}{} @@ -294,8 +292,7 @@ func runReceive( } }, func(err error) { close(cancel) - }, - ) + }) } level.Debug(logger).Log("msg", "setting up hashring") @@ -334,7 +331,6 @@ func runReceive( if !ok { return nil } - webHandler.SetWriter(nil) webHandler.Hashring(h) msg := "hashring has changed; server is not ready to receive web requests." statusProber.NotReady(errors.New(msg)) @@ -383,12 +379,12 @@ func runReceive( if s != nil { s.Shutdown(errors.New("reload hashrings")) } - tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset) + + multiStore := store.NewMultiTSDBStore(logger, reg, comp, dbs.TSDBClients) rw := store.ReadWriteTSDBStore{ - StoreServer: tsdbStore, + StoreServer: multiStore, WriteableStoreServer: webHandler, } - s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, rw, grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), @@ -406,7 +402,7 @@ func runReceive( // whenever the DB changes, thus it needs its own run group. g.Add(func() error { for range startGRPC { - level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) + level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", grpcBindAddr) if err := s.ListenAndServe(); err != nil { return errors.Wrap(err, "serve gRPC") } diff --git a/go.sum b/go.sum index d122df9d2c0..307fdee3367 100644 --- a/go.sum +++ b/go.sum @@ -451,6 +451,7 @@ github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuB github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9 h1:QsgXACQhd9QJhEmRumbsMQQvBtmdS0mafoVEBplWXEg= github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w= github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -521,8 +522,10 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= +github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -532,6 +535,7 @@ github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUr github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 5ee91d10172..ff2c4d78467 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -36,6 +37,10 @@ import ( const ( // DefaultTenantHeader is the default header used to designate the tenant making a write request. DefaultTenantHeader = "THANOS-TENANT" + // DefaultTenant is the default value used for when no tenant is passed via the tenant header. + DefaultTenant = "default-tenant" + // DefaultTenantLabel is the default label-name used for when no tenant is passed via the tenant header. + DefaultTenantLabel = "tenant_id" // DefaultReplicaHeader is the default header used to designate the replica count of a write request. DefaultReplicaHeader = "THANOS-REPLICA" ) @@ -51,6 +56,7 @@ type Options struct { ListenAddress string Registry prometheus.Registerer TenantHeader string + DefaultTenantId string ReplicaHeader string Endpoint string ReplicationFactor uint64 @@ -114,16 +120,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler { return h } -// SetWriter sets the writer. -// The writer must be set to a non-nil value in order for the -// handler to be ready and usable. -// If the writer is nil, then the handler is marked as not ready. -func (h *Handler) SetWriter(w *Writer) { - h.mtx.Lock() - defer h.mtx.Unlock() - h.writer = w -} - // Hashring sets the hashring for the handler and marks the hashring as ready. // The hashring must be set to a non-nil value in order for the // handler to be ready and usable. @@ -257,11 +253,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } tenant := r.Header.Get(h.options.TenantHeader) + if len(tenant) == 0 { + tenant = h.options.DefaultTenantId + } err = h.handleRequest(r.Context(), rep, tenant, &wreq) switch err { case nil: return + case tsdb.ErrNotReady: + http.Error(w, err.Error(), http.StatusServiceUnavailable) case conflictErr: http.Error(w, err.Error(), http.StatusConflict) case errBadReplica: @@ -348,23 +349,18 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic // can be ignored if the replication factor is met. if endpoint == h.options.Endpoint { go func(endpoint string) { - var err error - h.mtx.RLock() - if h.writer == nil { - err = errors.New("storage is not ready") - } else { - err = h.writer.Write(wreqs[endpoint]) - // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. - // To avoid breaking the counting logic, we need to flatten the error. - if errs, ok := err.(terrors.MultiError); ok { - if countCause(errs, isConflict) > 0 { - err = errors.Wrap(conflictErr, errs.Error()) - } else { - err = errors.New(errs.Error()) - } + err := h.writer.Write(tenant, wreqs[endpoint]) + // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. + // To avoid breaking the counting logic, we need to flatten the error. + if errs, ok := err.(terrors.MultiError); ok { + if countCause(errs, isConflict) > 0 { + err = errors.Wrap(conflictErr, errs.Error()) + } else if countCause(errs, isNotReady) > 0 { + err = tsdb.ErrNotReady + } else { + err = errors.New(errs.Error()) } } - h.mtx.RUnlock() if err != nil { level.Error(h.logger).Log("msg", "storing locally", "err", err, "endpoint", endpoint) } @@ -458,6 +454,9 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri err := h.parallelizeRequests(ctx, tenant, replicas, wreqs) if errs, ok := err.(terrors.MultiError); ok { + if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 { + return tsdb.ErrNotReady + } if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 { return errors.Wrap(conflictErr, "did not meet replication threshold") } @@ -475,6 +474,8 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st switch err { case nil: return &storepb.WriteResponse{}, nil + case tsdb.ErrNotReady: + return nil, status.Error(codes.Unavailable, err.Error()) case conflictErr: return nil, status.Error(codes.AlreadyExists, err.Error()) case errBadReplica: @@ -515,6 +516,13 @@ func isConflict(err error) bool { status.Code(err) == codes.AlreadyExists } +// isNotReady returns whether or not the given error represents a not ready error +func isNotReady(err error) bool { + return err == tsdb.ErrNotReady || + err.Error() == strconv.Itoa(http.StatusConflict) || + status.Code(err) == codes.Unavailable +} + func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { return &peerGroup{ dialOpts: dialOpts, diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 6f58b84cec0..3990f96f4e8 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -161,7 +161,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, - Writer: NewWriter(log.NewNopLogger(), appendables[i]), + Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), }) handlers = append(handlers, h) h.peers = peers @@ -477,12 +477,12 @@ func TestReceive(t *testing.T) { // on which node is erroring and which node is receiving. for i, handler := range handlers { // Test that the correct status is returned. - status, err := makeRequest(handler, tenant, tc.wreq) + rec, err := makeRequest(handler, tenant, tc.wreq) if err != nil { t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) } - if status != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d", i, tc.status, status) + if rec.Code != tc.status { + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) } } // Test that each time series is stored @@ -545,14 +545,14 @@ func cycleErrors(errs []error) func() error { } // makeRequest is a helper to make a correct request against a remote write endpoint given a request. -func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (int, error) { +func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (*httptest.ResponseRecorder, error) { buf, err := proto.Marshal(wreq) if err != nil { - return 0, errors.Wrap(err, "marshal request") + return nil, errors.Wrap(err, "marshal request") } req, err := http.NewRequest("POST", h.options.Endpoint, bytes.NewBuffer(snappy.Encode(nil, buf))) if err != nil { - return 0, errors.Wrap(err, "create request") + return nil, errors.Wrap(err, "create request") } req.Header.Add(h.options.TenantHeader, tenant) @@ -560,7 +560,7 @@ func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (int, err h.receiveHTTP(rec, req) rec.Flush() - return rec.Code, nil + return rec, nil } func randomAddr() string { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go new file mode 100644 index 00000000000..8be6050231f --- /dev/null +++ b/pkg/receive/multitsdb.go @@ -0,0 +1,208 @@ +package receive + +import ( + "io/ioutil" + "os" + "path" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage/tsdb" + terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store" + "golang.org/x/sync/errgroup" +) + +type MultiTSDB struct { + dataDir string + logger log.Logger + reg prometheus.Registerer + tsdbCfg *tsdb.Options + tenantLabelName string + labels labels.Labels + + mtx *sync.RWMutex + dbs map[string]*FlushableStorage + appendables map[string]*tsdb.ReadyStorage + stores []*store.TSDBStore +} + +func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string) *MultiTSDB { + if l == nil { + l = log.NewNopLogger() + } + + return &MultiTSDB{ + dataDir: dataDir, + logger: l, + reg: reg, + tsdbCfg: tsdbCfg, + mtx: &sync.RWMutex{}, + dbs: map[string]*FlushableStorage{}, + stores: []*store.TSDBStore{}, + appendables: map[string]*tsdb.ReadyStorage{}, + labels: labels, + tenantLabelName: tenantLabelName, + } +} + +func (t *MultiTSDB) Open() error { + if err := os.MkdirAll(t.dataDir, 0777); err != nil { + return err + } + + return t.openTSDBs() +} + +func (t *MultiTSDB) Close() error { + t.mtx.Lock() + defer t.mtx.Unlock() + + errmtx := &sync.Mutex{} + merr := terrors.MultiError{} + wg := &sync.WaitGroup{} + for _, tsdb := range t.dbs { + tsdb := tsdb // https://golang.org/doc/faq#closures_and_goroutines + wg.Add(1) + go func() { + err := tsdb.Close() + if err != nil { + errmtx.Lock() + merr.Add(err) + errmtx.Unlock() + } + wg.Done() + }() + } + + wg.Wait() + return merr.Err() +} + +func (t *MultiTSDB) Flush() error { + t.mtx.Lock() + defer t.mtx.Unlock() + + errmtx := &sync.Mutex{} + merr := terrors.MultiError{} + wg := &sync.WaitGroup{} + for _, tsdb := range t.dbs { + tsdb := tsdb // https://golang.org/doc/faq#closures_and_goroutines + wg.Add(1) + go func() { + err := tsdb.Flush() + if err != nil { + errmtx.Lock() + merr.Add(err) + errmtx.Unlock() + } + wg.Done() + }() + } + + wg.Wait() + return merr.Err() +} + +func (t *MultiTSDB) openTSDBs() error { + files, err := ioutil.ReadDir(t.dataDir) + if err != nil { + return err + } + + var g errgroup.Group + for _, f := range files { + f := f // https://golang.org/doc/faq#closures_and_goroutines + if !f.IsDir() { + continue + } + + g.Go(func() error { + tenantId := f.Name() + _, err := t.tenant(tenantId) + return err + }) + } + + return g.Wait() +} + +func (t *MultiTSDB) TSDBClients() []*store.TSDBStore { + t.mtx.RLock() + res := make([]*store.TSDBStore, len(t.stores)) + copy(res, t.stores) + defer t.mtx.RUnlock() + return res +} + +func (t *MultiTSDB) tenant(tenantId string) (*tsdb.ReadyStorage, error) { + // Fast path, as creating tenants is a very rare operation. + t.mtx.RLock() + db, exist := t.appendables[tenantId] + if exist { + t.mtx.RUnlock() + return db, nil + } + t.mtx.RUnlock() + + // Slow path needs to lock fully and attempt to read again to prevent race + // conditions, where since the fast path was tried, there may have actually + // been the same tenant inserted in the map. + t.mtx.Lock() + db, exist = t.appendables[tenantId] + if exist { + t.mtx.Unlock() + return db, nil + } + + rs := &tsdb.ReadyStorage{} + t.appendables[tenantId] = rs + t.mtx.Unlock() + + go func() { + s := NewFlushableStorage( + path.Join(t.dataDir, tenantId), + log.With(t.logger, "tenant", tenantId), + prometheus.WrapRegistererWith(prometheus.Labels{ + "tenant": tenantId, + }, t.reg), + t.tsdbCfg, + ) + + if err := s.Open(); err != nil { + level.Error(t.logger).Log("msg", "failed to open tsdb", "err", err) + t.mtx.Lock() + delete(t.appendables, tenantId) + t.mtx.Unlock() + return + } + + tstore := store.NewTSDBStore( + log.With(t.logger, "component", "thanos-tsdb-store", "tenant", tenantId), + prometheus.WrapRegistererWith(prometheus.Labels{ + "tenant": tenantId, + }, t.reg), + s.Get(), + component.Receive, + append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantId}), + ) + + rs.Set(s.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000)) + + t.mtx.Lock() + t.stores = append(t.stores, tstore) + t.dbs[tenantId] = s + t.mtx.Unlock() + }() + + return rs, nil +} + +func (t *MultiTSDB) TenantAppendable(tenantId string) (Appendable, error) { + return t.tenant(tenantId) +} diff --git a/pkg/receive/tsdb_test.go b/pkg/receive/tsdb_test.go index 75a80f1b597..5be278f415b 100644 --- a/pkg/receive/tsdb_test.go +++ b/pkg/receive/tsdb_test.go @@ -48,6 +48,9 @@ func TestFlushableStorage(t *testing.T) { // Append data to the WAL. app := db.Appender() + if err != nil { + t.Fatal(err) + } maxt := 1000 for i := 0; i < maxt; i++ { _, err := app.Add(labels.FromStrings("thanos", "flush"), int64(i), 1.0) diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 3a8a20f8276..0c67d6dc55a 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -7,9 +7,10 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/storage" terrors "github.com/prometheus/prometheus/tsdb/errors" ) @@ -18,29 +19,44 @@ type Appendable interface { Appender() (storage.Appender, error) } +type TenantStorage interface { + TenantAppendable(string) (Appendable, error) +} + type Writer struct { - logger log.Logger - append Appendable + logger log.Logger + multitsdb TenantStorage } -func NewWriter(logger log.Logger, app Appendable) *Writer { +func NewWriter(logger log.Logger, multitsdb TenantStorage) *Writer { return &Writer{ - logger: logger, - append: app, + logger: logger, + multitsdb: multitsdb, } } -func (r *Writer) Write(wreq *prompb.WriteRequest) error { +func (r *Writer) Write(tenantId string, wreq *prompb.WriteRequest) error { var ( numOutOfOrder = 0 numDuplicates = 0 numOutOfBounds = 0 ) - app, err := r.append.Appender() + s, err := r.multitsdb.TenantAppendable(tenantId) + if err != nil { + return errors.Wrap(err, "get tenant appendable") + } + + app, err := s.Appender() + if err == tsdb.ErrNotReady { + return err + } if err != nil { return errors.Wrap(err, "get appender") } + if app == nil { + return errors.New("tsdb not ready yet to be appended to") + } var errs terrors.MultiError for _, t := range wreq.Timeseries { @@ -91,6 +107,18 @@ func (r *Writer) Write(wreq *prompb.WriteRequest) error { return errs.Err() } +type fakeTenantAppendable struct { + f *fakeAppendable +} + +func newFakeTenantAppendable(f *fakeAppendable) *fakeTenantAppendable { + return &fakeTenantAppendable{f: f} +} + +func (t *fakeTenantAppendable) TenantAppendable(tenantId string) (Appendable, error) { + return t.f, nil +} + type fakeAppendable struct { appender storage.Appender appenderErr func() error diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go index 38b1a1a3aca..9d7da72ae78 100644 --- a/pkg/server/grpc/grpc.go +++ b/pkg/server/grpc/grpc.go @@ -95,7 +95,7 @@ func (s *Server) ListenAndServe() error { } s.listener = l - level.Info(s.logger).Log("msg", "listening for StoreAPI gRPC", "address", s.opts.listen) + level.Info(s.logger).Log("msg", "listening for serving gRPC", "address", s.opts.listen) return errors.Wrap(s.srv.Serve(s.listener), "serve gRPC") } diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go new file mode 100644 index 00000000000..936bf9aed49 --- /dev/null +++ b/pkg/store/multitsdb.go @@ -0,0 +1,156 @@ +package store + +import ( + "context" + "sort" + + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type MultiTSDBStore struct { + logger log.Logger + component component.SourceStoreAPI + tsdbStores func() []*TSDBStore +} + +// NewMultiTSDBStore creates a new TSDBStore. +func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() []*TSDBStore) *MultiTSDBStore { + if logger == nil { + logger = log.NewNopLogger() + } + return &MultiTSDBStore{ + logger: logger, + component: component, + tsdbStores: tsdbStores, + } +} + +// Info returns store information about the Prometheus instance. +func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*storepb.InfoResponse, error) { + stores := s.tsdbStores() + + resp := &storepb.InfoResponse{ + StoreType: s.component.ToProto(), + } + if len(stores) == 0 { + return resp, nil + } + + infos := make([]*storepb.InfoResponse, 0, len(stores)) + for _, store := range stores { + info, err := store.Info(ctx, req) + if err != nil { + return nil, err + } + infos = append(infos, info) + } + + resp.MinTime = infos[0].MinTime + resp.MaxTime = infos[0].MaxTime + + for i := 1; i < len(infos); i++ { + if resp.MinTime > infos[i].MinTime { + resp.MinTime = infos[i].MinTime + } + if resp.MaxTime < infos[i].MaxTime { + resp.MaxTime = infos[i].MaxTime + } + } + + seenLabelSets := map[uint64]struct{}{} + labelSets := []storepb.LabelSet{} + for _, info := range infos { + for _, labelSet := range info.LabelSets { + ls := storepb.LabelsToPromLabels(labelSet.Labels) + sort.Sort(ls) + h := ls.Hash() + _, seen := seenLabelSets[h] + if !seen { + seenLabelSets[h] = struct{}{} + labelSets = append(labelSets, labelSet) + } + } + } + resp.LabelSets = labelSets + + return resp, nil +} + +// Series returns all series for a requested time range and label matcher. The returned data may +// exceed the requested time bounds. +func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + stores := s.tsdbStores() + for _, store := range stores { + err := store.Series(r, srv) + if err != nil { + return err + } + } + return nil +} + +// LabelNames returns all known label names. +func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + names := map[string]struct{}{} + warnings := map[string]struct{}{} + + stores := s.tsdbStores() + for _, store := range stores { + r, err := store.LabelNames(ctx, req) + if err != nil { + return nil, err + } + + for _, l := range r.Names { + names[l] = struct{}{} + } + + for _, l := range r.Warnings { + warnings[l] = struct{}{} + } + } + + return &storepb.LabelNamesResponse{ + Names: keys(names), + Warnings: keys(warnings), + }, nil +} + +func keys(m map[string]struct{}) []string { + res := make([]string, 0, len(m)) + for k := range m { + res = append(res, k) + } + + return res +} + +// LabelValues returns all known label values for a given label name. +func (s *MultiTSDBStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + values := map[string]struct{}{} + warnings := map[string]struct{}{} + + stores := s.tsdbStores() + for _, store := range stores { + r, err := store.LabelValues(ctx, req) + if err != nil { + return nil, err + } + + for _, l := range r.Values { + values[l] = struct{}{} + } + + for _, l := range r.Warnings { + warnings[l] = struct{}{} + } + } + + return &storepb.LabelValuesResponse{ + Values: keys(values), + Warnings: keys(warnings), + }, nil +} diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index ccdee968b60..a493c94e166 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -84,6 +84,9 @@ scrape_configs: static_configs: - targets: - "localhost:909${i}" + - "localhost:5909${i}" + - "localhost:5909${i}" + - "localhost:5909${i}" - job_name: thanos-sidecar scrape_interval: 5s static_configs: @@ -99,6 +102,8 @@ scrape_configs: static_configs: - targets: - "localhost:10909" + - "localhost:11909" + - "localhost:12909" - job_name: thanos-query scrape_interval: 5s static_configs: @@ -163,38 +168,55 @@ fi sleep 0.5 if [ -n "${REMOTE_WRITE_ENABLED}" ]; then + +cat <<-EOF > ./data/hashring.json +[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}] +EOF + +for i in $(seq 0 1 2); do ${THANOS_EXECUTABLE} receive \ - --debug.name receive \ + --debug.name receive${i} \ --log.level debug \ - --log.level debug \ - --tsdb.path "./data/remote-write-receive-data" \ - --grpc-address 0.0.0.0:10907 \ + --tsdb.path "./data/remote-write-receive-${i}-data" \ + --grpc-address 0.0.0.0:1${i}907 \ --grpc-grace-period 1s \ - --http-address 0.0.0.0:10909 \ + --http-address 0.0.0.0:1${i}909 \ --http-grace-period 1s \ - --label "receive=\"true\"" \ + --receive.replication-factor 3 \ + --label "receive_replica=\"${i}\"" \ + --receive.local-endpoint 127.0.0.1:1${i}907 \ + --receive.hashrings-file ./data/hashring.json \ ${OBJSTORECFG} \ - --remote-write.address 0.0.0.0:10908 & + --remote-write.address 0.0.0.0:1${i}908 & + + STORES="${STORES} --store 127.0.0.1:1${i}907" +done - mkdir -p "data/local-prometheus-data/" - cat <data/local-prometheus-data/prometheus.yml +for i in $(seq 0 1 2); do + mkdir -p "data/local-prometheus-${i}-data/" + cat <data/local-prometheus-${i}-data/prometheus.yml +global: + external_labels: + prometheus: prom${i} + replica: 1 # When the Thanos remote-write-receive component is started, # this is an example configuration of a Prometheus server that # would scrape a local node-exporter and replicate its data to # the remote write endpoint. scrape_configs: - - job_name: node + - job_name: test scrape_interval: 1s static_configs: - - targets: ['localhost:9100'] + - targets: + - fake remote_write: -- url: http://localhost:10908/api/v1/receive +- url: http://localhost:1${i}908/api/v1/receive EOF ${PROMETHEUS_EXECUTABLE} \ - --config.file data/local-prometheus-data/prometheus.yml \ - --storage.tsdb.path "data/local-prometheus-data/" & - - STORES="${STORES} --store 127.0.0.1:10907" + --web.listen-address ":5909${i}" \ + --config.file data/local-prometheus-${i}-data/prometheus.yml \ + --storage.tsdb.path "data/local-prometheus-${i}-data/" & +done fi sleep 0.5 @@ -209,6 +231,7 @@ for i in $(seq 0 1); do --http-address 0.0.0.0:109${i}4 \ --http-grace-period 1s \ --query.replica-label prometheus \ + --query.replica-label receive_replica \ ${STORES} & done diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 720423b4583..aa8e55e84da 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -32,6 +32,7 @@ global: replica: %v scrape_configs: - job_name: 'test' + scrape_interval: 1s static_configs: - targets: ['fake'] `, name, replica) @@ -141,6 +142,7 @@ func TestQuery(t *testing.T) { "prometheus": "prom-both-remote-write-and-sidecar", "receive": model.LabelValue(r.HTTP.Port), "replica": model.LabelValue("1234"), + "tenant_id": "default-tenant", }, res[1].Metric) testutil.Equals(t, model.Metric{ "job": "test", @@ -206,6 +208,7 @@ func TestQuery(t *testing.T) { "job": "test", "prometheus": "prom-both-remote-write-and-sidecar", "receive": model.LabelValue(r.HTTP.Port), + "tenant_id": "default-tenant", }, res[1].Metric) testutil.Equals(t, model.Metric{ "job": "test", diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index be2e1118efd..0b1b30565b0 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -57,20 +57,23 @@ func TestReceive(t *testing.T) { { "job": "test", "prometheus": "prom1", - "receive": model.LabelValue(r1.HTTP.Port), + "receive": model.LabelValue(r2.HTTP.Port), "replica": "1", + "tenant_id": "default-tenant", }, { "job": "test", "prometheus": "prom2", "receive": model.LabelValue(r2.HTTP.Port), "replica": "1", + "tenant_id": "default-tenant", }, { "job": "test", "prometheus": "prom3", "receive": model.LabelValue(r2.HTTP.Port), "replica": "1", + "tenant_id": "default-tenant", }, }}) } @@ -103,18 +106,21 @@ func TestReceive(t *testing.T) { "prometheus": "prom1", "receive": model.LabelValue(r1.HTTP.Port), "replica": "1", + "tenant_id": "default-tenant", }, { "job": "test", "prometheus": "prom1", "receive": model.LabelValue(r2.HTTP.Port), "replica": "1", + "tenant_id": "default-tenant", }, { "job": "test", "prometheus": "prom1", "receive": model.LabelValue(r3.HTTP.Port), "replica": "1", + "tenant_id": "default-tenant", }, }}) } @@ -145,12 +151,14 @@ func TestReceive(t *testing.T) { "prometheus": "prom1", "receive": model.LabelValue(r1.HTTP.Port), "replica": "1", + "tenant_id": "default-tenant", }, { "job": "test", "prometheus": "prom1", "receive": model.LabelValue(r2.HTTP.Port), "replica": "1", + "tenant_id": "default-tenant", }, }}) } diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index d51a51a60a6..35e83ae5b8f 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -206,7 +206,6 @@ func receiver(http, grpc, metric address, replicationFactor int, hashring ...rec } func querier(http, grpc address, storeAddresses []address, fileSDStoreAddresses []address) *serverScheduler { - const replicaLabel = "replica" return &serverScheduler{ HTTP: http, GRPC: grpc, @@ -217,8 +216,8 @@ func querier(http, grpc address, storeAddresses []address, fileSDStoreAddresses "--grpc-address", grpc.HostPort(), "--grpc-grace-period", "0s", "--http-address", http.HostPort(), + "--query.replica-label=replica", "--log.level", "debug", - "--query.replica-label", replicaLabel, "--store.sd-dns-interval", "5s", } for _, addr := range storeAddresses {