diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 4ae3dfd0e52..db9fc63ea92 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -101,6 +101,10 @@ func registerReceive(app *extkingpin.App) { return errors.Wrap(err, "parse labels") } + if len(lset) == 0 { + return errors.New("no external labels configured for receive, uniquely identifying external labels must be configured (ideally with `receive_` prefix); see https://thanos.io/tip/thanos/storage.md#external-labels for details.") + } + var cw *receive.ConfigWatcher if *hashringsFile != "" { cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 60d3b93d375..ddf0f8993fe 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -166,7 +166,7 @@ func runSidecar( } if len(m.Labels()) == 0 { - return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured") + return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.") } // Periodically query the Prometheus config. We use this as a heartbeat as well as for updating diff --git a/pkg/block/block.go b/pkg/block/block.go index c8e63d609bd..33993bd446b 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -83,7 +83,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.Wrap(err, "not a block dir") } - meta, err := metadata.Read(bdir) + meta, err := metadata.ReadFromDir(bdir) if err != nil { // No meta or broken meta file. return errors.Wrap(err, "read meta") diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 129b827f8e6..b5006e59267 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -228,7 +228,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met // Best effort load from local dir. if f.cacheDir != "" { - m, err := metadata.Read(cachedBlockDir) + m, err := metadata.ReadFromDir(cachedBlockDir) if err == nil { return m, nil } diff --git a/pkg/block/index.go b/pkg/block/index.go index 12ffe5835b9..2c0721634a0 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -376,7 +376,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT entropy := rand.New(rand.NewSource(time.Now().UnixNano())) resid = ulid.MustNew(ulid.Now(), entropy) - meta, err := metadata.Read(bdir) + meta, err := metadata.ReadFromDir(bdir) if err != nil { return resid, errors.Wrap(err, "read meta file") } diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 2614cc9d552..365cf452ace 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -76,7 +76,7 @@ func TestReaders(t *testing.T) { db.Close() */ - m, err := metadata.Read("./testdata/index_format_v1") + m, err := metadata.ReadFromDir("./testdata/index_format_v1") testutil.Ok(t, err) e2eutil.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, m.ULID.String())) @@ -312,7 +312,7 @@ func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *meta } */ - m, err := metadata.Read("./testdata/index_format_v2") + m, err := metadata.ReadFromDir("./testdata/index_format_v2") testutil.Ok(t, err) e2eutil.Copy(t, "./testdata/index_format_v2", filepath.Join(tmpDir, m.ULID.String())) diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index db9e3792aa4..a11d142ad5c 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -11,7 +11,6 @@ package metadata import ( "encoding/json" "io" - "io/ioutil" "os" "path/filepath" @@ -57,6 +56,8 @@ type Thanos struct { // Version of Thanos meta file. If none specified, 1 is assumed (since first version did not have explicit version specified). Version int `json:"version,omitempty"` + // Labels are the external labels identifying the producer as well as tenant. + // See https://thanos.io/tip/thanos/storage.md#external-labels for details. Labels map[string]string `json:"labels"` Downsample ThanosDownsample `json:"downsample"` @@ -87,7 +88,7 @@ type ThanosDownsample struct { // InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk. // NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { - newMeta, err := Read(bdir) + newMeta, err := ReadFromDir(bdir) if err != nil { return nil, errors.Wrap(err, "read new meta") } @@ -154,17 +155,23 @@ func renameFile(logger log.Logger, from, to string) error { return pdir.Close() } -// Read reads the given meta from /meta.json. -func Read(dir string) (*Meta, error) { - b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) +// ReadFromDir reads the given meta from /meta.json. +func ReadFromDir(dir string) (*Meta, error) { + f, err := os.Open(filepath.Join(dir, MetaFilename)) if err != nil { return nil, err } - var m Meta + return read(f) +} - if err := json.Unmarshal(b, &m); err != nil { +func read(rc io.ReadCloser) (_ *Meta, err error) { + defer runutil.ExhaustCloseWithErrCapture(&err, rc, "close meta JSON") + + var m Meta + if err = json.NewDecoder(rc).Decode(&m); err != nil { return nil, err } + if m.Version != TSDBVersion1 { return nil, errors.Errorf("unexpected meta file version %d", m.Version) } @@ -178,5 +185,10 @@ func Read(dir string) (*Meta, error) { if version != ThanosVersion1 { return nil, errors.Errorf("unexpected meta file Thanos section version %d", m.Version) } + + if m.Thanos.Labels == nil { + // To avoid extra nil checks, allocate map here if empty. + m.Thanos.Labels = make(map[string]string) + } return &m, nil } diff --git a/pkg/block/metadata/meta_test.go b/pkg/block/metadata/meta_test.go new file mode 100644 index 00000000000..dbff00623f4 --- /dev/null +++ b/pkg/block/metadata/meta_test.go @@ -0,0 +1,202 @@ +package metadata + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestMeta_ReadWrite(t *testing.T) { + t.Run("empty write/read/write", func(t *testing.T) { + b := bytes.Buffer{} + testutil.Ok(t, Meta{}.Write(&b)) + testutil.Equals(t, `{ + "ulid": "00000000000000000000000000", + "minTime": 0, + "maxTime": 0, + "stats": {}, + "compaction": { + "level": 0 + }, + "version": 0, + "thanos": { + "labels": null, + "downsample": { + "resolution": 0 + }, + "source": "" + } +} +`, b.String()) + _, err := read(ioutil.NopCloser(&b)) + testutil.NotOk(t, err) + testutil.Equals(t, "unexpected meta file version 0", err.Error()) + }) + + t.Run("real write/read/write", func(t *testing.T) { + b := bytes.Buffer{} + m1 := Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(5, nil), + MinTime: 2424, + MaxTime: 134, + Version: 1, + Compaction: tsdb.BlockMetaCompaction{ + Sources: []ulid.ULID{ulid.MustNew(1, nil), ulid.MustNew(2, nil)}, + Parents: []tsdb.BlockDesc{ + { + ULID: ulid.MustNew(3, nil), + MinTime: 442, + MaxTime: 24225, + }, + }, + Level: 123, + }, + Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4}, + }, + Thanos: Thanos{ + Version: 1, + Labels: map[string]string{"ext": "lset1"}, + Source: ReceiveSource, + Files: []File{ + {RelPath: "index", SizeBytes: 1313}, + }, + Downsample: ThanosDownsample{ + Resolution: 123144, + }, + }, + } + testutil.Ok(t, m1.Write(&b)) + testutil.Equals(t, `{ + "ulid": "00000000050000000000000000", + "minTime": 2424, + "maxTime": 134, + "stats": { + "numSamples": 245, + "numSeries": 4, + "numChunks": 14 + }, + "compaction": { + "level": 123, + "sources": [ + "00000000010000000000000000", + "00000000020000000000000000" + ], + "parents": [ + { + "ulid": "00000000030000000000000000", + "minTime": 442, + "maxTime": 24225 + } + ] + }, + "version": 1, + "thanos": { + "version": 1, + "labels": { + "ext": "lset1" + }, + "downsample": { + "resolution": 123144 + }, + "source": "receive", + "files": [ + { + "rel_path": "index", + "size_bytes": 1313 + } + ] + } +} +`, b.String()) + retMeta, err := read(ioutil.NopCloser(&b)) + testutil.Ok(t, err) + testutil.Equals(t, m1, *retMeta) + }) + + t.Run("missing external labels write/read/write", func(t *testing.T) { + b := bytes.Buffer{} + m1 := Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(5, nil), + MinTime: 2424, + MaxTime: 134, + Version: 1, + Compaction: tsdb.BlockMetaCompaction{ + Sources: []ulid.ULID{ulid.MustNew(1, nil), ulid.MustNew(2, nil)}, + Parents: []tsdb.BlockDesc{ + { + ULID: ulid.MustNew(3, nil), + MinTime: 442, + MaxTime: 24225, + }, + }, + Level: 123, + }, + Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4}, + }, + Thanos: Thanos{ + Version: 1, + Source: ReceiveSource, + Files: []File{ + {RelPath: "index", SizeBytes: 1313}, + }, + Downsample: ThanosDownsample{ + Resolution: 123144, + }, + }, + } + testutil.Ok(t, m1.Write(&b)) + testutil.Equals(t, `{ + "ulid": "00000000050000000000000000", + "minTime": 2424, + "maxTime": 134, + "stats": { + "numSamples": 245, + "numSeries": 4, + "numChunks": 14 + }, + "compaction": { + "level": 123, + "sources": [ + "00000000010000000000000000", + "00000000020000000000000000" + ], + "parents": [ + { + "ulid": "00000000030000000000000000", + "minTime": 442, + "maxTime": 24225 + } + ] + }, + "version": 1, + "thanos": { + "version": 1, + "labels": null, + "downsample": { + "resolution": 123144 + }, + "source": "receive", + "files": [ + { + "rel_path": "index", + "size_bytes": 1313 + } + ] + } +} +`, b.String()) + retMeta, err := read(ioutil.NopCloser(&b)) + testutil.Ok(t, err) + + // We expect map to be empty but allocated. + testutil.Equals(t, map[string]string(nil), m1.Thanos.Labels) + m1.Thanos.Labels = map[string]string{} + testutil.Equals(t, m1, *retMeta) + }) +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 414ae7cefcc..1046057039c 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -644,7 +644,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return retry(errors.Wrapf(err, "download block %s", ie.id)) } - meta, err := metadata.Read(bdir) + meta, err := metadata.ReadFromDir(bdir) if err != nil { return errors.Wrapf(err, "read meta from %s", bdir) } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index b4ca09b356d..5898f9df2d4 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -417,7 +417,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( } testutil.Ok(t, err) - meta, err := metadata.Read(filepath.Join(prepareDir, id.String())) + meta, err := metadata.ReadFromDir(filepath.Join(prepareDir, id.String())) testutil.Ok(t, err) metas = append(metas, meta) diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index f4ff24ae182..09a2ec4fa86 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -455,7 +455,7 @@ func TestDownsample(t *testing.T) { } testutil.Ok(t, err) - _, err = metadata.Read(filepath.Join(dir, id.String())) + _, err = metadata.ReadFromDir(filepath.Join(dir, id.String())) testutil.Ok(t, err) indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename)) diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go index 3ac6a9ed773..98814454aec 100644 --- a/pkg/compact/planner_test.go +++ b/pkg/compact/planner_test.go @@ -48,7 +48,7 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata. var res []*metadata.Meta for _, pdir := range plan { - meta, err := metadata.Read(pdir) + meta, err := metadata.ReadFromDir(pdir) if err != nil { return nil, errors.Wrapf(err, "read meta from %s", pdir) } diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index d3f80c3d59b..df66622774b 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -390,7 +390,7 @@ func (s *Shipper) blockMetasFromOldest() (metas []*metadata.Meta, _ error) { if !fi.IsDir() { continue } - m, err := metadata.Read(dir) + m, err := metadata.ReadFromDir(dir) if err != nil { return nil, errors.Wrapf(err, "read metadata for block %v", dir) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6bcead6b9b4..34534407925 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2054,7 +2054,7 @@ func (r *bucketIndexReader) decodeSeriesWithReq(b []byte, lbls *labels.Labels, c *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) } - // Read the chunks meta data. + // ReadFromDir the chunks meta data. k = d.Uvarint() if k == 0 { @@ -2227,7 +2227,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i fetchBegin = time.Now() - // Read entire chunk into new buffer. + // ReadFromDir entire chunk into new buffer. nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen)) if err != nil { return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index afe359eed4b..551e5f51508 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -110,7 +110,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) // Add labels to the meta of the second block. - meta, err := metadata.Read(dir2) + meta, err := metadata.ReadFromDir(dir2) testutil.Ok(t, err) meta.Thanos.Labels = map[string]string{"ext2": "value2"} testutil.Ok(t, meta.WriteToDir(logger, dir2)) diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index af32be9ec8a..2f609195d8d 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -383,7 +383,7 @@ func CreateBlockWithBlockDelay( return ulid.ULID{}, errors.Wrap(err, "create block id") } - m, err := metadata.Read(path.Join(dir, blockID.String())) + m, err := metadata.ReadFromDir(path.Join(dir, blockID.String())) if err != nil { return ulid.ULID{}, errors.Wrap(err, "open meta file") }