Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Open db in Read only mode #588

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0997a15
Added db read only open mode and use it for the tsdb cli.
Apr 24, 2019
cd08753
add the DBReadOnly struct
Apr 24, 2019
8128c53
fix wal closing
Apr 24, 2019
83d3683
refactored to update blocks on domand
Apr 25, 2019
a704541
rename to DBView
Apr 25, 2019
a16cd17
revert renaming
Apr 25, 2019
ce25efc
remove metrics
Apr 25, 2019
825d6c0
detach db from openBlocks()
May 6, 2019
ca72a4c
Merge remote-tracking branch 'upstream/master' into read-only-alterna…
May 6, 2019
93af048
Merge remote-tracking branch 'upstream/master' into read-only-alterna…
May 29, 2019
c2d809b
refactor and add tests
May 30, 2019
1b68f0d
added db.Close to close all open blocks for windows
May 30, 2019
4778d73
add a read only interface for a block.
May 31, 2019
32d5aae
nits
Jun 3, 2019
b722198
Merge remote-tracking branch 'upstream/master' into read-only-alterna…
Jun 6, 2019
add6f1e
nits
krasi-georgiev Jun 11, 2019
a2726b4
Merge remote-tracking branch 'upstream/master' into read-only-alterna…
krasi-georgiev Jun 11, 2019
6092063
simplified
krasi-georgiev Jun 12, 2019
fbea5e8
refactored to use the Blockreader API.
krasi-georgiev Jun 13, 2019
3b76b2f
non blocking head closing and use dir hash to ensure read only.
krasi-georgiev Jun 14, 2019
95ba508
Merge branch 'master' into read-only-alternative
krasi-georgiev Jun 24, 2019
764d307
fix wal corruption metrics and head test
krasi-georgiev Jun 25, 2019
bd79d07
nits
krasi-georgiev Jun 25, 2019
106d0e9
nit
krasi-georgiev Jun 25, 2019
2b4ddbc
nits
krasi-georgiev Jul 2, 2019
32ae42e
Merge branch 'master' into read-only-alternative
krasi-georgiev Jul 4, 2019
fb75682
refactor error handling for DirHash and DirSize
krasi-georgiev Jul 9, 2019
1cf409f
NumSeries in Meta() of Head
codesome Jul 16, 2019
0a645ab
Merge pull request #5 from codesome/num-series-read-only
krasi-georgiev Jul 16, 2019
af10877
nits
krasi-georgiev Jul 17, 2019
b8c4781
Merge branch 'read-only-alternative' of github.com:krasi-georgiev/tsd…
krasi-georgiev Jul 17, 2019
df4a374
use channel for the db closing and remove mutex
krasi-georgiev Jul 18, 2019
b3f7774
add ErrClosed and add a test for it.
krasi-georgiev Jul 18, 2019
1a97569
handle mutli errors.
krasi-georgiev Jul 19, 2019
03b33b1
nits
krasi-georgiev Jul 19, 2019
beeaefa
head meta comment
krasi-georgiev Jul 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## master / unreleased
- [FEATURE] New `OpenReadOnly` API to allow opening a database in read only mode.

## 0.7.0
- [CHANGE] tsdb now requires golang 1.12 or higher.
Expand Down
24 changes: 15 additions & 9 deletions cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ func main() {
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
)

safeDBOptions := *tsdb.DefaultOptions
safeDBOptions.RetentionDuration = 0

switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
case benchWriteCmd.FullCommand():
wb := &writeBenchmark{
Expand All @@ -74,17 +71,25 @@ func main() {
}
wb.run()
case listCmd.FullCommand():
db, err := tsdb.Open(*listPath, nil, nil, &safeDBOptions)
db, err := tsdb.OpenReadOnly(*listPath, nil, nil)
if err != nil {
exitWithError(err)
}
printBlocks(db.Blocks(), listCmdHumanReadable)
blocks, err := db.Blocks()
if err != nil {
exitWithError(err)
}
printBlocks(blocks, listCmdHumanReadable)
case analyzeCmd.FullCommand():
db, err := tsdb.Open(*analyzePath, nil, nil, &safeDBOptions)
db, err := tsdb.OpenReadOnly(*analyzePath, nil, nil)

krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
exitWithError(err)
}
blocks, err := db.Blocks()
if err != nil {
exitWithError(err)
}
blocks := db.Blocks()
var block *tsdb.Block
if *analyzeBlockID != "" {
for _, b := range blocks {
Expand All @@ -101,7 +106,8 @@ func main() {
}
analyzeBlock(block, *analyzeLimit)
case dumpCmd.FullCommand():
db, err := tsdb.Open(*dumpPath, nil, nil, &safeDBOptions)
db, err := tsdb.OpenReadOnly(*dumpPath, nil, nil)

krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
exitWithError(err)
}
Expand Down Expand Up @@ -548,7 +554,7 @@ func analyzeBlock(b *tsdb.Block, limit int) {
printInfo(postingInfos)
}

func dumpSamples(db *tsdb.DB, mint, maxt int64) {
func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) {
q, err := db.Querier(mint, maxt)
if err != nil {
exitWithError(err)
Expand Down
119 changes: 117 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type dbMetrics struct {
tombCleanTimer prometheus.Histogram
blocksBytes prometheus.Gauge
sizeRetentionCount prometheus.Counter
walCorruptionsTotal prometheus.Counter
}

func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Expand Down Expand Up @@ -222,6 +223,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_size_retentions_total",
Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.",
})
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_corruptions_total",
Help: "Total number of WAL corruptions.",
})

if r != nil {
r.MustRegister(
Expand All @@ -240,6 +245,107 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
return m
}

// DBReadOnly provides APIs for read only operations on a database.
type DBReadOnly struct {
logger log.Logger
dir string
registerer prometheus.Registerer
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}

// OpenReadOnly returns a new DB in the given directory only for read operations.
func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil {
return nil, err
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}

if l == nil {
l = log.NewNopLogger()
}

db := &DBReadOnly{
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
logger: l,
dir: dir,
registerer: r,
}

return db, nil
}

// Querier loads the wal and returns a new querier over the data partition for the given time range.
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
// A goroutine must not handle more than one open Querier.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? It is not thread safe? You can only invoke Querier once from single DbReadOnly? Can reword comment for that then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a direct comment copy from the existing db.Querier.

It was added here, but can't tell why. Maybe you can give an idea why?
3065be9

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Not sure either, I think it might mean similar one querier = one goroutine, but no sure 100% (:
That will work for now.

func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) {
head, err := NewHead(dbRead.registerer, dbRead.logger, nil, 1)
if err != nil {
return nil, err
}

blocks, err := dbRead.Blocks()
if err != nil {
return nil, err
}

db := &DB{
dir: dbRead.dir,
logger: dbRead.logger,
head: head,
blocks: blocks,
}

// Set the min valid time for the ingested wal samples
// to be no lower than the maxt of the last block.
minValidTime := int64(math.MinInt64)
if len(blocks) > 0 {
minValidTime = blocks[len(blocks)-1].Meta().MaxTime
}

if err := db.head.Init(minValidTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
}

return db.Querier(mint, maxt)
}

// Blocks returns the databases persisted blocks.
func (dbRead *DBReadOnly) Blocks() ([]*Block, error) {
db := &DB{
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
dir: dbRead.dir,
logger: dbRead.logger,
}
db.metrics = newDBMetrics(db, dbRead.registerer)

loadable, corrupted, err := db.openBlocks()
if err != nil {
return nil, err
}

// Corrupted blocks that have been replaced by parents can be safely ignored.
codesome marked this conversation as resolved.
Show resolved Hide resolved
for _, block := range loadable {
for _, b := range block.Meta().Compaction.Parents {
delete(corrupted, b.ULID)
}
}
if len(corrupted) > 0 {
return nil, fmt.Errorf("unexpected corrupted block:%v", corrupted)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}

if len(loadable) == 0 {
return nil, errors.New("no blocks found")
}

sort.Slice(loadable, func(i, j int) bool {
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime
})

blockMetas := make([]BlockMeta, 0, len(loadable))
for _, b := range loadable {
blockMetas = append(blockMetas, b.Meta())
}
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
level.Warn(dbRead.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String())
}
return loadable, nil
}

// Open returns a new DB in the given directory.
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
if err := os.MkdirAll(dir, 0777); err != nil {
Expand Down Expand Up @@ -322,8 +428,17 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
minValidTime = blocks[len(blocks)-1].Meta().MaxTime
}

if err := db.head.Init(minValidTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
if initErr := db.head.Init(minValidTime); initErr != nil {
err := errors.Cause(initErr) // So that we can pick up errors even when wrapped.
if _, ok := err.(*wal.CorruptionErr); ok {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
level.Warn(db.logger).Log("msg", "encountered WAL corruption error, attempting repair", "err", err)
db.metrics.walCorruptionsTotal.Inc()
if err := wlog.Repair(err); err != nil {
return nil, errors.Wrap(err, "repair corrupted WAL")
}
} else {
return nil, errors.Wrap(initErr, "read WAL")
}
}

go db.run()
Expand Down
54 changes: 30 additions & 24 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type headMetrics struct {
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
headTruncateFail prometheus.Counter
headTruncateTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter
Expand Down Expand Up @@ -159,10 +158,6 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
})
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_corruptions_total",
Help: "Total number of WAL corruptions.",
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.",
Expand Down Expand Up @@ -206,7 +201,6 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.maxTime,
m.gcDuration,
m.walTruncateDuration,
m.walCorruptionsTotal,
m.samplesAppended,
m.headTruncateFail,
m.headTruncateTotal,
Expand Down Expand Up @@ -312,7 +306,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
}
}

func (h *Head) loadWAL(r *wal.Reader) error {
func (h *Head) loadWAL(r *wal.Reader) (err error) {
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs uint64
Expand All @@ -328,6 +322,18 @@ func (h *Head) loadWAL(r *wal.Reader) error {
)
wg.Add(n)

defer func() {
// For CorruptionErr ensure to terminate all workers before exiting.
if _, ok := err.(*wal.CorruptionErr); ok {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < n; i++ {
close(inputs[i])
for range outputs[i] {
}
}
wg.Wait()
}
}()

for i := 0; i < n; i++ {
outputs[i] = make(chan []RefSample, 300)
inputs[i] = make(chan []RefSample, 300)
Expand All @@ -345,9 +351,12 @@ func (h *Head) loadWAL(r *wal.Reader) error {
samples []RefSample
tstones []Stone
allStones = newMemTombstones()
err error
)
defer allStones.Close()
defer func() {
if err := allStones.Close(); err != nil {
level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err)
}
}()
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
rec := r.Record()
Expand Down Expand Up @@ -432,9 +441,6 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
}
}
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}

// Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ {
Expand All @@ -444,6 +450,10 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
wg.Wait()

if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}

if err := allStones.Iter(func(ref uint64, dranges Intervals) error {
return h.chunkRewrite(ref, dranges)
}); err != nil {
Expand Down Expand Up @@ -488,23 +498,19 @@ func (h *Head) Init(minValidTime int64) error {
startFrom++
}

// Backfill segments from the last checkpoint onwards
// Backfill segments from the last checkpoint onwards.
sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1})
if err != nil {
return errors.Wrap(err, "open WAL segments")
}

defer func() {
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
}()
err = h.loadWAL(wal.NewReader(sr))
sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows.
if err == nil {
return nil
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
h.metrics.walCorruptionsTotal.Inc()
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}
return nil

return err
}

// Truncate removes old data before mint from the head.
Expand Down
Loading