Skip to content

Commit

Permalink
Merge branch 'master' into read-only-alternative
Browse files Browse the repository at this point in the history
  • Loading branch information
krasi-georgiev committed Jul 4, 2019
2 parents 2b4ddbc + d230c67 commit 32ae42e
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 37 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
## master / unreleased

- [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode.
- `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s and
for this the interface is refactored to return the full block meta instead of
just MinTime/MaxTime. Required to allow reading the ULID of a block.

## 0.9.1

- [CHANGE] LiveReader metrics are now injected rather than global.

## 0.9.0

- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
- [BUGFIX] Re-calculate block size when calling `block.Delete`.
- [BUGFIX] Re-encode all head chunks at compaction that are open (being appended to) or outside the Maxt block range. This avoids writing out corrupt data. It happens when snapshotting with the head included.
- [BUGFIX] Improved handling of multiple refs for the same series in WAL reading.
- [BUGFIX] `prometheus_tsdb_compactions_failed_total` is now incremented on any compaction failure.
- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before.
- [CHANGE] Create new clean segment when starting the WAL.
- [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`.
- [ENHANCEMENT] Improved atomicity of .tmp block replacement during compaction for usual case.
- [ENHANCEMENT] Improved postings intersection matching.
- [ENHANCEMENT] Reduced disk usage for WAL for small setups.
- [ENHANCEMENT] Optimize queries using regexp for set lookups.

## 0.8.0

- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.
- [BUGFIX] Don't panic and recover nicely when running out of disk space.
- [BUGFIX] Correctly handle empty labels.
Expand All @@ -17,9 +35,11 @@
- [FEATURE] Added `currentSegment` metric for the current WAL segment it is being written to.

## 0.7.1

- [ENHANCEMENT] Reduce memory usage in mergedPostings.Seek

## 0.7.0

- [CHANGE] tsdb now requires golang 1.12 or higher.
- [REMOVED] `chunks.NewReader` is removed as it wasn't used anywhere.
- [REMOVED] `FromData` is considered unused so was removed.
Expand All @@ -35,12 +55,15 @@
- [ENHANCEMENT] PostListings and NotMatcher now public.

## 0.6.1

- [BUGFIX] Update `last` after appending a non-overlapping chunk in `chunks.MergeOverlappingChunks`. [#539](https://github.com/prometheus/tsdb/pull/539)

## 0.6.0

- [CHANGE] `AllowOverlappingBlock` is now `AllowOverlappingBlocks`.

## 0.5.0

- [FEATURE] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370)
- Disabled by default and can be enabled via `AllowOverlappingBlock` option.
- Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks.
Expand All @@ -56,6 +79,7 @@
- [BUGFIX] LiveReader can get into an infinite loop on corrupt WALs.

## 0.4.0

- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
Expand All @@ -67,9 +91,11 @@
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.

## 0.3.1

- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.

## 0.3.0

- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
- [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct.
- [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors.
Expand Down
2 changes: 1 addition & 1 deletion Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ else
GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH)
endif

PROMU_VERSION ?= 0.4.0
PROMU_VERSION ?= 0.5.0
PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz

GOLANGCI_LINT :=
Expand Down
14 changes: 8 additions & 6 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,21 @@ func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, erro

func (r blockIndexReader) Postings(name, value string) (index.Postings, error) {
p, err := r.ir.Postings(name, value)
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
if err != nil {
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return p, nil
}

func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}

func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
return errors.Wrapf(
r.ir.Series(ref, lset, chks),
"block: %s",
r.b.Meta().ULID,
)
if err := r.ir.Series(ref, lset, chks); err != nil {
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return nil
}

func (r blockIndexReader) LabelIndices() ([][]string, error) {
Expand Down
11 changes: 7 additions & 4 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tsdb
import (
"context"
"encoding/binary"

"errors"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()

blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0))
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()

blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0))
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
files, err := sequenceFiles(chunkDir(blockDir))
testutil.Ok(t, err)
testutil.Assert(t, len(files) > 0, "No chunk created.")
Expand Down Expand Up @@ -213,7 +214,9 @@ func createBlock(tb testing.TB, dir string, series []Series) string {

testutil.Ok(tb, os.MkdirAll(dir, 0777))

ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil)
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil)
testutil.Ok(tb, err)
return filepath.Join(dir, ulid.String())
}
Expand Down Expand Up @@ -265,7 +268,7 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t <= maxt; t++ {
for t := mint; t < maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series[i] = newSeries(lbls, samples)
Expand Down
6 changes: 4 additions & 2 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ type Meta struct {
Ref uint64
Chunk chunkenc.Chunk

MinTime, MaxTime int64 // time range the data covers
// Time range the data covers.
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
MinTime, MaxTime int64
}

// writeHash writes the chunk encoding and raw data into the provided hash.
Expand Down Expand Up @@ -218,7 +220,7 @@ func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
last += 1
last++
continue
}
nc := &newChks[last]
Expand Down
28 changes: 26 additions & 2 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}

for i, chk := range chks {
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block.
//
// Block time range is half-open: [meta.MinTime, meta.MaxTime) and
// chunks are closed hence the chk.MaxTime >= meta.MaxTime check.
//
// TODO think how to avoid the typecasting to verify when it is head block.
if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime {
dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})

} else
// Sanity check for disk blocks.
// chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that.
if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
Expand All @@ -774,12 +789,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}

it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}

var (
t int64
v float64
)
for it.Next() {
ts, v := it.At()
app.Append(ts, v)
t, v = it.At()
app.Append(t, v)
}
if err := it.Err(); err != nil {
return errors.Wrap(err, "iterate chunk while re-encoding")
}

chks[i].Chunk = newChunk
chks[i].MaxTime = t
}
}

Expand Down
16 changes: 14 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,8 +1101,20 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
if !withHead {
return nil
}
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil)
return errors.Wrap(err, "snapshot head block")

mint := db.head.MinTime()
maxt := db.head.MaxTime()
head := &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
}
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
if _, err := db.compactor.Write(dir, head, mint, maxt+1, nil); err != nil {
return errors.Wrap(err, "snapshot head block")
}
return nil
}

// Querier returns a new querier over the data partition for the given time range.
Expand Down
60 changes: 58 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,62 @@ func TestDB_Snapshot(t *testing.T) {
testutil.Equals(t, 1000.0, sum)
}

// TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples
// that are outside the set block time range.
// See https://github.com/prometheus/prometheus/issues/5105
func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
db, delete := openTestDB(t, nil)
defer delete()

app := db.Appender()
mint := int64(1414141414000)
for i := 0; i < 1000; i++ {
_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
testutil.Ok(t, app.Rollback())

snap, err := ioutil.TempDir("", "snap")
testutil.Ok(t, err)

// Hackingly introduce "race", by having lower max time then maxTime in last chunk.
db.head.maxTime = db.head.maxTime - 10

defer func() {
testutil.Ok(t, os.RemoveAll(snap))
}()
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())

// Reopen DB from snapshot.
db, err = Open(snap, nil, nil, nil)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, db.Close()) }()

querier, err := db.Querier(mint, mint+1000)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, querier.Close()) }()

// Sum values.
seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
testutil.Ok(t, err)

sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
for series.Next() {
_, v := series.At()
sum += v
}
testutil.Ok(t, series.Err())
}
testutil.Ok(t, seriesSet.Err())

// Since we snapshotted with MaxTime - 10, so expect 10 less samples.
testutil.Equals(t, 1000.0-10, sum)
}

func TestDB_SnapshotWithDelete(t *testing.T) {
numSamples := int64(10)

Expand Down Expand Up @@ -930,7 +986,7 @@ func TestTombstoneCleanFail(t *testing.T) {
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 0))
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1))
block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction.
Expand Down Expand Up @@ -974,7 +1030,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}

block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 0)), nil)
block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil)
testutil.Ok(c.t, err)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block)
Expand Down
8 changes: 7 additions & 1 deletion head.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,9 +1318,15 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
}
// Set the head chunks as open (being appended to).
maxTime := c.maxTime
if s.headChunk == c {
maxTime = math.MaxInt64
}

*chks = append(*chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
MaxTime: maxTime,
Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
})
}
Expand Down
Loading

0 comments on commit 32ae42e

Please sign in to comment.