diff --git a/Gopkg.toml b/Gopkg.toml
index 98bbc9494d..057fa27ac4 100644
--- a/Gopkg.toml
+++ b/Gopkg.toml
@@ -37,7 +37,8 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"]
name = "github.com/prometheus/common"
[[constraint]]
- version = "v2.3.2"
+ # TODO(bwplotka): Move to released version once our recent fixes will be released (v2.7.0)
+ revision = "3bd41cc92c7800cc6072171bd4237406126fa169"
name = "github.com/prometheus/prometheus"
[[override]]
@@ -46,7 +47,7 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"]
[[constraint]]
name = "github.com/prometheus/tsdb"
- revision = "bd832fc8274e8fe63999ac749daaaff9d881241f"
+ version = "v0.4.0"
[[constraint]]
branch = "master"
diff --git a/Makefile b/Makefile
index 4d0e27aab2..97ee8ff044 100644
--- a/Makefile
+++ b/Makefile
@@ -36,7 +36,9 @@ PROTOC_VERSION ?= 3.4.0
# E2e test deps.
# Referenced by github.com/improbable-eng/thanos/blob/master/docs/getting_started.md#prometheus
-SUPPORTED_PROM_VERSIONS ?=v2.0.0 v2.2.1 v2.3.2 v2.4.3 v2.5.0
+
+# Limitied prom version, because testing was not possibe. This should fix it: https://github.com/improbable-eng/thanos/issues/758
+SUPPORTED_PROM_VERSIONS ?=v2.4.3 v2.5.0
ALERTMANAGER_VERSION ?=v0.15.2
MINIO_SERVER_VERSION ?=RELEASE.2018-10-06T00-15-16Z
diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go
index 01b9b54bf6..ff68feee2a 100644
--- a/cmd/thanos/bucket.go
+++ b/cmd/thanos/bucket.go
@@ -10,10 +10,9 @@ import (
"text/template"
"time"
- "github.com/prometheus/tsdb/labels"
-
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/verifier"
@@ -23,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/tsdb/labels"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/alecthomas/kingpin.v2"
@@ -254,7 +254,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
defer cancel()
// Getting Metas.
- var blockMetas []*block.Meta
+ var blockMetas []*metadata.Meta
if err = bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
@@ -277,7 +277,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
}
}
-func printTable(blockMetas []*block.Meta, selectorLabels labels.Labels, sortBy []string) error {
+func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortBy []string) error {
header := inspectColumns
var lines [][]string
@@ -355,7 +355,7 @@ func getKeysAlphabetically(labels map[string]string) []string {
// matchesSelector checks if blockMeta contains every label from
// the selector with the correct value
-func matchesSelector(blockMeta *block.Meta, selectorLabels labels.Labels) bool {
+func matchesSelector(blockMeta *metadata.Meta, selectorLabels labels.Labels) bool {
for _, l := range selectorLabels {
if v, ok := blockMeta.Thanos.Labels[l.Name]; !ok || v != l.Value {
return false
diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go
index c00aadd43d..5f3846ff50 100644
--- a/cmd/thanos/downsample.go
+++ b/cmd/thanos/downsample.go
@@ -8,11 +8,10 @@ import (
"path/filepath"
"time"
- "github.com/prometheus/tsdb/chunkenc"
-
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
@@ -23,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
+ "github.com/prometheus/tsdb/chunkenc"
"gopkg.in/alecthomas/kingpin.v2"
)
@@ -105,7 +105,7 @@ func downsampleBucket(
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}
- var metas []*block.Meta
+ var metas []*metadata.Meta
err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
@@ -119,7 +119,7 @@ func downsampleBucket(
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")
- var m block.Meta
+ var m metadata.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return errors.Wrap(err, "decode meta")
}
@@ -201,7 +201,7 @@ func downsampleBucket(
return nil
}
-func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *block.Meta, dir string, resolution int64) error {
+func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())
@@ -224,7 +224,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
pool = downsample.NewPool()
}
- b, err := tsdb.OpenBlock(bdir, pool)
+ b, err := tsdb.OpenBlock(logger, bdir, pool)
if err != nil {
return errors.Wrapf(err, "open block %s", m.ULID)
}
diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go
index bf6cfa64fe..9e6c5ac19e 100644
--- a/cmd/thanos/query.go
+++ b/cmd/thanos/query.go
@@ -296,7 +296,16 @@ func runQuery(
return stores.Get(), nil
}, selectorLset)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
- engine = promql.NewEngine(logger, reg, maxConcurrentQueries, queryTimeout)
+ engine = promql.NewEngine(
+ promql.EngineOpts{
+ Logger: logger,
+ Reg: reg,
+ MaxConcurrent: maxConcurrentQueries,
+ // TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703
+ MaxSamples: math.MaxInt32,
+ Timeout: queryTimeout,
+ },
+ )
)
// Periodically update the store set with the addresses we see in our cluster.
{
diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go
index 03fed102d2..00d7e2bb98 100644
--- a/cmd/thanos/rule.go
+++ b/cmd/thanos/rule.go
@@ -19,15 +19,14 @@ import (
"syscall"
"time"
- "github.com/improbable-eng/thanos/pkg/extprom"
-
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
+ "github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
@@ -117,7 +116,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
MaxBlockDuration: *tsdbBlockDuration,
Retention: *tsdbRetention,
NoLockfile: true,
- WALFlushInterval: 30 * time.Second,
}
lookupQueries := map[string]struct{}{}
@@ -290,7 +288,7 @@ func runRule(
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)
- notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
+ notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
@@ -309,17 +307,18 @@ func runRule(
res = append(res, a)
}
alertQ.Push(res)
-
- return nil
}
+
+ st := tsdb.Adapter(db, 0)
mgr = rules.NewManager(&rules.ManagerOptions{
Context: ctx,
QueryFunc: queryFn,
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
- Appendable: tsdb.Adapter(db, 0),
+ Appendable: st,
Registerer: reg,
ExternalURL: nil,
+ TSDB: st,
})
g.Add(func() error {
mgr.Run()
@@ -579,7 +578,7 @@ func runRule(
}
}()
- s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource)
+ s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource)
ctx, cancel := context.WithCancel(context.Background())
diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go
index 61f493f93a..bd462b2f65 100644
--- a/cmd/thanos/sidecar.go
+++ b/cmd/thanos/sidecar.go
@@ -14,7 +14,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/reloader"
@@ -102,7 +102,7 @@ func runSidecar(
reloader *reloader.Reloader,
component string,
) error {
- var metadata = &metadata{
+ var m = &promMetadata{
promURL: promURL,
// Start out with the full time range. The shipper will constrain it later.
@@ -128,7 +128,7 @@ func runSidecar(
// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
- if err := metadata.UpdateLabels(ctx, logger); err != nil {
+ if err := m.UpdateLabels(ctx, logger); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
@@ -145,14 +145,14 @@ func runSidecar(
return errors.Wrap(err, "initial external labels query")
}
- if len(metadata.Labels()) == 0 {
+ if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured")
}
// New gossip cluster.
- mint, maxt := metadata.Timestamps()
+ mint, maxt := m.Timestamps()
if err = peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{
- Labels: metadata.LabelsPB(),
+ Labels: m.LabelsPB(),
MinTime: mint,
MaxTime: maxt,
}); err != nil {
@@ -165,12 +165,12 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer iterCancel()
- if err := metadata.UpdateLabels(iterCtx, logger); err != nil {
+ if err := m.UpdateLabels(iterCtx, logger); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
promUp.Set(0)
} else {
// Update gossip.
- peer.SetLabels(metadata.LabelsPB())
+ peer.SetLabels(m.LabelsPB())
promUp.Set(1)
lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9)
@@ -204,7 +204,7 @@ func runSidecar(
var client http.Client
promStore, err := store.NewPrometheusStore(
- logger, &client, promURL, metadata.Labels, metadata.Timestamps)
+ logger, &client, promURL, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
@@ -252,7 +252,7 @@ func runSidecar(
}
}()
- s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource)
+ s := shipper.New(logger, nil, dataDir, bkt, m.Labels, metadata.SidecarSource)
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
@@ -265,9 +265,9 @@ func runSidecar(
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
} else {
- metadata.UpdateTimestamps(minTime, math.MaxInt64)
+ m.UpdateTimestamps(minTime, math.MaxInt64)
- mint, maxt := metadata.Timestamps()
+ mint, maxt := m.Timestamps()
peer.SetTimestamps(mint, maxt)
}
return nil
@@ -281,7 +281,7 @@ func runSidecar(
return nil
}
-type metadata struct {
+type promMetadata struct {
promURL *url.URL
mtx sync.Mutex
@@ -290,7 +290,7 @@ type metadata struct {
labels labels.Labels
}
-func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
+func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
elset, err := queryExternalLabels(ctx, logger, s.promURL)
if err != nil {
return err
@@ -303,7 +303,7 @@ func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
return nil
}
-func (s *metadata) UpdateTimestamps(mint int64, maxt int64) {
+func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) {
s.mtx.Lock()
defer s.mtx.Unlock()
@@ -311,14 +311,14 @@ func (s *metadata) UpdateTimestamps(mint int64, maxt int64) {
s.maxt = maxt
}
-func (s *metadata) Labels() labels.Labels {
+func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.labels
}
-func (s *metadata) LabelsPB() []storepb.Label {
+func (s *promMetadata) LabelsPB() []storepb.Label {
s.mtx.Lock()
defer s.mtx.Unlock()
@@ -332,7 +332,7 @@ func (s *metadata) LabelsPB() []storepb.Label {
return lset
}
-func (s *metadata) Timestamps() (mint int64, maxt int64) {
+func (s *promMetadata) Timestamps() (mint int64, maxt int64) {
s.mtx.Lock()
defer s.mtx.Unlock()
diff --git a/pkg/block/block.go b/pkg/block/block.go
index 118b7ea96c..cdc52a3e08 100644
--- a/pkg/block/block.go
+++ b/pkg/block/block.go
@@ -5,11 +5,12 @@ package block
import (
"context"
"encoding/json"
- "io/ioutil"
"os"
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"fmt"
"github.com/go-kit/kit/log"
@@ -17,8 +18,6 @@ import (
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
- "github.com/prometheus/tsdb"
- "github.com/prometheus/tsdb/fileutil"
)
const (
@@ -33,103 +32,6 @@ const (
DebugMetas = "debug/metas"
)
-type SourceType string
-
-const (
- UnknownSource SourceType = ""
- SidecarSource SourceType = "sidecar"
- CompactorSource SourceType = "compactor"
- CompactorRepairSource SourceType = "compactor.repair"
- RulerSource SourceType = "ruler"
- BucketRepairSource SourceType = "bucket.repair"
- TestSource SourceType = "test"
-)
-
-// Meta describes the a block's meta. It wraps the known TSDB meta structure and
-// extends it by Thanos-specific fields.
-type Meta struct {
- Version int `json:"version"`
-
- tsdb.BlockMeta
-
- Thanos ThanosMeta `json:"thanos"`
-}
-
-// ThanosMeta holds block meta information specific to Thanos.
-type ThanosMeta struct {
- Labels map[string]string `json:"labels"`
- Downsample ThanosDownsampleMeta `json:"downsample"`
-
- // Source is a real upload source of the block.
- Source SourceType `json:"source"`
-}
-
-type ThanosDownsampleMeta struct {
- Resolution int64 `json:"resolution"`
-}
-
-// WriteMetaFile writes the given meta into
/meta.json.
-func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error {
- // Make any changes to the file appear atomic.
- path := filepath.Join(dir, MetaFilename)
- tmp := path + ".tmp"
-
- f, err := os.Create(tmp)
- if err != nil {
- return err
- }
-
- enc := json.NewEncoder(f)
- enc.SetIndent("", "\t")
-
- if err := enc.Encode(meta); err != nil {
- runutil.CloseWithLogOnErr(logger, f, "close meta")
- return err
- }
- if err := f.Close(); err != nil {
- return err
- }
- return renameFile(logger, tmp, path)
-}
-
-// ReadMetaFile reads the given meta from /meta.json.
-func ReadMetaFile(dir string) (*Meta, error) {
- b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename))
- if err != nil {
- return nil, err
- }
- var m Meta
-
- if err := json.Unmarshal(b, &m); err != nil {
- return nil, err
- }
- if m.Version != 1 {
- return nil, errors.Errorf("unexpected meta file version %d", m.Version)
- }
- return &m, nil
-}
-
-func renameFile(logger log.Logger, from, to string) error {
- if err := os.RemoveAll(to); err != nil {
- return err
- }
- if err := os.Rename(from, to); err != nil {
- return err
- }
-
- // Directory was renamed; sync parent dir to persist rename.
- pdir, err := fileutil.OpenDir(filepath.Dir(to))
- if err != nil {
- return err
- }
-
- if err = fileutil.Fsync(pdir); err != nil {
- runutil.CloseWithLogOnErr(logger, pdir, "close dir")
- return err
- }
- return pdir.Close()
-}
-
// Download downloads directory that is mean to be block directory.
func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error {
if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil {
@@ -169,7 +71,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "not a block dir")
}
- meta, err := ReadMetaFile(bdir)
+ meta, err := metadata.Read(bdir)
if err != nil {
// No meta or broken meta file.
return errors.Wrap(err, "read meta")
@@ -216,16 +118,16 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
}
// DownloadMeta downloads only meta file from bucket by block ID.
-func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) {
+func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename))
if err != nil {
- return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
+ return metadata.Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
}
defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client")
- var m Meta
+ var m metadata.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
- return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
+ return metadata.Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
}
return m, nil
}
@@ -234,24 +136,3 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}
-
-// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk.
-// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
-func InjectThanosMeta(logger log.Logger, bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
- newMeta, err := ReadMetaFile(bdir)
- if err != nil {
- return nil, errors.Wrap(err, "read new meta")
- }
- newMeta.Thanos = meta
-
- // While downsampling we need to copy original compaction.
- if downsampledMeta != nil {
- newMeta.Compaction = downsampledMeta.Compaction
- }
-
- if err := WriteMetaFile(logger, bdir, newMeta); err != nil {
- return nil, errors.Wrap(err, "write new meta")
- }
-
- return newMeta, nil
-}
diff --git a/pkg/block/index.go b/pkg/block/index.go
index 2249863b2d..eb0a3689c0 100644
--- a/pkg/block/index.go
+++ b/pkg/block/index.go
@@ -11,6 +11,10 @@ import (
"strings"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
+ "github.com/prometheus/tsdb/fileutil"
+
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
@@ -36,23 +40,84 @@ type indexCache struct {
Postings []postingsRange
}
+type realByteSlice []byte
+
+func (b realByteSlice) Len() int {
+ return len(b)
+}
+
+func (b realByteSlice) Range(start, end int) []byte {
+ return b[start:end]
+}
+
+func (b realByteSlice) Sub(start, end int) index.ByteSlice {
+ return b[start:end]
+}
+
+func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) {
+ version := int(b.Range(4, 5)[0])
+
+ if version != 1 && version != 2 {
+ return nil, errors.Errorf("unknown index file version %d", version)
+ }
+
+ toc, err := index.NewTOCFromByteSlice(b)
+ if err != nil {
+ return nil, errors.Wrap(err, "read TOC")
+ }
+
+ symbolsV2, symbolsV1, err := index.ReadSymbols(b, version, int(toc.Symbols))
+ if err != nil {
+ return nil, errors.Wrap(err, "read symbols")
+ }
+
+ symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2))
+ for o, s := range symbolsV1 {
+ symbolsTable[o] = s
+ }
+ for o, s := range symbolsV2 {
+ symbolsTable[uint32(o)] = s
+ }
+
+ return symbolsTable, nil
+}
+
// WriteIndexCache writes a cache file containing the first lookup stages
// for an index file.
-func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
+func WriteIndexCache(logger log.Logger, indexFn string, fn string) error {
+ indexFile, err := fileutil.OpenMmapFile(indexFn)
+ if err != nil {
+ return errors.Wrapf(err, "open mmap index file %s", indexFn)
+ }
+ defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn)
+
+ b := realByteSlice(indexFile.Bytes())
+ indexr, err := index.NewReader(b)
+ if err != nil {
+ return errors.Wrap(err, "open index reader")
+ }
+ defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader")
+
+ // We assume reader verified index already.
+ symbols, err := getSymbolTable(b)
+ if err != nil {
+ return err
+ }
+
f, err := os.Create(fn)
if err != nil {
- return errors.Wrap(err, "create file")
+ return errors.Wrap(err, "create index cache file")
}
defer runutil.CloseWithLogOnErr(logger, f, "index cache writer")
v := indexCache{
- Version: r.Version(),
- Symbols: r.SymbolTable(),
+ Version: indexr.Version(),
+ Symbols: symbols,
LabelValues: map[string][]string{},
}
// Extract label value indices.
- lnames, err := r.LabelIndices()
+ lnames, err := indexr.LabelIndices()
if err != nil {
return errors.Wrap(err, "read label indices")
}
@@ -62,7 +127,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
}
ln := lns[0]
- tpls, err := r.LabelValues(ln)
+ tpls, err := indexr.LabelValues(ln)
if err != nil {
return errors.Wrap(err, "get label values")
}
@@ -82,7 +147,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
}
// Extract postings ranges.
- pranges, err := r.PostingsRanges()
+ pranges, err := indexr.PostingsRanges()
if err != nil {
return errors.Wrap(err, "read postings ranges")
}
@@ -346,7 +411,7 @@ type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (
// - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347.
// Fixable inconsistencies are resolved in the new block.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/378
-func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) {
+func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) {
if len(ignoreChkFns) == 0 {
return resid, errors.New("no ignore chunk function specified")
}
@@ -355,7 +420,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
resid = ulid.MustNew(ulid.Now(), entropy)
- meta, err := ReadMetaFile(bdir)
+ meta, err := metadata.Read(bdir)
if err != nil {
return resid, errors.Wrap(err, "read meta file")
}
@@ -363,7 +428,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno
return resid, errors.New("cannot repair downsampled block")
}
- b, err := tsdb.OpenBlock(bdir, nil)
+ b, err := tsdb.OpenBlock(logger, bdir, nil)
if err != nil {
return resid, errors.Wrap(err, "open block")
}
@@ -405,7 +470,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno
if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
return resid, errors.Wrap(err, "rewrite block")
}
- if err := WriteMetaFile(logger, resdir, &resmeta); err != nil {
+ if err := metadata.Write(logger, resdir, &resmeta); err != nil {
return resid, err
}
return resid, nil
@@ -494,7 +559,7 @@ OUTER:
func rewrite(
indexr tsdb.IndexReader, chunkr tsdb.ChunkReader,
indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter,
- meta *Meta,
+ meta *metadata.Meta,
ignoreChkFns []ignoreFnType,
) error {
symbols, err := indexr.Symbols()
diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go
new file mode 100644
index 0000000000..80c10e8e6e
--- /dev/null
+++ b/pkg/block/index_test.go
@@ -0,0 +1,46 @@
+package block
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/testutil"
+ "github.com/prometheus/tsdb/labels"
+)
+
+func TestWriteReadIndexCache(t *testing.T) {
+ tmpDir, err := ioutil.TempDir("", "test-compact-prepare")
+ testutil.Ok(t, err)
+ defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
+
+ b, err := testutil.CreateBlock(tmpDir, []labels.Labels{
+ {{Name: "a", Value: "1"}},
+ {{Name: "a", Value: "2"}},
+ {{Name: "a", Value: "3"}},
+ {{Name: "a", Value: "4"}},
+ {{Name: "b", Value: "1"}},
+ }, 100, 0, 1000, nil, 124)
+ testutil.Ok(t, err)
+
+ fn := filepath.Join(tmpDir, "index.cache.json")
+ testutil.Ok(t, WriteIndexCache(log.NewNopLogger(), filepath.Join(tmpDir, b.String(), "index"), fn))
+
+ version, symbols, lvals, postings, err := ReadIndexCache(log.NewNopLogger(), fn)
+ testutil.Ok(t, err)
+
+ testutil.Equals(t, 2, version)
+ testutil.Equals(t, 6, len(symbols))
+ testutil.Equals(t, 2, len(lvals))
+
+ vals, ok := lvals["a"]
+ testutil.Assert(t, ok, "")
+ testutil.Equals(t, []string{"1", "2", "3", "4"}, vals)
+
+ vals, ok = lvals["b"]
+ testutil.Assert(t, ok, "")
+ testutil.Equals(t, []string{"1"}, vals)
+ testutil.Equals(t, 6, len(postings))
+}
diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go
new file mode 100644
index 0000000000..44f1d3768d
--- /dev/null
+++ b/pkg/block/metadata/meta.go
@@ -0,0 +1,142 @@
+package metadata
+
+// metadata package implements writing and reading wrapped meta.json where Thanos puts its metadata.
+// Those metadata contains external labels, downsampling resolution and source type.
+// This package is minimal and separated because it used by testutils which limits test helpers we can use in
+// this package.
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+
+ "github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/runutil"
+ "github.com/pkg/errors"
+ "github.com/prometheus/tsdb"
+ "github.com/prometheus/tsdb/fileutil"
+)
+
+type SourceType string
+
+const (
+ UnknownSource SourceType = ""
+ SidecarSource SourceType = "sidecar"
+ CompactorSource SourceType = "compactor"
+ CompactorRepairSource SourceType = "compactor.repair"
+ RulerSource SourceType = "ruler"
+ BucketRepairSource SourceType = "bucket.repair"
+ TestSource SourceType = "test"
+)
+
+const (
+ // MetaFilename is the known JSON filename for meta information.
+ MetaFilename = "meta.json"
+)
+
+// Meta describes the a block's meta. It wraps the known TSDB meta structure and
+// extends it by Thanos-specific fields.
+type Meta struct {
+ Version int `json:"version"`
+
+ tsdb.BlockMeta
+
+ Thanos Thanos `json:"thanos"`
+}
+
+// Thanos holds block meta information specific to Thanos.
+type Thanos struct {
+ Labels map[string]string `json:"labels"`
+ Downsample ThanosDownsample `json:"downsample"`
+
+ // Source is a real upload source of the block.
+ Source SourceType `json:"source"`
+}
+
+type ThanosDownsample struct {
+ Resolution int64 `json:"resolution"`
+}
+
+// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk.
+// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
+func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
+ newMeta, err := Read(bdir)
+ if err != nil {
+ return nil, errors.Wrap(err, "read new meta")
+ }
+ newMeta.Thanos = meta
+
+ // While downsampling we need to copy original compaction.
+ if downsampledMeta != nil {
+ newMeta.Compaction = downsampledMeta.Compaction
+ }
+
+ if err := Write(logger, bdir, newMeta); err != nil {
+ return nil, errors.Wrap(err, "write new meta")
+ }
+
+ return newMeta, nil
+}
+
+// Write writes the given meta into /meta.json.
+func Write(logger log.Logger, dir string, meta *Meta) error {
+ // Make any changes to the file appear atomic.
+ path := filepath.Join(dir, MetaFilename)
+ tmp := path + ".tmp"
+
+ f, err := os.Create(tmp)
+ if err != nil {
+ return err
+ }
+
+ enc := json.NewEncoder(f)
+ enc.SetIndent("", "\t")
+
+ if err := enc.Encode(meta); err != nil {
+ runutil.CloseWithLogOnErr(logger, f, "close meta")
+ return err
+ }
+ if err := f.Close(); err != nil {
+ return err
+ }
+ return renameFile(logger, tmp, path)
+}
+
+func renameFile(logger log.Logger, from, to string) error {
+ if err := os.RemoveAll(to); err != nil {
+ return err
+ }
+ if err := os.Rename(from, to); err != nil {
+ return err
+ }
+
+ // Directory was renamed; sync parent dir to persist rename.
+ pdir, err := fileutil.OpenDir(filepath.Dir(to))
+ if err != nil {
+ return err
+ }
+
+ if err = fileutil.Fsync(pdir); err != nil {
+ runutil.CloseWithLogOnErr(logger, pdir, "close dir")
+ return err
+ }
+ return pdir.Close()
+}
+
+// Read reads the given meta from /meta.json.
+func Read(dir string) (*Meta, error) {
+ b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename))
+ if err != nil {
+ return nil, err
+ }
+ var m Meta
+
+ if err := json.Unmarshal(b, &m); err != nil {
+ return nil, err
+ }
+ if m.Version != 1 {
+ return nil, errors.Errorf("unexpected meta file version %d", m.Version)
+ }
+ return &m, nil
+}
diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go
index 544de920ea..29302f2a1c 100644
--- a/pkg/compact/compact.go
+++ b/pkg/compact/compact.go
@@ -9,6 +9,8 @@ import (
"sync"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"io/ioutil"
"github.com/go-kit/kit/log"
@@ -39,7 +41,7 @@ type Syncer struct {
bkt objstore.Bucket
syncDelay time.Duration
mtx sync.Mutex
- blocks map[ulid.ULID]*block.Meta
+ blocks map[ulid.ULID]*metadata.Meta
metrics *syncerMetrics
}
@@ -130,7 +132,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
logger: logger,
reg: reg,
syncDelay: syncDelay,
- blocks: map[ulid.ULID]*block.Meta{},
+ blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
}, nil
@@ -185,9 +187,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
- meta.Thanos.Source != block.BucketRepairSource &&
- meta.Thanos.Source != block.CompactorSource &&
- meta.Thanos.Source != block.CompactorRepairSource {
+ meta.Thanos.Source != metadata.BucketRepairSource &&
+ meta.Thanos.Source != metadata.CompactorSource &&
+ meta.Thanos.Source != metadata.CompactorRepairSource {
level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil
@@ -214,7 +216,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
-func GroupKey(meta block.Meta) string {
+func GroupKey(meta metadata.Meta) string {
return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels))
}
@@ -381,7 +383,7 @@ type Group struct {
labels labels.Labels
resolution int64
mtx sync.Mutex
- blocks map[ulid.ULID]*block.Meta
+ blocks map[ulid.ULID]*metadata.Meta
compactions prometheus.Counter
compactionFailures prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
@@ -405,7 +407,7 @@ func newGroup(
bkt: bkt,
labels: lset,
resolution: resolution,
- blocks: map[ulid.ULID]*block.Meta{},
+ blocks: map[ulid.ULID]*metadata.Meta{},
compactions: compactions,
compactionFailures: compactionFailures,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
@@ -419,7 +421,7 @@ func (cg *Group) Key() string {
}
// Add the block with the given meta to the group.
-func (cg *Group) Add(meta *block.Meta) error {
+func (cg *Group) Add(meta *metadata.Meta) error {
cg.mtx.Lock()
defer cg.mtx.Unlock()
@@ -541,7 +543,7 @@ func IsRetryError(err error) bool {
return ok
}
-func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string) error {
+func (cg *Group) areBlocksOverlapping(include *metadata.Meta, excludeDirs ...string) error {
var (
metas []tsdb.BlockMeta
exclude = map[ulid.ULID]struct{}{}
@@ -566,6 +568,9 @@ func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string
metas = append(metas, include.BlockMeta)
}
+ sort.Slice(metas, func(i, j int) bool {
+ return metas[i].MinTime < metas[j].MinTime
+ })
if overlaps := tsdb.OverlappingBlocks(metas); len(overlaps) > 0 {
return errors.Errorf("overlaps found while gathering blocks. %s", overlaps)
}
@@ -597,12 +602,12 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return retry(errors.Wrapf(err, "download block %s", ie.id))
}
- meta, err := block.ReadMetaFile(bdir)
+ meta, err := metadata.Read(bdir)
if err != nil {
return errors.Wrapf(err, "read meta from %s", bdir)
}
- resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
+ resid, err := block.Repair(logger, tmpdir, ie.id, metadata.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", ie.id)
}
@@ -647,7 +652,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := os.MkdirAll(bdir, 0777); err != nil {
return compID, errors.Wrap(err, "create planning block dir")
}
- if err := block.WriteMetaFile(cg.logger, bdir, meta); err != nil {
+ if err := metadata.Write(cg.logger, bdir, meta); err != nil {
return compID, errors.Wrap(err, "write planning meta file")
}
}
@@ -670,7 +675,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
begin := time.Now()
for _, pdir := range plan {
- meta, err := block.ReadMetaFile(pdir)
+ meta, err := metadata.Read(pdir)
if err != nil {
return compID, errors.Wrapf(err, "read meta from %s", pdir)
}
@@ -718,7 +723,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
begin = time.Now()
- compID, err = comp.Compact(dir, plan...)
+ compID, err = comp.Compact(dir, plan, nil)
if err != nil {
return compID, halt(errors.Wrapf(err, "compact blocks %v", plan))
}
@@ -727,10 +732,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
bdir := filepath.Join(dir, compID.String())
- newMeta, err := block.InjectThanosMeta(cg.logger, bdir, block.ThanosMeta{
+ newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
- Downsample: block.ThanosDownsampleMeta{Resolution: cg.resolution},
- Source: block.CompactorSource,
+ Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
+ Source: metadata.CompactorSource,
}, nil)
if err != nil {
return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir)
diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go
index 74df73cc6f..11213dc047 100644
--- a/pkg/compact/compact_e2e_test.go
+++ b/pkg/compact/compact_e2e_test.go
@@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
+ "math/rand"
"os"
"path"
"path/filepath"
@@ -15,12 +16,14 @@ import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/objtesting"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
+ "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
)
@@ -37,13 +40,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
// After the first synchronization the first 5 should be dropped and the
// last 5 be loaded from the bucket.
var ids []ulid.ULID
- var metas []*block.Meta
+ var metas []*metadata.Meta
for i := 0; i < 15; i++ {
id, err := ulid.New(uint64(i), nil)
testutil.Ok(t, err)
- var meta block.Meta
+ var meta metadata.Meta
meta.Version = 1
meta.ULID = id
@@ -56,7 +59,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
for _, m := range metas[5:] {
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
- testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf))
+ testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}
groups, err := sy.Groups()
@@ -79,11 +82,11 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
// Generate 10 source block metas and construct higher level blocks
// that are higher compactions of them.
- var metas []*block.Meta
+ var metas []*metadata.Meta
var ids []ulid.ULID
for i := 0; i < 10; i++ {
- var m block.Meta
+ var m metadata.Meta
m.Version = 1
m.ULID = ulid.MustNew(uint64(i), nil)
@@ -94,28 +97,28 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
metas = append(metas, &m)
}
- var m1 block.Meta
+ var m1 metadata.Meta
m1.Version = 1
m1.ULID = ulid.MustNew(100, nil)
m1.Compaction.Level = 2
m1.Compaction.Sources = ids[:4]
m1.Thanos.Downsample.Resolution = 0
- var m2 block.Meta
+ var m2 metadata.Meta
m2.Version = 1
m2.ULID = ulid.MustNew(200, nil)
m2.Compaction.Level = 2
m2.Compaction.Sources = ids[4:8] // last two source IDs is not part of a level 2 block.
m2.Thanos.Downsample.Resolution = 0
- var m3 block.Meta
+ var m3 metadata.Meta
m3.Version = 1
m3.ULID = ulid.MustNew(300, nil)
m3.Compaction.Level = 3
m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block.
m3.Thanos.Downsample.Resolution = 0
- var m4 block.Meta
+ var m4 metadata.Meta
m4.Version = 14
m4.ULID = ulid.MustNew(400, nil)
m4.Compaction.Level = 2
@@ -127,7 +130,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
fmt.Println("create", m.ULID)
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
- testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf))
+ testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}
// Do one initial synchronization with the bucket.
@@ -173,7 +176,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
- var metas []*block.Meta
+ var metas []*metadata.Meta
extLset := labels.Labels{{Name: "e1", Value: "1"}}
b1, err := testutil.CreateBlock(prepareDir, []labels.Labels{
{{Name: "a", Value: "1"}},
@@ -183,7 +186,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
}, 100, 0, 1000, extLset, 124)
testutil.Ok(t, err)
- meta, err := block.ReadMetaFile(filepath.Join(prepareDir, b1.String()))
+ meta, err := metadata.Read(filepath.Join(prepareDir, b1.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -196,15 +199,18 @@ func TestGroup_Compact_e2e(t *testing.T) {
testutil.Ok(t, err)
// Mix order to make sure compact is able to deduct min time / max time.
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b3.String()))
+ meta, err = metadata.Read(filepath.Join(prepareDir, b3.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
- // Empty block. This can happen when TSDB does not have any samples for min-block-size time.
- b2, err := testutil.CreateBlock(prepareDir, []labels.Labels{}, 100, 1001, 2000, extLset, 124)
+ // Currently TSDB does not produces empty blocks (see: https://github.com/prometheus/tsdb/pull/374). However before v2.7.0 it was
+ // so we still want to mimick this case as close as possible.
+ b2, err := createEmptyBlock(prepareDir, 1001, 2000, extLset, 124)
testutil.Ok(t, err)
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b2.String()))
+ // blocks" count=3 mint=0 maxt=3000 ulid=01D1RQCRRJM77KQQ4GYDSC50GM sources="[01D1RQCRMNZBVHBPGRPG2M3NZQ 01D1RQCRPJMYN45T65YA1PRWB7 01D1RQCRNMTWJKTN5QQXFNKKH8]"
+
+ meta, err = metadata.Read(filepath.Join(prepareDir, b2.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -217,7 +223,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
}, 100, 3001, 4000, extLset, 124)
testutil.Ok(t, err)
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, freshB.String()))
+ meta, err = metadata.Read(filepath.Join(prepareDir, freshB.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -263,7 +269,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
resDir := filepath.Join(dir, id.String())
testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir))
- meta, err = block.ReadMetaFile(resDir)
+ meta, err = metadata.Read(resDir)
testutil.Ok(t, err)
testutil.Equals(t, int64(0), meta.MinTime)
@@ -294,3 +300,56 @@ func TestGroup_Compact_e2e(t *testing.T) {
testutil.Ok(t, err)
})
}
+
+// createEmptyBlock produces empty block like it was the case before fix: https://github.com/prometheus/tsdb/pull/374.
+// (Prometheus pre v2.7.0)
+func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, resolution int64) (ulid.ULID, error) {
+ entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
+ uid := ulid.MustNew(ulid.Now(), entropy)
+
+ if err := os.Mkdir(path.Join(dir, uid.String()), os.ModePerm); err != nil {
+ return ulid.ULID{}, errors.Wrap(err, "close index")
+ }
+
+ if err := os.Mkdir(path.Join(dir, uid.String(), "chunks"), os.ModePerm); err != nil {
+ return ulid.ULID{}, errors.Wrap(err, "close index")
+ }
+
+ w, err := index.NewWriter(path.Join(dir, uid.String(), "index"))
+ if err != nil {
+ return ulid.ULID{}, errors.Wrap(err, "new index")
+ }
+
+ if err := w.Close(); err != nil {
+ return ulid.ULID{}, errors.Wrap(err, "close index")
+ }
+
+ m := tsdb.BlockMeta{
+ Version: 1,
+ ULID: uid,
+ MinTime: mint,
+ MaxTime: maxt,
+ Compaction: tsdb.BlockMetaCompaction{
+ Level: 1,
+ Sources: []ulid.ULID{uid},
+ },
+ }
+ b, err := json.Marshal(&m)
+ if err != nil {
+ return ulid.ULID{}, err
+ }
+
+ if err := ioutil.WriteFile(path.Join(dir, uid.String(), "meta.json"), b, os.ModePerm); err != nil {
+ return ulid.ULID{}, errors.Wrap(err, "saving meta.json")
+ }
+
+ if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, uid.String()), metadata.Thanos{
+ Labels: extLset.Map(),
+ Downsample: metadata.ThanosDownsample{Resolution: resolution},
+ Source: metadata.TestSource,
+ }, nil); err != nil {
+ return ulid.ULID{}, errors.Wrap(err, "finalize block")
+ }
+
+ return uid, nil
+}
diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go
index 305f72021c..f5afecdcd0 100644
--- a/pkg/compact/downsample/downsample.go
+++ b/pkg/compact/downsample/downsample.go
@@ -5,7 +5,8 @@ import (
"path/filepath"
"sort"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunkenc"
@@ -31,7 +32,7 @@ const (
// Downsample downsamples the given block. It writes a new block into dir and returns its ID.
func Downsample(
logger log.Logger,
- origMeta *block.Meta,
+ origMeta *metadata.Meta,
b tsdb.BlockReader,
dir string,
resolution int64,
@@ -113,6 +114,7 @@ func Downsample(
origMeta.Thanos.Downsample.Resolution,
resolution,
)
+
if err != nil {
return id, errors.Wrap(err, "downsample aggregate block")
}
@@ -125,18 +127,18 @@ func Downsample(
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
- id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime)
+ id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrap(err, "compact head")
}
bdir := filepath.Join(dir, id.String())
- var tmeta block.ThanosMeta
+ var tmeta metadata.Thanos
tmeta = origMeta.Thanos
- tmeta.Source = block.CompactorSource
+ tmeta.Source = metadata.CompactorSource
tmeta.Downsample.Resolution = resolution
- _, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta)
+ _, err = metadata.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrapf(err, "failed to finalize the block %s", bdir)
}
@@ -228,13 +230,20 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
}
func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) {
- return tsdb.EmptyTombstoneReader(), nil
+ return emptyTombstoneReader{}, nil
}
func (b *memBlock) Close() error {
return nil
}
+type emptyTombstoneReader struct{}
+
+func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil }
+func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil }
+func (emptyTombstoneReader) Total() uint64 { return 0 }
+func (emptyTombstoneReader) Close() error { return nil }
+
// currentWindow returns the end timestamp of the window that t falls into.
func currentWindow(t, r int64) int64 {
// The next timestamp is the next number after s.t that's aligned with window.
@@ -412,6 +421,7 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
chks = append(chks, ab.encode())
}
+
return chks
}
diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go
index d384478416..bb2c38b17a 100644
--- a/pkg/compact/downsample/downsample_test.go
+++ b/pkg/compact/downsample/downsample_test.go
@@ -1,12 +1,15 @@
package downsample
import (
+ "github.com/prometheus/tsdb"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunks"
@@ -59,7 +62,7 @@ func TestDownsampleRaw(t *testing.T) {
},
},
}
- testDownsample(t, input, &block.Meta{}, 100)
+ testDownsample(t, input, &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 250}}, 100)
}
func TestDownsampleAggr(t *testing.T) {
@@ -96,8 +99,9 @@ func TestDownsampleAggr(t *testing.T) {
},
},
}
- var meta block.Meta
+ var meta metadata.Meta
meta.Thanos.Downsample.Resolution = 10
+ meta.BlockMeta = tsdb.BlockMeta{MinTime: 99, MaxTime: 1300}
testDownsample(t, input, &meta, 500)
}
@@ -123,7 +127,7 @@ type downsampleTestSet struct {
// testDownsample inserts the input into a block and invokes the downsampler with the given resolution.
// The chunk ranges within the input block are aligned at 500 time units.
-func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, resolution int64) {
+func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta, resolution int64) {
t.Helper()
dir, err := ioutil.TempDir("", "downsample-raw")
diff --git a/pkg/compact/downsample/pool.go b/pkg/compact/downsample/pool.go
index 9b199e7ab9..17094cd0c6 100644
--- a/pkg/compact/downsample/pool.go
+++ b/pkg/compact/downsample/pool.go
@@ -6,14 +6,14 @@ import (
"github.com/prometheus/tsdb/chunkenc"
)
-// Pool is a memory pool of chunk objects, supporting Thanos aggr chunk encoding.
+// Pool is a memory pool of chunk objects, supporting Thanos aggregated chunk encoding.
// It maintains separate pools for xor and aggr chunks.
type pool struct {
wrapped chunkenc.Pool
aggr sync.Pool
}
-// TODO(bplotka): Add reasonable limits to our sync pools them to detect OOMs early.
+// TODO(bwplotka): Add reasonable limits to our sync pooling them to detect OOMs early.
func NewPool() chunkenc.Pool {
return &pool{
wrapped: chunkenc.NewPool(),
@@ -51,6 +51,7 @@ func (p *pool) Put(c chunkenc.Chunk) error {
// Clear []byte.
*ac = AggrChunk(nil)
p.aggr.Put(ac)
+ return nil
}
return p.wrapped.Put(c)
diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go
index 0aef91c697..1a26b1e5db 100644
--- a/pkg/compact/retention_test.go
+++ b/pkg/compact/retention_test.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/inmem"
@@ -253,15 +253,15 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) {
t.Helper()
- meta1 := block.Meta{
+ meta1 := metadata.Meta{
Version: 1,
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse(id),
MinTime: minTime.Unix() * 1000,
MaxTime: maxTime.Unix() * 1000,
},
- Thanos: block.ThanosMeta{
- Downsample: block.ThanosDownsampleMeta{
+ Thanos: metadata.Thanos{
+ Downsample: metadata.ThanosDownsample{
Resolution: resolutionLevel,
},
},
diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go
index 23e5b0a801..cf693dd2fb 100644
--- a/pkg/query/api/v1.go
+++ b/pkg/query/api/v1.go
@@ -527,15 +527,16 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) {
var sets []storage.SeriesSet
for _, mset := range matcherSets {
- s, err := q.Select(&storage.SelectParams{}, mset...)
+ s, _, err := q.Select(&storage.SelectParams{}, mset...)
if err != nil {
return nil, nil, &apiError{errorExec, err}
}
sets = append(sets, s)
}
- set := storage.NewMergeSeriesSet(sets)
- metrics := []labels.Labels{}
+ set := storage.NewMergeSeriesSet(sets, nil)
+
+ var metrics []labels.Labels
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go
index 4afef1d054..bcf17907b0 100644
--- a/pkg/query/api/v1_test.go
+++ b/pkg/query/api/v1_test.go
@@ -333,7 +333,7 @@ func TestEndpoints(t *testing.T) {
"start": []string{"-2"},
"end": []string{"-1"},
},
- response: []labels.Labels{},
+ response: []labels.Labels(nil),
},
// Start and end after series ends.
{
@@ -343,7 +343,7 @@ func TestEndpoints(t *testing.T) {
"start": []string{"100000"},
"end": []string{"100001"},
},
- response: []labels.Labels{},
+ response: []labels.Labels(nil),
},
// Start before series starts, end after series ends.
{
@@ -409,33 +409,38 @@ func TestEndpoints(t *testing.T) {
}
for _, test := range tests {
- // Build a context with the correct request params.
- ctx := context.Background()
- for p, v := range test.params {
- ctx = route.WithParam(ctx, p, v)
- }
- t.Logf("run query %q", test.query.Encode())
+ if ok := t.Run(test.query.Encode(), func(t *testing.T) {
+ // Build a context with the correct request params.
+ ctx := context.Background()
+ for p, v := range test.params {
+ ctx = route.WithParam(ctx, p, v)
+ }
- req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil)
- if err != nil {
- t.Fatal(err)
- }
- resp, _, apiErr := test.endpoint(req.WithContext(ctx))
- if apiErr != nil {
- if test.errType == errorNone {
- t.Fatalf("Unexpected error: %s", apiErr)
+ req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil)
+ if err != nil {
+ t.Fatal(err)
}
- if test.errType != apiErr.typ {
- t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ)
+ resp, _, apiErr := test.endpoint(req.WithContext(ctx))
+ if apiErr != nil {
+ if test.errType == errorNone {
+ t.Fatalf("Unexpected error: %s", apiErr)
+ }
+ if test.errType != apiErr.typ {
+ t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ)
+ }
+ return
}
- continue
- }
- if apiErr == nil && test.errType != errorNone {
- t.Fatalf("Expected error of type %q but got none", test.errType)
- }
- if !reflect.DeepEqual(resp, test.response) {
- t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp)
+ if apiErr == nil && test.errType != errorNone {
+ t.Fatalf("Expected error of type %q but got none", test.errType)
+ }
+
+ if !reflect.DeepEqual(resp, test.response) {
+ t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp)
+ }
+ }); !ok {
+ return
}
+
}
}
diff --git a/pkg/query/querier.go b/pkg/query/querier.go
index 6e962f6472..819ff3ac2a 100644
--- a/pkg/query/querier.go
+++ b/pkg/query/querier.go
@@ -169,13 +169,13 @@ func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg
}
-func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, error) {
+func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_select")
defer span.Finish()
sms, err := translateMatchers(ms...)
if err != nil {
- return nil, errors.Wrap(err, "convert matchers")
+ return nil, nil, errors.Wrap(err, "convert matchers")
}
queryAggrs, resAggr := aggrsFromFunc(params.Func)
@@ -189,10 +189,12 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
Aggregates: queryAggrs,
PartialResponseDisabled: !q.partialResponse,
}, resp); err != nil {
- return nil, errors.Wrap(err, "proxy Series()")
+ return nil, nil, errors.Wrap(err, "proxy Series()")
}
for _, w := range resp.warnings {
+ // NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method,
+ // so we choose to be consistent and keep reporter.
q.warningReporter(errors.New(w))
}
@@ -203,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
- }, nil
+ }, nil, nil
}
// TODO(fabxc): this could potentially pushed further down into the store API
@@ -220,7 +222,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
// The merged series set assembles all potentially-overlapping time ranges
// of the same series into a single one. The series are ordered so that equal series
// from different replicas are sequential. We can now deduplicate those.
- return newDedupSeriesSet(set, q.replicaLabel), nil
+ return newDedupSeriesSet(set, q.replicaLabel), nil, nil
}
// sortDedupLabels resorts the set so that the same series with different replica
@@ -245,6 +247,7 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) {
})
}
+// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()
@@ -261,6 +264,12 @@ func (q *querier) LabelValues(name string) ([]string, error) {
return resp.Values, nil
}
+// LabelNames returns all the unique label names present in the block in sorted order.
+// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702.
+func (q *querier) LabelNames() ([]string, error) {
+ return nil, errors.New("not implemented")
+}
+
func (q *querier) Close() error {
q.cancel()
return nil
diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go
index 46b51a7f3e..980d837213 100644
--- a/pkg/query/querier_test.go
+++ b/pkg/query/querier_test.go
@@ -36,7 +36,7 @@ func TestQuerier_Series(t *testing.T) {
q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil)
defer func() { testutil.Ok(t, q.Close()) }()
- res, err := q.Select(&storage.SelectParams{})
+ res, _, err := q.Select(&storage.SelectParams{})
testutil.Ok(t, err)
expected := []struct {
diff --git a/pkg/query/test_print.go b/pkg/query/test_print.go
new file mode 100644
index 0000000000..70bc292439
--- /dev/null
+++ b/pkg/query/test_print.go
@@ -0,0 +1,34 @@
+package query
+
+import (
+ "fmt"
+
+ "github.com/prometheus/prometheus/storage"
+)
+
+type printSeriesSet struct {
+ set storage.SeriesSet
+}
+
+func newPrintSeriesSet(set storage.SeriesSet) storage.SeriesSet {
+ return &printSeriesSet{set: set}
+}
+
+func (s *printSeriesSet) Next() bool {
+ return s.set.Next()
+}
+
+func (s *printSeriesSet) At() storage.Series {
+ at := s.set.At()
+ fmt.Println("Series", at.Labels())
+
+ i := at.Iterator()
+ for i.Next() {
+ fmt.Println(i.At())
+ }
+ return at
+}
+
+func (s *printSeriesSet) Err() error {
+ return s.set.Err()
+}
diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go
index 5c1df9a9b7..2163ec8b7a 100644
--- a/pkg/shipper/shipper.go
+++ b/pkg/shipper/shipper.go
@@ -11,6 +11,8 @@ import (
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
@@ -69,7 +71,7 @@ type Shipper struct {
metrics *metrics
bucket objstore.Bucket
labels func() labels.Labels
- source block.SourceType
+ source metadata.SourceType
}
// New creates a new shipper that detects new TSDB blocks in dir and uploads them
@@ -80,7 +82,7 @@ func New(
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
- source block.SourceType,
+ source metadata.SourceType,
) *Shipper {
if logger == nil {
logger = log.NewNopLogger()
@@ -114,7 +116,7 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) {
minTime = math.MaxInt64
maxSyncTime = math.MinInt64
- if err := s.iterBlockMetas(func(m *block.Meta) error {
+ if err := s.iterBlockMetas(func(m *metadata.Meta) error {
if m.MinTime < minTime {
minTime = m.MinTime
}
@@ -158,7 +160,7 @@ func (s *Shipper) Sync(ctx context.Context) {
// TODO(bplotka): If there are no blocks in the system check for WAL dir to ensure we have actually
// access to real TSDB dir (!).
- if err = s.iterBlockMetas(func(m *block.Meta) error {
+ if err = s.iterBlockMetas(func(m *metadata.Meta) error {
// Do not sync a block if we already uploaded it. If it is no longer found in the bucket,
// it was generally removed by the compaction process.
if _, ok := hasUploaded[m.ULID]; !ok {
@@ -180,7 +182,7 @@ func (s *Shipper) Sync(ctx context.Context) {
}
}
-func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
+func (s *Shipper) sync(ctx context.Context, meta *metadata.Meta) (err error) {
dir := filepath.Join(s.dir, meta.ULID.String())
// We only ship of the first compacted block level.
@@ -225,7 +227,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
meta.Thanos.Labels = lset.Map()
}
meta.Thanos.Source = s.source
- if err := block.WriteMetaFile(s.logger, updir, meta); err != nil {
+ if err := metadata.Write(s.logger, updir, meta); err != nil {
return errors.Wrap(err, "write meta file")
}
return block.Upload(ctx, s.logger, s.bucket, updir)
@@ -234,7 +236,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
// iterBlockMetas calls f with the block meta for each block found in dir. It logs
// an error and continues if it cannot access a meta.json file.
// If f returns an error, the function returns with the same error.
-func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error {
+func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error {
names, err := fileutil.ReadDir(s.dir)
if err != nil {
return errors.Wrap(err, "read dir")
@@ -253,7 +255,7 @@ func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error {
if !fi.IsDir() {
continue
}
- m, err := block.ReadMetaFile(dir)
+ m, err := metadata.Read(dir)
if err != nil {
level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err)
continue
diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go
index 8f0e70ecd6..496d53322f 100644
--- a/pkg/shipper/shipper_e2e_test.go
+++ b/pkg/shipper/shipper_e2e_test.go
@@ -14,6 +14,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/objtesting"
"github.com/improbable-eng/thanos/pkg/testutil"
@@ -32,7 +33,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) {
}()
extLset := labels.FromStrings("prometheus", "prom-1")
- shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, block.TestSource)
+ shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -54,7 +55,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) {
testutil.Ok(t, os.Mkdir(tmp, 0777))
- meta := block.Meta{
+ meta := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: timestamp.FromTime(now.Add(time.Duration(i) * time.Hour)),
MaxTime: timestamp.FromTime(now.Add((time.Duration(i) * time.Hour) + 1)),
@@ -62,7 +63,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) {
}
meta.Version = 1
meta.ULID = id
- meta.Thanos.Source = block.TestSource
+ meta.Thanos.Source = metadata.TestSource
metab, err := json.Marshal(&meta)
testutil.Ok(t, err)
diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go
index 368c5e92b9..150b5264db 100644
--- a/pkg/shipper/shipper_test.go
+++ b/pkg/shipper/shipper_test.go
@@ -2,15 +2,13 @@ package shipper
import (
"io/ioutil"
- "os"
- "testing"
-
"math"
-
+ "os"
"path"
+ "testing"
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
"github.com/prometheus/tsdb"
@@ -23,7 +21,7 @@ func TestShipperTimestamps(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()
- s := New(nil, nil, dir, nil, nil, block.TestSource)
+ s := New(nil, nil, dir, nil, nil, metadata.TestSource)
// Missing thanos meta file.
_, _, err = s.Timestamps()
@@ -41,7 +39,7 @@ func TestShipperTimestamps(t *testing.T) {
id1 := ulid.MustNew(1, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm))
- testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id1.String()), &block.Meta{
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &metadata.Meta{
Version: 1,
BlockMeta: tsdb.BlockMeta{
ULID: id1,
@@ -56,7 +54,7 @@ func TestShipperTimestamps(t *testing.T) {
id2 := ulid.MustNew(2, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm))
- testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id2.String()), &block.Meta{
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &metadata.Meta{
Version: 1,
BlockMeta: tsdb.BlockMeta{
ULID: id2,
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index ee23a69124..3b45756cbd 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -19,6 +19,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/pool"
@@ -960,7 +961,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...labels.Matcher) ([]labels.Mat
type bucketBlock struct {
logger log.Logger
bucket objstore.BucketReader
- meta *block.Meta
+ meta *metadata.Meta
dir string
indexCache *indexCache
chunkPool *pool.BytesPool
@@ -1024,7 +1025,7 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error {
} else if err != nil {
return err
}
- meta, err := block.ReadMetaFile(b.dir)
+ meta, err := metadata.Read(b.dir)
if err != nil {
return errors.Wrap(err, "read meta.json")
}
@@ -1054,13 +1055,7 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {
}
}()
- indexr, err := index.NewFileReader(fn)
- if err != nil {
- return errors.Wrap(err, "open index reader")
- }
- defer runutil.CloseWithLogOnErr(b.logger, indexr, "load index cache reader")
-
- if err := block.WriteIndexCache(b.logger, cachefn, indexr); err != nil {
+ if err := block.WriteIndexCache(b.logger, fn, cachefn); err != nil {
return errors.Wrap(err, "write index cache")
}
@@ -1144,7 +1139,7 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB
cache: cache,
loadedSeries: map[uint64][]byte{},
}
- r.dec.SetSymbolTable(r.block.symbols)
+ r.dec.LookupSymbol = r.lookupSymbol
return r
}
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index c5528121ac..f1f6780397 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -10,6 +10,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/objtesting"
"github.com/improbable-eng/thanos/pkg/runutil"
@@ -59,10 +60,10 @@ func prepareStoreWithTestBlocks(t testing.TB, ctx context.Context, dir string, b
dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String())
// Add labels to the meta of the second block.
- meta, err := block.ReadMetaFile(dir2)
+ meta, err := metadata.Read(dir2)
testutil.Ok(t, err)
meta.Thanos.Labels = map[string]string{"ext2": "value2"}
- testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta))
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta))
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1))
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2))
@@ -72,7 +73,7 @@ func prepareStoreWithTestBlocks(t testing.TB, ctx context.Context, dir string, b
testutil.Ok(t, os.RemoveAll(dir2))
}
- store, err := NewBucketStore(nil, nil, bkt, dir, 100, 0, false)
+ store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false)
testutil.Ok(t, err)
go func() {
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 06d9d2f98b..f2880dd9b4 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -4,13 +4,11 @@ import (
"testing"
"time"
- "github.com/oklog/ulid"
-
- "github.com/improbable-eng/thanos/pkg/compact/downsample"
-
"github.com/fortytw2/leaktest"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+ "github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/testutil"
+ "github.com/oklog/ulid"
"github.com/prometheus/tsdb/labels"
)
@@ -41,7 +39,7 @@ func TestBucketBlockSet_addGet(t *testing.T) {
}
for _, in := range input {
- var m block.Meta
+ var m metadata.Meta
m.Thanos.Downsample.Resolution = in.window
m.MinTime = in.mint
m.MaxTime = in.maxt
@@ -102,7 +100,7 @@ func TestBucketBlockSet_addGet(t *testing.T) {
var exp []*bucketBlock
for _, b := range c.res {
- var m block.Meta
+ var m metadata.Meta
m.Thanos.Downsample.Resolution = b.window
m.MinTime = b.mint
m.MaxTime = b.maxt
@@ -129,7 +127,7 @@ func TestBucketBlockSet_remove(t *testing.T) {
}
for _, in := range input {
- var m block.Meta
+ var m metadata.Meta
m.ULID = in.id
m.MinTime = in.mint
m.MaxTime = in.maxt
diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go
index 567d6a5fe1..c4218a1e14 100644
--- a/pkg/store/prometheus_test.go
+++ b/pkg/store/prometheus_test.go
@@ -122,6 +122,7 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
_, err = a.Add(labels.FromStrings("a", "c"), 0, 1)
testutil.Ok(t, err)
_, err = a.Add(labels.FromStrings("a", "a"), 0, 1)
+ testutil.Ok(t, err)
testutil.Ok(t, a.Commit())
ctx, cancel := context.WithCancel(context.Background())
diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go
index 807b61f8a0..7cf869f4d5 100644
--- a/pkg/testutil/prometheus.go
+++ b/pkg/testutil/prometheus.go
@@ -12,8 +12,9 @@ import (
"syscall"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
@@ -157,7 +158,7 @@ func (p *Prometheus) SetConfig(s string) (err error) {
// Stop terminates Prometheus and clean up its data directory.
func (p *Prometheus) Stop() error {
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
- return errors.Wrapf(err, "failed to Prometheus. Kill it manually and cleanr %s dir", p.db.Dir())
+ return errors.Wrapf(err, "failed to Prometheus. Kill it manually and clean %s dir", p.db.Dir())
}
time.Sleep(time.Second / 2)
@@ -188,7 +189,7 @@ func CreateBlock(
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
- h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000)
+ h, err := tsdb.NewHead(nil, nil, nil, 10000000000)
if err != nil {
return id, errors.Wrap(err, "create head block")
}
@@ -238,15 +239,15 @@ func CreateBlock(
return id, errors.Wrap(err, "create compactor")
}
- id, err = c.Write(dir, h, mint, maxt)
+ id, err = c.Write(dir, h, mint, maxt, nil)
if err != nil {
return id, errors.Wrap(err, "write block")
}
- if _, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(dir, id.String()), block.ThanosMeta{
+ if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{
Labels: extLset.Map(),
- Downsample: block.ThanosDownsampleMeta{Resolution: resolution},
- Source: block.TestSource,
+ Downsample: metadata.ThanosDownsample{Resolution: resolution},
+ Source: metadata.TestSource,
}, nil); err != nil {
return id, errors.Wrap(err, "finalize block")
}
@@ -256,4 +257,4 @@ func CreateBlock(
}
return id, nil
-}
+}
\ No newline at end of file
diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go
index 54a20703d4..72100b15bc 100644
--- a/pkg/verifier/index_issue.go
+++ b/pkg/verifier/index_issue.go
@@ -8,6 +8,8 @@ import (
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
@@ -94,7 +96,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
logger,
tmpdir,
id,
- block.BucketRepairSource,
+ metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,