Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

add capability to process bigtable indices with mt-index-cat and re-organize code #1909

Merged
merged 5 commits into from
Sep 25, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add capability to process bigtable indices and re-organize code
robert-milan committed Sep 24, 2020

Verified

This commit was signed with the committer’s verified signature.
robert-milan Robert Milan
commit 2a63113c9dac459f6c58b1617ba4385dfe11305d
183 changes: 134 additions & 49 deletions cmd/mt-index-cat/main.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (

"github.com/grafana/metrictank/cmd/mt-index-cat/out"
"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/idx/bigtable"
"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/logger"
@@ -47,13 +48,15 @@ func main() {
var verbose bool
var limit int
var partitionStr string
var btTotalPartitions int

globalFlags := flag.NewFlagSet("global config flags", flag.ExitOnError)
globalFlags.StringVar(&addr, "addr", "http://localhost:6060", "graphite/metrictank address")
globalFlags.StringVar(&prefix, "prefix", "", "only show metrics that have this prefix")
globalFlags.StringVar(&substr, "substr", "", "only show metrics that have this substring")
globalFlags.StringVar(&suffix, "suffix", "", "only show metrics that have this suffix")
globalFlags.StringVar(&partitionStr, "partitions", "*", "only show metrics from the comma separated list of partitions or * for all")
globalFlags.IntVar(&btTotalPartitions, "bt-total-partitions", -1, "when using bigtable you must set this to the total number of partitions for the instance if you do not specify partitions with the 'partitions' setting")
globalFlags.StringVar(&regexStr, "regex", "", "only show metrics that match this regex")
globalFlags.StringVar(&tags, "tags", "", "tag filter. empty (default), 'some', 'none', 'valid', or 'invalid'")
globalFlags.StringVar(&from, "from", "30min", "for vegeta outputs, will generate requests for data starting from now minus... eg '30min', '5h', '14d', etc. or a unix timestamp")
@@ -63,6 +66,7 @@ func main() {
globalFlags.BoolVar(&verbose, "verbose", false, "print stats to stderr")

cassFlags := cassandra.ConfigSetup()
btFlags := bigtable.ConfigSetup()

outputs := []string{"dump", "list", "vegeta-render", "vegeta-render-patterns"}

@@ -84,9 +88,13 @@ func main() {
fmt.Println(" 'valid' only show metrics whose tags (if any) are valid")
fmt.Println(" 'invalid' only show metrics that have one or more invalid tags")
fmt.Println()
fmt.Printf("idxtype: only 'cass' supported for now\n\n")
fmt.Printf("\n\n")
fmt.Printf("cass config flags:\n\n")
cassFlags.PrintDefaults()
fmt.Printf("\n\n")
fmt.Printf("bigtable config flags:\n\n")
btFlags.PrintDefaults()
fmt.Printf("\n\n")
fmt.Println()
fmt.Println("output:")
fmt.Println()
@@ -102,12 +110,18 @@ func main() {
fmt.Println(" roundDuration: formats an integer-seconds duration using aggressive rounding. for the purpose of getting an idea of overal metrics age")
fmt.Println()
fmt.Println()
fmt.Println("EXAMPLES:")
fmt.Println("Cassandra Examples:")
fmt.Println("mt-index-cat -from 60min cass -hosts cassandra:9042 list")
fmt.Println("mt-index-cat -from 60min cass -hosts cassandra:9042 'sumSeries({{.Name | pattern}})'")
fmt.Println("mt-index-cat -from 60min cass -hosts cassandra:9042 'GET http://localhost:6060/render?target=sumSeries({{.Name | pattern}})&from=-6h\\nX-Org-Id: 1\\n\\n'")
fmt.Println("mt-index-cat cass -hosts cassandra:9042 -timeout 60s '{{.LastUpdate | age | roundDuration}}\\n' | sort | uniq -c")
fmt.Println("mt-index-cat cass -hosts localhost:9042 -schema-file ../../scripts/config/schema-idx-cassandra.toml '{{.Name | patternCustom 15 \"pass\" 40 \"1rcnw\" 15 \"2rcnw\" 10 \"3rcnw\" 10 \"3rccw\" 10 \"2rccw\"}}\\n'")
fmt.Println()
fmt.Println()
fmt.Println("Bigtable Examples:")
fmt.Println("mt-index-cat -max-stale 0 -bt-total-partitions 128 bt -gcp-project your_project -bigtable-instance the_bt_instance -table-name metric_idx -create-cf false list")
fmt.Println("mt-index-cat -max-stale 768h -partitions 1,2,3 bt -gcp-project your_project -bigtable-instance the_bt_instance -table-name metric_idx -create-cf false '{{.NameWithTags}} {{.Id}} {{.OrgId}} {{.LastUpdate}} {{.Partition}}\n'")

}

if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
@@ -139,13 +153,24 @@ func main() {
os.Exit(-1)
}
var cassI int
var btI int
for i, v := range os.Args {
if v == "cass" {
cassI = i
}
if v == "bt" {
btI = i
}
}
if cassI == 0 {
log.Println("only indextype 'cass' supported")

if cassI > 0 && btI > 0 {
log.Println("you may only use one index type at a time, either 'cass' or 'bt'")
flag.Usage()
os.Exit(1)
}

if cassI == 0 && btI == 0 {
log.Println("you must use one index type, either 'cass' or 'bt'")
flag.Usage()
os.Exit(1)
}
@@ -156,9 +181,21 @@ func main() {
os.Exit(1)
}

globalFlags.Parse(os.Args[1:cassI])
cassFlags.Parse(os.Args[cassI+1 : len(os.Args)-1])
cassandra.CliConfig.Enabled = true
var idxFlagPos int
if cassI > 0 {
idxFlagPos = cassI
} else {
idxFlagPos = btI
}

globalFlags.Parse(os.Args[1:idxFlagPos])
if cassI > 0 {
cassFlags.Parse(os.Args[idxFlagPos+1 : len(os.Args)-1])
cassandra.CliConfig.Enabled = true
} else {
btFlags.Parse(os.Args[idxFlagPos+1 : len(os.Args)-1])
bigtable.CliConfig.Enabled = true
}

if regexStr != "" {
var err error
@@ -184,8 +221,17 @@ func main() {
show = out.Template(format)
}

idx := cassandra.New(cassandra.CliConfig)
err := idx.InitBare()
var cassIdx *cassandra.CasIdx
var btIdx *bigtable.BigtableIdx
var err error

if cassI > 0 {
cassIdx = cassandra.New(cassandra.CliConfig)
err = cassIdx.InitBare()
} else {
btIdx = bigtable.New(bigtable.CliConfig)
err = btIdx.InitBare()
}
perror(err)

// from should either be a unix timestamp, or a specification that graphite/metrictank will recognize.
@@ -237,53 +283,92 @@ func main() {
}
}

var defs []schema.MetricDefinition
if len(partitions) == 0 {
defs = idx.Load(nil, time.Now())
} else {
defs = idx.LoadPartitions(partitions, nil, time.Now())
}
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
total := len(defs)
shown := 0

for _, d := range defs {
// note that prefix and substr can be "", meaning filter disabled.
// the conditions handle this fine as well.
if !strings.HasPrefix(d.Name, prefix) {
continue
}
if !strings.HasSuffix(d.Name, suffix) {
continue
}
if !strings.Contains(d.Name, substr) {
continue
}
if tags == "none" && len(d.Tags) != 0 {
continue
}
if tags == "some" && len(d.Tags) == 0 {
continue
// if partitionStr is set to all (*) and we are using bigtable then we must
// ensure that we know the total number of partitions
if partitionStr == "*" && btI > 0 {
if btTotalPartitions == -1 {
log.Println("When selecting all partitions with bigtable you must specify the total number of partitions for the instance")
flag.Usage()
os.Exit(-1)
} else {
for i := 0; i < btTotalPartitions; i++ {
partitions = append(partitions, int32(i))
}
}
if regex != nil && !regex.MatchString(d.Name) {
continue
}

var total int
var shown int

processDefs := func(defs []schema.MetricDefinition) {
total += len(defs)
if shown >= limit && limit > 0 {
log.Infof("Limit (%d) reached while processing Metric Definitions", limit)
return
}
if tags == "valid" || tags == "invalid" {
err := schema.ValidateTags(d.Tags)
for _, d := range defs {
// note that prefix and substr can be "", meaning filter disabled.
// the conditions handle this fine as well.
if !strings.HasPrefix(d.Name, prefix) {
continue
}
if !strings.HasSuffix(d.Name, suffix) {
continue
}
if !strings.Contains(d.Name, substr) {
continue
}
if tags == "none" && len(d.Tags) != 0 {
continue
}
if tags == "some" && len(d.Tags) == 0 {
continue
}
if regex != nil && !regex.MatchString(d.Name) {
continue
}
if tags == "valid" || tags == "invalid" {
err := schema.ValidateTags(d.Tags)

// skip the metric if the validation result is not what we want
if (err == nil) != (tags == "valid") {
// skip the metric if the validation result is not what we want
if (err == nil) != (tags == "valid") {
continue
}
}
if cutoffMin != 0 && d.LastUpdate >= cutoffMin {
continue
}
show(d)
shown++
if shown >= limit && limit > 0 {
log.Infof("Limit (%d) reached while processing Metric Definitions", limit)
return
}
}
if cutoffMin != 0 && d.LastUpdate >= cutoffMin {
continue
}

var defs []schema.MetricDefinition
if len(partitions) == 0 {
if cassI > 0 {
defs = cassIdx.Load(nil, time.Now())
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
processDefs(defs)
}
show(d)
shown += 1
if shown == limit {
break
} else {
if cassI > 0 {
defs = cassIdx.LoadPartitions(partitions, nil, time.Now())
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
processDefs(defs)
} else {
now := time.Now()
for _, p := range partitions {
defs = btIdx.LoadPartition(p, nil, now)
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
processDefs(defs)
}
}
}

40 changes: 38 additions & 2 deletions docs/tools.md
Original file line number Diff line number Diff line change
@@ -191,6 +191,8 @@ global config flags:
-addr string
graphite/metrictank address (default "http://localhost:6060")
-bt-total-partitions int
when using bigtable you must set this to the total number of partitions for the instance if you do not specify partitions with the 'partitions' setting (default -1)
-from string
for vegeta outputs, will generate requests for data starting from now minus... eg '30min', '5h', '14d', etc. or a unix timestamp (default "30min")
-limit int
@@ -221,7 +223,7 @@ tags filter:
'valid' only show metrics whose tags (if any) are valid
'invalid' only show metrics that have one or more invalid tags
idxtype: only 'cass' supported for now
cass config flags:
@@ -276,6 +278,34 @@ cass config flags:
-write-queue-size int
Max number of metricDefs allowed to be unwritten to cassandra (default 100000)
bigtable config flags:
-bigtable-instance string
Name of bigtable instance (default "default")
-create-cf
enable the creation of the table and column families (default true)
-enabled
-gcp-project string
Name of GCP project the bigtable cluster resides in (default "default")
-prune-interval duration
Interval at which the index should be checked for stale series. (default 3h0m0s)
-table-name string
Name of bigtable table used for metricDefs (default "metrics")
-update-bigtable-index
synchronize index changes to bigtable. not all your nodes need to do this. (default true)
-update-interval duration
frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates (default 3h0m0s)
-write-concurrency int
Number of writer threads to use (default 5)
-write-max-flush-size int
Max number of metricDefs in each batch write to bigtable (default 10000)
-write-queue-size int
Max number of metricDefs allowed to be unwritten to bigtable. Must be larger then write-max-flush-size (default 100000)
output:
* presets: dump|list|vegeta-render|vegeta-render-patterns
@@ -297,12 +327,18 @@ output:
roundDuration: formats an integer-seconds duration using aggressive rounding. for the purpose of getting an idea of overal metrics age
EXAMPLES:
Cassandra Examples:
mt-index-cat -from 60min cass -hosts cassandra:9042 list
mt-index-cat -from 60min cass -hosts cassandra:9042 'sumSeries({{.Name | pattern}})'
mt-index-cat -from 60min cass -hosts cassandra:9042 'GET http://localhost:6060/render?target=sumSeries({{.Name | pattern}})&from=-6h\nX-Org-Id: 1\n\n'
mt-index-cat cass -hosts cassandra:9042 -timeout 60s '{{.LastUpdate | age | roundDuration}}\n' | sort | uniq -c
mt-index-cat cass -hosts localhost:9042 -schema-file ../../scripts/config/schema-idx-cassandra.toml '{{.Name | patternCustom 15 "pass" 40 "1rcnw" 15 "2rcnw" 10 "3rcnw" 10 "3rccw" 10 "2rccw"}}\n'
Bigtable Examples:
mt-index-cat -max-stale 0 -bt-total-partitions 128 bt -gcp-project your_project -bigtable-instance the_bt_instance -table-name metric_idx -create-cf false list
mt-index-cat -max-stale 768h -partitions 1,2,3 bt -gcp-project your_project -bigtable-instance the_bt_instance -table-name metric_idx -create-cf false '{{.NameWithTags}} {{.Id}} {{.OrgId}} {{.LastUpdate}} {{.Partition}}
'
```


3 changes: 2 additions & 1 deletion idx/bigtable/config.go
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ func NewIdxConfig() *IdxConfig {

var CliConfig = NewIdxConfig()

func ConfigSetup() {
func ConfigSetup() *flag.FlagSet {
btIdx := flag.NewFlagSet("bigtable-idx", flag.ExitOnError)

btIdx.BoolVar(&CliConfig.Enabled, "enabled", CliConfig.Enabled, "")
@@ -73,6 +73,7 @@ func ConfigSetup() {
btIdx.BoolVar(&CliConfig.CreateCF, "create-cf", CliConfig.CreateCF, "enable the creation of the table and column families")

globalconf.Register("bigtable-idx", btIdx, flag.ExitOnError)
return btIdx
}

func ConfigProcess() {