Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] adding thanos upload-blocks command #6884

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled.
- [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.
- [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions.
- [#6884](https://github.com/thanos-io/thanos/pull/6884) Tools: Add upload-block command to upload blocks to object storage.

### Changed

Expand Down
121 changes: 121 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
v1 "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/clientconfig"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/compactv2"
Expand All @@ -56,9 +57,11 @@ import (
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/replicate"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/ui"
"github.com/thanos-io/thanos/pkg/verifier"
Expand Down Expand Up @@ -158,6 +161,11 @@ type bucketMarkBlockConfig struct {
removeMarker bool
}

type bucketUploadBlocksConfig struct {
path string
prometheus prometheusConfig
}

func (tbc *bucketVerifyConfig) registerBucketVerifyFlag(cmd extkingpin.FlagClause) *bucketVerifyConfig {
cmd.Flag("repair", "Attempt to repair blocks for which issues were detected").
Short('r').Default("false").BoolVar(&tbc.repair)
Expand Down Expand Up @@ -277,6 +285,30 @@ func (tbc *bucketRetentionConfig) registerBucketRetentionFlag(cmd extkingpin.Fla
return tbc
}

func (tbc *bucketUploadBlocksConfig) registerBucketUploadBlocksFlag(cmd extkingpin.FlagClause) *bucketUploadBlocksConfig {
cmd.Flag("path", "Path to the directory containing blocks to upload.").Default("./data").StringVar(&tbc.path)

cmd.Flag("prometheus.url",
"URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URLVar(&tbc.prometheus.url)
cmd.Flag("prometheus.ready_timeout",
"Maximum time to wait for the Prometheus instance to start up").
Default("10m").DurationVar(&tbc.prometheus.readyTimeout)
cmd.Flag("prometheus.get_config_interval",
"How often to get Prometheus config").
Default("30s").DurationVar(&tbc.prometheus.getConfigInterval)
cmd.Flag("prometheus.get_config_timeout",
"Timeout for getting Prometheus config").
Default("5s").DurationVar(&tbc.prometheus.getConfigTimeout)
tbc.prometheus.httpClient = extflag.RegisterPathOrContent(
cmd,
"prometheus.http-client",
"YAML file or string with http client configs. See Format details: https://thanos.io/tip/components/sidecar.md/#configuration.",
)

return tbc
}

func registerBucket(app extkingpin.AppClause) {
cmd := app.Command("bucket", "Bucket utility commands")

Expand All @@ -291,6 +323,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
registerBucketRetention(cmd, objStoreConfig)
registerBucketUploadBlocks(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -1420,3 +1453,91 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
return nil
})
}

func registerBucketUploadBlocks(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command("upload-blocks", "Upload blocks push blocks from the provided path to the object storage.")

tbc := &bucketUploadBlocksConfig{}
tbc.registerBucketUploadBlocksFlag(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
ctx := context.Background()
httpConfContentYaml, err := tbc.prometheus.httpClient.Content()
if err != nil {
return errors.Wrap(err, "getting http client config")
}

httpClientConfig, err := clientconfig.NewHTTPClientConfigFromYAML(httpConfContentYaml)
if err != nil {
return errors.Wrap(err, "parsing http config YAML")
}

httpClient, err := clientconfig.NewHTTPClient(*httpClientConfig, "thanos-tool")
if err != nil {
return errors.Wrap(err, "Improper http client config")
}

m := &promMetadata{
promURL: tbc.prometheus.url,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-tool"),
}

err = runutil.Retry(2*time.Second, ctx.Done(), func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is usecase to upload blocks without running Prometheus. In this case external labels are specified as a flag.
This can be done in future prs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to have a command to get those external labels independently of this process.

I can see an example where you want to migrate blocks from one cluster to another, but you want to retain the labels from the old cluster which might already be shut down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should have follow-up where we replace by extflags from commandline I think

if err := m.UpdateLabels(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
)
return err
}

level.Info(logger).Log(
"msg", "successfully loaded prometheus external labels",
"external_labels", m.Labels().String(),
)
return nil
})

if err != nil {
return errors.Wrap(err, "initial external labels query")
}

if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Upload.String())
if err != nil {
return err
}

bkt = objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

if err := promclient.IsDirAccessible(tbc.path); err != nil {
level.Error(logger).Log("err", err)
}

defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

s := shipper.New(logger, reg, tbc.path, bkt, m.Labels, metadata.BucketUploadSource,
nil, false, metadata.HashFunc(""), shipper.DefaultMetaFilename)

if _, err := s.Sync(ctx); err != nil {
level.Error(logger).Log("msg", "shipper sync", "err", err)
}

return nil
})
}
62 changes: 62 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.

tools bucket upload-blocks [<flags>]
Upload blocks push blocks from the provided path to the object storeage.

tools rules-check --rules=RULES
Check if the rule files are valid or not.

Expand Down Expand Up @@ -187,6 +190,9 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.

tools bucket upload-blocks [<flags>]
Upload blocks push blocks from the provided path to the object storeage.


```

Expand Down Expand Up @@ -841,6 +847,62 @@ Flags:

```

### Bucket Upload Blocks

`tools bucket upload-blocks` uploads a blocks created on the given bucket.

```$ mdox-exec="thanos tools bucket upload-blocks --help"
usage: thanos tools bucket upload-blocks [<flags>]

Upload blocks push blocks from the provided path to the object storeage.

Flags:
-h, --help Show context-sensitive help (also try --help-long and
--help-man).
--log.format=logfmt Log format to use. Possible options: logfmt or json.
--log.level=info Log filtering level.
--objstore.config=<content>
Alternative to 'objstore.config-file' flag (mutually
exclusive). Content of YAML file that contains
object store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--objstore.config-file=<file-path>
Path to YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--path="./data" Path to the directory containing blocks to upload.
--prometheus.get_config_interval=30s
How often to get Prometheus config
--prometheus.get_config_timeout=5s
Timeout for getting Prometheus config
--prometheus.http-client=<content>
Alternative to 'prometheus.http-client-file' flag
(mutually exclusive). Content of YAML file or string
with http client configs. See Format details:
https://thanos.io/tip/components/sidecar.md/#configuration.
--prometheus.http-client-file=<file-path>
Path to YAML file or string with http
client configs. See Format details:
https://thanos.io/tip/components/sidecar.md/#configuration.
--prometheus.ready_timeout=10m
Maximum time to wait for the Prometheus instance to
start up
--prometheus.url=http://localhost:9090
URL at which to reach Prometheus's API. For better
performance use local network.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
with tracing configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tracing.config-file=<file-path>
Path to YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--version Show application version.

```

## Rules-check

The `tools rules-check` subcommand contains tools for validation of Prometheus rules.
Expand Down
1 change: 1 addition & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
RulerSource SourceType = "ruler"
BucketRepairSource SourceType = "bucket.repair"
BucketRewriteSource SourceType = "bucket.rewrite"
BucketUploadSource SourceType = "bucket.upload"
TestSource SourceType = "test"
)

Expand Down
1 change: 1 addition & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ var (
Bucket = source{component: component{name: "bucket"}}
Cleanup = source{component: component{name: "cleanup"}}
Mark = source{component: component{name: "mark"}}
Upload = source{component: component{name: "upload"}}
Rewrite = source{component: component{name: "rewrite"}}
Retention = source{component: component{name: "retention"}}
Compact = source{component: component{name: "compact"}}
Expand Down
16 changes: 16 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,22 @@ func IsWALDirAccessible(dir string) error {
return nil
}

// IsDirAccessible returns no error if dir can be found.
func IsDirAccessible(dir string) error {
const errMsg = "Dir is not accessible."

f, err := os.Stat(dir)
if err != nil {
return errors.Wrap(err, errMsg)
}

if !f.IsDir() {
return errors.New(errMsg)
}

return nil
}

// ExternalLabels returns sorted external labels from /api/v1/status/config Prometheus endpoint.
// Note that configuration can be hot reloadable on Prometheus, so this config might change in runtime.
func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labels, error) {
Expand Down
Loading