Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 4907b9e

Browse files
committed
report MetricPoint and MetricPointWithoutOrg separately
1 parent 88249aa commit 4907b9e

File tree

5 files changed

+27
-11
lines changed

5 files changed

+27
-11
lines changed

cmd/mt-kafka-mdm-sniff-out-of-order/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/raintank/worldping-api/pkg/log"
1919
"github.com/rakyll/globalconf"
2020
"gopkg.in/raintank/schema.v1"
21+
"gopkg.in/raintank/schema.v1/msg"
2122
)
2223

2324
var (
@@ -111,7 +112,7 @@ func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition
111112
ip.lock.Unlock()
112113
}
113114

114-
func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, partition int32) {
115+
func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.Format, partition int32) {
115116
now := Msg{
116117
Part: partition,
117118
Seen: time.Now(),

cmd/mt-kafka-mdm-sniff/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/raintank/worldping-api/pkg/log"
1818
"github.com/rakyll/globalconf"
1919
"gopkg.in/raintank/schema.v1"
20+
"gopkg.in/raintank/schema.v1/msg"
2021
)
2122

2223
var (
@@ -72,7 +73,7 @@ func (ip inputPrinter) ProcessMetricData(metric *schema.MetricData, partition in
7273
}
7374
}
7475

75-
func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
76+
func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) {
7677
stdoutLock.Lock()
7778
err := ip.tplP.Execute(os.Stdout, DataP{
7879
partition,

input/input.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import (
66
"fmt"
77
"time"
88

9-
schema "gopkg.in/raintank/schema.v1"
9+
"gopkg.in/raintank/schema.v1"
10+
"gopkg.in/raintank/schema.v1/msg"
1011

1112
"github.com/grafana/metrictank/idx"
1213
"github.com/grafana/metrictank/mdata"
@@ -16,7 +17,7 @@ import (
1617

1718
type Handler interface {
1819
ProcessMetricData(md *schema.MetricData, partition int32)
19-
ProcessMetricPoint(point schema.MetricPoint, partition int32)
20+
ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32)
2021
}
2122

2223
// TODO: clever way to document all metrics for all different inputs
@@ -25,6 +26,7 @@ type Handler interface {
2526
type DefaultHandler struct {
2627
receivedMD *stats.Counter32
2728
receivedMP *stats.Counter32
29+
receivedMPNO *stats.Counter32
2830
invalidMD *stats.Counter32
2931
invalidMP *stats.Counter32
3032
unknownMP *stats.Counter32
@@ -39,6 +41,7 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
3941
return DefaultHandler{
4042
receivedMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.received", input)),
4143
receivedMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.received", input)),
44+
receivedMPNO: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint_no_org.received", input)),
4245
invalidMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.invalid", input)),
4346
invalidMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.invalid", input)),
4447
unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.unknown", input)),
@@ -52,8 +55,12 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
5255

5356
// ProcessMetricPoint updates the index if possible, and stores the data if we have an index entry
5457
// concurrency-safe.
55-
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
56-
in.receivedMP.Inc()
58+
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) {
59+
if format == msg.FormatMetricPoint {
60+
in.receivedMP.Inc()
61+
} else {
62+
in.receivedMPNO.Inc()
63+
}
5764
if !point.Valid() {
5865
in.invalidMP.Inc()
5966
log.Debug("in: Invalid metric %v", point)

input/kafkamdm/kafkamdm.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -353,14 +353,15 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
353353
}
354354

355355
func (k *KafkaMdm) handleMsg(data []byte, partition int32) {
356-
if msg.IsPointMsg(data) {
356+
format, isPointMsg := msg.IsPointMsg(data)
357+
if isPointMsg {
357358
_, point, err := msg.ReadPointMsg(data, uint32(orgId))
358359
if err != nil {
359360
metricsDecodeErr.Inc()
360361
log.Error(3, "kafka-mdm decode error, skipping message. %s", err)
361362
return
362363
}
363-
k.Handler.ProcessMetricPoint(point, partition)
364+
k.Handler.ProcessMetricPoint(point, format, partition)
364365
return
365366
}
366367

vendor/gopkg.in/raintank/schema.v1/msg/msg.go

+9-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)