forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reversedns.go
156 lines (135 loc) · 4.84 KB
/
reversedns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package reverse_dns
import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel"
)
const sampleConfig = `
## For optimal performance, you may want to limit which metrics are passed to this
## processor. eg:
## namepass = ["my_metric_*"]
## cache_ttl is how long the dns entries should stay cached for.
## generally longer is better, but if you expect a large number of diverse lookups
## you'll want to consider memory use.
cache_ttl = "24h"
## lookup_timeout is how long should you wait for a single dns request to repsond.
## this is also the maximum acceptable latency for a metric travelling through
## the reverse_dns processor. After lookup_timeout is exceeded, a metric will
## be passed on unaltered.
## multiple simultaneous resolution requests for the same IP will only make a
## single rDNS request, and they will all wait for the answer for this long.
lookup_timeout = "3s"
## max_parallel_lookups is the maximum number of dns requests to be in flight
## at the same time. Requesting hitting cached values do not count against this
## total, and neither do mulptiple requests for the same IP.
## It's probably best to keep this number fairly low.
max_parallel_lookups = 10
## ordered controls whether or not the metrics need to stay in the same order
## this plugin received them in. If false, this plugin will change the order
## with requests hitting cached results moving through immediately and not
## waiting on slower lookups. This may cause issues for you if you are
## depending on the order of metrics staying the same. If so, set this to true.
## keeping the metrics ordered may be slightly slower.
ordered = false
[[processors.reverse_dns.lookup]]
## get the ip from the field "source_ip", and put the result in the field "source_name"
field = "source_ip"
dest = "source_name"
[[processors.reverse_dns.lookup]]
## get the ip from the tag "destination_ip", and put the result in the tag
## "destination_name".
tag = "destination_ip"
dest = "destination_name"
## If you would prefer destination_name to be a field instead, you can use a
## processors.converter after this one, specifying the order attribute.
`
type lookupEntry struct {
Tag string `toml:"tag"`
Field string `toml:"field"`
Dest string `toml:"dest"`
}
type ReverseDNS struct {
reverseDNSCache *ReverseDNSCache
acc telegraf.Accumulator
parallel parallel.Parallel
Lookups []lookupEntry `toml:"lookup"`
CacheTTL config.Duration `toml:"cache_ttl"`
LookupTimeout config.Duration `toml:"lookup_timeout"`
MaxParallelLookups int `toml:"max_parallel_lookups"`
Ordered bool `toml:"ordered"`
Log telegraf.Logger `toml:"-"`
}
func (r *ReverseDNS) SampleConfig() string {
return sampleConfig
}
func (r *ReverseDNS) Description() string {
return "ReverseDNS does a reverse lookup on IP addresses to retrieve the DNS name"
}
func (r *ReverseDNS) Start(acc telegraf.Accumulator) error {
r.acc = acc
r.reverseDNSCache = NewReverseDNSCache(
time.Duration(r.CacheTTL),
time.Duration(r.LookupTimeout),
r.MaxParallelLookups, // max parallel reverse-dns lookups
)
if r.Ordered {
r.parallel = parallel.NewOrdered(acc, r.asyncAdd, 10000, r.MaxParallelLookups)
} else {
r.parallel = parallel.NewUnordered(acc, r.asyncAdd, r.MaxParallelLookups)
}
return nil
}
func (r *ReverseDNS) Stop() error {
r.parallel.Stop()
r.reverseDNSCache.Stop()
return nil
}
func (r *ReverseDNS) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
r.parallel.Enqueue(metric)
return nil
}
func (r *ReverseDNS) asyncAdd(metric telegraf.Metric) []telegraf.Metric {
for _, lookup := range r.Lookups {
if len(lookup.Field) > 0 {
if ipField, ok := metric.GetField(lookup.Field); ok {
if ip, ok := ipField.(string); ok {
result, err := r.reverseDNSCache.Lookup(ip)
if err != nil {
r.Log.Errorf("lookup error: %v", err)
continue
}
if len(result) > 0 {
metric.AddField(lookup.Dest, result[0])
}
}
}
}
if len(lookup.Tag) > 0 {
if ipTag, ok := metric.GetTag(lookup.Tag); ok {
result, err := r.reverseDNSCache.Lookup(ipTag)
if err != nil {
r.Log.Errorf("lookup error: %v", err)
continue
}
if len(result) > 0 {
metric.AddTag(lookup.Dest, result[0])
}
}
}
}
return []telegraf.Metric{metric}
}
func init() {
processors.AddStreaming("reverse_dns", func() telegraf.StreamingProcessor {
return newReverseDNS()
})
}
func newReverseDNS() *ReverseDNS {
return &ReverseDNS{
CacheTTL: config.Duration(24 * time.Hour),
LookupTimeout: config.Duration(1 * time.Minute),
MaxParallelLookups: 10,
}
}