diff --git a/CHANGELOG.md b/CHANGELOG.md index c663993ed0..ed5b877aef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Unreleased + +### Fixed + +- [#1070](https://github.com/improbable-eng/thanos/pull/1070) Downsampling works back again. Deferred closer errors are now properly captured. + + ## [v0.4.0-rc.0](https://github.com/improbable-eng/thanos/releases/tag/v0.4.0-rc.0) - 2019.04.18 :warning: **IMPORTANT** :warning: This is the last release that supports gossip. From Thanos v0.5.0, gossip will be completely removed. diff --git a/pkg/block/index.go b/pkg/block/index.go index fc25cfc378..f9f36d55aa 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -340,7 +340,7 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime if err != nil { return stats, errors.Wrap(err, "open index file") } - defer runutil.CloseWithErrCapture(logger, &err, r, "gather index issue file reader") + defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader") p, err := r.Postings(index.AllPostingsKey()) if err != nil { @@ -460,19 +460,19 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT if err != nil { return resid, errors.Wrap(err, "open block") } - defer runutil.CloseWithErrCapture(logger, &err, b, "repair block reader") + defer runutil.CloseWithErrCapture(&err, b, "repair block reader") indexr, err := b.Index() if err != nil { return resid, errors.Wrap(err, "open index") } - defer runutil.CloseWithErrCapture(logger, &err, indexr, "repair index reader") + defer runutil.CloseWithErrCapture(&err, indexr, "repair index reader") chunkr, err := b.Chunks() if err != nil { return resid, errors.Wrap(err, "open chunks") } - defer runutil.CloseWithErrCapture(logger, &err, chunkr, "repair chunk reader") + defer runutil.CloseWithErrCapture(&err, chunkr, "repair chunk reader") resdir := filepath.Join(dir, resid.String()) @@ -480,13 +480,13 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT if err != nil { return resid, errors.Wrap(err, "open chunk writer") } - defer runutil.CloseWithErrCapture(logger, &err, chunkw, "repair chunk writer") + defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk writer") indexw, err := index.NewWriter(filepath.Join(resdir, IndexFilename)) if err != nil { return resid, errors.Wrap(err, "open index writer") } - defer runutil.CloseWithErrCapture(logger, &err, indexw, "repair index writer") + defer runutil.CloseWithErrCapture(&err, indexw, "repair index writer") // TODO(fabxc): adapt so we properly handle the version once we update to an upstream // that has multiple. diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 2263d61359..92d07af266 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -43,13 +43,13 @@ func Downsample( if err != nil { return id, errors.Wrap(err, "open index reader") } - defer runutil.CloseWithErrCapture(logger, &err, indexr, "downsample index reader") + defer runutil.CloseWithErrCapture(&err, indexr, "downsample index reader") chunkr, err := b.Chunks() if err != nil { return id, errors.Wrap(err, "open chunk reader") } - defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader") + defer runutil.CloseWithErrCapture(&err, chunkr, "downsample chunk reader") // Generate new block id. uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) @@ -81,12 +81,13 @@ func Downsample( if err != nil { return id, errors.Wrap(err, "get streamed block writer") } - defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer") + defer runutil.CloseWithErrCapture(&err, streamedBlockWriter, "close stream block writer") postings, err := indexr.Postings(index.AllPostingsKey()) if err != nil { return id, errors.Wrap(err, "get all postings list") } + var ( aggrChunks []*AggrChunk all []sample diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 731faa5b53..a93ed7c2e7 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -163,6 +163,9 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta id, err := Downsample(log.NewNopLogger(), meta, mb, dir, resolution) testutil.Ok(t, err) + _, err = metadata.Read(filepath.Join(dir, id.String())) + testutil.Ok(t, err) + exp := map[uint64]map[AggrType][]sample{} got := map[uint64]map[AggrType][]sample{} diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 16213c0cd0..896550a378 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -167,26 +167,20 @@ func (w *streamedBlockWriter) Close() error { if w.finalized { return nil } - - var merr tsdb.MultiError w.finalized = true - // Finalise data block only if there wasn't any internal errors. - if !w.ignoreFinalize { - merr.Add(w.finalize()) - } + merr := tsdb.MultiError{} - for _, cl := range w.closers { - merr.Add(cl.Close()) + if w.ignoreFinalize { + // Close open file descriptors anyway. + for _, cl := range w.closers { + merr.Add(cl.Close()) + } + return merr.Err() } - return errors.Wrap(merr.Err(), "close closers") -} + // Finalize saves prepared index and metadata to corresponding files. -// finalize saves prepared index and meta data to corresponding files. -// It is called on Close. Even if an error happened outside of StreamWriter, it will finalize the block anyway, -// so it's a caller's responsibility to remove the block's directory. -func (w *streamedBlockWriter) finalize() error { if err := w.writeLabelSets(); err != nil { return errors.Wrap(err, "write label sets") } @@ -195,7 +189,15 @@ func (w *streamedBlockWriter) finalize() error { return errors.Wrap(err, "write mem postings") } - if err := w.writeIndexCache(); err != nil { + for _, cl := range w.closers { + merr.Add(cl.Close()) + } + + if err := block.WriteIndexCache( + w.logger, + filepath.Join(w.blockDir, block.IndexFilename), + filepath.Join(w.blockDir, block.IndexCacheFilename), + ); err != nil { return errors.Wrap(err, "write index cache") } @@ -207,8 +209,14 @@ func (w *streamedBlockWriter) finalize() error { return errors.Wrap(err, "sync blockDir") } + if err := merr.Err(); err != nil { + return errors.Wrap(err, "finalize") + } + + // No error, claim success. + level.Info(w.logger).Log( - "msg", "write downsampled block", + "msg", "finalized downsampled block", "mint", w.meta.MinTime, "maxt", w.meta.MaxTime, "ulid", w.meta.ULID, @@ -224,7 +232,7 @@ func (w *streamedBlockWriter) syncDir() (err error) { return errors.Wrap(err, "open temporary block blockDir") } - defer runutil.CloseWithErrCapture(w.logger, &err, df, "close temporary block blockDir") + defer runutil.CloseWithErrCapture(&err, df, "close temporary block blockDir") if err := fileutil.Fsync(df); err != nil { return errors.Wrap(err, "sync temporary blockDir") @@ -257,16 +265,6 @@ func (w *streamedBlockWriter) writeMemPostings() error { return nil } -func (w *streamedBlockWriter) writeIndexCache() error { - indexFile := filepath.Join(w.blockDir, block.IndexFilename) - indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename) - if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil { - return errors.Wrap(err, "write index cache") - } - - return nil -} - // writeMetaFile writes meta file. func (w *streamedBlockWriter) writeMetaFile() error { w.meta.Version = metadata.MetaVersion1 diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index d6dbfe7e6a..10e58ea4e0 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -33,7 +33,7 @@ // For capturing error, use CloseWithErrCapture: // // var err error -// defer runutil.CloseWithErrCapture(logger, &err, closer, "log format message") +// defer runutil.CloseWithErrCapture(&err, closer, "log format message") // // // ... // @@ -49,6 +49,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/tsdb" ) // Repeat executes f every interval seconds until stopc is closed. @@ -107,26 +108,13 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ... level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...))) } -// CloseWithErrCapture runs function and on error tries to return error by argument. -// If error is already there we assume that error has higher priority and we just log the function error. -func CloseWithErrCapture(logger log.Logger, err *error, closer io.Closer, format string, a ...interface{}) { - closeErr := closer.Close() - if closeErr == nil { - return - } - - if *err == nil { - err = &closeErr - return - } +// CloseWithErrCapture runs function and on error return error by argument including the given error (usually +// from caller function). +func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) { + merr := tsdb.MultiError{} - // There is already an error, let's log this one. - if logger == nil { - logger = log.NewLogfmtLogger(os.Stderr) - } + merr.Add(*err) + merr.Add(errors.Wrapf(closer.Close(), format, a...)) - level.Warn(logger).Log( - "msg", "detected best effort close error that was preempted from the more important one", - "err", errors.Wrap(closeErr, fmt.Sprintf(format, a...)), - ) + *err = merr.Err() } diff --git a/pkg/runutil/runutil_test.go b/pkg/runutil/runutil_test.go new file mode 100644 index 0000000000..3ed80d17bd --- /dev/null +++ b/pkg/runutil/runutil_test.go @@ -0,0 +1,70 @@ +package runutil + +import ( + "github.com/pkg/errors" + "io" + "testing" +) + +type testCloser struct { + err error +} + +func (c testCloser) Close() error { + return c.err +} + +func TestCloseWithErrCapture(t *testing.T) { + for _, tcase := range []struct{ + err error + closer io.Closer + + expectedErrStr string + }{ + { + err: nil, + closer: testCloser{err:nil}, + expectedErrStr: "", + }, + { + err: errors.New("test"), + closer: testCloser{err:nil}, + expectedErrStr: "test", + }, + { + err: nil, + closer: testCloser{err:errors.New("test")}, + expectedErrStr: "close: test", + }, + { + err: errors.New("test"), + closer: testCloser{err:errors.New("test")}, + expectedErrStr: "2 errors: test; close: test", + }, + }{ + if ok := t.Run("", func(t *testing.T) { + ret := tcase.err + CloseWithErrCapture(&ret, tcase.closer, "close") + + if tcase.expectedErrStr == "" { + if ret != nil { + t.Error("Expected error to be nil") + t.Fail() + } + } else { + if ret == nil { + t.Error("Expected error to be not nil") + t.Fail() + } + + if tcase.expectedErrStr != ret.Error() { + t.Errorf("%s != %s", tcase.expectedErrStr, ret.Error()) + t.Fail() + } + } + + }); !ok { + return + } + } +} \ No newline at end of file diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index ee65642470..dce6866f4f 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -231,7 +231,7 @@ func (p *Prometheus) SetConfig(s string) (err error) { if err != nil { return err } - defer runutil.CloseWithErrCapture(nil, &err, f, "prometheus config") + defer runutil.CloseWithErrCapture(&err, f, "prometheus config") _, err = f.Write([]byte(s)) return err @@ -302,7 +302,7 @@ func createBlock( if err != nil { return id, errors.Wrap(err, "create head block") } - defer runutil.CloseWithErrCapture(log.NewNopLogger(), &err, h, "TSDB Head") + defer runutil.CloseWithErrCapture(&err, h, "TSDB Head") var g errgroup.Group var timeStepSize = (maxt - mint) / int64(numSamples+1)