Skip to content

Commit

Permalink
receive: Add support for TSDB per tenant (#2012)
Browse files Browse the repository at this point in the history
* receive: Add support for TSDB per tenant

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>

* pkg/store: Merge SeriesSets of multiple TSDB stores

This is required as the Series gRPC method of the StoreAPI requires the
Series returned to be sorted.

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>

* pkg/receive: Add multitsdb shipper support

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>

* Address comments

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>

* Add more comments on types and functions

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>

* pkg/store/multitsdb.go: Remove unused struct field

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>

* pkg/receive/multitsdb.go: Remove unused Close method

TSDBs are implicitly closed by flushing the database, which is ensured
on shutdown, hence there is no need to have the explicit close method.

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>

* pkg/store/multitsdb.go: Make errors and warnings tenant aware

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>

* pkg/store/multitsdb.go: Consistent tenant aware errors and warnings

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>
  • Loading branch information
brancz authored May 4, 2020
1 parent d68c1b6 commit 8448c05
Show file tree
Hide file tree
Showing 12 changed files with 784 additions and 123 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export GO111MODULE
GOPROXY ?= https://proxy.golang.org
export GOPROXY

GOTEST_OPTS ?= -failfast -timeout 10m -v

# Tools.
EMBEDMD ?= $(GOBIN)/embedmd-$(EMBEDMD_VERSION)
# v2.0.0
Expand Down Expand Up @@ -253,7 +255,7 @@ test-e2e: docker
@echo ">> running /test/e2e tests."
# NOTE(bwplotka):
# * If you see errors on CI (timeouts), but not locally, try to add -parallel 1 to limit to single CPU to reproduce small 1CPU machine.
@go test -failfast -timeout 10m -v ./test/e2e/...
@go test $(GOTEST_OPTS) ./test/e2e/...

.PHONY: install-deps
install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests.
Expand Down
129 changes: 67 additions & 62 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ import (
"github.com/prometheus/prometheus/storage/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -72,6 +71,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).String()

defaultTenantID := cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(receive.DefaultTenant).String()

tenantLabelName := cmd.Flag("receive.tenant-label-name", "Label name through which the tenant will be announced.").Default(receive.DefaultTenantLabel).String()

replicaHeader := cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).String()

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()
Expand Down Expand Up @@ -144,6 +147,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
cw,
*local,
*tenantHeader,
*defaultTenantID,
*tenantLabelName,
*replicaHeader,
*replicationFactor,
comp,
Expand Down Expand Up @@ -179,14 +184,15 @@ func runReceive(
cw *receive.ConfigWatcher,
endpoint string,
tenantHeader string,
defaultTenantID string,
tenantLabelName string,
replicaHeader string,
replicationFactor uint64,
comp component.SourceStoreAPI,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

localStorage := &tsdb.ReadyStorage{}
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
if err != nil {
return err
Expand All @@ -196,11 +202,47 @@ func runReceive(
return err
}

var bkt objstore.Bucket
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := len(confContentYaml) > 0
if upload {
if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration {
if !ignoreBlockSize {
return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration)
}
level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.")
}
// The background shipper continuously scans the data directory and uploads
// new blocks to object storage service.
bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String())
if err != nil {
return err
}
} else {
level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
}

dbs := receive.NewMultiTSDB(
dataDir,
logger,
reg,
tsdbOpts,
lset,
tenantLabelName,
bkt,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
DefaultTenantID: defaultTenantID,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
Tracer: tracer,
Expand All @@ -216,24 +258,6 @@ func runReceive(
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration {
if !ignoreBlockSize {
return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration)
}
level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.")
}

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.

Expand All @@ -250,24 +274,13 @@ func runReceive(
{
// TSDB.
cancel := make(chan struct{})
startTimeMargin := int64(2 * time.Duration(tsdbOpts.MinBlockDuration).Seconds() * 1000)
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring is loaded.
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbOpts,
)

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
}
}()
Expand All @@ -283,28 +296,24 @@ func runReceive(

level.Info(logger).Log("msg", "updating DB")

if err := db.Flush(); err != nil {
if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if err := db.Open(); err != nil {
if err := dbs.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
}
level.Info(logger).Log("msg", "tsdb started")
localStorage.Set(db.Get(), startTimeMargin)
webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
statusProber.Ready()
level.Info(logger).Log("msg", "server is ready to receive web requests")
level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests")
dbReady <- struct{}{}
}
}
}, func(err error) {
close(cancel)
},
)
})
}

level.Debug(logger).Log("msg", "setting up hashring")
Expand Down Expand Up @@ -349,7 +358,6 @@ func runReceive(
if !ok {
return nil
}
webHandler.SetWriter(nil)
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
statusProber.NotReady(errors.New(msg))
Expand Down Expand Up @@ -397,9 +405,14 @@ func runReceive(
if s != nil {
s.Shutdown(errors.New("reload hashrings"))
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset)

rw := store.ReadWriteTSDBStore{
StoreServer: tsdbStore,
StoreServer: store.NewMultiTSDBStore(
logger,
reg,
comp,
dbs.TSDBStores,
),
WriteableStoreServer: webHandler,
}

Expand All @@ -419,6 +432,7 @@ func runReceive(
// whenever the DB changes, thus it needs its own run group.
g.Add(func() error {
for range startGRPC {
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", grpcBindAddr)
if err := s.ListenAndServe(); err != nil {
return errors.Wrap(err, "serve gRPC")
}
Expand All @@ -440,27 +454,18 @@ func runReceive(
}

if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, comp.String())
if err != nil {
return err
}

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource)

// Before starting, ensure any old blocks are uploaded.
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
level.Debug(logger).Log("msg", "upload enabled")
if err := dbs.Sync(context.Background()); err != nil {
level.Warn(logger).Log("msg", "initial upload failed", "err", err)
}

{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("msg", "interval upload failed", "err", err)
}

return nil
Expand All @@ -481,8 +486,8 @@ func runReceive(
// Before quitting, ensure all blocks are uploaded.
defer func() {
<-uploadC
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
if err := dbs.Sync(context.Background()); err != nil {
level.Warn(logger).Log("msg", "on demnad upload failed", "err", err)
}
}()
defer close(uploadDone)
Expand All @@ -496,8 +501,8 @@ func runReceive(
case <-ctx.Done():
return nil
case <-uploadC:
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err)
}
uploadDone <- struct{}{}
}
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,10 @@ github.com/sercand/kuberesolver v2.1.0+incompatible h1:iJ1oCzPQ/aacsbCWLfJW1hPKk
github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand All @@ -665,6 +667,7 @@ github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUr
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
Expand Down
Loading

0 comments on commit 8448c05

Please sign in to comment.