Skip to content

Commit

Permalink
feat(influxd): New influxd verify tsm-blocks command
Browse files Browse the repository at this point in the history
This command performs verification of TSM blocks

* expected and actual CRC-32 checksums match
* expected and actual min and max timestamps match decoded
  data
  • Loading branch information
stuartcarnie committed Jul 3, 2019
1 parent 90a529e commit 025a624
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/influxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/generate"
"github.com/influxdata/influxdb/cmd/influxd/inspect"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
"github.com/influxdata/influxdb/cmd/influxd/verify"
_ "github.com/influxdata/influxdb/query/builtin"
_ "github.com/influxdata/influxdb/tsdb/tsi1"
_ "github.com/influxdata/influxdb/tsdb/tsm1"
Expand Down Expand Up @@ -37,6 +38,7 @@ func init() {
rootCmd.AddCommand(launcher.NewCommand())
rootCmd.AddCommand(generate.Command)
rootCmd.AddCommand(inspect.NewCommand())
rootCmd.AddCommand(verify.NewCommand())
}

// find determines the default behavior when running influxd.
Expand Down
147 changes: 147 additions & 0 deletions cmd/influxd/verify/tsm_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package verify

import (
"bytes"
"fmt"
"hash/crc32"
"os"
"path/filepath"

"github.com/influxdata/influxdb/kit/cli"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/spf13/cobra"
)

// tsmBlocksFlags defines the `tsm-blocks` Command.
var tsmBlocksFlags = struct {
cli.OrgBucket
path string
}{}

func newTSMBlocksCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tsm-blocks <pathspec>...",
Short: "Verifies consistency of TSM blocks for specified paths",
Long: `
This command will analyze a set of TSM files for inconsistencies of the
TSM index and the blocks.
The checks performed by this command are:
* CRC-32 checksums match for each block
* TSM index min and max timestamps match decoded data
OPTIONS
<pathspec>...
A list of files or directories to search for TSM files.
An optional organization or organization and bucket may be specified to limit
the analysis.
`,
Run: verifyTSMBlocks,
}

tsmBlocksFlags.AddFlags(cmd)

return cmd
}

func verifyTSMBlocks(cmd *cobra.Command, args []string) {
for _, arg := range args {
fi, err := os.Stat(arg)
if err != nil {
fmt.Printf("Error processing path %q: %v", arg, err)
continue
}

var files []string
if fi.IsDir() {
files, _ = filepath.Glob(filepath.Join(arg, "*."+tsm1.TSMFileExtension))
} else {
files = append(files, arg)
}
for _, path := range files {
if err := processFile(path); err != nil {
fmt.Printf("Error processing file %q: %v", path, err)
}
}
}
}

func processFile(path string) error {
fmt.Println("processing file: " + path)

file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
return fmt.Errorf("OpenFile: %v", err)
}

reader, err := tsm1.NewTSMReader(file)
if err != nil {
return fmt.Errorf("failed to create TSM reader for %q: %v", path, err)
}
defer reader.Close()

org, bucket := tsmBlocksFlags.OrgBucketID()
var start []byte
if org.Valid() {
if bucket.Valid() {
v := tsdb.EncodeName(org, bucket)
start = v[:]
} else {
v := tsdb.EncodeOrgName(org)
start = v[:]
}
}

var ts cursors.TimestampArray
count := 0
totalErrors := 0
iter := reader.Iterator(start)
for iter.Next() {
key := iter.Key()
if len(start) > 0 && (len(key) < len(start) || !bytes.Equal(key[:len(start)], start)) {
break
}

entries := iter.Entries()
for i := range entries {
entry := &entries[i]

checksum, buf, err := reader.ReadBytes(entry, nil)
if err != nil {
fmt.Printf("could not read block %d due to error: %q\n", count, err)
count++
continue
}

if expected := crc32.ChecksumIEEE(buf); checksum != expected {
totalErrors++
fmt.Printf("unexpected checksum %d, expected %d for key %v, block %d\n", checksum, expected, key, count)
}

if err = tsm1.DecodeTimestampArrayBlock(buf, &ts); err != nil {
totalErrors++
fmt.Printf("unable to decode timestamps for block %d: %q\n", count, err)
}

if got, exp := entry.MinTime, ts.MinTime(); got != exp {
totalErrors++
fmt.Printf("unexpected min time %d, expected %d for block %d: %q\n", got, exp, count, err)
}
if got, exp := entry.MaxTime, ts.MaxTime(); got != exp {
totalErrors++
fmt.Printf("unexpected max time %d, expected %d for block %d: %q\n", got, exp, count, err)
}

count++
}
}

fmt.Printf("Completed checking %d block(s)\n", count)

return nil
}
25 changes: 25 additions & 0 deletions cmd/influxd/verify/verify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package verify

import (
"github.com/spf13/cobra"
)

// NewCommand creates the new command.
func NewCommand() *cobra.Command {
base := &cobra.Command{
Use: "verify",
Short: "Commands for verifying on-disk database data",
}

// List of available sub-commands
// If a new sub-command is created, it must be added here
subCommands := []*cobra.Command{
newTSMBlocksCommand(),
}

for _, command := range subCommands {
base.AddCommand(command)
}

return base
}
40 changes: 40 additions & 0 deletions kit/cli/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cli

import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
"github.com/spf13/cobra"
)

// The ID type can be used to
type ID struct {
influxdb.ID
}

// Set parses the hex string s and sets the value of i.
func (i *ID) Set(v string) error {
return i.Decode([]byte(v))
}

func (i *ID) Type() string {
return "ID"
}

type OrgBucket struct {
Org ID
Bucket ID
}

func (o *OrgBucket) AddFlags(cmd *cobra.Command) {
flagSet := cmd.Flags()
flagSet.Var(&o.Org, "org-id", "organization id")
flagSet.Var(&o.Bucket, "bucket-id", "bucket id")
}

func (o *OrgBucket) OrgBucketID() (orgID, bucketID influxdb.ID) {
return o.Org.ID, o.Bucket.ID
}

func (o *OrgBucket) Name() [influxdb.IDLength]byte {
return tsdb.EncodeName(o.OrgBucketID())
}
8 changes: 8 additions & 0 deletions tsdb/explode.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ func EncodeName(org, bucket platform.ID) [16]byte {
return nameBytes
}

// EncodeOrgName converts org to the tsdb internal serialization that may be used
// as a prefix when searching for keys matching a specific organization.
func EncodeOrgName(org platform.ID) [8]byte {
var orgBytes [8]byte
binary.BigEndian.PutUint64(orgBytes[0:8], uint64(org))
return orgBytes
}

// EncodeNameString converts org/bucket pairs to the tsdb internal serialization
func EncodeNameString(org, bucket platform.ID) string {
name := EncodeName(org, bucket)
Expand Down

0 comments on commit 025a624

Please sign in to comment.