Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Add tsdb.Scan() to unblock from a corrupted db. #320

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
## master / unreleased
- [FEATURE] New public `Scanner interface` for allowing to build custom CLI tools to repair or handle unrepairable blocks.
- [FEATURE] New `scan` command for the `tsdb cli` tool that used the new tsdb scan interface to run a repair and move all unrepairable blocks out of the data folder to unblock Prometheus at the next startup.
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.

## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.

## 0.3.0
- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
Expand Down
222 changes: 215 additions & 7 deletions cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -48,6 +50,10 @@ func main() {
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String()
scanCmd = cli.Command("scan", "scans the db and promts to remove corrupted blocks")
scanCmdHumanReadable = scanCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
scanPath = scanCmd.Arg("dir", "database path (default is current dir ./)").Default("./").ExistingDir()
logger = level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowError())
)

switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
Expand All @@ -58,16 +64,201 @@ func main() {
samplesFile: *benchSamplesFile,
}
wb.run()

case listCmd.FullCommand():
db, err := tsdb.Open(*listPath, nil, nil, nil)
if err != nil {
exitWithError(err)
}
printBlocks(db.Blocks(), listCmdHumanReadable)

case scanCmd.FullCommand():
if err := scanTmps(*scanPath, scanCmdHumanReadable); err != nil {
exitWithError(err)
}

scan, err := tsdb.NewDBScanner(*scanPath, logger)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
exitWithError(err)
}
if err := scanTombstones(scan, scanCmdHumanReadable); err != nil {
exitWithError(err)
}
if err := scanIndexes(scan, scanCmdHumanReadable); err != nil {
exitWithError(err)
}
if err := scanOverlappingBlocks(scan, scanCmdHumanReadable); err != nil {
exitWithError(err)
}

fmt.Println("Scan complete!")
}
flag.CommandLine.Set("log.level", "debug")
}

func scanOverlappingBlocks(scan tsdb.Scanner, hformat *bool) error {
overlaps, err := scan.Overlapping()
if err != nil {
return err
}
if len(overlaps) > 0 {
fmt.Println("Overlaping blocks.")
fmt.Println("Deleting these will remove all data in the listed time range.")
var blocksDel []*tsdb.Block
for t, overBcks := range overlaps {
var ULIDS string
for _, b := range overBcks {
ULIDS = ULIDS + b.Meta().ULID.String() + " "
}
fmt.Printf("overlapping blocks : %v %v-%v \n", ULIDS, time.Unix(t.Min/1000, 0).Format("06/01/02 15:04:05"), time.Unix(t.Max/1000, 0).Format("15:04:05 06/01/02"))

var largest int
for i, b := range overBcks {
if b.Meta().Stats.NumSamples > overBcks[largest].Meta().Stats.NumSamples {
largest = i
}
}
fmt.Printf("\nBlock %v contains highest samples count and is ommited from the deletion list! \n\n", overBcks[largest])
// Remove the largest block from the slice.
o := append(overBcks[:largest], overBcks[largest+1:]...)
// Add this range to all blocks for deletion.
blocksDel = append(blocksDel, o...)
}

var paths []string
for _, b := range blocksDel {
paths = append(paths, b.Dir())
}
printBlocks(blocksDel, hformat)
moveTo := filepath.Join(scan.Dir(), "overlappingBlocks")
confirmed, err := confirm("Confirm moving the overlapping blocks to: " + moveTo)
if err != nil {
return err
}
if confirmed {
for _, file := range paths {
fileutil.Replace(file, moveTo)
}
}
}
return nil
}

func scanIndexes(scan tsdb.Scanner, hformat *bool) error {
corrupted, err := scan.Index()
if err != nil {
return err
}

for cause, bdirs := range corrupted {
fmt.Println("Blocks with corrupted index! \n", cause)
printFiles(bdirs, hformat)

moveTo := filepath.Join(scan.Dir(), "blocksWithInvalidIndexes")
confirmed, err := confirm("Confirm moving corrupted indexes to: " + moveTo)
if err != nil {
return err
}
if confirmed {
for _, file := range bdirs {
fileutil.Replace(file, moveTo)
}
}
}
return nil
}

func scanTombstones(scan tsdb.Scanner, hformat *bool) error {
invalid, err := scan.Tombstones()
if err != nil {
return errors.Wrap(err, "scannings Tombstones")
}

if len(invalid) > 0 {
fmt.Println("Tombstones include data to be deleted so removing these will cancel deleting these timeseries.")
for cause, files := range invalid {
for _, p := range files {
_, file := filepath.Split(p)
if file != "tombstone" {
return fmt.Errorf("path doesn't contain a valid tombstone filename: %v", p)
}
}
fmt.Println("invalid tombstones:", cause)
printFiles(files, hformat)
moveTo := filepath.Join(scan.Dir(), "badTombstones")
confirmed, err := confirm("Confirm moving corrupted tombstones to: " + moveTo)
if err != nil {
return err
}
if confirmed {
for _, file := range files {
fileutil.Replace(file, moveTo)
}
}
}
}
return nil
}

func scanTmps(scanPath string, hformat *bool) error {
var files []string
filepath.Walk(scanPath, func(path string, f os.FileInfo, _ error) error {
if filepath.Ext(path) == ".tmp" {
files = append(files, path)
}
return nil
})
if len(files) > 0 {
fmt.Println(`
These are usually caused by a crash or some incomplete operation and
are safe to delete as long as no other application is currently using this database.`)
printFiles(files, hformat)
confirmed, err := confirm("DELETE")
if err != nil {
return err
}
if confirmed {
if err := delAll(files); err != nil {
return errors.Wrap(err, "deleting temp files")
}
}
}
return nil
}

func delAll(paths []string) error {
for _, p := range paths {
if err := os.RemoveAll(p); err != nil {
return errors.Wrapf(err, "error deleting:%v", p)
}
}
return nil
}

func confirm(action string) (bool, error) {
for x := 0; x < 3; x++ {
fmt.Println(action, " (y/N)?")
var s string
_, err := fmt.Scanln(&s)
if err != nil {
return false, err
}

s = strings.TrimSpace(s)
s = strings.ToLower(s)

if s == "y" || s == "yes" {
return true, nil
}
if s == "n" || s == "no" {
return false, nil
}
fmt.Println(s, "is not a valid answer")
}
fmt.Printf("Bailing out, too many invalid answers! \n\n")
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
return false, nil
}

type writeBenchmark struct {
outPath string
samplesFile string
Expand Down Expand Up @@ -248,23 +439,22 @@ func (b *writeBenchmark) startProfiling() {
// Start CPU profiling.
b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err))
exitWithError(errors.Wrap(err, "bench: could not create cpu profile"))
}
if err := pprof.StartCPUProfile(b.cpuprof); err != nil {
exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err))
exitWithError(errors.Wrap(err, "bench: could not start CPU profile"))
}

// Start memory profiling.
b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err))
exitWithError(errors.Wrap(err, "bench: could not create memory profile: %v"))
}
runtime.MemProfileRate = 64 * 1024

// Start fatal profiling.
b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create block profile: %v", err))
exitWithError(errors.Wrap(err, "bench: could not create block profile: %v"))
}
runtime.SetBlockProfileRate(20)

Expand Down Expand Up @@ -357,22 +547,40 @@ func exitWithError(err error) {
os.Exit(1)
}

func printFiles(files []string, humanReadable *bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
defer tw.Flush()

fmt.Fprintln(tw, "PATH\tSIZE\tDATE\t")
for _, path := range files {
f, e := os.Stat(path)
if e != nil {
exitWithError(e)
}
fmt.Fprintf(tw,
"%v\t%v\t%v\n",
path, f.Size(), getFormatedTime(f.ModTime().Unix(), humanReadable),
)
}
}

func printBlocks(blocks []*tsdb.Block, humanReadable *bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
defer tw.Flush()

fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES")
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tPATH")
for _, b := range blocks {
meta := b.Meta()

fmt.Fprintf(tw,
"%v\t%v\t%v\t%v\t%v\t%v\n",
"%v\t%v\t%v\t%v\t%v\t%v\t%v\n",
meta.ULID,
getFormatedTime(meta.MinTime, humanReadable),
getFormatedTime(meta.MaxTime, humanReadable),
meta.Stats.NumSamples,
meta.Stats.NumChunks,
meta.Stats.NumSeries,
b.Dir(),
)
}
}
Expand Down
Loading