Skip to content

Commit

Permalink
receive: Add block shipping
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Apr 12, 2019
1 parent 4fd0adc commit e1a9a02
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 16 deletions.
33 changes: 32 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/receive"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand All @@ -20,6 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -36,6 +39,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

externalLabels := cmd.Flag("external-labels", "External labels to announce. Comma separated list of key=value pairs. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key1=value1,key2=value2").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runReceive(
g,
Expand All @@ -49,6 +56,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
*httpMetricsBindAddr,
*remoteWriteAddress,
*dataDir,
objStoreConfig,
*externalLabels,
)
}
}
Expand All @@ -65,10 +74,26 @@ func runReceive(
httpMetricsBindAddr string,
remoteWriteAddress string,
dataDir string,
objStoreConfig *pathOrContent,
externalLabelsString string,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

externalLabels := labels.Labels{}
labelStrings := strings.Split(externalLabelsString, ",")
for _, labelString := range labelStrings {
keyValueStrings := strings.Split(labelString, "=")
if len(keyValueStrings) != 2 {
return errors.New("external labels are malformed: a list of labels passed to the --external-labels flag must be comma separated and labels in key=value form")
}
externalLabels = append(externalLabels, labels.Label{
Name: keyValueStrings[0],
Value: keyValueStrings[1],
})
}
level.Debug(logger).Log("external_labels", externalLabels.String())

tsdbCfg := &tsdb.Options{
Retention: model.Duration(time.Hour * 24 * 15),
NoLockfile: true,
Expand Down Expand Up @@ -189,7 +214,7 @@ func runReceive(
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil)
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, externalLabels)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
Expand Down Expand Up @@ -225,6 +250,12 @@ func runReceive(
},
)
}

err := addShipper(logger, reg, g, metadata.ReceiveSource, &promMetadata{labels: externalLabels}, dataDir, objStoreConfig, false)
if err != nil {
return errors.Wrap(err, "failed to add block shipper to run group")
}

level.Info(logger).Log("msg", "starting receiver")

return nil
Expand Down
24 changes: 12 additions & 12 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ func runSidecar(
})
}

err := addShipper(logger, reg, g, metadata.SidecarSource, m, dataDir, objStoreConfig, uploadCompacted)
if err != nil {
return errors.Wrap(err, "failed to add block shipper to run group")
}

level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name())
return nil
}

func addShipper(logger log.Logger, reg prometheus.Registerer, g *run.Group, sourceType metadata.SourceType, m *promMetadata, dataDir string, objStoreConfig *pathOrContent, uploadCompacted bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -265,36 +275,26 @@ func runSidecar(

var s *shipper.Shipper
if uploadCompacted {
s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, m.promURL)
s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, sourceType, m.promURL)
if err != nil {
return errors.Wrap(err, "create shipper")
}
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, sourceType)
}

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
} else {
m.UpdateTimestamps(minTime, math.MaxInt64)

mint, maxt := m.Timestamps()
peer.SetTimestamps(mint, maxt)
}
return nil
})
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name())
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c h1:Y5u
github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/go-autorest v10.8.1+incompatible h1:u0jVQf+a6k6x8A+sT60l6EY9XZu+kHdnZVPAYqpVRo0=
github.com/Azure/go-autorest v10.8.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0=
github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
Expand Down Expand Up @@ -272,6 +270,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.19.0 h1:+jrnNy8MR4GZXvwF9PEuSyHxA4NaTf6601oNRwCSXq0=
Expand Down
1 change: 1 addition & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type SourceType string
const (
UnknownSource SourceType = ""
SidecarSource SourceType = "sidecar"
ReceiveSource SourceType = "receive"
CompactorSource SourceType = "compactor"
CompactorRepairSource SourceType = "compactor.repair"
RulerSource SourceType = "ruler"
Expand Down
4 changes: 3 additions & 1 deletion scripts/quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ then
--log.level debug \
--tsdb.path "./data/remote-write-receive-data" \
--grpc-address 0.0.0.0:19891 \
--http-address 0.0.0.0:19691 \
--http-address 0.0.0.0:18091 \
--external-labels "receive=true" \
${OBJSTORECFG} \
--remote-write.address 0.0.0.0:19291 &

mkdir -p "data/local-prometheus-data/"
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func testQuerySimple(t *testing.T, conf testConfig) {
"__name__": "up",
"instance": model.LabelValue("localhost:9100"),
"job": "node",
"receive": "true",
}, res[3].Metric)
}

Expand Down Expand Up @@ -201,6 +202,7 @@ func testQuerySimple(t *testing.T, conf testConfig) {
"__name__": "up",
"instance": model.LabelValue("localhost:9100"),
"job": "node",
"receive": "true",
}, res[2].Metric)
}
}
Expand Down
1 change: 1 addition & 0 deletions test/e2e/spinup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func receiver(i int, config string) cmdScheduleFunc {
"--grpc-address", remoteWriteReceiveGRPC(i),
"--http-address", remoteWriteReceiveMetricHTTP(i),
"--remote-write.address", remoteWriteReceiveHTTP(i),
"--external-labels", "receive=true",
"--tsdb.path", promDir,
"--log.level", "debug"))), nil
}
Expand Down

0 comments on commit e1a9a02

Please sign in to comment.