Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit f8a0abd

Browse files
committed
add mt-kafka-persist-sniff tool
1 parent 3106f88 commit f8a0abd

File tree

4 files changed

+114
-0
lines changed

4 files changed

+114
-0
lines changed

cmd/mt-kafka-persist-sniff/handler.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
7+
"github.com/grafana/metrictank/mdata"
8+
"github.com/raintank/schema"
9+
log "github.com/sirupsen/logrus"
10+
)
11+
12+
type PrintNotifierHandler struct{}
13+
14+
func NewPrintNotifierHandler() PrintNotifierHandler {
15+
return PrintNotifierHandler{}
16+
}
17+
18+
func (dn PrintNotifierHandler) PartitionOf(key schema.MKey) (int32, bool) {
19+
return 0, false
20+
}
21+
22+
func (dn PrintNotifierHandler) Handle(data []byte) {
23+
version := uint8(data[0])
24+
if version == uint8(mdata.PersistMessageBatchV1) {
25+
batch := mdata.PersistMessageBatch{}
26+
err := json.Unmarshal(data[1:], &batch)
27+
if err != nil {
28+
log.Errorf("failed to unmarsh batch message: %s -- skipping", err)
29+
return
30+
}
31+
for _, c := range batch.SavedChunks {
32+
fmt.Printf("%s %d %s\n", batch.Instance, c.T0, c.Key)
33+
}
34+
} else {
35+
log.Errorf("unknown message version %d", version)
36+
}
37+
return
38+
}

cmd/mt-kafka-persist-sniff/main.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"math/rand"
7+
"os"
8+
"os/signal"
9+
"strconv"
10+
"syscall"
11+
12+
"github.com/grafana/metrictank/logger"
13+
"github.com/grafana/metrictank/mdata/notifierKafka"
14+
"github.com/grafana/metrictank/stats"
15+
"github.com/rakyll/globalconf"
16+
log "github.com/sirupsen/logrus"
17+
)
18+
19+
var confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path")
20+
21+
func main() {
22+
flag.Usage = func() {
23+
fmt.Fprintln(os.Stderr, "mt-kafka-persist-sniff")
24+
fmt.Fprintln(os.Stderr, "Print what's flowing through kafka metric persist topic")
25+
fmt.Fprintf(os.Stderr, "\nFlags:\n\n")
26+
flag.PrintDefaults()
27+
}
28+
flag.Parse()
29+
formatter := &logger.TextFormatter{}
30+
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
31+
log.SetFormatter(formatter)
32+
log.SetLevel(log.InfoLevel)
33+
instance := "mt-kafka-persist-sniff" + strconv.Itoa(rand.Int())
34+
35+
// Only try and parse the conf file if it exists
36+
path := ""
37+
if _, err := os.Stat(*confFile); err == nil {
38+
path = *confFile
39+
}
40+
conf, err := globalconf.NewWithOptions(&globalconf.Options{
41+
Filename: path,
42+
EnvPrefix: "MT_",
43+
})
44+
if err != nil {
45+
log.Fatalf("error with configuration file: %s", err.Error())
46+
os.Exit(1)
47+
}
48+
conf.ParseAll()
49+
50+
// config may have had it disabled
51+
notifierKafka.Enabled = true
52+
53+
stats.NewDevnull() // make sure metrics don't pile up without getting discarded
54+
55+
notifierKafka.ConfigProcess(instance)
56+
notifierKafka.New(instance, NewPrintNotifierHandler())
57+
sigChan := make(chan os.Signal, 1)
58+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
59+
sig := <-sigChan
60+
log.Infof("Received signal %q. Shutting down", sig)
61+
// notifierKafka.Stop()
62+
}

docs/tools.md

+13
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,19 @@ Flags:
260260
```
261261

262262

263+
## mt-kafka-persist-sniff
264+
265+
```
266+
mt-kafka-persist-sniff
267+
Print what's flowing through kafka metric persist topic
268+
269+
Flags:
270+
271+
-config string
272+
configuration file path (default "/etc/metrictank/metrictank.ini")
273+
```
274+
275+
263276
## mt-schemas-explain
264277

265278
```

mdata/notifierKafka/notifierKafka.go

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func New(instance string, handler mdata.NotifierHandler) *NotifierKafka {
5151
instance: instance,
5252
in: make(chan mdata.SavedChunk),
5353
bPool: util.NewBufferPool(),
54+
handler: handler,
5455
client: client,
5556
consumer: consumer,
5657
producer: producer,

0 commit comments

Comments
 (0)