Skip to content

Commit

Permalink
dns processor - Add A, AAAA, and TXT query support
Browse files Browse the repository at this point in the history
The dns processor previously supported only reverse DNS lookups.
This adds support for performing A, AAAA, and TXT record queries.

The response.ptr.histogram metric was renamed to request_duration.histogram.
This naming allows the metric to represent the duration of the DNS request
for all query types.

Closes elastic#11416
  • Loading branch information
andrewkroh committed Aug 22, 2023
1 parent ecc82aa commit ed0f822
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 157 deletions.
68 changes: 34 additions & 34 deletions libbeat/processors/dns/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,38 @@ import (
"github.com/elastic/elastic-agent-libs/monitoring"
)

type ptrRecord struct {
host string
type successRecord struct {
data []string
expires time.Time
}

func (r ptrRecord) IsExpired(now time.Time) bool {
func (r successRecord) IsExpired(now time.Time) bool {
return now.After(r.expires)
}

type ptrCache struct {
type successCache struct {
sync.RWMutex
data map[string]ptrRecord
data map[string]successRecord
maxSize int
minSuccessTTL time.Duration
}

func (c *ptrCache) set(now time.Time, key string, ptr *PTR) {
func (c *successCache) set(now time.Time, key string, result *result) {
c.Lock()
defer c.Unlock()

if len(c.data) >= c.maxSize {
c.evict()
}

c.data[key] = ptrRecord{
host: ptr.Host,
expires: now.Add(time.Duration(ptr.TTL) * time.Second),
c.data[key] = successRecord{
data: result.Data,
expires: now.Add(time.Duration(result.TTL) * time.Second),
}
}

// evict removes a single random key from the cache.
func (c *ptrCache) evict() {
func (c *successCache) evict() {
var key string
for k := range c.data {
key = k
Expand All @@ -64,13 +64,13 @@ func (c *ptrCache) evict() {
delete(c.data, key)
}

func (c *ptrCache) get(now time.Time, key string) *PTR {
func (c *successCache) get(now time.Time, key string) *result {
c.RLock()
defer c.RUnlock()

r, found := c.data[key]
if found && !r.IsExpired(now) {
return &PTR{r.host, uint32(r.expires.Sub(now) / time.Second)}
return &result{r.data, uint32(r.expires.Sub(now) / time.Second)}
}
return nil
}
Expand Down Expand Up @@ -132,13 +132,13 @@ type cachedError struct {
func (ce *cachedError) Error() string { return ce.err.Error() + " (from failure cache)" }
func (ce *cachedError) Cause() error { return ce.err }

// PTRLookupCache is a cache for storing and retrieving the results of
// reverse DNS queries. It caches the results of queries regardless of their
// LookupCache is a cache for storing and retrieving the results of
// DNS queries. It caches the results of queries regardless of their
// outcome (success or failure).
type PTRLookupCache struct {
success *ptrCache
type LookupCache struct {
success *successCache
failure *failureCache
resolver PTRResolver
resolver resolver
stats cacheStats
}

Expand All @@ -147,15 +147,15 @@ type cacheStats struct {
Miss *monitoring.Int
}

// NewPTRLookupCache returns a new cache.
func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRResolver) (*PTRLookupCache, error) {
// NewLookupCache returns a new cache.
func NewLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver resolver) (*LookupCache, error) {
if err := conf.Validate(); err != nil {
return nil, err
}

c := &PTRLookupCache{
success: &ptrCache{
data: make(map[string]ptrRecord, conf.SuccessCache.InitialCapacity),
c := &LookupCache{
success: &successCache{
data: make(map[string]successRecord, conf.SuccessCache.InitialCapacity),
maxSize: conf.SuccessCache.MaxCapacity,
minSuccessTTL: conf.SuccessCache.MinTTL,
},
Expand All @@ -174,36 +174,36 @@ func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRR
return c, nil
}

// LookupPTR performs a reverse lookup on the given IP address. A cached result
// Lookup performs a lookup on the given query string. A cached result
// will be returned if it is contained in the cache, otherwise a lookup is
// performed.
func (c PTRLookupCache) LookupPTR(ip string) (*PTR, error) {
func (c LookupCache) Lookup(q string, qt queryType) (*result, error) {
now := time.Now()

ptr := c.success.get(now, ip)
if ptr != nil {
r := c.success.get(now, q)
if r != nil {
c.stats.Hit.Inc()
return ptr, nil
return r, nil
}

err := c.failure.get(now, ip)
err := c.failure.get(now, q)
if err != nil {
c.stats.Hit.Inc()
return nil, err
}
c.stats.Miss.Inc()

ptr, err = c.resolver.LookupPTR(ip)
r, err = c.resolver.Lookup(q, qt)
if err != nil {
c.failure.set(now, ip, &cachedError{err})
c.failure.set(now, q, &cachedError{err})
return nil, err
}

// We set the ptr.TTL to the minimum TTL in case it is less than that.
ptr.TTL = max(ptr.TTL, uint32(c.success.minSuccessTTL/time.Second))
// We set the result TTL to the minimum TTL in case it is less than that.
r.TTL = max(r.TTL, uint32(c.success.minSuccessTTL/time.Second))

c.success.set(now, ip, ptr)
return ptr, nil
c.success.set(now, q, r)
return r, nil
}

func max(a, b uint32) uint32 {
Expand Down
52 changes: 26 additions & 26 deletions libbeat/processors/dns/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,85 +29,85 @@ import (

type stubResolver struct{}

func (r *stubResolver) LookupPTR(ip string) (*PTR, error) {
func (r *stubResolver) Lookup(ip string, _ queryType) (*result, error) {
switch ip {
case gatewayIP:
return &PTR{Host: gatewayName, TTL: gatewayTTL}, nil
return &result{Data: []string{gatewayName}, TTL: gatewayTTL}, nil
case gatewayIP + "1":
return nil, io.ErrUnexpectedEOF
case gatewayIP + "2":
return &PTR{Host: gatewayName, TTL: 0}, nil
return &result{Data: []string{gatewayName}, TTL: 0}, nil
}
return nil, &dnsError{"fake lookup returned NXDOMAIN"}
}

func TestCache(t *testing.T) {
c, err := NewPTRLookupCache(
c, err := NewLookupCache(
monitoring.NewRegistry(),
defaultConfig.CacheConfig,
defaultConfig().CacheConfig,
&stubResolver{})
if err != nil {
t.Fatal(err)
}

// Initial success query.
ptr, err := c.LookupPTR(gatewayIP)
r, err := c.Lookup(gatewayIP, typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, gatewayTTL, ptr.TTL)
assert.EqualValues(t, []string{gatewayName}, r.Data)
assert.EqualValues(t, gatewayTTL, r.TTL)
assert.EqualValues(t, 0, c.stats.Hit.Get())
assert.EqualValues(t, 1, c.stats.Miss.Get())
}

// Cached success query.
ptr, err = c.LookupPTR(gatewayIP)
r, err = c.Lookup(gatewayIP, typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)
// TTL counts down while in cache.
assert.InDelta(t, gatewayTTL, ptr.TTL, 1)
assert.InDelta(t, gatewayTTL, r.TTL, 1)
assert.EqualValues(t, 1, c.stats.Hit.Get())
assert.EqualValues(t, 1, c.stats.Miss.Get())
}

// Initial failure query (like a dns error response code).
ptr, err = c.LookupPTR(gatewayIP + "0")
r, err = c.Lookup(gatewayIP+"0", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 1, c.stats.Hit.Get())
assert.EqualValues(t, 2, c.stats.Miss.Get())
}

// Cached failure query.
ptr, err = c.LookupPTR(gatewayIP + "0")
r, err = c.Lookup(gatewayIP+"0", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 2, c.stats.Hit.Get())
assert.EqualValues(t, 2, c.stats.Miss.Get())
}

// Initial network failure (like I/O timeout).
ptr, err = c.LookupPTR(gatewayIP + "1")
r, err = c.Lookup(gatewayIP+"1", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 2, c.stats.Hit.Get())
assert.EqualValues(t, 3, c.stats.Miss.Get())
}

// Check for a cache hit for the network failure.
ptr, err = c.LookupPTR(gatewayIP + "1")
r, err = c.Lookup(gatewayIP+"1", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 3, c.stats.Hit.Get())
assert.EqualValues(t, 3, c.stats.Miss.Get()) // Cache miss.
}

minTTL := defaultConfig.CacheConfig.SuccessCache.MinTTL
minTTL := defaultConfig().CacheConfig.SuccessCache.MinTTL
// Initial success returned TTL=0 with MinTTL.
ptr, err = c.LookupPTR(gatewayIP + "2")
r, err = c.Lookup(gatewayIP+"2", typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)

assert.EqualValues(t, minTTL/time.Second, ptr.TTL)
assert.EqualValues(t, minTTL/time.Second, r.TTL)
assert.EqualValues(t, 3, c.stats.Hit.Get())
assert.EqualValues(t, 4, c.stats.Miss.Get())

Expand All @@ -117,11 +117,11 @@ func TestCache(t *testing.T) {
}

// Cached success from a previous TTL=0 response.
ptr, err = c.LookupPTR(gatewayIP + "2")
r, err = c.Lookup(gatewayIP+"2", typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)
// TTL counts down while in cache.
assert.InDelta(t, minTTL/time.Second, ptr.TTL, 1)
assert.InDelta(t, minTTL/time.Second, r.TTL, 1)
assert.EqualValues(t, 4, c.stats.Hit.Get())
assert.EqualValues(t, 4, c.stats.Miss.Get())
}
Expand Down
80 changes: 55 additions & 25 deletions libbeat/processors/dns/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"strings"
"time"

"github.com/miekg/dns"

"github.com/elastic/elastic-agent-libs/mapstr"
)

Expand All @@ -31,7 +33,7 @@ type Config struct {
CacheConfig `config:",inline"`
Nameservers []string `config:"nameservers"` // Required on Windows. /etc/resolv.conf is used if none are given.
Timeout time.Duration `config:"timeout"` // Per request timeout (with 2 nameservers the total timeout would be 2x).
Type string `config:"type" validate:"required"` // Reverse is the only supported type currently.
Type queryType `config:"type" validate:"required"` // Reverse is the only supported type currently.
Action FieldAction `config:"action"` // Append or replace (defaults to append) when target exists.
TagOnFailure []string `config:"tag_on_failure"` // Tags to append when a failure occurs.
Fields mapstr.M `config:"fields"` // Mapping of source fields to target fields.
Expand Down Expand Up @@ -75,6 +77,41 @@ func (fa *FieldAction) Unpack(v string) error {
return nil
}

// queryType represents a DNS query type.
type queryType uint16

const (
typePTR = queryType(dns.TypePTR)
typeA = queryType(dns.TypeA)
typeAAAA = queryType(dns.TypeAAAA)
typeTXT = queryType(dns.TypeTXT)
)

func (qt queryType) String() string {
if name := dns.TypeToString[uint16(qt)]; name != "" {
return name
}
return strconv.FormatUint(uint64(qt), 10)
}

// Unpack unpacks a string to a queryType.
func (qt *queryType) Unpack(v string) error {
switch strings.ToLower(v) {
case "a":
*qt = typeA
case "aaaa":
*qt = typeAAAA
case "reverse", "ptr":
*qt = typePTR
case "txt":
*qt = typeTXT
default:
return fmt.Errorf("invalid dns lookup type '%v' specified in "+
"config (valid values are: A, AAAA, PTR, reverse, TXT)", v)
}
return nil
}

// CacheConfig defines the success and failure caching parameters.
type CacheConfig struct {
SuccessCache CacheSettings `config:"success_cache"`
Expand All @@ -100,15 +137,6 @@ type CacheSettings struct {

// Validate validates the data contained in the config.
func (c *Config) Validate() error {
// Validate lookup type.
c.Type = strings.ToLower(c.Type)
switch c.Type {
case "reverse":
default:
return fmt.Errorf("invalid dns lookup type '%v' specified in "+
"config (valid values are: reverse)", c.Type)
}

// Flatten the mapping of source fields to target fields.
c.reverseFlat = map[string]string{}
for k, v := range c.Fields.Flatten() {
Expand Down Expand Up @@ -157,20 +185,22 @@ func (c *CacheConfig) Validate() error {
return nil
}

var defaultConfig = Config{
CacheConfig: CacheConfig{
SuccessCache: CacheSettings{
MinTTL: time.Minute,
InitialCapacity: 1000,
MaxCapacity: 10000,
func defaultConfig() Config {
return Config{
CacheConfig: CacheConfig{
SuccessCache: CacheSettings{
MinTTL: time.Minute,
InitialCapacity: 1000,
MaxCapacity: 10000,
},
FailureCache: CacheSettings{
MinTTL: time.Minute,
TTL: time.Minute,
InitialCapacity: 1000,
MaxCapacity: 10000,
},
},
FailureCache: CacheSettings{
MinTTL: time.Minute,
TTL: time.Minute,
InitialCapacity: 1000,
MaxCapacity: 10000,
},
},
Transport: "udp",
Timeout: 500 * time.Millisecond,
Transport: "udp",
Timeout: 500 * time.Millisecond,
}
}
Loading

0 comments on commit ed0f822

Please sign in to comment.