Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Add block shipping #1011

Merged
merged 1 commit into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/receive"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
Expand All @@ -20,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -36,7 +40,16 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

labelStrs := cmd.Flag("labels", "External labels to announce. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key=\"value\"").Strings()

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this required? Basing of fact that retention is not configurable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see people just caring about a single instance of this with just local storage. I can see scenarios where it would not have to be set.

Copy link
Member Author

@brancz brancz Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note, retention should be configurable. People should be able to make a choice how they want local storage to be treated when running receive I think. I'll make retention configurable in a follow up PR. Don't want to mix concerns here.


m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
}

return runReceive(
g,
logger,
Expand All @@ -49,6 +62,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
*httpMetricsBindAddr,
*remoteWriteAddress,
*dataDir,
objStoreConfig,
lset,
)
}
}
Expand All @@ -65,6 +80,8 @@ func runReceive(
httpMetricsBindAddr string,
remoteWriteAddress string,
dataDir string,
objStoreConfig *pathOrContent,
lset labels.Labels,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
Expand Down Expand Up @@ -189,7 +206,7 @@ func runReceive(
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil)
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
Expand Down Expand Up @@ -225,6 +242,51 @@ func runReceive(
},
)
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe wrap the error here?

}

upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource)

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

return nil
})
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting receiver")

return nil
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ require (
github.com/fortytw2/leaktest v1.2.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-kit/kit v0.8.0
github.com/gobuffalo/envy v1.6.15 // indirect
github.com/gogo/protobuf v1.2.0
github.com/gohugoio/hugo v0.54.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/martian v2.1.0+incompatible // indirect
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
Expand All @@ -23,7 +21,7 @@ require (
github.com/hashicorp/memberlist v0.1.0
github.com/julienschmidt/httprouter v1.1.0 // indirect
github.com/lovoo/gcloud-opentracing v0.3.0
github.com/miekg/dns v1.0.8
github.com/miekg/dns v1.1.4
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mozillazg/go-cos v0.11.0
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223
Expand Down
Loading