Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Add tool for reporting out of order and duplicate metrics #2014

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/cmd/mt-index-migrate/mt-index-migrate
/cmd/mt-index-prune/mt-index-prune
/cmd/mt-indexdump-rules-analyzer/mt-indexdump-rules-analyzer
/cmd/mt-kafka-mdm-report-out-of-order/mt-kafka-mdm-report-out-of-order
/cmd/mt-kafka-mdm-sniff-out-of-order/mt-kafka-mdm-sniff-out-of-order
/cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff
/cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff
Expand Down
50 changes: 50 additions & 0 deletions cmd/mt-kafka-mdm-report-out-of-order/aggregate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"strings"

log "github.com/sirupsen/logrus"
)

type Aggregate struct {
Count int
OutOfOrderCount int
DuplicateCount int
}

func aggregateByName(tracker Tracker) map[string]Aggregate {
aggregates := map[string]Aggregate{}

for _, track := range tracker {
aggregate, _ := aggregates[track.Name]
aggregate.Count += track.Count
aggregate.OutOfOrderCount += track.OutOfOrderCount
aggregate.DuplicateCount += track.DuplicateCount
aggregates[track.Name] = aggregate
}

return aggregates
}

func aggregateByTag(tracker Tracker, groupByTag string) map[string]Aggregate {
aggregates := map[string]Aggregate{}

for _, track := range tracker {
for _, tag := range track.Tags {
kv := strings.Split(tag, "=")
if len(kv) != 2 {
log.Errorf("unexpected tag encoding for metric with name=%q tag=%q", track.Name, tag)
continue
}
if kv[0] == groupByTag {
aggregate, _ := aggregates[kv[1]]
aggregate.Count += track.Count
aggregate.OutOfOrderCount += track.OutOfOrderCount
aggregate.DuplicateCount += track.DuplicateCount
aggregates[kv[1]] = aggregate
}
}
}

return aggregates
}
146 changes: 146 additions & 0 deletions cmd/mt-kafka-mdm-report-out-of-order/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package main

import (
"flag"
"fmt"
"os"
"time"

"github.com/grafana/globalconf"
"github.com/grafana/metrictank/idx/cassandra"
inKafkaMdm "github.com/grafana/metrictank/input/kafkamdm"
log "github.com/sirupsen/logrus"
)

func ParseFlags() Flags {
flags := NewFlags()

flag.Usage = flags.Usage

flags.Parse(os.Args[1:])

return *flags
}

type Flags struct {
flagSet *flag.FlagSet

RunDuration time.Duration
Config string
PartitionFrom int
PartitionTo int
ReorderWindow uint
Prefix string
Substr string
GroupByName bool
GroupByTag string
}

func NewFlags() *Flags {
var flags Flags

flags.flagSet = flag.NewFlagSet("application flags", flag.ExitOnError)
flags.flagSet.DurationVar(&flags.RunDuration, "run-duration", 5*time.Minute, "the duration of time to run the program")
flags.flagSet.StringVar(&flags.Config, "config", "/etc/metrictank/metrictank.ini", "configuration file path")
flags.flagSet.IntVar(&flags.PartitionFrom, "partition-from", 0, "the partition to load the index from")
flags.flagSet.IntVar(&flags.PartitionTo, "partition-to", -1, "load the index from all partitions up to this one (exclusive). If unset, only the partition defined with \"--partition-from\" is loaded from")
flags.flagSet.UintVar(&flags.ReorderWindow, "reorder-window", 1, "the size of the reorder buffer window")
flags.flagSet.StringVar(&flags.Prefix, "prefix", "", "only report metrics with a name that has this prefix")
flags.flagSet.StringVar(&flags.Substr, "substr", "", "only report metrics with a name that has this substring")
flags.flagSet.BoolVar(&flags.GroupByName, "group-by-name", false, "group out-of-order metrics by name")
flags.flagSet.StringVar(&flags.GroupByTag, "group-by-tag", "", "group out-of-order metrics by the specified tag")

flags.flagSet.Usage = flags.Usage
return &flags
}

func (flags *Flags) Parse(args []string) {
err := flags.flagSet.Parse(args)
if err != nil {
log.Fatalf("failed to parse application flags %v: %s", args, err.Error())
os.Exit(1)
}

path := ""
if _, err := os.Stat(flags.Config); err == nil {
path = flags.Config
}
config, err := globalconf.NewWithOptions(&globalconf.Options{
Filename: path,
EnvPrefix: "MT_",
})
if err != nil {
log.Fatalf("error with configuration file: %s", err.Error())
os.Exit(1)
}
_ = cassandra.ConfigSetup()
inKafkaMdm.ConfigSetup()
config.Parse()

if flags.GroupByName == false && flags.GroupByTag == "" {
log.Fatalf("must specify at least one of -group-by-name or -group-by-tag")
os.Exit(1)
}

if flags.ReorderWindow < 1 {
log.Fatalf("-reorder-window must be greater than zero")
os.Exit(1)
}
}

func (flags *Flags) Usage() {
fmt.Fprintln(os.Stderr, "mt-kafka-mdm-report-out-of-order")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Inspects what's flowing through kafka (in mdm format) and reports out of order data grouped by metric name or tag, taking into account the reorder buffer)")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "# Mechanism")
fmt.Fprintln(os.Stderr, "* it sniffs points being added on a per-series (metric Id) level")
fmt.Fprintln(os.Stderr, "* for every series, tracks the last 'correct' point. E.g. a point that was able to be added to the series because its timestamp is higher than any previous timestamp")
fmt.Fprintln(os.Stderr, "* if for any series, a point comes in with a timestamp equal or lower than the last point correct point - which metrictank would not add unless it falls within the reorder buffer - it triggers an event for this out-of-order point")
fmt.Fprintln(os.Stderr, "* the reorder buffer is described by the window size")
fmt.Fprintln(os.Stderr, "Usage:")
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order [flags]")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Example output:")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, " total metric points count=2710806")
fmt.Fprintln(os.Stderr, " total out-of-order metric points count=3878")
fmt.Fprintln(os.Stderr, " out-of-order metric points grouped by name:")
fmt.Fprintln(os.Stderr, " out-of-order metric points for name=\"fruit.weight\" count=4 percentGroup=4.301075 percentClass=0.096131 percentTotal=0.000129")
fmt.Fprintln(os.Stderr, " out-of-order metric points for name=\"fruit.height\" count=1 percentGroup=4.545455 percentClass=0.024033 percentTotal=0.000032")
fmt.Fprintln(os.Stderr, " ...")
fmt.Fprintln(os.Stderr, " out-of-order metric points grouped by tag=\"fruit\":")
fmt.Fprintln(os.Stderr, " out-of-order metric points for tag=\"fruit\" value=\"apple\" count=80 percentGroup=5.856515 percentClass=2.062919 percentTotal=0.002951")
fmt.Fprintln(os.Stderr, " out-of-order metric points for tag=\"fruit\" value=\"orange\" count=2912 percentGroup=0.306267 percentClass=75.090253 percentTotal=0.107422")
fmt.Fprintln(os.Stderr, " ...")
fmt.Fprintln(os.Stderr, " total duplicate metric points count=12760")
fmt.Fprintln(os.Stderr, " duplicate metric points grouped by name:")
fmt.Fprintln(os.Stderr, " duplicate metric points for name=\"fruit.width\" count=105 percentGroup=19.266055 percentClass=0.760704 percentTotal=0.003397")
fmt.Fprintln(os.Stderr, " duplicate metric points for name=\"fruit.length\" count=123 percentGroup=15.688776 percentClass=0.891111 percentTotal=0.003979")
fmt.Fprintln(os.Stderr, " ...")
fmt.Fprintln(os.Stderr, " duplicate metric points grouped by tag=\"fruit\":")
fmt.Fprintln(os.Stderr, " duplicate metric points for tag=\"fruit\" value=\"banana\" count=4002 percentGroup=17.201066 percentClass=31.363636 percentTotal=0.147631")
fmt.Fprintln(os.Stderr, " duplicate metric points for tag=\"fruit\" value=\"orange\" count=4796 percentGroup=0.504415 percentClass=37.586207 percentTotal=0.176922")
fmt.Fprintln(os.Stderr, " ...")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Fields:")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, " name: the name of the metric (when grouped by name)")
fmt.Fprintln(os.Stderr, " tag: the tag key (when grouped by tag)")
fmt.Fprintln(os.Stderr, " value: the tag value (when grouped by tag)")
fmt.Fprintln(os.Stderr, " count: the number of metric points")
fmt.Fprintln(os.Stderr, " the example above shows that 4002 metric points that had tag \"fruit\"=\"banana\" were duplicates")
fmt.Fprintln(os.Stderr, " percentGroup: the percentage of all of the metric points which had the same name/tag (depending on grouping) that were out of order/duplicates (depending on classification)")
fmt.Fprintln(os.Stderr, " the example above shows that ~4.301% of all metric points with name \"fruit.weight\" were out of order")
fmt.Fprintln(os.Stderr, " percentClass: the percentage of all of the metric points which were out of order/duplicates (depending on classification) that had this name/tag (depending on grouping)")
fmt.Fprintln(os.Stderr, " the example above shows that ~2.063% of all metric points that were out of order had tag fruit=apple")
fmt.Fprintln(os.Stderr, " percentTotal: the percentage of all metric points that had this name/tag (depending on grouping) and were out of order/duplicates (depending on classification)")
fmt.Fprintln(os.Stderr, " the example above shows that ~0.177% of all metric points had tag \"fruit\"=\"orange\" and were duplicates")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "flags:")
flags.flagSet.PrintDefaults()
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "EXAMPLES:")
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -config metrictank.ini -partition-from 0")
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -group-by-tag namespace -config metrictank.ini -partition-from 0 -partition-to 3 -reorder-window 5 -run-duration 5m")
}
130 changes: 130 additions & 0 deletions cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package main

import (
"os"
"sync"
"time"

"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/errors"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/schema/msg"
log "github.com/sirupsen/logrus"
)

// find out-of-order and duplicate metrics
type inputOOOFinder struct {
reorderWindow uint32
tracker Tracker

lock sync.Mutex
}

func newInputOOOFinder(partitionFrom int, partitionTo int, reorderWindow uint32) *inputOOOFinder {
cassandraIndex := cassandra.New(cassandra.CliConfig)
err := cassandraIndex.InitBare()
if err != nil {
log.Fatalf("error initializing cassandra index: %s", err.Error())
os.Exit(1)
}

metricDefinitions := make([]schema.MetricDefinition, 0)
for partition := partitionFrom; (partitionTo == -1 && partition == partitionFrom) || (partitionTo > 0 && partition < partitionTo); partition++ {
metricDefinitions = cassandraIndex.LoadPartitions([]int32{int32(partition)}, metricDefinitions, time.Now())
}

tracker := Tracker{}
for _, metricDefinition := range metricDefinitions {
tracker[metricDefinition.Id] = Track{
Name: metricDefinition.Name,
Tags: metricDefinition.Tags,
reorderBuffer: mdata.NewReorderBuffer(reorderWindow, uint32(metricDefinition.Interval), false),
Count: 0,
OutOfOrderCount: 0,
DuplicateCount: 0,
}
}

return &inputOOOFinder{
reorderWindow: reorderWindow,
tracker: tracker,

lock: sync.Mutex{},
}
}

func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition int32) {
metricKey, err := schema.MKeyFromString(metric.Id)
if err != nil {
log.Errorf("failed to get metric key from id=%v: %s", metric.Id, err.Error())
return
}

ip.lock.Lock()
defer ip.lock.Unlock()

track, exists := ip.tracker[metricKey]
if !exists {
track = Track{
Name: metric.Name,
Tags: metric.Tags,
reorderBuffer: mdata.NewReorderBuffer(ip.reorderWindow, uint32(metric.Interval), false),
Count: 0,
OutOfOrderCount: 0,
DuplicateCount: 0,
}
}

ip.incrementCounts(metricKey, metric.Time, track, partition)
}

func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.Format, partition int32) {
ip.lock.Lock()
defer ip.lock.Unlock()

track, exists := ip.tracker[mp.MKey]
if !exists {
log.Errorf("track for metric with key=%v from partition=%d not found", mp.MKey, partition)
return
}

ip.incrementCounts(mp.MKey, int64(mp.Time), track, partition)
}

func (ip *inputOOOFinder) ProcessIndexControlMsg(msg schema.ControlMsg, partition int32) {

}

func (ip *inputOOOFinder) incrementCounts(metricKey schema.MKey, metricTime int64, track Track, partition int32) {
track.Count++

_, _, err := track.reorderBuffer.Add(uint32(metricTime), 0) // ignore value
if err == errors.ErrMetricTooOld {
track.OutOfOrderCount++
} else if err == errors.ErrMetricNewValueForTimestamp {
track.DuplicateCount++
} else if err != nil {
log.Errorf("failed to add metric with Name=%q and timestamp=%d from partition=%d to reorder buffer: %s", track.Name, metricTime, partition, err)
return
}

ip.tracker[metricKey] = track
}

func (ip inputOOOFinder) Tracker() Tracker {
return ip.tracker
}

type Tracker map[schema.MKey]Track

type Track struct {
Name string
Tags []string

reorderBuffer *mdata.ReorderBuffer

Count int
OutOfOrderCount int
DuplicateCount int
}
Loading