diff --git a/CHANGELOG.md b/CHANGELOG.md index 731c42ca..88e6c0b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,31 @@ ## master / unreleased + - [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode. - `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s and for this the interface is refactored to return the full block meta instead of just MinTime/MaxTime. Required to allow reading the ULID of a block. + +## 0.9.1 + + - [CHANGE] LiveReader metrics are now injected rather than global. + +## 0.9.0 + - [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609) - [BUGFIX] Re-calculate block size when calling `block.Delete`. + - [BUGFIX] Re-encode all head chunks at compaction that are open (being appended to) or outside the Maxt block range. This avoids writing out corrupt data. It happens when snapshotting with the head included. + - [BUGFIX] Improved handling of multiple refs for the same series in WAL reading. + - [BUGFIX] `prometheus_tsdb_compactions_failed_total` is now incremented on any compaction failure. - [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before. + - [CHANGE] Create new clean segment when starting the WAL. + - [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`. + - [ENHANCEMENT] Improved atomicity of .tmp block replacement during compaction for usual case. + - [ENHANCEMENT] Improved postings intersection matching. + - [ENHANCEMENT] Reduced disk usage for WAL for small setups. + - [ENHANCEMENT] Optimize queries using regexp for set lookups. ## 0.8.0 + - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. - [BUGFIX] Don't panic and recover nicely when running out of disk space. - [BUGFIX] Correctly handle empty labels. @@ -17,9 +35,11 @@ - [FEATURE] Added `currentSegment` metric for the current WAL segment it is being written to. ## 0.7.1 + - [ENHANCEMENT] Reduce memory usage in mergedPostings.Seek ## 0.7.0 + - [CHANGE] tsdb now requires golang 1.12 or higher. - [REMOVED] `chunks.NewReader` is removed as it wasn't used anywhere. - [REMOVED] `FromData` is considered unused so was removed. @@ -35,12 +55,15 @@ - [ENHANCEMENT] PostListings and NotMatcher now public. ## 0.6.1 + - [BUGFIX] Update `last` after appending a non-overlapping chunk in `chunks.MergeOverlappingChunks`. [#539](https://github.com/prometheus/tsdb/pull/539) ## 0.6.0 + - [CHANGE] `AllowOverlappingBlock` is now `AllowOverlappingBlocks`. ## 0.5.0 + - [FEATURE] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370) - Disabled by default and can be enabled via `AllowOverlappingBlock` option. - Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks. @@ -56,6 +79,7 @@ - [BUGFIX] LiveReader can get into an infinite loop on corrupt WALs. ## 0.4.0 + - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: @@ -67,9 +91,11 @@ - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. ## 0.3.1 + - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. ## 0.3.0 + - [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path. - [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct. - [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors. diff --git a/Makefile.common b/Makefile.common index 2dcf764a..48d2ff84 100644 --- a/Makefile.common +++ b/Makefile.common @@ -69,7 +69,7 @@ else GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH) endif -PROMU_VERSION ?= 0.4.0 +PROMU_VERSION ?= 0.5.0 PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz GOLANGCI_LINT := diff --git a/block.go b/block.go index 717a5b2d..d0fe2b2f 100644 --- a/block.go +++ b/block.go @@ -450,7 +450,10 @@ func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, erro func (r blockIndexReader) Postings(name, value string) (index.Postings, error) { p, err := r.ir.Postings(name, value) - return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) + if err != nil { + return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) + } + return p, nil } func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { @@ -458,11 +461,10 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { } func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { - return errors.Wrapf( - r.ir.Series(ref, lset, chks), - "block: %s", - r.b.Meta().ULID, - ) + if err := r.ir.Series(ref, lset, chks); err != nil { + return errors.Wrapf(err, "block: %s", r.b.Meta().ULID) + } + return nil } func (r blockIndexReader) LabelIndices() ([][]string, error) { diff --git a/block_test.go b/block_test.go index 6718051a..c56d08c4 100644 --- a/block_test.go +++ b/block_test.go @@ -16,6 +16,7 @@ package tsdb import ( "context" "encoding/binary" + "errors" "io/ioutil" "math/rand" @@ -56,7 +57,7 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, os.RemoveAll(tmpdir)) }() - blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) + blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) @@ -133,7 +134,7 @@ func TestCorruptedChunk(t *testing.T) { testutil.Ok(t, os.RemoveAll(tmpdir)) }() - blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) + blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) files, err := sequenceFiles(chunkDir(blockDir)) testutil.Ok(t, err) testutil.Assert(t, len(files) > 0, "No chunk created.") @@ -213,7 +214,9 @@ func createBlock(tb testing.TB, dir string, series []Series) string { testutil.Ok(tb, os.MkdirAll(dir, 0777)) - ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) testutil.Ok(tb, err) return filepath.Join(dir, ulid.String()) } @@ -265,7 +268,7 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series { lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) } samples := make([]tsdbutil.Sample, 0, maxt-mint+1) - for t := mint; t <= maxt; t++ { + for t := mint; t < maxt; t++ { samples = append(samples, sample{t: t, v: rand.Float64()}) } series[i] = newSeries(lbls, samples) diff --git a/chunks/chunks.go b/chunks/chunks.go index 70cb119c..9ce8c57d 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -51,7 +51,9 @@ type Meta struct { Ref uint64 Chunk chunkenc.Chunk - MinTime, MaxTime int64 // time range the data covers + // Time range the data covers. + // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. + MinTime, MaxTime int64 } // writeHash writes the chunk encoding and raw data into the provided hash. @@ -218,7 +220,7 @@ func MergeOverlappingChunks(chks []Meta) ([]Meta, error) { // So never overlaps with newChks[last-1] or anything before that. if c.MinTime > newChks[last].MaxTime { newChks = append(newChks, c) - last += 1 + last++ continue } nc := &newChks[last] diff --git a/compact.go b/compact.go index 29b98eea..a41b5252 100644 --- a/compact.go +++ b/compact.go @@ -757,6 +757,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for i, chk := range chks { + // Re-encode head chunks that are still open (being appended to) or + // outside the compacted MaxTime range. + // The chunk.Bytes() method is not safe for open chunks hence the re-encoding. + // This happens when snapshotting the head block. + // + // Block time range is half-open: [meta.MinTime, meta.MaxTime) and + // chunks are closed hence the chk.MaxTime >= meta.MaxTime check. + // + // TODO think how to avoid the typecasting to verify when it is head block. + if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime { + dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) + + } else + // Sanity check for disk blocks. + // chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that. if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime { return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d", chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime) @@ -774,12 +789,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} + + var ( + t int64 + v float64 + ) for it.Next() { - ts, v := it.At() - app.Append(ts, v) + t, v = it.At() + app.Append(t, v) + } + if err := it.Err(); err != nil { + return errors.Wrap(err, "iterate chunk while re-encoding") } chks[i].Chunk = newChunk + chks[i].MaxTime = t } } diff --git a/db.go b/db.go index 7ad6f615..e9aa5cdd 100644 --- a/db.go +++ b/db.go @@ -1101,8 +1101,20 @@ func (db *DB) Snapshot(dir string, withHead bool) error { if !withHead { return nil } - _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil) - return errors.Wrap(err, "snapshot head block") + + mint := db.head.MinTime() + maxt := db.head.MaxTime() + head := &rangeHead{ + head: db.head, + mint: mint, + maxt: maxt, + } + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + if _, err := db.compactor.Write(dir, head, mint, maxt+1, nil); err != nil { + return errors.Wrap(err, "snapshot head block") + } + return nil } // Querier returns a new querier over the data partition for the given time range. diff --git a/db_test.go b/db_test.go index 0732c5dd..14502cce 100644 --- a/db_test.go +++ b/db_test.go @@ -489,6 +489,62 @@ func TestDB_Snapshot(t *testing.T) { testutil.Equals(t, 1000.0, sum) } +// TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples +// that are outside the set block time range. +// See https://github.com/prometheus/prometheus/issues/5105 +func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { + db, delete := openTestDB(t, nil) + defer delete() + + app := db.Appender() + mint := int64(1414141414000) + for i := 0; i < 1000; i++ { + _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + testutil.Ok(t, app.Rollback()) + + snap, err := ioutil.TempDir("", "snap") + testutil.Ok(t, err) + + // Hackingly introduce "race", by having lower max time then maxTime in last chunk. + db.head.maxTime = db.head.maxTime - 10 + + defer func() { + testutil.Ok(t, os.RemoveAll(snap)) + }() + testutil.Ok(t, db.Snapshot(snap, true)) + testutil.Ok(t, db.Close()) + + // Reopen DB from snapshot. + db, err = Open(snap, nil, nil, nil) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, db.Close()) }() + + querier, err := db.Querier(mint, mint+1000) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, querier.Close()) }() + + // Sum values. + seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + testutil.Ok(t, err) + + sum := 0.0 + for seriesSet.Next() { + series := seriesSet.At().Iterator() + for series.Next() { + _, v := series.At() + sum += v + } + testutil.Ok(t, series.Err()) + } + testutil.Ok(t, seriesSet.Err()) + + // Since we snapshotted with MaxTime - 10, so expect 10 less samples. + testutil.Equals(t, 1000.0-10, sum) +} + func TestDB_SnapshotWithDelete(t *testing.T) { numSamples := int64(10) @@ -930,7 +986,7 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 0)) + blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1)) block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. @@ -974,7 +1030,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 0)), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) diff --git a/head.go b/head.go index 138e1598..878c508b 100644 --- a/head.go +++ b/head.go @@ -1318,9 +1318,15 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks if !c.OverlapsClosedInterval(h.mint, h.maxt) { continue } + // Set the head chunks as open (being appended to). + maxTime := c.maxTime + if s.headChunk == c { + maxTime = math.MaxInt64 + } + *chks = append(*chks, chunks.Meta{ MinTime: c.minTime, - MaxTime: c.maxTime, + MaxTime: maxTime, Ref: packChunkID(s.ref, uint64(s.chunkID(i))), }) } diff --git a/wal/live_reader.go b/wal/live_reader.go index fb048523..94175e79 100644 --- a/wal/live_reader.go +++ b/wal/live_reader.go @@ -27,26 +27,40 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// liveReaderMetrics holds all metrics exposed by the LiveReader. +type liveReaderMetrics struct { + readerCorruptionErrors *prometheus.CounterVec +} + +// LiveReaderMetrics instatiates, registers and returns metrics to be injected +// at LiveReader instantiation. +func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { + m := &liveReaderMetrics{ + readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_reader_corruption_errors_total", + Help: "Errors encountered when reading the WAL.", + }, []string{"error"}), + } + + if reg != nil { + reg.Register(m.readerCorruptionErrors) + } + + return m +} + // NewLiveReader returns a new live reader. -func NewLiveReader(logger log.Logger, reg prometheus.Registerer, r io.Reader) *LiveReader { +func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader { lr := &LiveReader{ - logger: logger, - rdr: r, + logger: logger, + rdr: r, + metrics: metrics, // Until we understand how they come about, make readers permissive // to records spanning pages. permissive: true, } - lr.readerCorruptionErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_reader_corruption_errors_total", - Help: "Errors encountered when reading the WAL.", - }, []string{"error"}) - - if reg != nil { - reg.MustRegister(lr.readerCorruptionErrors) - } - return lr } @@ -74,7 +88,7 @@ type LiveReader struct { // NB the non-ive Reader implementation allows for this. permissive bool - readerCorruptionErrors *prometheus.CounterVec + metrics *liveReaderMetrics } // Err returns any errors encountered reading the WAL. io.EOFs are not terminal @@ -282,7 +296,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) { if !r.permissive { return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize) } - r.readerCorruptionErrors.WithLabelValues("record_span_page").Inc() + r.metrics.readerCorruptionErrors.WithLabelValues("record_span_page").Inc() level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize) } if recordHeaderSize+length > pageSize { diff --git a/wal/reader_test.go b/wal/reader_test.go index 1e15cae8..96d15225 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -51,7 +51,7 @@ var readerConstructors = map[string]func(io.Reader) reader{ return NewReader(r) }, "LiveReader": func(r io.Reader) reader { - lr := NewLiveReader(log.NewNopLogger(), nil, r) + lr := NewLiveReader(log.NewNopLogger(), NewLiveReaderMetrics(nil), r) lr.eofNonErr = true return lr }, @@ -216,7 +216,7 @@ func TestReader_Live(t *testing.T) { // Read from a second FD on the same file. readFd, err := os.Open(writeFd.Name()) testutil.Ok(t, err) - reader := NewLiveReader(logger, nil, readFd) + reader := NewLiveReader(logger, NewLiveReaderMetrics(nil), readFd) for _, exp := range testReaderCases[i].exp { for !reader.Next() { testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err()) @@ -518,7 +518,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { testutil.Ok(t, err) defer seg.Close() - r := NewLiveReader(logger, nil, seg) + r := NewLiveReader(logger, NewLiveReaderMetrics(nil), seg) testutil.Assert(t, r.Next() == false, "expected no records") testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err()) }