diff --git a/CHANGELOG.md b/CHANGELOG.md index d438abf95d..5f71411212 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ## Unreleased +- [#3289](https://github.com/thanos-io/thanos/pull/3289) Thanos Object Store: Adding `prefix` to a bucket config + ### Added - [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific. @@ -112,6 +114,16 @@ Highlights: - [#3114](https://github.com/thanos-io/thanos/pull/3114) Query Frontend: Added support for Memacached cache. - **breaking** Renamed flag `log_queries_longer_than` to `log-queries-longer-than`. - [#3166](https://github.com/thanos-io/thanos/pull/3166) UIs: Added UI for passing a `storeMatch[]` parameter to queries. +<<<<<<< HEAD +- [#3184](https://github.com/thanos-io/thanos/pull/3184) Compact: Fix web.prefix-header to use &wc.prefixHeaderName +- [#3181](https://github.com/thanos-io/thanos/pull/3181) Logging: Add debug level logging for responses between 300-399 +- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also time range metadata based store filtering is supported on Labels APIs. +- [#3154](https://github.com/thanos-io/thanos/pull/3154) Query Frontend: Add metric `thanos_memcached_getmulti_gate_queries_max`. +- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric. +- [#3147](https://github.com/thanos-io/thanos/pull/3147) Querier: Add `query.metadata.default-time-range` flag to specify the default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning. +- [#3207](https://github.com/thanos-io/thanos/pull/3207) Query Frontend: Add `cache-compression-type` flag to use compression in the query frontend cache. +- [#3289](https://github.com/thanos-io/thanos/pull/3289) Added a provision in the bucket config to add a `prefix` +======= - [#3181](https://github.com/thanos-io/thanos/pull/3181) Logging: Added debug level logging for responses between 300-399 - [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allowed passing a `storeMatch[]` to Labels APIs; Time range metadata based store filtering is supported on Labels APIs. - [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Significantly improved sidecar latency (reduced ~2x). Added `thanos_sidecar_prometheus_store_received_frames` histogram metric. @@ -120,6 +132,7 @@ Highlights: - [#3122](https://github.com/thanos-io/thanos/pull/3122) \*: All Thanos components have now `/debug/fgprof` endpoint on HTTP port allowing to get [off-CPU profiles as well](https://github.com/felixge/fgprof). - [#3109](https://github.com/thanos-io/thanos/pull/3109) Query Frontend: Added support for `Cache-Control` HTTP response header which controls caching behaviour. So far `no-store` value is supported and it makes the response skip cache. - [#3092](https://github.com/thanos-io/thanos/pull/3092) Tools: Added `tools bucket cleanup` CLI tool that deletes all blocks marked to be deleted. +>>>>>>> upstream/master ### Changed diff --git a/docs/storage.md b/docs/storage.md index 7379b51ee5..03ecd2dad4 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -93,6 +93,7 @@ config: kms_key_id: "" kms_encryption_context: {} encryption_key: "" +prefix: "" ``` At a minimum, you will need to provide a value for the `bucket`, `endpoint`, `access_key`, and `secret_key` keys. The rest of the keys are optional. @@ -243,6 +244,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ##### Using GOOGLE_APPLICATION_CREDENTIALS @@ -327,6 +329,7 @@ config: container: "" endpoint: "" max_retries: 0 +prefix: "" ``` #### OpenStack Swift @@ -372,6 +375,7 @@ config: connect_timeout: 10s timeout: 5m use_dynamic_large_objects: false +prefix: "" ``` #### Tencent COS @@ -389,6 +393,7 @@ config: app_id: "" secret_key: "" secret_id: "" +prefix: "" ``` Set the flags `--objstore.config-file` to reference to the configuration file. @@ -407,6 +412,7 @@ config: bucket: "" access_key_id: "" access_key_secret: "" +prefix: "" ``` Use --objstore.config-file to reference to this configuration file. @@ -426,8 +432,23 @@ This is mainly useful for testing and demos. type: FILESYSTEM config: directory: "" +prefix: "" ``` +## Prefix + +Prefix field allows adding an optional prefix to block (`/`) and block files which are uploaded by any block "producer" (e.g sidecar, ruler, receiver). +The sample config below ensures that all the bucket operations e.g: upload, delete, list, etc are performed on `/tenant-0/prefix0/` path of the object store only. The prefix shall start and end with '/'. +Sample object store config: +```yaml +type: S3 +config: + +prefix: /tenant-0/prefix0/ +``` +It is worth mentioning that this feature can be used to store data of different tenants in different paths of the same bucket. However, for such use-cases, putting data on different paths WILL REQUIRE totally separate Store Gateway / Compactor by design. + + ### How to add a new client to Thanos? Following checklist allows adding new Go code client to supported providers: diff --git a/pkg/objstore/client/factory.go b/pkg/objstore/client/factory.go index 5c037bc0ed..935819a46c 100644 --- a/pkg/objstore/client/factory.go +++ b/pkg/objstore/client/factory.go @@ -38,6 +38,8 @@ const ( type BucketConfig struct { Type ObjProvider `yaml:"type"` Config interface{} `yaml:"config"` + // Prefix field allows adding an optional prefix to block (`/`) and block files which are uploaded by any block "producer" (e.g sidecar, ruler, receiver). + Prefix string `yaml:"prefix" default:""` } // NewBucket initializes and returns new object storage clients. @@ -76,5 +78,5 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("create %s client", bucketConf.Type)) } - return objstore.NewTracingBucket(objstore.BucketWithMetrics(bucket.Name(), bucket, reg)), nil + return objstore.NewTracingBucket(objstore.BucketWithMetrics(bucket.Name(), objstore.NewPrefixedBucket(bucket, bucketConf.Prefix), reg)), nil } diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index cf5317d43d..2abdd4f7ef 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -8,6 +8,7 @@ import ( "context" "io" "os" + "path" "path/filepath" "strings" "time" @@ -437,6 +438,91 @@ func (b *metricBucket) Name() string { return b.bkt.Name() } +type prefixedBucket struct { + bkt Bucket + prefix string +} + +// NewPrefixedBucket returns a new prefixedBucket. +func NewPrefixedBucket(bkt Bucket, prefix string) *prefixedBucket { + return &prefixedBucket{ + bkt: bkt, + prefix: prefix, + } +} + +func (pbkt *prefixedBucket) nameWithPrefix(name string) string { + return path.Join(pbkt.prefix, name) +} + +func (pbkt *prefixedBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket { + if ib, ok := pbkt.bkt.(InstrumentedBucket); ok { + return &prefixedBucket{ + bkt: ib.WithExpectedErrs(expectedFunc), + prefix: pbkt.prefix, + } + } + return pbkt +} + +func (pbkt *prefixedBucket) ReaderWithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) BucketReader { + return pbkt.WithExpectedErrs(expectedFunc) +} + +func (pbkt *prefixedBucket) Name() string { + return pbkt.bkt.Name() +} + +func (pbkt *prefixedBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { + pname := pbkt.nameWithPrefix(name) + err = pbkt.bkt.Upload(ctx, pname, r) + return +} + +func (pbkt *prefixedBucket) Delete(ctx context.Context, name string) (err error) { + pname := pbkt.nameWithPrefix(name) + err = pbkt.bkt.Delete(ctx, pname) + return +} + +func (pbkt *prefixedBucket) Close() error { + return pbkt.bkt.Close() +} + +func (pbkt *prefixedBucket) Iter(ctx context.Context, dir string, f func(string) error) error { + pdir := pbkt.nameWithPrefix(dir) + if pbkt.prefix != "" { + return pbkt.bkt.Iter(ctx, pdir, func(s string) error { + return f(strings.Join(strings.Split(s, pbkt.prefix)[1:], "/")) + }) + } + return pbkt.bkt.Iter(ctx, pdir, f) +} + +func (pbkt *prefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + pname := pbkt.nameWithPrefix(name) + return pbkt.bkt.Get(ctx, pname) +} + +func (pbkt *prefixedBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + pname := pbkt.nameWithPrefix(name) + return pbkt.bkt.GetRange(ctx, pname, off, length) +} + +func (pbkt *prefixedBucket) Exists(ctx context.Context, name string) (bool, error) { + pname := pbkt.nameWithPrefix(name) + return pbkt.bkt.Exists(ctx, pname) +} + +func (pbkt *prefixedBucket) IsObjNotFoundErr(err error) bool { + return pbkt.bkt.IsObjNotFoundErr(err) +} + +func (pbkt *prefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { + pname := pbkt.nameWithPrefix(name) + return pbkt.bkt.Attributes(ctx, pname) +} + type timingReadCloser struct { io.ReadCloser diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index 73ba167135..4db74b4376 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -4,6 +4,10 @@ package objstore import ( + "context" + "io/ioutil" + "sort" + "strings" "testing" promtest "github.com/prometheus/client_golang/prometheus/testutil" @@ -63,3 +67,60 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload) } + +func TestPrefixedBucket(t *testing.T) { + prefix := "/abc/def/" + ctx := context.Background() + bkt := NewPrefixedBucket(NewInMemBucket(), prefix) + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@"))) + // Double check we can immediately read it. + rc1, err := bkt.Get(ctx, "id1/obj_1.some") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, rc1.Close()) }() + content, err := ioutil.ReadAll(rc1) + testutil.Ok(t, err) + testutil.Equals(t, "@test-data@", string(content)) + // Upload other objects. + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@"))) + // Upload should be idempotent. + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@"))) + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_3.some", strings.NewReader("@test-data3@"))) + testutil.Ok(t, bkt.Upload(ctx, "id2/obj_4.some", strings.NewReader("@test-data4@"))) + testutil.Ok(t, bkt.Upload(ctx, "obj_5.some", strings.NewReader("@test-data5@"))) + + // Can we iter over items from top dir? + var seen []string + testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error { + seen = append(seen, fn) + return nil + })) + expected := []string{"obj_5.some", "id1/", "id2/"} + sort.Strings(expected) + sort.Strings(seen) + // fmt.Printf("%v\n", inmembkt.Objects()) + testutil.Equals(t, expected, seen) + + // Can we iter over items from id1/ dir? + seen = []string{} + testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error { + seen = append(seen, fn) + return nil + })) + testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen) + + // Can we iter over items from id1 dir? + seen = []string{} + testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error { + seen = append(seen, fn) + return nil + })) + testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen) + + // Can we iter over items from not existing dir? + testutil.Ok(t, bkt.Iter(ctx, "id0", func(fn string) error { + t.Error("Not expected to loop through not existing directory") + t.FailNow() + + return nil + })) +}