Skip to content

Commit

Permalink
cmd,pkg: flush storage when hashring changes (#1480)
Browse files Browse the repository at this point in the history
* pkg/receive: create flushable storage

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>

* cmd,pkg: flush storage when hashring changes

In order to allow the hashring of receive nodes to scale at runtime, we
need to temporarily stop the storage, flush it, and upload the blocks.
This commit ensures that whenever a hashring change is detected, the
entire storage is flushed and the shipper is notified so that is uploads
any new blocks. It also ensures that when the receive component starts
up, any in-progress WAL is flushed and uploaded. This ensures that
new data that may belong to a different tenant is not mixed with old
data. Finally, it also ensures that the storage is flushed and uploaded
whenever the process shuts down.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
  • Loading branch information
squat authored and brancz committed Sep 30, 2019
1 parent fc99659 commit 91fdbd6
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 128 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand Down
258 changes: 165 additions & 93 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,114 +149,153 @@ func runReceive(
MaxBlockDuration: tsdbBlockDuration,
WALCompression: true,
}
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)

localStorage := &tsdb.ReadyStorage{}
receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Receiver: receiver,
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
Endpoint: endpoint,
TenantHeader: tenantHeader,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
})

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})

// dbReady signals when TSDB is ready and the Store gRPC server can start.
dbReady := make(chan struct{}, 1)
// updateDB signals when TSDB needs to be flushed and updated.
updateDB := make(chan struct{}, 1)
// uploadC signals when new blocks should be uploaded.
uploadC := make(chan struct{}, 1)
// uploadDone signals when uploading has finished.
uploadDone := make(chan struct{}, 1)

level.Debug(logger).Log("msg", "setting up tsdb")
{
// TSDB.
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "starting TSDB ...")
db, err := tsdb.Open(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)
if err != nil {
close(dbOpen)
return fmt.Errorf("opening storage failed: %s", err)
// Before actually starting, we need to make sure
// the WAL is flushed.
startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
return
}
if err := db.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close storage")
return
}
level.Info(logger).Log("msg", "tsdb started")
}()

startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
webHandler.StorageReady()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
close(dbOpen)
<-cancel
return nil
},
func(err error) {
if err := localStorage.Close(); err != nil {
level.Error(logger).Log("msg", "error stopping storage", "err", err)
for {
select {
case <-cancel:
return nil
case _, ok := <-updateDB:
if !ok {
return nil
}
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
}
level.Info(logger).Log("msg", "tsdb started")
localStorage.Set(db.Get(), startTimeMargin)
webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
statusProber.SetReady()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
dbReady <- struct{}{}
}
close(cancel)
},
}
}, func(err error) {
close(cancel)
},
)
}

hashringReady := make(chan struct{})
level.Debug(logger).Log("msg", "setting up hashring")
{
updates := make(chan receive.Hashring)
// Note: the hashring configuration watcher
// is the sender and thus closes the chan.
// In the single-node case, which has no configuration
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)
if cw != nil {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
receive.HashringFromConfig(ctx, updates, cw)
return nil
}, func(error) {
cancel()
close(updates)
})
} else {
defer close(updates)
cancel := make(chan struct{})
g.Add(func() error {
updates <- receive.SingleNodeHashring(endpoint)
<-cancel
return nil
}, func(error) {
close(cancel)
close(updates)
})
}

cancel := make(chan struct{})
g.Add(
func() error {
g.Add(func() error {
defer close(updateDB)
for {
select {
case h := <-updates:
close(hashringReady)
case h, ok := <-updates:
if !ok {
return nil
}
webHandler.SetWriter(nil)
webHandler.Hashring(h)
statusProber.SetReady()
case <-cancel:
close(hashringReady)
return nil
}
select {
// If any new hashring is received, then mark the handler as unready, but keep it alive.
case <-updates:
msg := "hashring has changed; server is not ready to receive web requests."
webHandler.Hashring(nil)
statusProber.SetNotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
updateDB <- struct{}{}
case <-cancel:
return nil
}
<-cancel
return nil
},
func(err error) {
close(cancel)
},
}
}, func(err error) {
close(cancel)
},
)
}

Expand All @@ -269,36 +308,46 @@ func runReceive(
level.Debug(logger).Log("msg", "setting up grpc server")
{
var (
s *grpc.Server
l net.Listener
err error
s *grpc.Server
l net.Listener
)
startGRPC := make(chan struct{})
g.Add(func() error {
<-dbOpen

l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset)

defer close(startGRPC)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts)

// Wait hashring to be ready before start serving metrics
<-hashringReady
level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
for range dbReady {
if s != nil {
s.Stop()
}
l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), component.Receive, lset)
s = newStoreGRPCServer(logger, &receive.UnRegisterer{Registerer: reg}, tracer, tsdbStore, opts)
startGRPC <- struct{}{}
}
return nil
}, func(error) {
if s != nil {
s.Stop()
}
})
// We need to be able to start and stop the gRPC server
// whenever the DB changes, thus it needs its own run group.
g.Add(func() error {
for range startGRPC {
level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
if err := s.Serve(l); err != nil {
return errors.Wrap(err, "serve gRPC")
}
}
return nil
}, func(error) {})
}

level.Debug(logger).Log("msg", "setting up receive http handler")
Expand All @@ -313,17 +362,6 @@ func runReceive(
)
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
Expand All @@ -341,20 +379,54 @@ func runReceive(

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource)

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
// Before starting, ensure any old blocks are uploaded.
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
// Before quitting, ensure all blocks are uploaded.
defer func() {
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
}()

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

return nil
return nil
})
}, func(error) {
cancel()
})
}, func(error) {
cancel()
})
}

{
// Upload on demand.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer close(uploadC)
for {
select {
case <-ctx.Done():
return nil
case <-uploadC:
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
cancel()
uploadDone <- struct{}{}
}
}
}, func(error) {
cancel()
})
}
}

level.Info(logger).Log("msg", "starting receiver")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2 // v1.8.2 is misleading as Prometheus does not have v2 module. This is pointing to one commit after 2.12.0.
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 // v1.8.2 is misleading as Prometheus does not have v2 module.
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.22.4 h1:Mcq67g9mZEBvBuj/x7mF9KCyw5M8/4I/cjQPkdCsq0I=
github.com/aws/aws-sdk-go v1.22.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.23.12 h1:2UnxgNO6Y5J1OrkXS8XNp0UatDxD1bWHiDT62RDPggI=
github.com/aws/aws-sdk-go v1.23.12/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
Expand Down Expand Up @@ -401,8 +401,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE=
github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2 h1:yZAWzfQYJN+vduRHL5jcTrVw+XwYU52ZrAhprmwoknI=
github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2/go.mod h1:rMTlmxGCvukf2KMu3fClMDKLLoJ5hl61MhcJ7xKakf0=
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 h1:B9IMa7s163/ZDSduepHHfOZZHSKdSbgo/bFY5c+FMAs=
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467/go.mod h1:aojjoH+vNHyJUTJoW15HoQWMKXxNhQylU6/G261nqxQ=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
Expand Down
Loading

0 comments on commit 91fdbd6

Please sign in to comment.