Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: port influx inspect report-tsm to 2.x #21814

Merged
merged 6 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21786](https://github.com/influxdata/influxdb/pull/21786): Ported the `influxd inspect deletetsm` command from 1.x.
1. [21802](https://github.com/influxdata/influxdb/pull/21802): Removed unused `chronograf-migator` package & chronograf API service, and updated various "chronograf" references.
1. [21828](https://github.com/influxdata/influxdb/pull/21828): Added the command `influx inspect verify-wal`.
1. [21814](https://github.com/influxdata/influxdb/pull/21814): Ported the `influxd inspect report-tsm` command from 1.x.

### Bug Fixes

Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/inspect/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_index"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsi"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tombstone"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tsm"
Expand Down Expand Up @@ -40,6 +41,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) {
base.AddCommand(dump_tsi.NewDumpTSICommand())
base.AddCommand(delete_tsm.NewDeleteTSMCommand())
base.AddCommand(verify_wal.NewVerifyWALCommand())
base.AddCommand(report_tsm.NewReportTSMCommand())

return base, nil
}
366 changes: 366 additions & 0 deletions cmd/influxd/inspect/report_tsm/report_tsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,366 @@
package report_tsm

import (
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"text/tabwriter"
"time"

"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/retailnext/hllpp"
"github.com/spf13/cobra"
)

type args struct {
dir string
pattern string
detailed bool
exact bool
}

func NewReportTSMCommand() *cobra.Command {
var arguments args
cmd := &cobra.Command{
Use: "report-tsm",
Short: "Run TSM report",
Long: `
This command will analyze TSM files within a storage engine directory, reporting
the cardinality within the files as well as the time range that the point data
covers.
This command only interrogates the index within each file, and does not read any
block data. To reduce heap requirements, by default report-tsm estimates the
overall cardinality in the file set by using the HLL++ algorithm. Exact
cardinalities can be determined by using the --exact flag.
For each file, the following is output:
* The full filename;
* The series cardinality within the file;
* The number of series first encountered within the file;
* The min and max timestamp associated with TSM data in the file; and
* The time taken to load the TSM index and apply any tombstones.
The summary section then outputs the total time range and series cardinality for
the fileset. Depending on the --detailed flag, series cardinality is segmented
in the following ways:
* Series cardinality for each organization;
* Series cardinality for each bucket;
* Series cardinality for each measurement;
* Number of field keys for each measurement; and
* Number of tag values for each tag key.`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {

// Verify if shard dir
err := arguments.isShardDir(arguments.dir)
if arguments.detailed && err != nil {
return errors.New("--detailed only supported for shard dirs")
}

return arguments.Run(cmd)
},
}

cmd.Flags().StringVarP(&arguments.pattern, "pattern", "", "", "only process TSM files containing pattern")
cmd.Flags().BoolVarP(&arguments.exact, "exact", "", false, "calculate and exact cardinality count. Warning, may use significant memory...")
cmd.Flags().BoolVarP(&arguments.detailed, "detailed", "", false, "emit series cardinality segmented by measurements, tag keys and fields. Warning, may take a while.")

dir, err := fs.InfluxDir()
if err != nil {
panic(err)
}
dir = filepath.Join(dir, "engine/data")
cmd.Flags().StringVarP(&arguments.dir, "data-dir", "", dir, "use provided data directory")

return cmd
}

func (a *args) isShardDir(dir string) error {
name := filepath.Base(dir)
if id, err := strconv.Atoi(name); err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %s", dir)
}

return nil
}

func (a *args) Run(cmd *cobra.Command) error {
// Create the cardinality counter
newCounterFn := newHLLCounter
estTitle := " (est)"
if a.exact {
estTitle = ""
newCounterFn = newExactCounter
}

totalSeries := newCounterFn()
tagCardinalities := map[string]counter{}
measCardinalities := map[string]counter{}
fieldCardinalities := map[string]counter{}

dbCardinalities := map[string]counter{}

start := time.Now()

tw := tabwriter.NewWriter(cmd.OutOrStdout(), 8, 2, 1, ' ', 0)
_, _ = fmt.Fprintln(tw, strings.Join([]string{"DB", "RP", "Shard", "File", "Series", "New" + estTitle, "Min Time", "Max Time", "Load Time"}, "\t"))

minTime, maxTime := int64(math.MaxInt64), int64(math.MinInt64)
var fileCount int
if err := a.walkShardDirs(a.dir, func(db, rp, id, path string) error {
if a.pattern != "" && !strings.Contains(path, a.pattern) {
return nil
}

file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
_, _ = fmt.Fprintf(cmd.ErrOrStderr(), "error opening %q, skipping: %v\n", path, err)
return nil
}

loadStart := time.Now()
reader, err := tsm1.NewTSMReader(file)
if err != nil {
_, _ = fmt.Fprintf(cmd.ErrOrStderr(), "error reading %q, skipping: %v\n", file.Name(), err)
return nil
}
loadTime := time.Since(loadStart)
fileCount++

dbCount, ok := dbCardinalities[db]
if !ok {
dbCount = newCounterFn()
dbCardinalities[db] = dbCount
}

oldCount := dbCount.Count()

seriesCount := reader.KeyCount()
for i := 0; i < seriesCount; i++ {
key, _ := reader.KeyAt(i)
totalSeries.Add(key)
dbCount.Add(key)

if a.detailed {
sep := strings.Index(string(key), "#!~#")
seriesKey, field := key[:sep], key[sep+4:]
measurement, tags := models.ParseKey(seriesKey)

measCount, ok := measCardinalities[measurement]
if !ok {
measCount = newCounterFn()
measCardinalities[measurement] = measCount
}
measCount.Add(key)

fieldCount, ok := fieldCardinalities[measurement]
if !ok {
fieldCount = newCounterFn()
fieldCardinalities[measurement] = fieldCount
}
fieldCount.Add(field)

for _, t := range tags {
tagCount, ok := tagCardinalities[string(t.Key)]
if !ok {
tagCount = newCounterFn()
tagCardinalities[string(t.Key)] = tagCount
}
tagCount.Add(t.Value)
}
}
}
minT, maxT := reader.TimeRange()
if minT < minTime {
minTime = minT
}
if maxT > maxTime {
maxTime = maxT
}
err = reader.Close()
if err != nil {
return fmt.Errorf("failed to close TSM Reader: %v", err)
}

_, _ = fmt.Fprintln(tw, strings.Join([]string{
db, rp, id,
filepath.Base(file.Name()),
strconv.FormatInt(int64(seriesCount), 10),
strconv.FormatInt(int64(dbCount.Count()-oldCount), 10),
time.Unix(0, minT).UTC().Format(time.RFC3339Nano),
time.Unix(0, maxT).UTC().Format(time.RFC3339Nano),
loadTime.String(),
}, "\t"))
if a.detailed {
err = tw.Flush()
if err != nil {
return fmt.Errorf("failed to flush tabwriter: %v", err)
}
}
return nil
}); err != nil {
return err
}

err := tw.Flush()
if err != nil {
return fmt.Errorf("failed to flush tabwriter: %v", err)
}

printSummary(cmd, printArgs{
fileCount: fileCount,
minTime: minTime,
maxTime: maxTime,
estTitle: estTitle,
totalSeries: totalSeries,
detailed: a.detailed,
tagCardinalities: tagCardinalities,
measCardinalities: measCardinalities,
fieldCardinalities: fieldCardinalities,
dbCardinalities: dbCardinalities,
})

cmd.Printf("Completed in %s\n", time.Since(start))
return nil
}

type printArgs struct {
fileCount int
minTime, maxTime int64
estTitle string
totalSeries counter
detailed bool

tagCardinalities map[string]counter
measCardinalities map[string]counter
fieldCardinalities map[string]counter
dbCardinalities map[string]counter
}

func printSummary(cmd *cobra.Command, p printArgs) {
cmd.Printf("\nSummary:")
cmd.Printf(" Files: %d\n", p.fileCount)
cmd.Printf(" Time Range: %s - %s\n",
time.Unix(0, p.minTime).UTC().Format(time.RFC3339Nano),
time.Unix(0, p.maxTime).UTC().Format(time.RFC3339Nano),
)
cmd.Printf(" Duration: %s \n\n", time.Unix(0, p.maxTime).Sub(time.Unix(0, p.minTime)))

cmd.Printf("Statistics\n")
cmd.Printf(" Series:\n")
for db, counts := range p.dbCardinalities {
cmd.Printf(" - %s%s: %d (%d%%)\n", db, p.estTitle, counts.Count(), int(float64(counts.Count())/float64(p.totalSeries.Count())*100))
}
cmd.Printf(" Total%s: %d\n", p.estTitle, p.totalSeries.Count())

if p.detailed {
cmd.Printf("\n Measurements (est):\n")
for _, t := range sortKeys(p.measCardinalities) {
cmd.Printf(" - %v: %d (%d%%)\n", t, p.measCardinalities[t].Count(), int((float64(p.measCardinalities[t].Count())/float64(p.totalSeries.Count()))*100))
}

cmd.Printf("\n Fields (est):\n")
for _, t := range sortKeys(p.fieldCardinalities) {
cmd.Printf(" - %v: %d\n", t, p.fieldCardinalities[t].Count())
}

cmd.Printf("\n Tags (est):\n")
for _, t := range sortKeys(p.tagCardinalities) {
cmd.Printf(" - %v: %d\n", t, p.tagCardinalities[t].Count())
}
}
}

// sortKeys is a quick helper to return the sorted set of a map's keys
func sortKeys(vals map[string]counter) (keys []string) {
for k := range vals {
keys = append(keys, k)
}
sort.Strings(keys)

return keys
}

func (a *args) walkShardDirs(root string, fn func(db, rp, id, path string) error) error {
type location struct {
db, rp, id, path string
}

var tsms []location
if err := filepath.WalkDir(root, func(path string, info os.DirEntry, err error) error {
if err != nil {
return err
}

if info.IsDir() {
return nil
}

if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)

if err := a.isShardDir(shardDir); err != nil {
return err
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
tsms = append(tsms, location{db: db, rp: rp, id: id, path: path})
return nil
danxmoran marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}); err != nil {
return err
}

sort.Slice(tsms, func(i, j int) bool {
a, _ := strconv.Atoi(tsms[i].id)
b, _ := strconv.Atoi(tsms[j].id)
return a < b
})

for _, shard := range tsms {
if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil {
return err
}
}
return nil
}

// counter abstracts a a method of counting keys.
type counter interface {
Add(key []byte)
Count() uint64
}

// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation.
func newHLLCounter() counter {
return hllpp.New()
}

// exactCounter returns an exact count for keys using counting all distinct items in a set.
type exactCounter struct {
m map[string]struct{}
}

func (c *exactCounter) Add(key []byte) {
c.m[string(key)] = struct{}{}
}

func (c *exactCounter) Count() uint64 {
return uint64(len(c.m))
}

func newExactCounter() counter {
return &exactCounter{
m: make(map[string]struct{}),
}
}
Loading