Skip to content

Commit

Permalink
feat(inputs.suricata) v2 message parsing
Browse files Browse the repository at this point in the history
Adds a new v2 message parsing that will parse any event type that is
received.

fixes: influxdata#13032
  • Loading branch information
powersj committed Apr 11, 2023
1 parent 4b91ab0 commit 0d16601
Show file tree
Hide file tree
Showing 11 changed files with 710 additions and 32 deletions.
35 changes: 24 additions & 11 deletions plugins/inputs/suricata/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,30 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Suricata stats and alerts plugin
[[inputs.suricata]]
## Data sink for Suricata stats log.
# This is expected to be a filename of a
# unix socket to be created for listening.
source = "/var/run/suricata-stats.sock"

# Delimiter for flattening field keys, e.g. subitem "alert" of "detect"
# becomes "detect_alert" when delimiter is "_".
delimiter = "_"

# Detect alert logs
alerts = false
## Source
## Data sink for Suricata stats log. This is expected to be a filename of a
## unix socket to be created for listening.
# source = "/var/run/suricata-stats.sock"

## Delimiter
## Used for flattening field keys, e.g. subitem "alert" of "detect" becomes
## "detect_alert" when delimiter is "_".
# delimiter = "_"

## Metric version
## Version 1 only collects stats and optionally will look for alerts if
## the configuration setting alerts is set to true.
## Version 2 parses any event type message by default and produced metrics
## under a single metric name using a tag to differentiate between event
## types. The timestamp for the message is applied to the generated metric.
## Additional tags and fields are included as well.
# version = "1"

## Alerts
## In metric version 1, only status is captured by default, alerts must be
## turned on with this configuration option. This option does not apply for
## metric version 2.
# alerts = false
```

## Metrics
Expand Down
31 changes: 22 additions & 9 deletions plugins/inputs/suricata/sample.conf
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
# Suricata stats and alerts plugin
[[inputs.suricata]]
## Data sink for Suricata stats log.
# This is expected to be a filename of a
# unix socket to be created for listening.
source = "/var/run/suricata-stats.sock"
## Source
## Data sink for Suricata stats log. This is expected to be a filename of a
## unix socket to be created for listening.
# source = "/var/run/suricata-stats.sock"

# Delimiter for flattening field keys, e.g. subitem "alert" of "detect"
# becomes "detect_alert" when delimiter is "_".
delimiter = "_"
## Delimiter
## Used for flattening field keys, e.g. subitem "alert" of "detect" becomes
## "detect_alert" when delimiter is "_".
# delimiter = "_"

# Detect alert logs
alerts = false
## Metric version
## Version 1 only collects stats and optionally will look for alerts if
## the configuration setting alerts is set to true.
## Version 2 parses any event type message by default and produced metrics
## under a single metric name using a tag to differentiate between event
## types. The timestamp for the message is applied to the generated metric.
## Additional tags and fields are included as well.
# version = "1"

## Alerts
## In metric version 1, only status is captured by default, alerts must be
## turned on with this configuration option. This option does not apply for
## metric version 2.
# alerts = false
116 changes: 104 additions & 12 deletions plugins/inputs/suricata/suricata.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"net"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

Expand All @@ -32,6 +34,7 @@ type Suricata struct {
Source string `toml:"source"`
Delimiter string `toml:"delimiter"`
Alerts bool `toml:"alerts"`
Version string `toml:"version"`

inputListener *net.UnixListener
cancel context.CancelFunc
Expand All @@ -45,6 +48,26 @@ func (*Suricata) SampleConfig() string {
return sampleConfig
}

func (s *Suricata) Init() error {
if s.Source == "" {
s.Source = "/var/run/suricata-stats.sock"
}

if s.Delimiter == "" {
s.Delimiter = "_"
}

switch s.Version {
case "":
s.Version = "1"
case "1", "2":
default:
return fmt.Errorf("invalid version %q, use either 1 or 2", s.Version)
}

return nil
}

// Start initiates background collection of JSON data from the socket
// provided to Suricata.
func (s *Suricata) Start(acc telegraf.Accumulator) error {
Expand Down Expand Up @@ -223,25 +246,97 @@ func (s *Suricata) parseStats(acc telegraf.Accumulator, result map[string]interf
}
}

func (s *Suricata) parseGeneric(acc telegraf.Accumulator, result map[string]interface{}) error {
eventType := ""
if _, ok := result["event_type"]; !ok {
return fmt.Errorf("unable to determine event type of message: %s", result)
}
value, err := internal.ToString(result["event_type"])
if err != nil {
return fmt.Errorf("unable to convert event type %q to string: %w", result["event_type"], err)
}
eventType = value

timestamp := time.Now()
if val, ok := result["timestamp"]; ok {
value, err := internal.ToString(val)
if err != nil {
return fmt.Errorf("unable to convert timestamp %q to string: %w", val, err)
}
timestamp, err = time.Parse("2006-01-02T15:04:05.999999-0700", value)
if err != nil {
return fmt.Errorf("unable to parse timestamp %q: %w", val, err)
}
}

// Make sure the event key exists first
if _, ok := result[eventType].(map[string]interface{}); !ok {
return fmt.Errorf("unable to find key %q in %s", eventType, result)
}

fields := make(map[string]interface{})
for k, v := range result[eventType].(map[string]interface{}) {
err := flexFlatten(fields, k, v, s.Delimiter)
if err != nil {
s.Log.Debugf("Flattening %q failed: %v", eventType, err)
continue
}
}

tags := map[string]string{
"event_type": eventType,
}

// best effort to gather these tags and fields, if errors are encountered
// we ignore and move on
for _, key := range []string{"proto", "out_iface", "in_iface"} {
if val, ok := result[key]; ok {
if convertedVal, err := internal.ToString(val); err == nil {
tags[key] = convertedVal
}
}
}
for _, key := range []string{"src_ip", "dest_ip"} {
if val, ok := result[key]; ok {
if convertedVal, err := internal.ToString(val); err == nil {
fields[key] = convertedVal
}
}
}
for _, key := range []string{"src_port", "dest_port"} {
if val, ok := result[key]; ok {
if convertedVal, err := internal.ToInt64(val); err == nil {
fields[key] = convertedVal
}
}
}

acc.AddFields("suricata", fields, tags, timestamp)
return nil
}

func (s *Suricata) parse(acc telegraf.Accumulator, sjson []byte) error {
// initial parsing
var result map[string]interface{}
err := json.Unmarshal(sjson, &result)
if err != nil {
return err
}
// check for presence of relevant stats or alert
_, ok := result["stats"]
_, ok2 := result["alert"]
if !ok && !ok2 {
s.Log.Debugf("Invalid input without 'stats' or 'alert' object: %v", result)
return fmt.Errorf("input does not contain 'stats' or 'alert' object")

if s.Version == "2" {
return s.parseGeneric(acc, result)
}
if ok {

// Version 1 parsing of stats and optionally alerts
if _, ok := result["stats"]; ok {
s.parseStats(acc, result)
} else if ok2 && s.Alerts {
} else if _, ok := result["alert"]; ok && s.Alerts {
s.parseAlert(acc, result)
} else {
s.Log.Debugf("Invalid input without 'stats' or 'alert' object: %v", result)
return fmt.Errorf("input does not contain 'stats' or 'alert' object")
}

return nil
}

Expand All @@ -253,9 +348,6 @@ func (s *Suricata) Gather(_ telegraf.Accumulator) error {

func init() {
inputs.Add("suricata", func() telegraf.Input {
return &Suricata{
Source: "/var/run/suricata-stats.sock",
Delimiter: "_",
}
return &Suricata{}
})
}
43 changes: 43 additions & 0 deletions plugins/inputs/suricata/suricata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,46 @@ func TestSuricataParse(t *testing.T) {
testutil.RequireMetricsEqual(t, tc.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
}

func TestSuricataParseVersion2(t *testing.T) {
tests := []struct {
filename string
expected []telegraf.Metric
}{{
filename: "v2/flow.json",
expected: []telegraf.Metric{
testutil.MustMetric(
"suricata",
map[string]string{
"event_type": "flow",
"in_iface": "eth1",
"proto": "TCP",
},
map[string]interface{}{
"age": float64(0),
"dest_ip": "142.251.130.3",
"dest_port": int64(443),
"src_ip": "192.168.0.121",
"src_port": int64(50212),
"state": "new",
},
time.Unix(0, 0),
),
},
},
}

for _, tc := range tests {
data, err := os.ReadFile("testdata/" + tc.filename)
require.NoError(t, err)

s := Suricata{
Version: "2",
Log: testutil.Logger{},
}
acc := testutil.Accumulator{}
require.NoError(t, s.parse(&acc, data))

testutil.RequireMetricsEqual(t, tc.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
}
35 changes: 35 additions & 0 deletions plugins/inputs/suricata/testdata/v2/alert.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"timestamp": "2021-05-30T20:07:13.208777+0200",
"flow_id": 1696236471136137,
"in_iface": "s1-suricata",
"event_type": "alert",
"src_ip": "10.0.0.5",
"src_port": 18715,
"dest_ip": "179.60.192.3",
"dest_port": 80,
"proto": "TCP",
"alert": {
"action": "allowed",
"gid": 1,
"source": {
"ip": "10.0.0.5",
"port": 18715
},
"target": {
"ip": "179.60.192.3",
"port": 80
},
"signature_id": 6,
"rev": 0,
"signature": "Corrupted HTTP body",
"severity": 3,
"category": "Misc activity"
},
"flow": {
"pkts_toserver": 1,
"pkts_toclient": 0,
"bytes_toserver": 174,
"bytes_toclient": 0,
"start": "2021-05-30T20:07:13.208777+0200"
}
}
18 changes: 18 additions & 0 deletions plugins/inputs/suricata/testdata/v2/dns.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"timestamp": "2023-04-07T00:20:57.995497+0800",
"flow_id": 2150129093506313,
"in_iface": "eth1",
"event_type": "dns",
"src_ip": "192.168.0.100",
"src_port": 39262,
"dest_ip": "192.168.0.1",
"dest_port": 53,
"proto": "UDP",
"dns": {
"type": "query",
"id": 7145,
"rrname": "reddit.com",
"rrtype": "A",
"tx_id": 10
}
}
29 changes: 29 additions & 0 deletions plugins/inputs/suricata/testdata/v2/drop.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"timestamp": "2023-04-07T00:21:01.318245+0800",
"flow_id": 180225164834117,
"in_iface": "eth1",
"event_type": "drop",
"src_ip": "192.168.0.110",
"src_port": 46016,
"dest_ip": "54.192.18.125",
"dest_port": 443,
"proto": "TCP",
"drop": {
"len": 76,
"tos": 0,
"ttl": 64,
"ipid": 62316,
"tcpseq": 3900248957,
"tcpack": 2339873683,
"tcpwin": 501,
"syn": false,
"ack": true,
"psh": true,
"rst": false,
"urg": false,
"fin": true,
"tcpres": 0,
"tcpurgp": 0,
"reason": "stream error"
}
}
21 changes: 21 additions & 0 deletions plugins/inputs/suricata/testdata/v2/flow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"timestamp": "2023-04-07T00:28:22.136079+0800",
"flow_id": 911610881873910,
"in_iface": "eth1",
"event_type": "flow",
"src_ip": "192.168.0.121",
"src_port": 50212,
"dest_ip": "142.251.130.3",
"dest_port": 443,
"proto": "TCP",
"flow": {
"age": 0,
"state": "new",
"alerted": false
},
"tcp": {
"tcp_flags": "00",
"tcp_flags_ts": "00",
"tcp_flags_tc": "00"
}
}
Loading

0 comments on commit 0d16601

Please sign in to comment.