Skip to content

Commit

Permalink
[Filebeat] Add network.direction to netflow/log fileset (#23052)
Browse files Browse the repository at this point in the history
* [Filebeat] Add network.direction to netflow/log fileset

* Update changelog

* Run mage update
  • Loading branch information
Andrew Stucki authored Dec 10, 2020
1 parent f78a14c commit 462537d
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Migrate microsoft/m365_defender to httpjson v2 config {pull}23018[23018]
- Add top_level_domain enrichment for suricata/eve fileset. {pull}23046[23046]
- Add top_level_domain enrichment for zeek/dns fileset. {pull}23046[23046]
- Add `network.direction` to netflow/log fileset. {pull}23052[23052]

*Heartbeat*

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,11 @@ filebeat.modules:
var:
netflow_host: localhost
netflow_port: 2055
# internal_networks specifies which networks are considered internal or private
# you can specify either a CIDR block or any of the special named ranges listed
# at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network
internal_networks:
- private

#-------------------------- Arbor Peakflow SP Module --------------------------
- module: netscout
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/netflow/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type config struct {
udp.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`
InternalNetworks []string `config:"internal_networks"`
Protocols []string `config:"protocols"`
ExpirationTimeout time.Duration `config:"expiration_timeout"`
PacketQueueSize int `config:"queue_size"`
Expand Down
43 changes: 19 additions & 24 deletions x-pack/filebeat/input/netflow/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/flowhash"
"github.com/elastic/beats/v7/libbeat/conditions"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
)

Expand All @@ -35,10 +36,10 @@ var (
}
)

func toBeatEvent(flow record.Record) (event beat.Event) {
func toBeatEvent(flow record.Record, internalNetworks []string) (event beat.Event) {
switch flow.Type {
case record.Flow:
return flowToBeatEvent(flow)
return flowToBeatEvent(flow, internalNetworks)
case record.Options:
return optionsToBeatEvent(flow)
default:
Expand Down Expand Up @@ -113,7 +114,7 @@ func optionsToBeatEvent(flow record.Record) beat.Event {
return toBeatEventCommon(flow)
}

func flowToBeatEvent(flow record.Record) (event beat.Event) {
func flowToBeatEvent(flow record.Record, internalNetworks []string) (event beat.Event) {
event = toBeatEventCommon(flow)

ecsEvent, ok := event.Fields["event"].(common.MapStr)
Expand Down Expand Up @@ -179,12 +180,12 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) {
}
if srcIP != nil {
ecsSource["ip"] = srcIP
ecsSource["locality"] = getIPLocality(srcIP).String()
ecsSource["locality"] = getIPLocality(internalNetworks, srcIP).String()
}
ecsSource["mac"] = srcMac
if dstIP != nil {
ecsDest["ip"] = dstIP
ecsDest["locality"] = getIPLocality(dstIP).String()
ecsDest["locality"] = getIPLocality(internalNetworks, dstIP).String()
}
ecsDest["mac"] = dstMac
}
Expand All @@ -194,7 +195,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) {
if ip, found := getKeyIP(flow.Fields, "sourceIPv4Address"); found {
ecsSource["ip"] = ip
relatedIP = append(relatedIP, ip)
ecsSource["locality"] = getIPLocality(ip).String()
ecsSource["locality"] = getIPLocality(internalNetworks, ip).String()
}
if sourcePort, found := getKeyUint64(flow.Fields, "sourceTransportPort"); found {
ecsSource["port"] = sourcePort
Expand All @@ -207,7 +208,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) {
if ip, found := getKeyIP(flow.Fields, "destinationIPv4Address"); found {
ecsDest["ip"] = ip
relatedIP = append(relatedIP, ip)
ecsDest["locality"] = getIPLocality(ip).String()
ecsDest["locality"] = getIPLocality(internalNetworks, ip).String()
}
if destPort, found := getKeyUint64(flow.Fields, "destinationTransportPort"); found {
ecsDest["port"] = destPort
Expand Down Expand Up @@ -243,7 +244,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) {
dstIP = net.IPv4(0, 0, 0, 0).To4()
}
ecsFlow["id"] = flowID(srcIP, dstIP, srcPort, dstPort, uint8(protocol))
ecsFlow["locality"] = getIPLocality(srcIP, dstIP).String()
ecsFlow["locality"] = getIPLocality(internalNetworks, srcIP, dstIP).String()

// ECS Fields -- network
ecsNetwork := common.MapStr{}
Expand Down Expand Up @@ -394,29 +395,23 @@ func (l Locality) String() string {
return "unknown (" + strconv.Itoa(int(l)) + ")"
}

func isPrivateNetwork(ip net.IP) bool {
for _, net := range privateIPv4 {
if net.Contains(ip) {
return true
}
}

return privateIPv6.Contains(ip)
}

func isLocalOrPrivate(ip net.IP) bool {
return isPrivateNetwork(ip) ||
ip.IsLoopback() ||
func isLocal(ip net.IP) bool {
return ip.IsLoopback() ||
ip.IsUnspecified() ||
ip.Equal(net.IPv4bcast) ||
ip.IsLinkLocalUnicast() ||
ip.IsLinkLocalMulticast() ||
ip.IsInterfaceLocalMulticast()
}

func getIPLocality(ip ...net.IP) Locality {
for _, addr := range ip {
if !isLocalOrPrivate(addr) {
func getIPLocality(internalNetworks []string, ips ...net.IP) Locality {
for _, ip := range ips {
contains, err := conditions.NetworkContains(ip, internalNetworks...)
if err != nil {
return LocalityPublic
}
// always consider loopback/link-local private
if !contains && !isLocal(ip) {
return LocalityPublic
}
}
Expand Down
32 changes: 17 additions & 15 deletions x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ type packet struct {
}

type netflowInput struct {
mutex sync.Mutex
udp *udp.Server
decoder *decoder.Decoder
outlet channel.Outleter
forwarder *harvester.Forwarder
logger *logp.Logger
queueC chan packet
queueSize int
started bool
mutex sync.Mutex
udp *udp.Server
decoder *decoder.Decoder
outlet channel.Outleter
forwarder *harvester.Forwarder
internalNetworks []string
logger *logp.Logger
queueC chan packet
queueSize int
started bool
}

func init() {
Expand Down Expand Up @@ -122,11 +123,12 @@ func NewInput(
}

input := &netflowInput{
outlet: out,
forwarder: harvester.NewForwarder(out),
decoder: decoder,
logger: logger,
queueSize: config.PacketQueueSize,
outlet: out,
internalNetworks: config.InternalNetworks,
forwarder: harvester.NewForwarder(out),
decoder: decoder,
logger: logger,
queueSize: config.PacketQueueSize,
}

input.udp = udp.New(&config.Config, input.packetDispatch)
Expand Down Expand Up @@ -243,7 +245,7 @@ func (p *netflowInput) recvRoutine() {
evs := make([]beat.Event, n)
numFlows.Add(uint64(n))
for i, flow := range flows {
evs[i] = toBeatEvent(flow)
evs[i] = toBeatEvent(flow, p.internalNetworks)
}
p.Publish(evs)
}
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/netflow/netflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func getFlowsFromDat(t testing.TB, name string, testCase TestCase) TestResult {
}
ev := make([]beat.Event, len(flows))
for i := range flows {
ev[i] = toBeatEvent(flows[i])
ev[i] = toBeatEvent(flows[i], []string{"private"})
}
//return TestResult{Name: name, Error: err.Error(), Events: flowsToEvents(flows)}
events = append(events, ev...)
Expand Down Expand Up @@ -242,7 +242,7 @@ func getFlowsFromPCAP(t testing.TB, name, pcapFile string) TestResult {
}
ev := make([]beat.Event, len(flows))
for i := range flows {
ev[i] = toBeatEvent(flows[i])
ev[i] = toBeatEvent(flows[i], []string{"private"})
}
events = append(events, ev...)
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestReverseFlows(t *testing.T) {

var evs []beat.Event
for _, f := range flows {
evs = append(evs, toBeatEvent(f))
evs = append(evs, toBeatEvent(f, []string{"private"}))
}
if !assert.Len(t, evs, 2) {
t.Fatal()
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/module/netflow/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@
var:
netflow_host: localhost
netflow_port: 2055
# internal_networks specifies which networks are considered internal or private
# you can specify either a CIDR block or any of the special named ranges listed
# at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network
internal_networks:
- private
7 changes: 7 additions & 0 deletions x-pack/filebeat/module/netflow/log/config/netflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ max_message_size: '{{.max_message_size}}'
expiration_timeout: '{{.expiration_timeout}}'
queue_size: {{.queue_size}}

{{if .internal_networks}}
internal_hosts:
{{range .internal_networks}}
- '{{ . }}'
{{end}}
{{end}}

{{if .timeout}}
timeout: '{{.timeout}}'
{{end}}
Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/module/netflow/log/ingest/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ processors:
field: destination.as.organization_name
target_field: destination.as.organization.name
ignore_missing: true
- set:
field: network.direction
value: inbound
if: 'ctx?.source?.locality == "external" && ctx?.destination?.locality == "internal"'
- set:
field: network.direction
value: outbound
if: 'ctx?.source?.locality == "internal" && ctx?.destination?.locality == "external"'
- set:
field: network.direction
value: internal
if: 'ctx?.source?.locality == "internal" && ctx?.destination?.locality == "internal"'
- set:
field: network.direction
value: external
if: 'ctx?.source?.locality == "external" && ctx?.destination?.locality == "external"'
- set:
field: network.direction
value: unknown
if: 'ctx?.network?.direction == null'

on_failure:
- set:
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/modules.d/netflow.yml.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@
var:
netflow_host: localhost
netflow_port: 2055
# internal_networks specifies which networks are considered internal or private
# you can specify either a CIDR block or any of the special named ranges listed
# at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network
internal_networks:
- private

0 comments on commit 462537d

Please sign in to comment.