Skip to content

Commit

Permalink
receive: Add block shipping
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Apr 17, 2019
1 parent 9fad981 commit 98912c5
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 125 deletions.
35 changes: 34 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/receive"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -36,7 +38,16 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

labelStrs := cmd.Flag("labels", "External labels to announce. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key=\"value\").Strings()

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
}

return runReceive(
g,
logger,
Expand All @@ -49,6 +60,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
*httpMetricsBindAddr,
*remoteWriteAddress,
*dataDir,
objStoreConfig,
lset,
)
}
}
Expand All @@ -65,6 +78,8 @@ func runReceive(
httpMetricsBindAddr string,
remoteWriteAddress string,
dataDir string,
objStoreConfig *pathOrContent,
externalLabels labels.Labels,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
Expand Down Expand Up @@ -189,7 +204,7 @@ func runReceive(
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil)
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, externalLabels)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
Expand Down Expand Up @@ -225,6 +240,24 @@ func runReceive(
},
)
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload {
if err := addShipper(logger, reg, g, metadata.ReceiveSource, &promMetadata{labels: externalLabels}, dataDir, confContentYaml, false); err != nil {
return errors.Wrap(err, "add block shipper to run group")
}
}

level.Info(logger).Log("msg", "starting receiver")

return nil
Expand Down
91 changes: 45 additions & 46 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,67 +234,66 @@ func runSidecar(
return err
}

var uploads = true
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
uploads = false
upload = false
}

if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())
if err != nil {
return err
if upload {
if err := addShipper(logger, reg, g, metadata.SidecarSource, m, dataDir, confContentYaml, uploadCompacted); err != nil {
return errors.Wrap(err, "add block shipper to run group")
}
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()
level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name())
return nil
}

func addShipper(logger log.Logger, reg prometheus.Registerer, g *run.Group, sourceType metadata.SourceType, m *promMetadata, dataDir string, confContentYaml []byte, uploadCompacted bool) error {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())
if err != nil {
return err
}

if err := promclient.IsWALDirAccesible(dataDir); err != nil {
level.Error(logger).Log("err", err)
// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
if err := promclient.IsWALDirAccesible(dataDir); err != nil {
level.Error(logger).Log("err", err)
}

var s *shipper.Shipper
if uploadCompacted {
s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, m.promURL)
if err != nil {
return errors.Wrap(err, "create shipper")
}
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
}
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
var s *shipper.Shipper
if uploadCompacted {
s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, sourceType, m.promURL)
if err != nil {
return errors.Wrap(err, "create shipper")
}
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, sourceType)
}

minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
} else {
m.UpdateTimestamps(minTime, math.MaxInt64)
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

mint, maxt := m.Timestamps()
peer.SetTimestamps(mint, maxt)
}
return nil
})
}, func(error) {
cancel()
return nil
})
}
}, func(error) {
cancel()
})

level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name())
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ require (
github.com/fortytw2/leaktest v1.2.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-kit/kit v0.8.0
github.com/gobuffalo/envy v1.6.15 // indirect
github.com/gogo/protobuf v1.2.0
github.com/gohugoio/hugo v0.54.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/martian v2.1.0+incompatible // indirect
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
Expand All @@ -23,7 +21,7 @@ require (
github.com/hashicorp/memberlist v0.1.0
github.com/julienschmidt/httprouter v1.1.0 // indirect
github.com/lovoo/gcloud-opentracing v0.3.0
github.com/miekg/dns v1.0.8
github.com/miekg/dns v1.1.4
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mozillazg/go-cos v0.11.0
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223
Expand Down
Loading

0 comments on commit 98912c5

Please sign in to comment.