Skip to content

Commit

Permalink
metadata: Fixed panic when no ext labels are set; Added more tests.
Browse files Browse the repository at this point in the history
```
caller=fetcher.go:700 msg="block has no labels left, creating one" replica=deduped
panic: assignment to entry in nil map
goroutine 441 [running]:
github.com/thanos-io/thanos/pkg/block.(*ReplicaLabelRemover).Modify(0xc000b19d70, 0x2b261e0, 0xc000c16d00, 0xc0008596e0, 0xc000b1a980, 0x0, 0x0)
	/remote-source/app/pkg/block/fetcher.go:701 +0x3cf
github.com/thanos-io/thanos/pkg/block.(*BaseFetcher).fetch(0xc000d75d40, 0x2b261e0, 0xc000c16d00, 0xc000c16a40, 0xc000c16a00, 0x4, 0x4, 0xc000b0e590, 0x1, 0x1, ...)
	/remote-source/app/pkg/block/fetcher.go:441 +0x65d
github.com/thanos-io/thanos/pkg/block.(*MetaFetcher).Fetch(0xc0002fe480, 0x2b261e0, 0xc000c16d00, 0x5528a0, 0xc000c16340, 0xc0005c6000, 0x8)
	/remote-source/app/pkg/block/fetcher.go:474 +0x9f
github.com/thanos-io/thanos/pkg/compact.(*Syncer).SyncMetas(0xc000c67d80, 0x2b261e0, 0xc000c16d00, 0x0, 0x0)
	/remote-source/app/pkg/compact/compact.go:127 +0xc0
github.com/thanos-io/thanos/pkg/compact.(*BucketCompactor).Compact(0xc0002fed20, 0x2b261e0, 0xc000c16d00, 0x0, 0x0)
	/remote-source/app/pkg/compact/compact.go:945 +0x2c2
main.runCompact.func6(0xc000501c80, 0x0)
	/remote-source/app/cmd/thanos/compact.go:307 +0x15a
main.runCompact.func7.1(0xc0006bbd70, 0xc000bbe0a0)
	/remote-source/app/cmd/thanos/compact.go:367 +0x99
github.com/thanos-io/thanos/pkg/runutil.Repeat(0x45d964b800, 0xc00005a660, 0xc0007e9f30, 0x0, 0x0)
	/remote-source/app/pkg/runutil/runutil.go:72 +0x91
main.runCompact.func7(0x0, 0x0)
	/remote-source/app/cmd/thanos/compact.go:366 +0x29a
github.com/oklog/run.(*Group).Run.func1(0xc0001f9f20, 0xc000c67e00, 0xc000b0ecb0)
	/remote-source/deps/gomod/pkg/mod/github.com/oklog/run@v1.1.0/group.go:38 +0x27
created by github.com/oklog/run.(*Group).Run
	/remote-source/deps/gomod/pkg/mod/github.com/oklog/run@v1.1.0/group.go:37 +0xbb

```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Nov 18, 2020
1 parent 755b275 commit ff5789d
Show file tree
Hide file tree
Showing 16 changed files with 243 additions and 22 deletions.
4 changes: 4 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func registerReceive(app *extkingpin.App) {
return errors.Wrap(err, "parse labels")
}

if len(lset) == 0 {
return errors.New("no external labels configured for receive, uniquely identifying external labels must be configured (ideally with `receive_` prefix); see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

var cw *receive.ConfigWatcher
if *hashringsFile != "" {
cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func runSidecar(
}

if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured")
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "not a block dir")
}

meta, err := metadata.Read(bdir)
meta, err := metadata.ReadFromDir(bdir)
if err != nil {
// No meta or broken meta file.
return errors.Wrap(err, "read meta")
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met

// Best effort load from local dir.
if f.cacheDir != "" {
m, err := metadata.Read(cachedBlockDir)
m, err := metadata.ReadFromDir(cachedBlockDir)
if err == nil {
return m, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
resid = ulid.MustNew(ulid.Now(), entropy)

meta, err := metadata.Read(bdir)
meta, err := metadata.ReadFromDir(bdir)
if err != nil {
return resid, errors.Wrap(err, "read meta file")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestReaders(t *testing.T) {
db.Close()
*/

m, err := metadata.Read("./testdata/index_format_v1")
m, err := metadata.ReadFromDir("./testdata/index_format_v1")
testutil.Ok(t, err)
e2eutil.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, m.ULID.String()))

Expand Down Expand Up @@ -312,7 +312,7 @@ func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *meta
}
*/

m, err := metadata.Read("./testdata/index_format_v2")
m, err := metadata.ReadFromDir("./testdata/index_format_v2")
testutil.Ok(t, err)
e2eutil.Copy(t, "./testdata/index_format_v2", filepath.Join(tmpDir, m.ULID.String()))

Expand Down
26 changes: 19 additions & 7 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package metadata
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"

Expand Down Expand Up @@ -57,6 +56,8 @@ type Thanos struct {
// Version of Thanos meta file. If none specified, 1 is assumed (since first version did not have explicit version specified).
Version int `json:"version,omitempty"`

// Labels are the external labels identifying the producer as well as tenant.
// See https://thanos.io/tip/thanos/storage.md#external-labels for details.
Labels map[string]string `json:"labels"`
Downsample ThanosDownsample `json:"downsample"`

Expand Down Expand Up @@ -87,7 +88,7 @@ type ThanosDownsample struct {
// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk.
// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
newMeta, err := Read(bdir)
newMeta, err := ReadFromDir(bdir)
if err != nil {
return nil, errors.Wrap(err, "read new meta")
}
Expand Down Expand Up @@ -154,17 +155,23 @@ func renameFile(logger log.Logger, from, to string) error {
return pdir.Close()
}

// Read reads the given meta from <dir>/meta.json.
func Read(dir string) (*Meta, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename))
// ReadFromDir reads the given meta from <dir>/meta.json.
func ReadFromDir(dir string) (*Meta, error) {
f, err := os.Open(filepath.Join(dir, MetaFilename))
if err != nil {
return nil, err
}
var m Meta
return read(f)
}

if err := json.Unmarshal(b, &m); err != nil {
func read(rc io.ReadCloser) (_ *Meta, err error) {
defer runutil.ExhaustCloseWithErrCapture(&err, rc, "close meta JSON")

var m Meta
if err = json.NewDecoder(rc).Decode(&m); err != nil {
return nil, err
}

if m.Version != TSDBVersion1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
Expand All @@ -178,5 +185,10 @@ func Read(dir string) (*Meta, error) {
if version != ThanosVersion1 {
return nil, errors.Errorf("unexpected meta file Thanos section version %d", m.Version)
}

if m.Thanos.Labels == nil {
// To avoid extra nil checks, allocate map here if empty.
m.Thanos.Labels = make(map[string]string)
}
return &m, nil
}
205 changes: 205 additions & 0 deletions pkg/block/metadata/meta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadata

import (
"bytes"
"io/ioutil"
"testing"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMeta_ReadWrite(t *testing.T) {
t.Run("empty write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
testutil.Ok(t, Meta{}.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000000000000000000000",
"minTime": 0,
"maxTime": 0,
"stats": {},
"compaction": {
"level": 0
},
"version": 0,
"thanos": {
"labels": null,
"downsample": {
"resolution": 0
},
"source": ""
}
}
`, b.String())
_, err := read(ioutil.NopCloser(&b))
testutil.NotOk(t, err)
testutil.Equals(t, "unexpected meta file version 0", err.Error())
})

t.Run("real write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
m1 := Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Compaction: tsdb.BlockMetaCompaction{
Sources: []ulid.ULID{ulid.MustNew(1, nil), ulid.MustNew(2, nil)},
Parents: []tsdb.BlockDesc{
{
ULID: ulid.MustNew(3, nil),
MinTime: 442,
MaxTime: 24225,
},
},
Level: 123,
},
Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4},
},
Thanos: Thanos{
Version: 1,
Labels: map[string]string{"ext": "lset1"},
Source: ReceiveSource,
Files: []File{
{RelPath: "index", SizeBytes: 1313},
},
Downsample: ThanosDownsample{
Resolution: 123144,
},
},
}
testutil.Ok(t, m1.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000050000000000000000",
"minTime": 2424,
"maxTime": 134,
"stats": {
"numSamples": 245,
"numSeries": 4,
"numChunks": 14
},
"compaction": {
"level": 123,
"sources": [
"00000000010000000000000000",
"00000000020000000000000000"
],
"parents": [
{
"ulid": "00000000030000000000000000",
"minTime": 442,
"maxTime": 24225
}
]
},
"version": 1,
"thanos": {
"version": 1,
"labels": {
"ext": "lset1"
},
"downsample": {
"resolution": 123144
},
"source": "receive",
"files": [
{
"rel_path": "index",
"size_bytes": 1313
}
]
}
}
`, b.String())
retMeta, err := read(ioutil.NopCloser(&b))
testutil.Ok(t, err)
testutil.Equals(t, m1, *retMeta)
})

t.Run("missing external labels write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
m1 := Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Compaction: tsdb.BlockMetaCompaction{
Sources: []ulid.ULID{ulid.MustNew(1, nil), ulid.MustNew(2, nil)},
Parents: []tsdb.BlockDesc{
{
ULID: ulid.MustNew(3, nil),
MinTime: 442,
MaxTime: 24225,
},
},
Level: 123,
},
Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4},
},
Thanos: Thanos{
Version: 1,
Source: ReceiveSource,
Files: []File{
{RelPath: "index", SizeBytes: 1313},
},
Downsample: ThanosDownsample{
Resolution: 123144,
},
},
}
testutil.Ok(t, m1.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000050000000000000000",
"minTime": 2424,
"maxTime": 134,
"stats": {
"numSamples": 245,
"numSeries": 4,
"numChunks": 14
},
"compaction": {
"level": 123,
"sources": [
"00000000010000000000000000",
"00000000020000000000000000"
],
"parents": [
{
"ulid": "00000000030000000000000000",
"minTime": 442,
"maxTime": 24225
}
]
},
"version": 1,
"thanos": {
"version": 1,
"labels": null,
"downsample": {
"resolution": 123144
},
"source": "receive",
"files": [
{
"rel_path": "index",
"size_bytes": 1313
}
]
}
}
`, b.String())
retMeta, err := read(ioutil.NopCloser(&b))
testutil.Ok(t, err)

// We expect map to be empty but allocated.
testutil.Equals(t, map[string]string(nil), m1.Thanos.Labels)
m1.Thanos.Labels = map[string]string{}
testutil.Equals(t, m1, *retMeta)
})
}
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return retry(errors.Wrapf(err, "download block %s", ie.id))
}

meta, err := metadata.Read(bdir)
meta, err := metadata.ReadFromDir(bdir)
if err != nil {
return errors.Wrapf(err, "read meta from %s", bdir)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (
}
testutil.Ok(t, err)

meta, err := metadata.Read(filepath.Join(prepareDir, id.String()))
meta, err := metadata.ReadFromDir(filepath.Join(prepareDir, id.String()))
testutil.Ok(t, err)
metas = append(metas, meta)

Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func TestDownsample(t *testing.T) {
}
testutil.Ok(t, err)

_, err = metadata.Read(filepath.Join(dir, id.String()))
_, err = metadata.ReadFromDir(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename))
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.

var res []*metadata.Meta
for _, pdir := range plan {
meta, err := metadata.Read(pdir)
meta, err := metadata.ReadFromDir(pdir)
if err != nil {
return nil, errors.Wrapf(err, "read meta from %s", pdir)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (s *Shipper) blockMetasFromOldest() (metas []*metadata.Meta, _ error) {
if !fi.IsDir() {
continue
}
m, err := metadata.Read(dir)
m, err := metadata.ReadFromDir(dir)
if err != nil {
return nil, errors.Wrapf(err, "read metadata for block %v", dir)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,7 @@ func (r *bucketIndexReader) decodeSeriesWithReq(b []byte, lbls *labels.Labels, c
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}

// Read the chunks meta data.
// ReadFromDir the chunks meta data.
k = d.Uvarint()

if k == 0 {
Expand Down Expand Up @@ -2227,7 +2227,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i

fetchBegin = time.Now()

// Read entire chunk into new buffer.
// ReadFromDir entire chunk into new buffer.
nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen))
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen)
Expand Down
Loading

0 comments on commit ff5789d

Please sign in to comment.