diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 63a3ba9c360..a3606103a96 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "net" + "strings" "sync" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/component" "github.com/improbable-eng/thanos/pkg/receive" "github.com/improbable-eng/thanos/pkg/runutil" @@ -20,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/tsdb" + "github.com/prometheus/tsdb/labels" "google.golang.org/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -36,6 +39,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB."). Default("./data").String() + externalLabels := cmd.Flag("external-labels", "External labels to announce. Comma separated list of key=value pairs. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key1=value1,key2=value2").String() + + objStoreConfig := regCommonObjStoreFlags(cmd, "", false) + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runReceive( g, @@ -49,6 +56,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri *httpMetricsBindAddr, *remoteWriteAddress, *dataDir, + objStoreConfig, + *externalLabels, ) } } @@ -65,10 +74,26 @@ func runReceive( httpMetricsBindAddr string, remoteWriteAddress string, dataDir string, + objStoreConfig *pathOrContent, + externalLabelsString string, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") + externalLabels := labels.Labels{} + labelStrings := strings.Split(externalLabelsString, ",") + for _, labelString := range labelStrings { + keyValueStrings := strings.Split(labelString, "=") + if len(keyValueStrings) != 2 { + return errors.New("external labels are malformed: a list of labels passed to the --external-labels flag must be comma separated and labels in key=value form") + } + externalLabels = append(externalLabels, labels.Label{ + Name: keyValueStrings[0], + Value: keyValueStrings[1], + }) + } + level.Debug(logger).Log("external_labels", externalLabels.String()) + tsdbCfg := &tsdb.Options{ Retention: model.Duration(time.Hour * 24 * 15), NoLockfile: true, @@ -189,7 +214,7 @@ func runReceive( } db := localStorage.Get() - tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil) + tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, externalLabels) opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) if err != nil { @@ -225,6 +250,12 @@ func runReceive( }, ) } + + err := addShipper(logger, reg, g, metadata.ReceiveSource, &promMetadata{labels: externalLabels}, dataDir, objStoreConfig, false) + if err != nil { + return errors.Wrap(err, "failed to add block shipper to run group") + } + level.Info(logger).Log("msg", "starting receiver") return nil diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 1e31b54d150..8faa90cb1a5 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -229,6 +229,16 @@ func runSidecar( }) } + err := addShipper(logger, reg, g, metadata.SidecarSource, m, dataDir, objStoreConfig, uploadCompacted) + if err != nil { + return errors.Wrap(err, "failed to add block shipper to run group") + } + + level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name()) + return nil +} + +func addShipper(logger log.Logger, reg prometheus.Registerer, g *run.Group, sourceType metadata.SourceType, m *promMetadata, dataDir string, objStoreConfig *pathOrContent, uploadCompacted bool) error { confContentYaml, err := objStoreConfig.Content() if err != nil { return err @@ -265,12 +275,12 @@ func runSidecar( var s *shipper.Shipper if uploadCompacted { - s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, m.promURL) + s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, sourceType, m.promURL) if err != nil { return errors.Wrap(err, "create shipper") } } else { - s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource) + s = shipper.New(logger, reg, dataDir, bkt, m.Labels, sourceType) } return runutil.Repeat(30*time.Second, ctx.Done(), func() error { @@ -278,15 +288,6 @@ func runSidecar( level.Warn(logger).Log("err", err, "uploaded", uploaded) } - minTime, _, err := s.Timestamps() - if err != nil { - level.Warn(logger).Log("msg", "reading timestamps failed", "err", err) - } else { - m.UpdateTimestamps(minTime, math.MaxInt64) - - mint, maxt := m.Timestamps() - peer.SetTimestamps(mint, maxt) - } return nil }) }, func(error) { @@ -294,7 +295,6 @@ func runSidecar( }) } - level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name()) return nil } diff --git a/go.sum b/go.sum index 237900b401e..7840ed8f3ae 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,6 @@ github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c h1:Y5u github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= github.com/Azure/go-autorest v10.8.1+incompatible h1:u0jVQf+a6k6x8A+sT60l6EY9XZu+kHdnZVPAYqpVRo0= github.com/Azure/go-autorest v10.8.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= -github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0= -github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= @@ -272,6 +270,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.19.0 h1:+jrnNy8MR4GZXvwF9PEuSyHxA4NaTf6601oNRwCSXq0= diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 0d8b22dd190..7ce07590369 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -23,6 +23,7 @@ type SourceType string const ( UnknownSource SourceType = "" SidecarSource SourceType = "sidecar" + ReceiveSource SourceType = "receive" CompactorSource SourceType = "compactor" CompactorRepairSource SourceType = "compactor.repair" RulerSource SourceType = "ruler" diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index 334d73f2dab..ce634d2bcf4 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -139,7 +139,9 @@ then --log.level debug \ --tsdb.path "./data/remote-write-receive-data" \ --grpc-address 0.0.0.0:19891 \ - --http-address 0.0.0.0:19691 \ + --http-address 0.0.0.0:18091 \ + --external-labels "receive=true" \ + ${OBJSTORECFG} \ --remote-write.address 0.0.0.0:19291 & mkdir -p "data/local-prometheus-data/" diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index d8b4fc26071..912e1d80bf0 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -145,6 +145,7 @@ func testQuerySimple(t *testing.T, conf testConfig) { "__name__": "up", "instance": model.LabelValue("localhost:9100"), "job": "node", + "receive": "true", }, res[3].Metric) } @@ -201,6 +202,7 @@ func testQuerySimple(t *testing.T, conf testConfig) { "__name__": "up", "instance": model.LabelValue("localhost:9100"), "job": "node", + "receive": "true", }, res[2].Metric) } } diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 34bd40f6954..d52daf0beff 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -145,6 +145,7 @@ func receiver(i int, config string) cmdScheduleFunc { "--grpc-address", remoteWriteReceiveGRPC(i), "--http-address", remoteWriteReceiveMetricHTTP(i), "--remote-write.address", remoteWriteReceiveHTTP(i), + "--external-labels", "receive=true", "--tsdb.path", promDir, "--log.level", "debug"))), nil }