Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Add tsdb.Scan() to unblock from a corrupted db. #320

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,13 @@ const (
flagStd = 1
)

const indexFilename = "index"
const metaFilename = "meta.json"
const (
indexFilename = "index"
metaFilename = "meta.json"
chunksDirname = "chunks"
)

func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
func chunkDir(dir string) string { return filepath.Join(dir, chunksDirname) }
func walDir(dir string) string { return filepath.Join(dir, "wal") }

func readMetaFile(dir string) (*BlockMeta, error) {
Expand Down
198 changes: 196 additions & 2 deletions cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
Expand All @@ -31,6 +32,8 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
Expand All @@ -48,6 +51,10 @@ func main() {
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is benchout/storage)").Default("benchout/storage").String()
scanCmd = cli.Command("scan", "scans the db and lists corrupted blocks")
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
scanCmdHumanReadable = scanCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
scanPath = scanCmd.Arg("dir", "database path (default is current dir ./)").Default("./").ExistingDir()
logger = level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowError())
)

switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
Expand All @@ -58,16 +65,185 @@ func main() {
samplesFile: *benchSamplesFile,
}
wb.run()

case listCmd.FullCommand():
db, err := tsdb.Open(*listPath, nil, nil, nil)
if err != nil {
exitWithError(err)
}
printBlocks(db.Blocks(), listCmdHumanReadable)

case scanCmd.FullCommand():
scanTmps(*scanPath, scanCmdHumanReadable)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

scan, err := tsdb.NewDBScanner(*scanPath, logger)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
exitWithError(err)
}
scanTmbst(scan, scanCmdHumanReadable)
scanIndexes(scan, scanCmdHumanReadable)
scanOverlapping(scan, scanCmdHumanReadable)

fmt.Println("Scan complete!")
fmt.Println("Hooray! The db is clean(or the scan tool is broken):\U0001f638")
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}
flag.CommandLine.Set("log.level", "debug")
}

func scanOverlapping(scan tsdb.Scanner, hformat *bool) {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
overlaps, err := scan.Overlapping()
if err != nil {
exitWithError(err)
}
if len(overlaps) > 0 {
fmt.Println("Overlaping blocks.")
fmt.Println("Deleting these will remove all data in the listed time range.")
var blocksDel []*tsdb.Block
for t, overBcks := range overlaps {
fmt.Printf("overlapping blocks : %v-%v \n", time.Unix(t.Min/1000, 0).Format("06/01/02 15:04:05"), time.Unix(t.Max/1000, 0).Format("15:04:05 06/01/02"))
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

var largest int
for i, b := range overBcks {
if b.Meta().Stats.NumSamples > overBcks[largest].Meta().Stats.NumSamples {
largest = i
}
}
fmt.Printf("\nBlock %v contains highest samples count and is ommited from the deletion list! \n\n", overBcks[largest])
//Remove the largest block from the slice.
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
o := append(overBcks[:largest], overBcks[largest+1:]...)
// Add this range to all blocks for deletion.
blocksDel = append(blocksDel, o...)
}

var paths []string
for _, b := range blocksDel {
_, folder := path.Split(b.Dir())
if _, err := ulid.Parse(folder); err != nil {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("\nskipping invalid block dir: %v :%v \n\n", b.Dir(), err)
continue
}
paths = append(paths, b.Dir())
}
printBlocks(blocksDel, hformat)
if confirm() {
if err = dellAll(paths); err != nil {
exitWithError(errors.Wrap(err, "deleting overlapping blocks"))
}
}
}
}

func scanIndexes(scan tsdb.Scanner, hformat *bool) {
unrepairable, repaired, err := scan.Indexes()
if err != nil {
exitWithError(err)
}

if len(repaired) > 0 {
fmt.Println("Corrupted indexes that were repaired.")
for _, stats := range repaired {
fmt.Printf("path:%v stats:%+v \n", stats.BlockDir, stats)
}
}

if len(unrepairable) > 0 {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
for cause, bdirs := range unrepairable {
fmt.Println("Blocks with unrepairable indexes! \n", cause)
printFiles(bdirs, hformat)
if confirm() {
if err = dellAll(bdirs); err != nil {
exitWithError(errors.Wrap(err, "deleting blocks with invalid indexes"))
}
}
}
}
}

func scanTmbst(scan tsdb.Scanner, hformat *bool) {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
invalid, err := scan.Tombstones()
if err != nil {
exitWithError(errors.Wrap(err, "scannings Tombstones"))
}

if len(invalid) > 0 {
fmt.Println("Tombstones include data to be deleted so removing these will cancel deleting these timeseries.")
for cause, files := range invalid {
for _, p := range files {
_, file := filepath.Split(p)
if file != "tombstone" {
exitWithError(fmt.Errorf("path doesn't contain a valid tombstone filename: %v", p))
}
}
fmt.Println("invalid tombstones:", cause)
printFiles(files, hformat)
if confirm() {
if err = dellAll(files); err != nil {
exitWithError(errors.Wrap(err, "deleting Tombstones"))
}
}
}
}
}

func scanTmps(scanPath string, hformat *bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scanTmpCompactFiles to be more verbose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto with error return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there were cases with other temps like WAL checkpoint so will leave as is to be more generic and just updated the print message.

var files []string
filepath.Walk(scanPath, func(path string, f os.FileInfo, _ error) error {
if filepath.Ext(path) == ".tmp" {
files = append(files, path)
}
return nil
})
if len(files) > 0 {
fmt.Println(`
These are usually caused by a crash or incomplete compaction and
are safe to delete as long as no other application is currently using this database.`)
for _, p := range files {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if filepath.Ext(p) != ".tmp" {
exitWithError(fmt.Errorf("path doesn't contain a valid tmp extension: %v", p))
}
}
printFiles(files, hformat)
if confirm() {
if err := dellAll(files); err != nil {
exitWithError(errors.Wrap(err, "deleting temp files"))
}
}
}
}

func dellAll(paths []string) error {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
for _, p := range paths {
if err := os.RemoveAll(p); err != nil {
return fmt.Errorf("error deleting: %v, %v", p, err)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}
}
return nil
}

func confirm() bool {
for x := 0; x < 3; x++ {
fmt.Println("DELETE (y/N)?")
var s string
_, err := fmt.Scanln(&s)
if err != nil {
exitWithError(err)
}

s = strings.TrimSpace(s)
s = strings.ToLower(s)

if s == "y" || s == "yes" {
return true
}
if s == "n" || s == "no" {
return false
}
fmt.Println(s, "is not a valid answer")
}
fmt.Printf("Bailing out, too many invalid answers! \n\n")
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
return false
}

type writeBenchmark struct {
outPath string
samplesFile string
Expand Down Expand Up @@ -346,22 +522,40 @@ func exitWithError(err error) {
os.Exit(1)
}

func printFiles(files []string, humanReadable *bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
defer tw.Flush()

fmt.Fprintln(tw, "PATH\tSIZE\tDATE\t")
for _, path := range files {
f, e := os.Stat(path)
if e != nil {
exitWithError(e)
}
fmt.Fprintf(tw,
"%v\t%v\t%v\n",
path, f.Size(), getFormatedTime(f.ModTime().Unix(), humanReadable),
)
}
}

func printBlocks(blocks []*tsdb.Block, humanReadable *bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
defer tw.Flush()

fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES")
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tPATH")
for _, b := range blocks {
meta := b.Meta()

fmt.Fprintf(tw,
"%v\t%v\t%v\t%v\t%v\t%v\n",
"%v\t%v\t%v\t%v\t%v\t%v\t%v\n",
meta.ULID,
getFormatedTime(meta.MinTime, humanReadable),
getFormatedTime(meta.MaxTime, humanReadable),
meta.Stats.NumSamples,
meta.Stats.NumChunks,
meta.Stats.NumSeries,
b.Dir(),
)
}
}
Expand Down
54 changes: 18 additions & 36 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,9 @@ func (db *DB) reload() (err error) {
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})
if err := validateBlockSequence(blocks); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about changing anything in db.go before resolving conflicts (: I would aim for changing as little as possible. (e.g -type Overlaps map[TimeRange][]BlockMeta -> type Overlaps map[TimeRange][]*Block change)

return errors.Wrap(err, "invalid block sequence")

if overlaps := OverlappingBlocks(blocks); len(overlaps) > 0 {
return errors.Errorf("invalid block sequence , block time ranges overlap: %s", overlaps)
}

// Swap in new blocks first for subsequently created readers to be seen.
Expand Down Expand Up @@ -571,45 +572,26 @@ func (db *DB) reload() (err error) {
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}

// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) <= 1 {
return nil
}

var metas []BlockMeta
for _, b := range bs {
metas = append(metas, b.meta)
}

overlaps := OverlappingBlocks(metas)
if len(overlaps) > 0 {
return errors.Errorf("block time ranges overlap: %s", overlaps)
}

return nil
}

// TimeRange specifies minTime and maxTime range.
type TimeRange struct {
Min, Max int64
}

// Overlaps contains overlapping blocks aggregated by overlapping range.
type Overlaps map[TimeRange][]BlockMeta
type Overlaps map[TimeRange][]*Block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you need whole blocks if we are using only meta?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is just to get rid of validateBlockSequence function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes and also need the block dir when preparing to delete these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using meta.ULID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any harm in this refactoring as validateBlockSequence is doing the same thing.


// String returns human readable string form of overlapped blocks.
func (o Overlaps) String() string {
var res []string
for r, overlaps := range o {
var groups []string
for _, m := range overlaps {
for _, b := range overlaps {
groups = append(groups, fmt.Sprintf(
"<ulid: %s, mint: %d, maxt: %d, range: %s>",
m.ULID.String(),
m.MinTime,
m.MaxTime,
(time.Duration((m.MaxTime-m.MinTime)/1000)*time.Second).String(),
b.Meta().ULID.String(),
b.Meta().MinTime,
b.Meta().MaxTime,
(time.Duration((b.Meta().MaxTime-b.Meta().MinTime)/1000)*time.Second).String(),
))
}
res = append(res, fmt.Sprintf(
Expand All @@ -624,15 +606,15 @@ func (o Overlaps) String() string {
}

// OverlappingBlocks returns all overlapping blocks from given meta files.
func OverlappingBlocks(bm []BlockMeta) Overlaps {
func OverlappingBlocks(bm []*Block) Overlaps {
if len(bm) <= 1 {
return nil
}
var (
overlaps [][]BlockMeta
overlaps [][]*Block

// pending contains not ended blocks in regards to "current" timestamp.
pending = []BlockMeta{bm[0]}
pending = []*Block{bm[0]}
// continuousPending helps to aggregate same overlaps to single group.
continuousPending = true
)
Expand All @@ -641,11 +623,11 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps {
// We check if any of the pending block finished (blocks that we have seen before, but their maxTime was still ahead current
// timestamp). If not, it means they overlap with our current block. In the same time current block is assumed pending.
for _, b := range bm[1:] {
var newPending []BlockMeta
var newPending []*Block

for _, p := range pending {
// "b.MinTime" is our current time.
if b.MinTime >= p.MaxTime {
if b.Meta().MinTime >= p.Meta().MaxTime {
continuousPending = false
continue
}
Expand Down Expand Up @@ -676,12 +658,12 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps {

minRange := TimeRange{Min: 0, Max: math.MaxInt64}
for _, b := range overlap {
if minRange.Max > b.MaxTime {
minRange.Max = b.MaxTime
if minRange.Max > b.Meta().MaxTime {
minRange.Max = b.Meta().MaxTime
}

if minRange.Min < b.MinTime {
minRange.Min = b.MinTime
if minRange.Min < b.Meta().MinTime {
minRange.Min = b.Meta().MinTime
}
}
overlapGroups[minRange] = overlap
Expand Down
Loading