3
3
package input
4
4
5
5
import (
6
+ "flag"
6
7
"fmt"
7
8
"math"
8
9
"strconv"
9
10
10
- "github.com/raintank/schema"
11
- "github.com/raintank/schema/msg"
12
-
11
+ "github.com/grafana/globalconf"
13
12
"github.com/grafana/metrictank/idx"
14
13
"github.com/grafana/metrictank/mdata"
15
14
"github.com/grafana/metrictank/stats"
15
+ "github.com/raintank/schema"
16
+ "github.com/raintank/schema/msg"
16
17
log "github.com/sirupsen/logrus"
17
18
)
18
19
20
+ var rejectInvalidTags bool
21
+
22
+ func ConfigSetup () {
23
+ input := flag .NewFlagSet ("input" , flag .ExitOnError )
24
+ input .BoolVar (& rejectInvalidTags , "reject-invalid-tags" , true , "reject received metrics that have invalid tags" )
25
+ globalconf .Register ("input" , input , flag .ExitOnError )
26
+ }
27
+
19
28
type Handler interface {
20
29
ProcessMetricData (md * schema.MetricData , partition int32 )
21
30
ProcessMetricPoint (point schema.MetricPoint , format msg.Format , partition int32 )
@@ -29,6 +38,7 @@ type DefaultHandler struct {
29
38
receivedMP * stats.Counter32
30
39
receivedMPNO * stats.Counter32
31
40
invalidMD * stats.CounterRate32
41
+ invalidTagMD * stats.CounterRate32
32
42
invalidMP * stats.CounterRate32
33
43
unknownMP * stats.Counter32
34
44
@@ -57,6 +67,9 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
57
67
receivedMPNO : stats .NewCounter32 (fmt .Sprintf ("input.%s.metricpoint_no_org.received" , input )),
58
68
// metric input.%s.metricdata.discarded.invalid is a count of times a metricdata was invalid by input plugin
59
69
invalidMD : stats .NewCounterRate32 (fmt .Sprintf ("input.%s.metricdata.discarded.invalid" , input )),
70
+ // metric input.%s.metricdata.discarded.invalid_tags is a count of times a metricdata was considered invalid due to
71
+ // invalid tags in the metric definition. all rejected metrics counted here are also counted in the above "invalid" counter
72
+ invalidTagMD : stats .NewCounterRate32 (fmt .Sprintf ("input.%s.metricdata.discarded.invalid_tag" , input )),
60
73
// metric input.%s.metricpoint.discarded.invalid is a count of times a metricpoint was invalid by input plugin
61
74
invalidMP : stats .NewCounterRate32 (fmt .Sprintf ("input.%s.metricpoint.discarded.invalid" , input )),
62
75
// metric input.%s.metricpoint.discarded.unknown is the count of times the ID of a received metricpoint was not in the index, by input plugin
@@ -103,27 +116,42 @@ func (in DefaultHandler) ProcessMetricData(md *schema.MetricData, partition int3
103
116
err := md .Validate ()
104
117
if err != nil {
105
118
in .invalidMD .Inc ()
106
- log .Debugf ("in: Invalid metric %v: %s" , md , err )
107
-
108
- var reason string
109
- switch err {
110
- case schema .ErrInvalidIntervalzero :
111
- reason = invalidInterval
112
- case schema .ErrInvalidOrgIdzero :
113
- reason = invalidOrgId
114
- case schema .ErrInvalidEmptyName :
115
- reason = invalidName
116
- case schema .ErrInvalidMtype :
117
- reason = invalidMtype
118
- case schema .ErrInvalidTagFormat :
119
- reason = invalidTagFormat
120
- default :
121
- reason = "unknown"
119
+
120
+ ignoreError := false
121
+ // assuming that the tag format was the only issue found by Validate()
122
+ // better make sure that the tag format is the last check which Validate() checks, to not accidentally ignore a potential following error
123
+ if err == schema .ErrInvalidTagFormat {
124
+ in .invalidTagMD .Inc ()
125
+ if ! rejectInvalidTags {
126
+ log .Debugf ("in: Invalid metric %v, not rejecting it because rejection due to invalid tags is disabled: %s" , md , err )
127
+ ignoreError = true
128
+ }
122
129
}
123
- mdata .PromDiscardedSamples .WithLabelValues (reason , strconv .Itoa (md .OrgId )).Inc ()
124
130
125
- return
131
+ if ! ignoreError {
132
+ log .Debugf ("in: Invalid metric %v: %s" , md , err )
133
+
134
+ var reason string
135
+ switch err {
136
+ case schema .ErrInvalidIntervalzero :
137
+ reason = invalidInterval
138
+ case schema .ErrInvalidOrgIdzero :
139
+ reason = invalidOrgId
140
+ case schema .ErrInvalidEmptyName :
141
+ reason = invalidName
142
+ case schema .ErrInvalidMtype :
143
+ reason = invalidMtype
144
+ case schema .ErrInvalidTagFormat :
145
+ reason = invalidTagFormat
146
+ default :
147
+ reason = "unknown"
148
+ }
149
+ mdata .PromDiscardedSamples .WithLabelValues (reason , strconv .Itoa (md .OrgId )).Inc ()
150
+
151
+ return
152
+ }
126
153
}
154
+
127
155
// in cassandra we store timestamps and interval as 32bit signed integers.
128
156
// math.MaxInt32 = Jan 19 03:14:07 UTC 2038
129
157
if md .Time <= 0 || md .Time >= math .MaxInt32 {
0 commit comments