Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tool for reporting out of order/duplicate metrics grouped by name or tag #112

Merged
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
0a1ed09
Add mt-kafka-mdm-report-out-of-order
pub-djedruszczak Oct 25, 2021
a84c106
Clean up mt-kafka-mdm-report-out-of-order
pub-djedruszczak Oct 25, 2021
f3144be
Restore temporary changes
pub-djedruszczak Oct 29, 2021
eaca2f7
Remove extraneous newline
pub-djedruszczak Oct 29, 2021
31f7cac
Add documentation, add tag encoding check
pub-djedruszczak Oct 29, 2021
3c26ac9
Add documentation
pub-djedruszczak Oct 29, 2021
f20b59a
Fix copy paste issue
pub-djedruszczak Oct 29, 2021
cfc82e3
Move away from mt-kafka-mdm-sniff-out-of-order tracker
pub-djedruszczak Nov 1, 2021
43646b8
Fix latest check
pub-djedruszczak Nov 1, 2021
87fd461
Fix todo
pub-djedruszczak Nov 1, 2021
3721b91
Fix bad comparison
pub-djedruszczak Nov 1, 2021
3d41e88
Refactor processing, save memory on grouping
pub-djedruszczak Nov 2, 2021
824da12
Remove unused flag
pub-djedruszczak Nov 2, 2021
b7fcbf0
Include metric key in index miss error
pub-djedruszczak Nov 2, 2021
107a305
Use k=v logging
pub-djedruszczak Nov 2, 2021
c4a68e4
Be more verbose
pub-djedruszczak Nov 2, 2021
3595046
Remove cassandra-idx flags, rework flags
pub-djedruszczak Nov 2, 2021
4893fa0
Encapsulate tracker
pub-djedruszczak Nov 2, 2021
09d2437
Rework reorder buffer
pub-djedruszczak Nov 2, 2021
db2ba01
Reorder input args
pub-djedruszczak Nov 2, 2021
3a4db64
Remove unused variable
pub-djedruszczak Nov 2, 2021
75faf55
Use DurationVar
pub-djedruszczak Nov 5, 2021
43849f7
Use metric interval from definition
pub-djedruszczak Nov 5, 2021
5694140
Remove legacy arguments length check
pub-djedruszczak Nov 5, 2021
3e94a59
Set flag set usage
pub-djedruszczak Nov 5, 2021
19de257
Add partition to metric miss log
pub-djedruszczak Nov 5, 2021
ecd80d9
Remove metric interval references from usage
pub-djedruszczak Nov 5, 2021
e2e67bf
Use reorder buffer, count duplicates
pub-djedruszczak Nov 8, 2021
d761093
Clean up todos
pub-djedruszczak Nov 8, 2021
92c94ce
Disambiguate flag usage
pub-djedruszczak Nov 8, 2021
c2b376c
Fix reorder window default
pub-djedruszczak Nov 8, 2021
b030cad
Use flag name in flag validation error
pub-djedruszczak Nov 8, 2021
ed89f27
Replace reference to old variable name
pub-djedruszczak Nov 8, 2021
8c62f84
Update tracker documentation
pub-djedruszczak Nov 8, 2021
bb1502c
Propagate partition to errors
pub-djedruszczak Nov 8, 2021
45dbf3c
Use uint for reorder window flag, remove unused struct field
pub-djedruszczak Nov 8, 2021
dd2da74
Use consistent language
pub-djedruszczak Nov 8, 2021
7f6ac8d
Use all flags in second example
pub-djedruszczak Nov 8, 2021
ee7cd84
Initialize reorder buffer for new metrics
pub-djedruszczak Nov 8, 2021
a3b5e7c
Update example output in help
pub-djedruszczak Nov 8, 2021
6a09623
Move aggregation out of finder
pub-djedruszczak Nov 9, 2021
01f62d7
Add forgotten file
pub-djedruszczak Nov 9, 2021
9fae36e
Update usage
pub-djedruszczak Nov 9, 2021
6ae1913
Use print line
pub-djedruszczak Nov 9, 2021
af7d1fd
Update usage
pub-djedruszczak Nov 9, 2021
b24bd67
Update usage
pub-djedruszczak Nov 9, 2021
1898e0b
Group output by classification, then grouping
pub-djedruszczak Nov 9, 2021
eb02e7c
Align help fields and add count example
pub-djedruszczak Nov 9, 2021
499e691
Remove bad return
pub-djedruszczak Nov 10, 2021
16c1a6d
Use declarative statements for output field descriptions
pub-djedruszczak Nov 10, 2021
9492f81
Use helper for percentage calculation
pub-djedruszczak Nov 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions cmd/mt-kafka-mdm-report-out-of-order/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package main

import (
"flag"
"fmt"
"os"
"time"

"github.com/grafana/globalconf"
"github.com/grafana/metrictank/idx/cassandra"
inKafkaMdm "github.com/grafana/metrictank/input/kafkamdm"
log "github.com/sirupsen/logrus"
)

type Flags struct {
flagSet *flag.FlagSet

RunDuration time.Duration
Config string
PartitionFrom int
PartitionTo int
ReorderWindow uint
Prefix string
Substr string
GroupByName bool
GroupByTag string
}

func NewFlags() *Flags {
var flags Flags

flags.flagSet = flag.NewFlagSet("application flags", flag.ExitOnError)
flags.flagSet.DurationVar(&flags.RunDuration, "run-duration", 5*time.Minute, "the duration of time to run the program")
flags.flagSet.StringVar(&flags.Config, "config", "/etc/metrictank/metrictank.ini", "configuration file path")
flags.flagSet.IntVar(&flags.PartitionFrom, "partition-from", 0, "the partition to load the index from")
flags.flagSet.IntVar(&flags.PartitionTo, "partition-to", -1, "load the index from all partitions up to this one (exclusive). If unset, only the partition defined with \"--partition-from\" is loaded from")
flags.flagSet.UintVar(&flags.ReorderWindow, "reorder-window", 1, "the size of the reorder buffer window")
flags.flagSet.StringVar(&flags.Prefix, "prefix", "", "only report metrics with a name that has this prefix")
flags.flagSet.StringVar(&flags.Substr, "substr", "", "only report metrics with a name that has this substring")
flags.flagSet.BoolVar(&flags.GroupByName, "group-by-name", false, "group out-of-order metrics by name")
flags.flagSet.StringVar(&flags.GroupByTag, "group-by-tag", "", "group out-of-order metrics by the specified tag")

flags.flagSet.Usage = flags.Usage
return &flags
}

func (flags *Flags) Parse(args []string) {
err := flags.flagSet.Parse(args)
if err != nil {
log.Fatalf("failed to parse application flags %v: %s", args, err.Error)
os.Exit(1)
}

path := ""
if _, err := os.Stat(flags.Config); err == nil {
path = flags.Config
}
config, err := globalconf.NewWithOptions(&globalconf.Options{
Filename: path,
EnvPrefix: "MT_",
})
if err != nil {
log.Fatalf("error with configuration file: %s", err.Error())
os.Exit(1)
}
_ = cassandra.ConfigSetup()
inKafkaMdm.ConfigSetup()
config.Parse()

if flags.GroupByName == false && flags.GroupByTag == "" {
log.Fatalf("must specify at least one of -group-by-name or -group-by-tag")
os.Exit(1)
}

if flags.ReorderWindow < 1 {
log.Fatalf("-reorder-window must be greater than zero")
os.Exit(1)
}
}

func (flags *Flags) Usage() {
fmt.Fprintln(os.Stderr, "mt-kafka-mdm-report-out-of-order")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Inspects what's flowing through kafka (in mdm format) and reports out of order data grouped by metric name or tag, taking into account the reorder buffer)")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "# Mechanism")
fmt.Fprintln(os.Stderr, "* it sniffs points being added on a per-series (metric Id) level")
fmt.Fprintln(os.Stderr, "* for every series, tracks the last 'correct' point. E.g. a point that was able to be added to the series because its timestamp is higher than any previous timestamp")
fmt.Fprintln(os.Stderr, "* if for any series, a point comes in with a timestamp equal or lower than the last point correct point - which metrictank would not add unless it falls within the reorder buffer - it triggers an event for this out-of-order point")
fmt.Fprintln(os.Stderr, "* the reorder buffer is described by the window size")
fmt.Fprintln(os.Stderr, "Usage:")
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order [flags]")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "out-of-order metrics are printed in the following format based on grouping")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "out-of-order metrics grouped by name:")
fmt.Fprintln(os.Stderr, "out-of-order name=<name1> count=<count1>")
fmt.Fprintln(os.Stderr, "out-of-order name=<name1> count=<count1>")
pub-djedruszczak marked this conversation as resolved.
Show resolved Hide resolved
fmt.Fprintln(os.Stderr, "...")
fmt.Fprintln(os.Stderr, "duplicate metrics grouped by name:")
fmt.Fprintln(os.Stderr, "duplicate name=<name1> count=<count1>")
fmt.Fprintln(os.Stderr, "duplicate name=<name1> count=<count1>")
pub-djedruszczak marked this conversation as resolved.
Show resolved Hide resolved
fmt.Fprintln(os.Stderr, "...")
fmt.Fprintln(os.Stderr, "out-of-order metrics grouped by tag:")
fmt.Fprintln(os.Stderr, "out-of-order tag=<tag1> count=<count1>")
fmt.Fprintln(os.Stderr, "out-of-order tag=<tag2> count=<count2>")
fmt.Fprintln(os.Stderr, "...")
fmt.Fprintln(os.Stderr, "duplicate metrics grouped by tag:")
fmt.Fprintln(os.Stderr, "duplicate tag=<tag1> count=<count1>")
fmt.Fprintln(os.Stderr, "duplicate tag=<tag2> count=<count2>")
fmt.Fprintln(os.Stderr, "...")
fmt.Fprintln(os.Stderr)
fmt.Fprintf(os.Stderr, "flags:")
flags.flagSet.PrintDefaults()
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "EXAMPLES:")
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -config metrictank.ini -partition-from 0")
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -group-by-tag namespace -config metrictank.ini -partition-from 0 -partition-to 3 -reorder-window 5 -run-duration 5m")
}

func ParseFlags() Flags {
flags := NewFlags()

flag.Usage = flags.Usage

flags.Parse(os.Args[1:])

return *flags
}
159 changes: 159 additions & 0 deletions cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

import (
"os"
"strings"
"sync"
"time"

"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/errors"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/schema/msg"
log "github.com/sirupsen/logrus"
)

type Track struct {
Name string
Tags []string
pub-djedruszczak marked this conversation as resolved.
Show resolved Hide resolved

reorderBuffer *mdata.ReorderBuffer
}

type Tracker map[schema.MKey]Track

// find out-of-order and duplicate metrics
type inputOOOFinder struct {
prefix string
substr string

reorderWindow uint32
tracker Tracker

groupByName bool
outOfOrderGroupedByName *map[string]int
duplicatesGroupedByName *map[string]int
groupByTag string
outOfOrderGroupedByTag *map[string]int
duplicatesGroupedByTag *map[string]int
pub-djedruszczak marked this conversation as resolved.
Show resolved Hide resolved

lock sync.Mutex
}

func newInputOOOFinder(prefix string, substr string, partitionFrom int, partitionTo int, reorderWindow uint32, groupByName bool, outOfOrderGroupedByName *map[string]int, duplicatesGroupedByName *map[string]int, groupByTag string, outOfOrderGroupedByTag *map[string]int, duplicatesGroupedByTag *map[string]int) *inputOOOFinder {
cassandraIndex := cassandra.New(cassandra.CliConfig)
err := cassandraIndex.InitBare()
if err != nil {
log.Fatalf("error initializing cassandra index: %s", err.Error())
os.Exit(1)
}

metricDefinitions := make([]schema.MetricDefinition, 0)
for partition := partitionFrom; (partitionTo == -1 && partition == partitionFrom) || (partitionTo > 0 && partition < partitionTo); partition++ {
metricDefinitions = cassandraIndex.LoadPartitions([]int32{int32(partition)}, metricDefinitions, time.Now())
}

tracker := Tracker{}
for _, metricDefinition := range metricDefinitions {
tracker[metricDefinition.Id] = Track{
Name: metricDefinition.Name,
Tags: metricDefinition.Tags,
reorderBuffer: mdata.NewReorderBuffer(reorderWindow, uint32(metricDefinition.Interval), false),
}
}

return &inputOOOFinder{
prefix: prefix,
substr: substr,

reorderWindow: reorderWindow,
tracker: tracker,

groupByName: groupByName,
outOfOrderGroupedByName: outOfOrderGroupedByName,
duplicatesGroupedByName: duplicatesGroupedByName,
groupByTag: groupByTag,
outOfOrderGroupedByTag: outOfOrderGroupedByTag,
duplicatesGroupedByTag: duplicatesGroupedByTag,

lock: sync.Mutex{},
}
}

func (ip *inputOOOFinder) incrementGroupings(groupedByName *map[string]int, groupedByTag *map[string]int, track Track) {
if ip.groupByName == true {
(*groupedByName)[track.Name]++
}

if ip.groupByTag != "" {
for _, tag := range track.Tags {
kv := strings.Split(tag, "=")
if len(kv) != 2 {
log.Errorf("unexpected tag encoding tag=%q", tag)
continue
}
if kv[0] == ip.groupByTag {
(*groupedByTag)[kv[1]]++
}
}
}
}

func (ip *inputOOOFinder) processTrack(metricKey schema.MKey, metricTime int64, track Track, partition int32) {
if ip.prefix != "" && !strings.HasPrefix(track.Name, ip.prefix) {
return
}
if ip.substr != "" && !strings.Contains(track.Name, ip.substr) {
return
}

_, err := track.reorderBuffer.Add(uint32(metricTime), 0) // ignore value
if err == errors.ErrMetricTooOld {
ip.incrementGroupings(ip.outOfOrderGroupedByName, ip.outOfOrderGroupedByTag, track)
} else if err == errors.ErrMetricNewValueForTimestamp {
ip.incrementGroupings(ip.duplicatesGroupedByName, ip.duplicatesGroupedByTag, track)
} else if err != nil {
log.Errorf("failed to add metric with name=%q and timestamp=%d from partition=%d to reorder buffer: %s", track.Name, metricTime, partition, err)
}
}

func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition int32) {
metricKey, err := schema.MKeyFromString(metric.Id)
if err != nil {
log.Errorf("failed to get metric key from id=%v: %s", metric.Id, err.Error())
return
}

ip.lock.Lock()
defer ip.lock.Unlock()
shanson7 marked this conversation as resolved.
Show resolved Hide resolved

track, exists := ip.tracker[metricKey]
if !exists {
ip.tracker[metricKey] = Track{
Name: metric.Name,
Tags: metric.Tags,
reorderBuffer: mdata.NewReorderBuffer(ip.reorderWindow, uint32(metric.Interval), false),
}
return
pub-djedruszczak marked this conversation as resolved.
Show resolved Hide resolved
}

ip.processTrack(metricKey, metric.Time, track, partition)
}

func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.Format, partition int32) {
ip.lock.Lock()
defer ip.lock.Unlock()

track, exists := ip.tracker[mp.MKey]
if !exists {
log.Errorf("track for metric with key=%v from partition=%d not found", mp.MKey, partition)
return
}

ip.processTrack(mp.MKey, int64(mp.Time), track, partition)
}

func (ip *inputOOOFinder) ProcessIndexControlMsg(msg schema.ControlMsg, partition int32) {

}
87 changes: 87 additions & 0 deletions cmd/mt-kafka-mdm-report-out-of-order/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"math/rand"
"os"
"os/signal"
"strconv"
"syscall"
"time"

inKafkaMdm "github.com/grafana/metrictank/input/kafkamdm"
"github.com/grafana/metrictank/logger"
log "github.com/sirupsen/logrus"
)

func configureLogging() {
formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
log.SetLevel(log.InfoLevel)
}

func main() {
configureLogging()

flags := ParseFlags()

inKafkaMdm.ConfigProcess("mt-kafka-mdm-report-out-of-order" + strconv.Itoa(rand.Int()))
kafkaMdm := inKafkaMdm.New()

outOfOrderGroupedByName := map[string]int{}
duplicatesGroupedByName := map[string]int{}
outOfOrderGroupedByTag := map[string]int{}
duplicatesGroupedByTag := map[string]int{}
inputOOOFinder := newInputOOOFinder(
flags.Prefix,
flags.Substr,
flags.PartitionFrom,
flags.PartitionTo,
uint32(flags.ReorderWindow),
flags.GroupByName,
&outOfOrderGroupedByName,
&duplicatesGroupedByName,
flags.GroupByTag,
&outOfOrderGroupedByTag,
&duplicatesGroupedByTag,
)

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

ctx, cancel := context.WithCancel(context.Background())
kafkaMdm.Start(inputOOOFinder, cancel)
select {
case sig := <-sigChan:
log.Infof("Received signal %q. Shutting down", sig)
case <-ctx.Done():
log.Info("Mdm input plugin signalled a fatal error. Shutting down")
case <-time.After(flags.RunDuration):
log.Infof("Finished scanning")
}
kafkaMdm.Stop()

if flags.GroupByName {
pub-djedruszczak marked this conversation as resolved.
Show resolved Hide resolved
log.Info("out-of-order metrics grouped by name:")
for key, value := range outOfOrderGroupedByName {
log.Infof("out-of-order name=%q count=%d", key, value)
}

log.Info("duplicate metrics grouped by name:")
for key, value := range duplicatesGroupedByName {
log.Infof("duplicate name=%q count=%d", key, value)
}
}
if flags.GroupByTag != "" {
log.Info("out-of-order metrics grouped by tag:")
for key, value := range outOfOrderGroupedByTag {
log.Infof("out-of-order tag=%q count=%d", key, value)
}

log.Info("duplicate metrics grouped by tag:")
for key, value := range duplicatesGroupedByTag {
log.Infof("duplicate tag=%q count=%d", key, value)
}
}
}