From 07739f2118bfa7ad7745ee31a214158962b80960 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Wed, 2 Jan 2019 19:21:28 +0000 Subject: [PATCH] Upgraded Prometheus and TSDB deps. This PR does not add anything, however by upgrading Prometheus from 2.3.2 to master tip and same for TSDB it may affects few things. Bigger packages we use and changes I found manually: * prometheus/prometheus/discovery/file * [ENHANCEMENT] Discovery: Improve performance of previously slow updates of changes of targets. #4526 ?? cc @ivan-valkov * [BUGFIX] Wait for service discovery to stop before exiting #4508 ?? * prometheus/prometheus/promql: * [BUGFIX] PromQL: Fix a goroutine leak in the lexer/parser. #4858 * [BUGFIX] Change max/min over_time to handle NaNs properly. #438 * [BUGFIX] Check label name for `count_values` PromQL function. #4585 * [BUGFIX] Ensure that vectors and matrices do not contain identical label-sets. #4589 * [ENHANCEMENT] Optimize PromQL aggregations #4248 * [BUGFIX] Only add LookbackDelta to vector selectors #4399 * [BUGFIX] Reduce floating point errors in stddev and related functions #4533 * prometheus/prometheus/rules: * New metrics exposed! (prometheus evaluation!) * [ENHANCEMENT] Rules: Error out at load time for invalid templates, rather than at evaluation time. #4537 * prometheus/tsdb/index: Index reader optimizations. There are things/fixes we may reuse in next PRs (TODO create gh issues for those): * api changes (warnings support on Prometheus UI and Query API) * UI fixes: * [ENHANCEMENT] Web UI: Support console queries at specific times. #4764 * [ENHANCEMENT] Web UI: Avoid browser spell-checking in expression field. #472 * Use notifier package once https://github.com/prometheus/prometheus/pull/5025 is merged. * Ruler UI fixes: * [ENHANCEMENT] Show rule evaluation errors in UI #4457 Follopw up issues we can fix in further PRs: * QueryAPI has now api/v1/labels that Thanos does not yet support. Created issue with details: https://github.com/improbable-eng/thanos/issues/702 * https://github.com/improbable-eng/thanos/issues/703 Signed-off-by: Bartek Plotka --- Gopkg.toml | 5 +- Makefile | 4 +- cmd/thanos/bucket.go | 10 +- cmd/thanos/downsample.go | 12 +- cmd/thanos/query.go | 11 +- cmd/thanos/rule.go | 17 ++- cmd/thanos/sidecar.go | 36 +++--- pkg/block/block.go | 133 ++------------------ pkg/block/index.go | 89 ++++++++++++-- pkg/block/index_test.go | 46 +++++++ pkg/block/metadata/meta.go | 142 ++++++++++++++++++++++ pkg/compact/compact.go | 41 ++++--- pkg/compact/compact_e2e_test.go | 95 ++++++++++++--- pkg/compact/downsample/downsample.go | 24 ++-- pkg/compact/downsample/downsample_test.go | 10 +- pkg/compact/downsample/pool.go | 5 +- pkg/compact/retention_test.go | 8 +- pkg/query/api/v1.go | 7 +- pkg/query/api/v1_test.go | 55 +++++---- pkg/query/querier.go | 19 ++- pkg/query/querier_test.go | 2 +- pkg/query/test_print.go | 34 ++++++ pkg/shipper/shipper.go | 18 +-- pkg/shipper/shipper_e2e_test.go | 7 +- pkg/shipper/shipper_test.go | 14 +-- pkg/store/bucket.go | 15 +-- pkg/store/bucket_e2e_test.go | 7 +- pkg/store/bucket_test.go | 14 +-- pkg/store/prometheus_test.go | 1 + pkg/testutil/prometheus.go | 17 +-- pkg/verifier/index_issue.go | 4 +- 31 files changed, 587 insertions(+), 315 deletions(-) create mode 100644 pkg/block/index_test.go create mode 100644 pkg/block/metadata/meta.go create mode 100644 pkg/query/test_print.go diff --git a/Gopkg.toml b/Gopkg.toml index 98bbc9494d..057fa27ac4 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -37,7 +37,8 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"] name = "github.com/prometheus/common" [[constraint]] - version = "v2.3.2" + # TODO(bwplotka): Move to released version once our recent fixes will be released (v2.7.0) + revision = "3bd41cc92c7800cc6072171bd4237406126fa169" name = "github.com/prometheus/prometheus" [[override]] @@ -46,7 +47,7 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"] [[constraint]] name = "github.com/prometheus/tsdb" - revision = "bd832fc8274e8fe63999ac749daaaff9d881241f" + version = "v0.4.0" [[constraint]] branch = "master" diff --git a/Makefile b/Makefile index 4d0e27aab2..97ee8ff044 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,9 @@ PROTOC_VERSION ?= 3.4.0 # E2e test deps. # Referenced by github.com/improbable-eng/thanos/blob/master/docs/getting_started.md#prometheus -SUPPORTED_PROM_VERSIONS ?=v2.0.0 v2.2.1 v2.3.2 v2.4.3 v2.5.0 + +# Limitied prom version, because testing was not possibe. This should fix it: https://github.com/improbable-eng/thanos/issues/758 +SUPPORTED_PROM_VERSIONS ?=v2.4.3 v2.5.0 ALERTMANAGER_VERSION ?=v0.15.2 MINIO_SERVER_VERSION ?=RELEASE.2018-10-06T00-15-16Z diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 01b9b54bf6..ff68feee2a 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -10,10 +10,9 @@ import ( "text/template" "time" - "github.com/prometheus/tsdb/labels" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/verifier" @@ -23,6 +22,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/labels" "golang.org/x/text/language" "golang.org/x/text/message" "gopkg.in/alecthomas/kingpin.v2" @@ -254,7 +254,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin defer cancel() // Getting Metas. - var blockMetas []*block.Meta + var blockMetas []*metadata.Meta if err = bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) if !ok { @@ -277,7 +277,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin } } -func printTable(blockMetas []*block.Meta, selectorLabels labels.Labels, sortBy []string) error { +func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortBy []string) error { header := inspectColumns var lines [][]string @@ -355,7 +355,7 @@ func getKeysAlphabetically(labels map[string]string) []string { // matchesSelector checks if blockMeta contains every label from // the selector with the correct value -func matchesSelector(blockMeta *block.Meta, selectorLabels labels.Labels) bool { +func matchesSelector(blockMeta *metadata.Meta, selectorLabels labels.Labels) bool { for _, l := range selectorLabels { if v, ok := blockMeta.Thanos.Labels[l.Name]; !ok || v != l.Value { return false diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index c00aadd43d..5f3846ff50 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -8,11 +8,10 @@ import ( "path/filepath" "time" - "github.com/prometheus/tsdb/chunkenc" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/client" @@ -23,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunkenc" "gopkg.in/alecthomas/kingpin.v2" ) @@ -105,7 +105,7 @@ func downsampleBucket( if err := os.MkdirAll(dir, 0777); err != nil { return errors.Wrap(err, "create dir") } - var metas []*block.Meta + var metas []*metadata.Meta err := bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) @@ -119,7 +119,7 @@ func downsampleBucket( } defer runutil.CloseWithLogOnErr(logger, rc, "block reader") - var m block.Meta + var m metadata.Meta if err := json.NewDecoder(rc).Decode(&m); err != nil { return errors.Wrap(err, "decode meta") } @@ -201,7 +201,7 @@ func downsampleBucket( return nil } -func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *block.Meta, dir string, resolution int64) error { +func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error { begin := time.Now() bdir := filepath.Join(dir, m.ULID.String()) @@ -224,7 +224,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu pool = downsample.NewPool() } - b, err := tsdb.OpenBlock(bdir, pool) + b, err := tsdb.OpenBlock(logger, bdir, pool) if err != nil { return errors.Wrapf(err, "open block %s", m.ULID) } diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index bf6cfa64fe..9e6c5ac19e 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -296,7 +296,16 @@ func runQuery( return stores.Get(), nil }, selectorLset) queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel) - engine = promql.NewEngine(logger, reg, maxConcurrentQueries, queryTimeout) + engine = promql.NewEngine( + promql.EngineOpts{ + Logger: logger, + Reg: reg, + MaxConcurrent: maxConcurrentQueries, + // TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703 + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + }, + ) ) // Periodically update the store set with the addresses we see in our cluster. { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 03fed102d2..00d7e2bb98 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,15 +19,14 @@ import ( "syscall" "time" - "github.com/improbable-eng/thanos/pkg/extprom" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/discovery/cache" "github.com/improbable-eng/thanos/pkg/discovery/dns" + "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/shipper" @@ -117,7 +116,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) MaxBlockDuration: *tsdbBlockDuration, Retention: *tsdbRetention, NoLockfile: true, - WALFlushInterval: 30 * time.Second, } lookupQueries := map[string]struct{}{} @@ -290,7 +288,7 @@ func runRule( ctx, cancel := context.WithCancel(context.Background()) ctx = tracing.ContextWithTracer(ctx, tracer) - notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) error { + notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { res := make([]*alert.Alert, 0, len(alerts)) for _, alrt := range alerts { // Only send actually firing alerts. @@ -309,17 +307,18 @@ func runRule( res = append(res, a) } alertQ.Push(res) - - return nil } + + st := tsdb.Adapter(db, 0) mgr = rules.NewManager(&rules.ManagerOptions{ Context: ctx, QueryFunc: queryFn, NotifyFunc: notify, Logger: log.With(logger, "component", "rules"), - Appendable: tsdb.Adapter(db, 0), + Appendable: st, Registerer: reg, ExternalURL: nil, + TSDB: st, }) g.Add(func() error { mgr.Run() @@ -579,7 +578,7 @@ func runRule( } }() - s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource) + s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 61f493f93a..bd462b2f65 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -14,7 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/reloader" @@ -102,7 +102,7 @@ func runSidecar( reloader *reloader.Reloader, component string, ) error { - var metadata = &metadata{ + var m = &promMetadata{ promURL: promURL, // Start out with the full time range. The shipper will constrain it later. @@ -128,7 +128,7 @@ func runSidecar( // Blocking query of external labels before joining as a Source Peer into gossip. // We retry infinitely until we reach and fetch labels from our Prometheus. err := runutil.Retry(2*time.Second, ctx.Done(), func() error { - if err := metadata.UpdateLabels(ctx, logger); err != nil { + if err := m.UpdateLabels(ctx, logger); err != nil { level.Warn(logger).Log( "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", "err", err, @@ -145,14 +145,14 @@ func runSidecar( return errors.Wrap(err, "initial external labels query") } - if len(metadata.Labels()) == 0 { + if len(m.Labels()) == 0 { return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured") } // New gossip cluster. - mint, maxt := metadata.Timestamps() + mint, maxt := m.Timestamps() if err = peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{ - Labels: metadata.LabelsPB(), + Labels: m.LabelsPB(), MinTime: mint, MaxTime: maxt, }); err != nil { @@ -165,12 +165,12 @@ func runSidecar( iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second) defer iterCancel() - if err := metadata.UpdateLabels(iterCtx, logger); err != nil { + if err := m.UpdateLabels(iterCtx, logger); err != nil { level.Warn(logger).Log("msg", "heartbeat failed", "err", err) promUp.Set(0) } else { // Update gossip. - peer.SetLabels(metadata.LabelsPB()) + peer.SetLabels(m.LabelsPB()) promUp.Set(1) lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9) @@ -204,7 +204,7 @@ func runSidecar( var client http.Client promStore, err := store.NewPrometheusStore( - logger, &client, promURL, metadata.Labels, metadata.Timestamps) + logger, &client, promURL, m.Labels, m.Timestamps) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -252,7 +252,7 @@ func runSidecar( } }() - s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource) + s := shipper.New(logger, nil, dataDir, bkt, m.Labels, metadata.SidecarSource) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { @@ -265,9 +265,9 @@ func runSidecar( if err != nil { level.Warn(logger).Log("msg", "reading timestamps failed", "err", err) } else { - metadata.UpdateTimestamps(minTime, math.MaxInt64) + m.UpdateTimestamps(minTime, math.MaxInt64) - mint, maxt := metadata.Timestamps() + mint, maxt := m.Timestamps() peer.SetTimestamps(mint, maxt) } return nil @@ -281,7 +281,7 @@ func runSidecar( return nil } -type metadata struct { +type promMetadata struct { promURL *url.URL mtx sync.Mutex @@ -290,7 +290,7 @@ type metadata struct { labels labels.Labels } -func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error { +func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error { elset, err := queryExternalLabels(ctx, logger, s.promURL) if err != nil { return err @@ -303,7 +303,7 @@ func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error { return nil } -func (s *metadata) UpdateTimestamps(mint int64, maxt int64) { +func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) { s.mtx.Lock() defer s.mtx.Unlock() @@ -311,14 +311,14 @@ func (s *metadata) UpdateTimestamps(mint int64, maxt int64) { s.maxt = maxt } -func (s *metadata) Labels() labels.Labels { +func (s *promMetadata) Labels() labels.Labels { s.mtx.Lock() defer s.mtx.Unlock() return s.labels } -func (s *metadata) LabelsPB() []storepb.Label { +func (s *promMetadata) LabelsPB() []storepb.Label { s.mtx.Lock() defer s.mtx.Unlock() @@ -332,7 +332,7 @@ func (s *metadata) LabelsPB() []storepb.Label { return lset } -func (s *metadata) Timestamps() (mint int64, maxt int64) { +func (s *promMetadata) Timestamps() (mint int64, maxt int64) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/pkg/block/block.go b/pkg/block/block.go index 118b7ea96c..cdc52a3e08 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -5,11 +5,12 @@ package block import ( "context" "encoding/json" - "io/ioutil" "os" "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "fmt" "github.com/go-kit/kit/log" @@ -17,8 +18,6 @@ import ( "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/prometheus/tsdb" - "github.com/prometheus/tsdb/fileutil" ) const ( @@ -33,103 +32,6 @@ const ( DebugMetas = "debug/metas" ) -type SourceType string - -const ( - UnknownSource SourceType = "" - SidecarSource SourceType = "sidecar" - CompactorSource SourceType = "compactor" - CompactorRepairSource SourceType = "compactor.repair" - RulerSource SourceType = "ruler" - BucketRepairSource SourceType = "bucket.repair" - TestSource SourceType = "test" -) - -// Meta describes the a block's meta. It wraps the known TSDB meta structure and -// extends it by Thanos-specific fields. -type Meta struct { - Version int `json:"version"` - - tsdb.BlockMeta - - Thanos ThanosMeta `json:"thanos"` -} - -// ThanosMeta holds block meta information specific to Thanos. -type ThanosMeta struct { - Labels map[string]string `json:"labels"` - Downsample ThanosDownsampleMeta `json:"downsample"` - - // Source is a real upload source of the block. - Source SourceType `json:"source"` -} - -type ThanosDownsampleMeta struct { - Resolution int64 `json:"resolution"` -} - -// WriteMetaFile writes the given meta into /meta.json. -func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error { - // Make any changes to the file appear atomic. - path := filepath.Join(dir, MetaFilename) - tmp := path + ".tmp" - - f, err := os.Create(tmp) - if err != nil { - return err - } - - enc := json.NewEncoder(f) - enc.SetIndent("", "\t") - - if err := enc.Encode(meta); err != nil { - runutil.CloseWithLogOnErr(logger, f, "close meta") - return err - } - if err := f.Close(); err != nil { - return err - } - return renameFile(logger, tmp, path) -} - -// ReadMetaFile reads the given meta from /meta.json. -func ReadMetaFile(dir string) (*Meta, error) { - b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) - if err != nil { - return nil, err - } - var m Meta - - if err := json.Unmarshal(b, &m); err != nil { - return nil, err - } - if m.Version != 1 { - return nil, errors.Errorf("unexpected meta file version %d", m.Version) - } - return &m, nil -} - -func renameFile(logger log.Logger, from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err - } - if err := os.Rename(from, to); err != nil { - return err - } - - // Directory was renamed; sync parent dir to persist rename. - pdir, err := fileutil.OpenDir(filepath.Dir(to)) - if err != nil { - return err - } - - if err = fileutil.Fsync(pdir); err != nil { - runutil.CloseWithLogOnErr(logger, pdir, "close dir") - return err - } - return pdir.Close() -} - // Download downloads directory that is mean to be block directory. func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error { if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil { @@ -169,7 +71,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.Wrap(err, "not a block dir") } - meta, err := ReadMetaFile(bdir) + meta, err := metadata.Read(bdir) if err != nil { // No meta or broken meta file. return errors.Wrap(err, "read meta") @@ -216,16 +118,16 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error { } // DownloadMeta downloads only meta file from bucket by block ID. -func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) { +func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) { rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename)) if err != nil { - return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) + return metadata.Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) } defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client") - var m Meta + var m metadata.Meta if err := json.NewDecoder(rc).Decode(&m); err != nil { - return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) + return metadata.Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) } return m, nil } @@ -234,24 +136,3 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } - -// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk. -// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. -func InjectThanosMeta(logger log.Logger, bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { - newMeta, err := ReadMetaFile(bdir) - if err != nil { - return nil, errors.Wrap(err, "read new meta") - } - newMeta.Thanos = meta - - // While downsampling we need to copy original compaction. - if downsampledMeta != nil { - newMeta.Compaction = downsampledMeta.Compaction - } - - if err := WriteMetaFile(logger, bdir, newMeta); err != nil { - return nil, errors.Wrap(err, "write new meta") - } - - return newMeta, nil -} diff --git a/pkg/block/index.go b/pkg/block/index.go index 2249863b2d..eb0a3689c0 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -11,6 +11,10 @@ import ( "strings" "time" + "github.com/improbable-eng/thanos/pkg/block/metadata" + + "github.com/prometheus/tsdb/fileutil" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" @@ -36,23 +40,84 @@ type indexCache struct { Postings []postingsRange } +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) index.ByteSlice { + return b[start:end] +} + +func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { + version := int(b.Range(4, 5)[0]) + + if version != 1 && version != 2 { + return nil, errors.Errorf("unknown index file version %d", version) + } + + toc, err := index.NewTOCFromByteSlice(b) + if err != nil { + return nil, errors.Wrap(err, "read TOC") + } + + symbolsV2, symbolsV1, err := index.ReadSymbols(b, version, int(toc.Symbols)) + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } + + symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2)) + for o, s := range symbolsV1 { + symbolsTable[o] = s + } + for o, s := range symbolsV2 { + symbolsTable[uint32(o)] = s + } + + return symbolsTable, nil +} + // WriteIndexCache writes a cache file containing the first lookup stages // for an index file. -func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { +func WriteIndexCache(logger log.Logger, indexFn string, fn string) error { + indexFile, err := fileutil.OpenMmapFile(indexFn) + if err != nil { + return errors.Wrapf(err, "open mmap index file %s", indexFn) + } + defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn) + + b := realByteSlice(indexFile.Bytes()) + indexr, err := index.NewReader(b) + if err != nil { + return errors.Wrap(err, "open index reader") + } + defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader") + + // We assume reader verified index already. + symbols, err := getSymbolTable(b) + if err != nil { + return err + } + f, err := os.Create(fn) if err != nil { - return errors.Wrap(err, "create file") + return errors.Wrap(err, "create index cache file") } defer runutil.CloseWithLogOnErr(logger, f, "index cache writer") v := indexCache{ - Version: r.Version(), - Symbols: r.SymbolTable(), + Version: indexr.Version(), + Symbols: symbols, LabelValues: map[string][]string{}, } // Extract label value indices. - lnames, err := r.LabelIndices() + lnames, err := indexr.LabelIndices() if err != nil { return errors.Wrap(err, "read label indices") } @@ -62,7 +127,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { } ln := lns[0] - tpls, err := r.LabelValues(ln) + tpls, err := indexr.LabelValues(ln) if err != nil { return errors.Wrap(err, "get label values") } @@ -82,7 +147,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { } // Extract postings ranges. - pranges, err := r.PostingsRanges() + pranges, err := indexr.PostingsRanges() if err != nil { return errors.Wrap(err, "read postings ranges") } @@ -346,7 +411,7 @@ type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) ( // - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347. // Fixable inconsistencies are resolved in the new block. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/378 -func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { +func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { if len(ignoreChkFns) == 0 { return resid, errors.New("no ignore chunk function specified") } @@ -355,7 +420,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno entropy := rand.New(rand.NewSource(time.Now().UnixNano())) resid = ulid.MustNew(ulid.Now(), entropy) - meta, err := ReadMetaFile(bdir) + meta, err := metadata.Read(bdir) if err != nil { return resid, errors.Wrap(err, "read meta file") } @@ -363,7 +428,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno return resid, errors.New("cannot repair downsampled block") } - b, err := tsdb.OpenBlock(bdir, nil) + b, err := tsdb.OpenBlock(logger, bdir, nil) if err != nil { return resid, errors.Wrap(err, "open block") } @@ -405,7 +470,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { return resid, errors.Wrap(err, "rewrite block") } - if err := WriteMetaFile(logger, resdir, &resmeta); err != nil { + if err := metadata.Write(logger, resdir, &resmeta); err != nil { return resid, err } return resid, nil @@ -494,7 +559,7 @@ OUTER: func rewrite( indexr tsdb.IndexReader, chunkr tsdb.ChunkReader, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, - meta *Meta, + meta *metadata.Meta, ignoreChkFns []ignoreFnType, ) error { symbols, err := indexr.Symbols() diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go new file mode 100644 index 0000000000..80c10e8e6e --- /dev/null +++ b/pkg/block/index_test.go @@ -0,0 +1,46 @@ +package block + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/tsdb/labels" +) + +func TestWriteReadIndexCache(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test-compact-prepare") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + b, err := testutil.CreateBlock(tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, nil, 124) + testutil.Ok(t, err) + + fn := filepath.Join(tmpDir, "index.cache.json") + testutil.Ok(t, WriteIndexCache(log.NewNopLogger(), filepath.Join(tmpDir, b.String(), "index"), fn)) + + version, symbols, lvals, postings, err := ReadIndexCache(log.NewNopLogger(), fn) + testutil.Ok(t, err) + + testutil.Equals(t, 2, version) + testutil.Equals(t, 6, len(symbols)) + testutil.Equals(t, 2, len(lvals)) + + vals, ok := lvals["a"] + testutil.Assert(t, ok, "") + testutil.Equals(t, []string{"1", "2", "3", "4"}, vals) + + vals, ok = lvals["b"] + testutil.Assert(t, ok, "") + testutil.Equals(t, []string{"1"}, vals) + testutil.Equals(t, 6, len(postings)) +} diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go new file mode 100644 index 0000000000..44f1d3768d --- /dev/null +++ b/pkg/block/metadata/meta.go @@ -0,0 +1,142 @@ +package metadata + +// metadata package implements writing and reading wrapped meta.json where Thanos puts its metadata. +// Those metadata contains external labels, downsampling resolution and source type. +// This package is minimal and separated because it used by testutils which limits test helpers we can use in +// this package. + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/fileutil" +) + +type SourceType string + +const ( + UnknownSource SourceType = "" + SidecarSource SourceType = "sidecar" + CompactorSource SourceType = "compactor" + CompactorRepairSource SourceType = "compactor.repair" + RulerSource SourceType = "ruler" + BucketRepairSource SourceType = "bucket.repair" + TestSource SourceType = "test" +) + +const ( + // MetaFilename is the known JSON filename for meta information. + MetaFilename = "meta.json" +) + +// Meta describes the a block's meta. It wraps the known TSDB meta structure and +// extends it by Thanos-specific fields. +type Meta struct { + Version int `json:"version"` + + tsdb.BlockMeta + + Thanos Thanos `json:"thanos"` +} + +// Thanos holds block meta information specific to Thanos. +type Thanos struct { + Labels map[string]string `json:"labels"` + Downsample ThanosDownsample `json:"downsample"` + + // Source is a real upload source of the block. + Source SourceType `json:"source"` +} + +type ThanosDownsample struct { + Resolution int64 `json:"resolution"` +} + +// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk. +// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. +func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { + newMeta, err := Read(bdir) + if err != nil { + return nil, errors.Wrap(err, "read new meta") + } + newMeta.Thanos = meta + + // While downsampling we need to copy original compaction. + if downsampledMeta != nil { + newMeta.Compaction = downsampledMeta.Compaction + } + + if err := Write(logger, bdir, newMeta); err != nil { + return nil, errors.Wrap(err, "write new meta") + } + + return newMeta, nil +} + +// Write writes the given meta into /meta.json. +func Write(logger log.Logger, dir string, meta *Meta) error { + // Make any changes to the file appear atomic. + path := filepath.Join(dir, MetaFilename) + tmp := path + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return err + } + + enc := json.NewEncoder(f) + enc.SetIndent("", "\t") + + if err := enc.Encode(meta); err != nil { + runutil.CloseWithLogOnErr(logger, f, "close meta") + return err + } + if err := f.Close(); err != nil { + return err + } + return renameFile(logger, tmp, path) +} + +func renameFile(logger log.Logger, from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := fileutil.OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = fileutil.Fsync(pdir); err != nil { + runutil.CloseWithLogOnErr(logger, pdir, "close dir") + return err + } + return pdir.Close() +} + +// Read reads the given meta from /meta.json. +func Read(dir string) (*Meta, error) { + b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) + if err != nil { + return nil, err + } + var m Meta + + if err := json.Unmarshal(b, &m); err != nil { + return nil, err + } + if m.Version != 1 { + return nil, errors.Errorf("unexpected meta file version %d", m.Version) + } + return &m, nil +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 544de920ea..29302f2a1c 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "io/ioutil" "github.com/go-kit/kit/log" @@ -39,7 +41,7 @@ type Syncer struct { bkt objstore.Bucket syncDelay time.Duration mtx sync.Mutex - blocks map[ulid.ULID]*block.Meta + blocks map[ulid.ULID]*metadata.Meta metrics *syncerMetrics } @@ -130,7 +132,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket logger: logger, reg: reg, syncDelay: syncDelay, - blocks: map[ulid.ULID]*block.Meta{}, + blocks: map[ulid.ULID]*metadata.Meta{}, bkt: bkt, metrics: newSyncerMetrics(reg), }, nil @@ -185,9 +187,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error { // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377 if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) && - meta.Thanos.Source != block.BucketRepairSource && - meta.Thanos.Source != block.CompactorSource && - meta.Thanos.Source != block.CompactorRepairSource { + meta.Thanos.Source != metadata.BucketRepairSource && + meta.Thanos.Source != metadata.CompactorSource && + meta.Thanos.Source != metadata.CompactorRepairSource { level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) return nil @@ -214,7 +216,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. -func GroupKey(meta block.Meta) string { +func GroupKey(meta metadata.Meta) string { return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels)) } @@ -381,7 +383,7 @@ type Group struct { labels labels.Labels resolution int64 mtx sync.Mutex - blocks map[ulid.ULID]*block.Meta + blocks map[ulid.ULID]*metadata.Meta compactions prometheus.Counter compactionFailures prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter @@ -405,7 +407,7 @@ func newGroup( bkt: bkt, labels: lset, resolution: resolution, - blocks: map[ulid.ULID]*block.Meta{}, + blocks: map[ulid.ULID]*metadata.Meta{}, compactions: compactions, compactionFailures: compactionFailures, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, @@ -419,7 +421,7 @@ func (cg *Group) Key() string { } // Add the block with the given meta to the group. -func (cg *Group) Add(meta *block.Meta) error { +func (cg *Group) Add(meta *metadata.Meta) error { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -541,7 +543,7 @@ func IsRetryError(err error) bool { return ok } -func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string) error { +func (cg *Group) areBlocksOverlapping(include *metadata.Meta, excludeDirs ...string) error { var ( metas []tsdb.BlockMeta exclude = map[ulid.ULID]struct{}{} @@ -566,6 +568,9 @@ func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string metas = append(metas, include.BlockMeta) } + sort.Slice(metas, func(i, j int) bool { + return metas[i].MinTime < metas[j].MinTime + }) if overlaps := tsdb.OverlappingBlocks(metas); len(overlaps) > 0 { return errors.Errorf("overlaps found while gathering blocks. %s", overlaps) } @@ -597,12 +602,12 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return retry(errors.Wrapf(err, "download block %s", ie.id)) } - meta, err := block.ReadMetaFile(bdir) + meta, err := metadata.Read(bdir) if err != nil { return errors.Wrapf(err, "read meta from %s", bdir) } - resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) + resid, err := block.Repair(logger, tmpdir, ie.id, metadata.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) if err != nil { return errors.Wrapf(err, "repair failed for block %s", ie.id) } @@ -647,7 +652,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err := os.MkdirAll(bdir, 0777); err != nil { return compID, errors.Wrap(err, "create planning block dir") } - if err := block.WriteMetaFile(cg.logger, bdir, meta); err != nil { + if err := metadata.Write(cg.logger, bdir, meta); err != nil { return compID, errors.Wrap(err, "write planning meta file") } } @@ -670,7 +675,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin := time.Now() for _, pdir := range plan { - meta, err := block.ReadMetaFile(pdir) + meta, err := metadata.Read(pdir) if err != nil { return compID, errors.Wrapf(err, "read meta from %s", pdir) } @@ -718,7 +723,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin = time.Now() - compID, err = comp.Compact(dir, plan...) + compID, err = comp.Compact(dir, plan, nil) if err != nil { return compID, halt(errors.Wrapf(err, "compact blocks %v", plan)) } @@ -727,10 +732,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( bdir := filepath.Join(dir, compID.String()) - newMeta, err := block.InjectThanosMeta(cg.logger, bdir, block.ThanosMeta{ + newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ Labels: cg.labels.Map(), - Downsample: block.ThanosDownsampleMeta{Resolution: cg.resolution}, - Source: block.CompactorSource, + Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, + Source: metadata.CompactorSource, }, nil) if err != nil { return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 74df73cc6f..11213dc047 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "os" "path" "path/filepath" @@ -15,12 +16,14 @@ import ( "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -37,13 +40,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { // After the first synchronization the first 5 should be dropped and the // last 5 be loaded from the bucket. var ids []ulid.ULID - var metas []*block.Meta + var metas []*metadata.Meta for i := 0; i < 15; i++ { id, err := ulid.New(uint64(i), nil) testutil.Ok(t, err) - var meta block.Meta + var meta metadata.Meta meta.Version = 1 meta.ULID = id @@ -56,7 +59,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { for _, m := range metas[5:] { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) } groups, err := sy.Groups() @@ -79,11 +82,11 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { // Generate 10 source block metas and construct higher level blocks // that are higher compactions of them. - var metas []*block.Meta + var metas []*metadata.Meta var ids []ulid.ULID for i := 0; i < 10; i++ { - var m block.Meta + var m metadata.Meta m.Version = 1 m.ULID = ulid.MustNew(uint64(i), nil) @@ -94,28 +97,28 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { metas = append(metas, &m) } - var m1 block.Meta + var m1 metadata.Meta m1.Version = 1 m1.ULID = ulid.MustNew(100, nil) m1.Compaction.Level = 2 m1.Compaction.Sources = ids[:4] m1.Thanos.Downsample.Resolution = 0 - var m2 block.Meta + var m2 metadata.Meta m2.Version = 1 m2.ULID = ulid.MustNew(200, nil) m2.Compaction.Level = 2 m2.Compaction.Sources = ids[4:8] // last two source IDs is not part of a level 2 block. m2.Thanos.Downsample.Resolution = 0 - var m3 block.Meta + var m3 metadata.Meta m3.Version = 1 m3.ULID = ulid.MustNew(300, nil) m3.Compaction.Level = 3 m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block. m3.Thanos.Downsample.Resolution = 0 - var m4 block.Meta + var m4 metadata.Meta m4.Version = 14 m4.ULID = ulid.MustNew(400, nil) m4.Compaction.Level = 2 @@ -127,7 +130,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { fmt.Println("create", m.ULID) var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) } // Do one initial synchronization with the bucket. @@ -173,7 +176,7 @@ func TestGroup_Compact_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - var metas []*block.Meta + var metas []*metadata.Meta extLset := labels.Labels{{Name: "e1", Value: "1"}} b1, err := testutil.CreateBlock(prepareDir, []labels.Labels{ {{Name: "a", Value: "1"}}, @@ -183,7 +186,7 @@ func TestGroup_Compact_e2e(t *testing.T) { }, 100, 0, 1000, extLset, 124) testutil.Ok(t, err) - meta, err := block.ReadMetaFile(filepath.Join(prepareDir, b1.String())) + meta, err := metadata.Read(filepath.Join(prepareDir, b1.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -196,15 +199,18 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, err) // Mix order to make sure compact is able to deduct min time / max time. - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b3.String())) + meta, err = metadata.Read(filepath.Join(prepareDir, b3.String())) testutil.Ok(t, err) metas = append(metas, meta) - // Empty block. This can happen when TSDB does not have any samples for min-block-size time. - b2, err := testutil.CreateBlock(prepareDir, []labels.Labels{}, 100, 1001, 2000, extLset, 124) + // Currently TSDB does not produces empty blocks (see: https://github.com/prometheus/tsdb/pull/374). However before v2.7.0 it was + // so we still want to mimick this case as close as possible. + b2, err := createEmptyBlock(prepareDir, 1001, 2000, extLset, 124) testutil.Ok(t, err) - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b2.String())) + // blocks" count=3 mint=0 maxt=3000 ulid=01D1RQCRRJM77KQQ4GYDSC50GM sources="[01D1RQCRMNZBVHBPGRPG2M3NZQ 01D1RQCRPJMYN45T65YA1PRWB7 01D1RQCRNMTWJKTN5QQXFNKKH8]" + + meta, err = metadata.Read(filepath.Join(prepareDir, b2.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -217,7 +223,7 @@ func TestGroup_Compact_e2e(t *testing.T) { }, 100, 3001, 4000, extLset, 124) testutil.Ok(t, err) - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, freshB.String())) + meta, err = metadata.Read(filepath.Join(prepareDir, freshB.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -263,7 +269,7 @@ func TestGroup_Compact_e2e(t *testing.T) { resDir := filepath.Join(dir, id.String()) testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir)) - meta, err = block.ReadMetaFile(resDir) + meta, err = metadata.Read(resDir) testutil.Ok(t, err) testutil.Equals(t, int64(0), meta.MinTime) @@ -294,3 +300,56 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, err) }) } + +// createEmptyBlock produces empty block like it was the case before fix: https://github.com/prometheus/tsdb/pull/374. +// (Prometheus pre v2.7.0) +func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, resolution int64) (ulid.ULID, error) { + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + if err := os.Mkdir(path.Join(dir, uid.String()), os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + if err := os.Mkdir(path.Join(dir, uid.String(), "chunks"), os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + w, err := index.NewWriter(path.Join(dir, uid.String(), "index")) + if err != nil { + return ulid.ULID{}, errors.Wrap(err, "new index") + } + + if err := w.Close(); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + m := tsdb.BlockMeta{ + Version: 1, + ULID: uid, + MinTime: mint, + MaxTime: maxt, + Compaction: tsdb.BlockMetaCompaction{ + Level: 1, + Sources: []ulid.ULID{uid}, + }, + } + b, err := json.Marshal(&m) + if err != nil { + return ulid.ULID{}, err + } + + if err := ioutil.WriteFile(path.Join(dir, uid.String(), "meta.json"), b, os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "saving meta.json") + } + + if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, uid.String()), metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: resolution}, + Source: metadata.TestSource, + }, nil); err != nil { + return ulid.ULID{}, errors.Wrap(err, "finalize block") + } + + return uid, nil +} diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 305f72021c..f5afecdcd0 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -5,7 +5,8 @@ import ( "path/filepath" "sort" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunkenc" @@ -31,7 +32,7 @@ const ( // Downsample downsamples the given block. It writes a new block into dir and returns its ID. func Downsample( logger log.Logger, - origMeta *block.Meta, + origMeta *metadata.Meta, b tsdb.BlockReader, dir string, resolution int64, @@ -113,6 +114,7 @@ func Downsample( origMeta.Thanos.Downsample.Resolution, resolution, ) + if err != nil { return id, errors.Wrap(err, "downsample aggregate block") } @@ -125,18 +127,18 @@ func Downsample( if err != nil { return id, errors.Wrap(err, "create compactor") } - id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime) + id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta) if err != nil { return id, errors.Wrap(err, "compact head") } bdir := filepath.Join(dir, id.String()) - var tmeta block.ThanosMeta + var tmeta metadata.Thanos tmeta = origMeta.Thanos - tmeta.Source = block.CompactorSource + tmeta.Source = metadata.CompactorSource tmeta.Downsample.Resolution = resolution - _, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta) + _, err = metadata.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta) if err != nil { return id, errors.Wrapf(err, "failed to finalize the block %s", bdir) } @@ -228,13 +230,20 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { } func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return tsdb.EmptyTombstoneReader(), nil + return emptyTombstoneReader{}, nil } func (b *memBlock) Close() error { return nil } +type emptyTombstoneReader struct{} + +func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } +func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } +func (emptyTombstoneReader) Total() uint64 { return 0 } +func (emptyTombstoneReader) Close() error { return nil } + // currentWindow returns the end timestamp of the window that t falls into. func currentWindow(t, r int64) int64 { // The next timestamp is the next number after s.t that's aligned with window. @@ -412,6 +421,7 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta { chks = append(chks, ab.encode()) } + return chks } diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index d384478416..bb2c38b17a 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -1,12 +1,15 @@ package downsample import ( + "github.com/prometheus/tsdb" "io/ioutil" "math" "os" "path/filepath" "testing" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunks" @@ -59,7 +62,7 @@ func TestDownsampleRaw(t *testing.T) { }, }, } - testDownsample(t, input, &block.Meta{}, 100) + testDownsample(t, input, &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 250}}, 100) } func TestDownsampleAggr(t *testing.T) { @@ -96,8 +99,9 @@ func TestDownsampleAggr(t *testing.T) { }, }, } - var meta block.Meta + var meta metadata.Meta meta.Thanos.Downsample.Resolution = 10 + meta.BlockMeta = tsdb.BlockMeta{MinTime: 99, MaxTime: 1300} testDownsample(t, input, &meta, 500) } @@ -123,7 +127,7 @@ type downsampleTestSet struct { // testDownsample inserts the input into a block and invokes the downsampler with the given resolution. // The chunk ranges within the input block are aligned at 500 time units. -func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, resolution int64) { +func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta, resolution int64) { t.Helper() dir, err := ioutil.TempDir("", "downsample-raw") diff --git a/pkg/compact/downsample/pool.go b/pkg/compact/downsample/pool.go index 9b199e7ab9..17094cd0c6 100644 --- a/pkg/compact/downsample/pool.go +++ b/pkg/compact/downsample/pool.go @@ -6,14 +6,14 @@ import ( "github.com/prometheus/tsdb/chunkenc" ) -// Pool is a memory pool of chunk objects, supporting Thanos aggr chunk encoding. +// Pool is a memory pool of chunk objects, supporting Thanos aggregated chunk encoding. // It maintains separate pools for xor and aggr chunks. type pool struct { wrapped chunkenc.Pool aggr sync.Pool } -// TODO(bplotka): Add reasonable limits to our sync pools them to detect OOMs early. +// TODO(bwplotka): Add reasonable limits to our sync pooling them to detect OOMs early. func NewPool() chunkenc.Pool { return &pool{ wrapped: chunkenc.NewPool(), @@ -51,6 +51,7 @@ func (p *pool) Put(c chunkenc.Chunk) error { // Clear []byte. *ac = AggrChunk(nil) p.aggr.Put(ac) + return nil } return p.wrapped.Put(c) diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 0aef91c697..1a26b1e5db 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/inmem" @@ -253,15 +253,15 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) { t.Helper() - meta1 := block.Meta{ + meta1 := metadata.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: ulid.MustParse(id), MinTime: minTime.Unix() * 1000, MaxTime: maxTime.Unix() * 1000, }, - Thanos: block.ThanosMeta{ - Downsample: block.ThanosDownsampleMeta{ + Thanos: metadata.Thanos{ + Downsample: metadata.ThanosDownsample{ Resolution: resolutionLevel, }, }, diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 23e5b0a801..cf693dd2fb 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -527,15 +527,16 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { var sets []storage.SeriesSet for _, mset := range matcherSets { - s, err := q.Select(&storage.SelectParams{}, mset...) + s, _, err := q.Select(&storage.SelectParams{}, mset...) if err != nil { return nil, nil, &apiError{errorExec, err} } sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets) - metrics := []labels.Labels{} + set := storage.NewMergeSeriesSet(sets, nil) + + var metrics []labels.Labels for set.Next() { metrics = append(metrics, set.At().Labels()) } diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 4afef1d054..bcf17907b0 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -333,7 +333,7 @@ func TestEndpoints(t *testing.T) { "start": []string{"-2"}, "end": []string{"-1"}, }, - response: []labels.Labels{}, + response: []labels.Labels(nil), }, // Start and end after series ends. { @@ -343,7 +343,7 @@ func TestEndpoints(t *testing.T) { "start": []string{"100000"}, "end": []string{"100001"}, }, - response: []labels.Labels{}, + response: []labels.Labels(nil), }, // Start before series starts, end after series ends. { @@ -409,33 +409,38 @@ func TestEndpoints(t *testing.T) { } for _, test := range tests { - // Build a context with the correct request params. - ctx := context.Background() - for p, v := range test.params { - ctx = route.WithParam(ctx, p, v) - } - t.Logf("run query %q", test.query.Encode()) + if ok := t.Run(test.query.Encode(), func(t *testing.T) { + // Build a context with the correct request params. + ctx := context.Background() + for p, v := range test.params { + ctx = route.WithParam(ctx, p, v) + } - req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil) - if err != nil { - t.Fatal(err) - } - resp, _, apiErr := test.endpoint(req.WithContext(ctx)) - if apiErr != nil { - if test.errType == errorNone { - t.Fatalf("Unexpected error: %s", apiErr) + req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil) + if err != nil { + t.Fatal(err) } - if test.errType != apiErr.typ { - t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + resp, _, apiErr := test.endpoint(req.WithContext(ctx)) + if apiErr != nil { + if test.errType == errorNone { + t.Fatalf("Unexpected error: %s", apiErr) + } + if test.errType != apiErr.typ { + t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + } + return } - continue - } - if apiErr == nil && test.errType != errorNone { - t.Fatalf("Expected error of type %q but got none", test.errType) - } - if !reflect.DeepEqual(resp, test.response) { - t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp) + if apiErr == nil && test.errType != errorNone { + t.Fatalf("Expected error of type %q but got none", test.errType) + } + + if !reflect.DeepEqual(resp, test.response) { + t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp) + } + }); !ok { + return } + } } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 6e962f6472..819ff3ac2a 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -169,13 +169,13 @@ func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg } -func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, error) { +func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_select") defer span.Finish() sms, err := translateMatchers(ms...) if err != nil { - return nil, errors.Wrap(err, "convert matchers") + return nil, nil, errors.Wrap(err, "convert matchers") } queryAggrs, resAggr := aggrsFromFunc(params.Func) @@ -189,10 +189,12 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s Aggregates: queryAggrs, PartialResponseDisabled: !q.partialResponse, }, resp); err != nil { - return nil, errors.Wrap(err, "proxy Series()") + return nil, nil, errors.Wrap(err, "proxy Series()") } for _, w := range resp.warnings { + // NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method, + // so we choose to be consistent and keep reporter. q.warningReporter(errors.New(w)) } @@ -203,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s maxt: q.maxt, set: newStoreSeriesSet(resp.seriesSet), aggr: resAggr, - }, nil + }, nil, nil } // TODO(fabxc): this could potentially pushed further down into the store API @@ -220,7 +222,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // The merged series set assembles all potentially-overlapping time ranges // of the same series into a single one. The series are ordered so that equal series // from different replicas are sequential. We can now deduplicate those. - return newDedupSeriesSet(set, q.replicaLabel), nil + return newDedupSeriesSet(set, q.replicaLabel), nil, nil } // sortDedupLabels resorts the set so that the same series with different replica @@ -245,6 +247,7 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) { }) } +// LabelValues returns all potential values for a label name. func (q *querier) LabelValues(name string) ([]string, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") defer span.Finish() @@ -261,6 +264,12 @@ func (q *querier) LabelValues(name string) ([]string, error) { return resp.Values, nil } +// LabelNames returns all the unique label names present in the block in sorted order. +// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702. +func (q *querier) LabelNames() ([]string, error) { + return nil, errors.New("not implemented") +} + func (q *querier) Close() error { q.cancel() return nil diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 46b51a7f3e..980d837213 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -36,7 +36,7 @@ func TestQuerier_Series(t *testing.T) { q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil) defer func() { testutil.Ok(t, q.Close()) }() - res, err := q.Select(&storage.SelectParams{}) + res, _, err := q.Select(&storage.SelectParams{}) testutil.Ok(t, err) expected := []struct { diff --git a/pkg/query/test_print.go b/pkg/query/test_print.go new file mode 100644 index 0000000000..70bc292439 --- /dev/null +++ b/pkg/query/test_print.go @@ -0,0 +1,34 @@ +package query + +import ( + "fmt" + + "github.com/prometheus/prometheus/storage" +) + +type printSeriesSet struct { + set storage.SeriesSet +} + +func newPrintSeriesSet(set storage.SeriesSet) storage.SeriesSet { + return &printSeriesSet{set: set} +} + +func (s *printSeriesSet) Next() bool { + return s.set.Next() +} + +func (s *printSeriesSet) At() storage.Series { + at := s.set.At() + fmt.Println("Series", at.Labels()) + + i := at.Iterator() + for i.Next() { + fmt.Println(i.At()) + } + return at +} + +func (s *printSeriesSet) Err() error { + return s.set.Err() +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 5c1df9a9b7..2163ec8b7a 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -11,6 +11,8 @@ import ( "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -69,7 +71,7 @@ type Shipper struct { metrics *metrics bucket objstore.Bucket labels func() labels.Labels - source block.SourceType + source metadata.SourceType } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -80,7 +82,7 @@ func New( dir string, bucket objstore.Bucket, lbls func() labels.Labels, - source block.SourceType, + source metadata.SourceType, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -114,7 +116,7 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { minTime = math.MaxInt64 maxSyncTime = math.MinInt64 - if err := s.iterBlockMetas(func(m *block.Meta) error { + if err := s.iterBlockMetas(func(m *metadata.Meta) error { if m.MinTime < minTime { minTime = m.MinTime } @@ -158,7 +160,7 @@ func (s *Shipper) Sync(ctx context.Context) { // TODO(bplotka): If there are no blocks in the system check for WAL dir to ensure we have actually // access to real TSDB dir (!). - if err = s.iterBlockMetas(func(m *block.Meta) error { + if err = s.iterBlockMetas(func(m *metadata.Meta) error { // Do not sync a block if we already uploaded it. If it is no longer found in the bucket, // it was generally removed by the compaction process. if _, ok := hasUploaded[m.ULID]; !ok { @@ -180,7 +182,7 @@ func (s *Shipper) Sync(ctx context.Context) { } } -func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { +func (s *Shipper) sync(ctx context.Context, meta *metadata.Meta) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) // We only ship of the first compacted block level. @@ -225,7 +227,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { meta.Thanos.Labels = lset.Map() } meta.Thanos.Source = s.source - if err := block.WriteMetaFile(s.logger, updir, meta); err != nil { + if err := metadata.Write(s.logger, updir, meta); err != nil { return errors.Wrap(err, "write meta file") } return block.Upload(ctx, s.logger, s.bucket, updir) @@ -234,7 +236,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { // iterBlockMetas calls f with the block meta for each block found in dir. It logs // an error and continues if it cannot access a meta.json file. // If f returns an error, the function returns with the same error. -func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error { +func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { names, err := fileutil.ReadDir(s.dir) if err != nil { return errors.Wrap(err, "read dir") @@ -253,7 +255,7 @@ func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error { if !fi.IsDir() { continue } - m, err := block.ReadMetaFile(dir) + m, err := metadata.Read(dir) if err != nil { level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) continue diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 8f0e70ecd6..496d53322f 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/testutil" @@ -32,7 +33,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, block.TestSource) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -54,7 +55,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { testutil.Ok(t, os.Mkdir(tmp, 0777)) - meta := block.Meta{ + meta := metadata.Meta{ BlockMeta: tsdb.BlockMeta{ MinTime: timestamp.FromTime(now.Add(time.Duration(i) * time.Hour)), MaxTime: timestamp.FromTime(now.Add((time.Duration(i) * time.Hour) + 1)), @@ -62,7 +63,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { } meta.Version = 1 meta.ULID = id - meta.Thanos.Source = block.TestSource + meta.Thanos.Source = metadata.TestSource metab, err := json.Marshal(&meta) testutil.Ok(t, err) diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 368c5e92b9..150b5264db 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -2,15 +2,13 @@ package shipper import ( "io/ioutil" - "os" - "testing" - "math" - + "os" "path" + "testing" "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/oklog/ulid" "github.com/prometheus/tsdb" @@ -23,7 +21,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, block.TestSource) + s := New(nil, nil, dir, nil, nil, metadata.TestSource) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -41,7 +39,7 @@ func TestShipperTimestamps(t *testing.T) { id1 := ulid.MustNew(1, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm)) - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id1.String()), &block.Meta{ + testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &metadata.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id1, @@ -56,7 +54,7 @@ func TestShipperTimestamps(t *testing.T) { id2 := ulid.MustNew(2, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm)) - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id2.String()), &block.Meta{ + testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &metadata.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id2, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ee23a69124..3b45756cbd 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/pool" @@ -960,7 +961,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...labels.Matcher) ([]labels.Mat type bucketBlock struct { logger log.Logger bucket objstore.BucketReader - meta *block.Meta + meta *metadata.Meta dir string indexCache *indexCache chunkPool *pool.BytesPool @@ -1024,7 +1025,7 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { } else if err != nil { return err } - meta, err := block.ReadMetaFile(b.dir) + meta, err := metadata.Read(b.dir) if err != nil { return errors.Wrap(err, "read meta.json") } @@ -1054,13 +1055,7 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { } }() - indexr, err := index.NewFileReader(fn) - if err != nil { - return errors.Wrap(err, "open index reader") - } - defer runutil.CloseWithLogOnErr(b.logger, indexr, "load index cache reader") - - if err := block.WriteIndexCache(b.logger, cachefn, indexr); err != nil { + if err := block.WriteIndexCache(b.logger, fn, cachefn); err != nil { return errors.Wrap(err, "write index cache") } @@ -1144,7 +1139,7 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB cache: cache, loadedSeries: map[uint64][]byte{}, } - r.dec.SetSymbolTable(r.block.symbols) + r.dec.LookupSymbol = r.lookupSymbol return r } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index c5528121ac..f1f6780397 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/runutil" @@ -59,10 +60,10 @@ func prepareStoreWithTestBlocks(t testing.TB, ctx context.Context, dir string, b dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) // Add labels to the meta of the second block. - meta, err := block.ReadMetaFile(dir2) + meta, err := metadata.Read(dir2) testutil.Ok(t, err) meta.Thanos.Labels = map[string]string{"ext2": "value2"} - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta)) + testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta)) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1)) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2)) @@ -72,7 +73,7 @@ func prepareStoreWithTestBlocks(t testing.TB, ctx context.Context, dir string, b testutil.Ok(t, os.RemoveAll(dir2)) } - store, err := NewBucketStore(nil, nil, bkt, dir, 100, 0, false) + store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false) testutil.Ok(t, err) go func() { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 06d9d2f98b..f2880dd9b4 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -4,13 +4,11 @@ import ( "testing" "time" - "github.com/oklog/ulid" - - "github.com/improbable-eng/thanos/pkg/compact/downsample" - "github.com/fortytw2/leaktest" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/oklog/ulid" "github.com/prometheus/tsdb/labels" ) @@ -41,7 +39,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { } for _, in := range input { - var m block.Meta + var m metadata.Meta m.Thanos.Downsample.Resolution = in.window m.MinTime = in.mint m.MaxTime = in.maxt @@ -102,7 +100,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { var exp []*bucketBlock for _, b := range c.res { - var m block.Meta + var m metadata.Meta m.Thanos.Downsample.Resolution = b.window m.MinTime = b.mint m.MaxTime = b.maxt @@ -129,7 +127,7 @@ func TestBucketBlockSet_remove(t *testing.T) { } for _, in := range input { - var m block.Meta + var m metadata.Meta m.ULID = in.id m.MinTime = in.mint m.MaxTime = in.maxt diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 567d6a5fe1..c4218a1e14 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -122,6 +122,7 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) { _, err = a.Add(labels.FromStrings("a", "c"), 0, 1) testutil.Ok(t, err) _, err = a.Add(labels.FromStrings("a", "a"), 0, 1) + testutil.Ok(t, err) testutil.Ok(t, a.Commit()) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index 807b61f8a0..7cf869f4d5 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -12,8 +12,9 @@ import ( "syscall" "time" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -157,7 +158,7 @@ func (p *Prometheus) SetConfig(s string) (err error) { // Stop terminates Prometheus and clean up its data directory. func (p *Prometheus) Stop() error { if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil { - return errors.Wrapf(err, "failed to Prometheus. Kill it manually and cleanr %s dir", p.db.Dir()) + return errors.Wrapf(err, "failed to Prometheus. Kill it manually and clean %s dir", p.db.Dir()) } time.Sleep(time.Second / 2) @@ -188,7 +189,7 @@ func CreateBlock( extLset labels.Labels, resolution int64, ) (id ulid.ULID, err error) { - h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000) + h, err := tsdb.NewHead(nil, nil, nil, 10000000000) if err != nil { return id, errors.Wrap(err, "create head block") } @@ -238,15 +239,15 @@ func CreateBlock( return id, errors.Wrap(err, "create compactor") } - id, err = c.Write(dir, h, mint, maxt) + id, err = c.Write(dir, h, mint, maxt, nil) if err != nil { return id, errors.Wrap(err, "write block") } - if _, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(dir, id.String()), block.ThanosMeta{ + if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{ Labels: extLset.Map(), - Downsample: block.ThanosDownsampleMeta{Resolution: resolution}, - Source: block.TestSource, + Downsample: metadata.ThanosDownsample{Resolution: resolution}, + Source: metadata.TestSource, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } @@ -256,4 +257,4 @@ func CreateBlock( } return id, nil -} +} \ No newline at end of file diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 54a20703d4..72100b15bc 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -8,6 +8,8 @@ import ( "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -94,7 +96,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac logger, tmpdir, id, - block.BucketRepairSource, + metadata.BucketRepairSource, block.IgnoreCompleteOutsideChunk, block.IgnoreDuplicateOutsideChunk, block.IgnoreIssue347OutsideChunk,