diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f16b53592ef..ce05b27ade9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -757,6 +757,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Migrate microsoft/m365_defender to httpjson v2 config {pull}23018[23018] - Add top_level_domain enrichment for suricata/eve fileset. {pull}23046[23046] - Add top_level_domain enrichment for zeek/dns fileset. {pull}23046[23046] +- Add `network.direction` to netflow/log fileset. {pull}23052[23052] *Heartbeat* diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index bf894ba38c8..0abd5c15fed 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1370,6 +1370,11 @@ filebeat.modules: var: netflow_host: localhost netflow_port: 2055 + # internal_networks specifies which networks are considered internal or private + # you can specify either a CIDR block or any of the special named ranges listed + # at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network + internal_networks: + - private #-------------------------- Arbor Peakflow SP Module -------------------------- - module: netscout diff --git a/x-pack/filebeat/input/netflow/config.go b/x-pack/filebeat/input/netflow/config.go index cc0094f4ed8..4d795a44eec 100644 --- a/x-pack/filebeat/input/netflow/config.go +++ b/x-pack/filebeat/input/netflow/config.go @@ -16,6 +16,7 @@ import ( type config struct { udp.Config `config:",inline"` harvester.ForwarderConfig `config:",inline"` + InternalNetworks []string `config:"internal_networks"` Protocols []string `config:"protocols"` ExpirationTimeout time.Duration `config:"expiration_timeout"` PacketQueueSize int `config:"queue_size"` diff --git a/x-pack/filebeat/input/netflow/convert.go b/x-pack/filebeat/input/netflow/convert.go index 69d7a2ea6cd..29d2609e5c8 100644 --- a/x-pack/filebeat/input/netflow/convert.go +++ b/x-pack/filebeat/input/netflow/convert.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/flowhash" + "github.com/elastic/beats/v7/libbeat/conditions" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" ) @@ -35,10 +36,10 @@ var ( } ) -func toBeatEvent(flow record.Record) (event beat.Event) { +func toBeatEvent(flow record.Record, internalNetworks []string) (event beat.Event) { switch flow.Type { case record.Flow: - return flowToBeatEvent(flow) + return flowToBeatEvent(flow, internalNetworks) case record.Options: return optionsToBeatEvent(flow) default: @@ -113,7 +114,7 @@ func optionsToBeatEvent(flow record.Record) beat.Event { return toBeatEventCommon(flow) } -func flowToBeatEvent(flow record.Record) (event beat.Event) { +func flowToBeatEvent(flow record.Record, internalNetworks []string) (event beat.Event) { event = toBeatEventCommon(flow) ecsEvent, ok := event.Fields["event"].(common.MapStr) @@ -179,12 +180,12 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) { } if srcIP != nil { ecsSource["ip"] = srcIP - ecsSource["locality"] = getIPLocality(srcIP).String() + ecsSource["locality"] = getIPLocality(internalNetworks, srcIP).String() } ecsSource["mac"] = srcMac if dstIP != nil { ecsDest["ip"] = dstIP - ecsDest["locality"] = getIPLocality(dstIP).String() + ecsDest["locality"] = getIPLocality(internalNetworks, dstIP).String() } ecsDest["mac"] = dstMac } @@ -194,7 +195,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) { if ip, found := getKeyIP(flow.Fields, "sourceIPv4Address"); found { ecsSource["ip"] = ip relatedIP = append(relatedIP, ip) - ecsSource["locality"] = getIPLocality(ip).String() + ecsSource["locality"] = getIPLocality(internalNetworks, ip).String() } if sourcePort, found := getKeyUint64(flow.Fields, "sourceTransportPort"); found { ecsSource["port"] = sourcePort @@ -207,7 +208,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) { if ip, found := getKeyIP(flow.Fields, "destinationIPv4Address"); found { ecsDest["ip"] = ip relatedIP = append(relatedIP, ip) - ecsDest["locality"] = getIPLocality(ip).String() + ecsDest["locality"] = getIPLocality(internalNetworks, ip).String() } if destPort, found := getKeyUint64(flow.Fields, "destinationTransportPort"); found { ecsDest["port"] = destPort @@ -243,7 +244,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) { dstIP = net.IPv4(0, 0, 0, 0).To4() } ecsFlow["id"] = flowID(srcIP, dstIP, srcPort, dstPort, uint8(protocol)) - ecsFlow["locality"] = getIPLocality(srcIP, dstIP).String() + ecsFlow["locality"] = getIPLocality(internalNetworks, srcIP, dstIP).String() // ECS Fields -- network ecsNetwork := common.MapStr{} @@ -394,19 +395,8 @@ func (l Locality) String() string { return "unknown (" + strconv.Itoa(int(l)) + ")" } -func isPrivateNetwork(ip net.IP) bool { - for _, net := range privateIPv4 { - if net.Contains(ip) { - return true - } - } - - return privateIPv6.Contains(ip) -} - -func isLocalOrPrivate(ip net.IP) bool { - return isPrivateNetwork(ip) || - ip.IsLoopback() || +func isLocal(ip net.IP) bool { + return ip.IsLoopback() || ip.IsUnspecified() || ip.Equal(net.IPv4bcast) || ip.IsLinkLocalUnicast() || @@ -414,9 +404,14 @@ func isLocalOrPrivate(ip net.IP) bool { ip.IsInterfaceLocalMulticast() } -func getIPLocality(ip ...net.IP) Locality { - for _, addr := range ip { - if !isLocalOrPrivate(addr) { +func getIPLocality(internalNetworks []string, ips ...net.IP) Locality { + for _, ip := range ips { + contains, err := conditions.NetworkContains(ip, internalNetworks...) + if err != nil { + return LocalityPublic + } + // always consider loopback/link-local private + if !contains && !isLocal(ip) { return LocalityPublic } } diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 0fbb7dfe2ae..b0114e584fd 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -45,15 +45,16 @@ type packet struct { } type netflowInput struct { - mutex sync.Mutex - udp *udp.Server - decoder *decoder.Decoder - outlet channel.Outleter - forwarder *harvester.Forwarder - logger *logp.Logger - queueC chan packet - queueSize int - started bool + mutex sync.Mutex + udp *udp.Server + decoder *decoder.Decoder + outlet channel.Outleter + forwarder *harvester.Forwarder + internalNetworks []string + logger *logp.Logger + queueC chan packet + queueSize int + started bool } func init() { @@ -122,11 +123,12 @@ func NewInput( } input := &netflowInput{ - outlet: out, - forwarder: harvester.NewForwarder(out), - decoder: decoder, - logger: logger, - queueSize: config.PacketQueueSize, + outlet: out, + internalNetworks: config.InternalNetworks, + forwarder: harvester.NewForwarder(out), + decoder: decoder, + logger: logger, + queueSize: config.PacketQueueSize, } input.udp = udp.New(&config.Config, input.packetDispatch) @@ -243,7 +245,7 @@ func (p *netflowInput) recvRoutine() { evs := make([]beat.Event, n) numFlows.Add(uint64(n)) for i, flow := range flows { - evs[i] = toBeatEvent(flow) + evs[i] = toBeatEvent(flow, p.internalNetworks) } p.Publish(evs) } diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index 7ffa78c7749..8a073df00df 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -197,7 +197,7 @@ func getFlowsFromDat(t testing.TB, name string, testCase TestCase) TestResult { } ev := make([]beat.Event, len(flows)) for i := range flows { - ev[i] = toBeatEvent(flows[i]) + ev[i] = toBeatEvent(flows[i], []string{"private"}) } //return TestResult{Name: name, Error: err.Error(), Events: flowsToEvents(flows)} events = append(events, ev...) @@ -242,7 +242,7 @@ func getFlowsFromPCAP(t testing.TB, name, pcapFile string) TestResult { } ev := make([]beat.Event, len(flows)) for i := range flows { - ev[i] = toBeatEvent(flows[i]) + ev[i] = toBeatEvent(flows[i], []string{"private"}) } events = append(events, ev...) } @@ -341,7 +341,7 @@ func TestReverseFlows(t *testing.T) { var evs []beat.Event for _, f := range flows { - evs = append(evs, toBeatEvent(f)) + evs = append(evs, toBeatEvent(f, []string{"private"})) } if !assert.Len(t, evs, 2) { t.Fatal() diff --git a/x-pack/filebeat/module/netflow/_meta/config.yml b/x-pack/filebeat/module/netflow/_meta/config.yml index 20d1905b6f4..91fe3953e94 100644 --- a/x-pack/filebeat/module/netflow/_meta/config.yml +++ b/x-pack/filebeat/module/netflow/_meta/config.yml @@ -4,3 +4,8 @@ var: netflow_host: localhost netflow_port: 2055 + # internal_networks specifies which networks are considered internal or private + # you can specify either a CIDR block or any of the special named ranges listed + # at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network + internal_networks: + - private diff --git a/x-pack/filebeat/module/netflow/log/config/netflow.yml b/x-pack/filebeat/module/netflow/log/config/netflow.yml index 0f25b5149e4..68b7b43feb5 100644 --- a/x-pack/filebeat/module/netflow/log/config/netflow.yml +++ b/x-pack/filebeat/module/netflow/log/config/netflow.yml @@ -5,6 +5,13 @@ max_message_size: '{{.max_message_size}}' expiration_timeout: '{{.expiration_timeout}}' queue_size: {{.queue_size}} +{{if .internal_networks}} +internal_hosts: +{{range .internal_networks}} +- '{{ . }}' +{{end}} +{{end}} + {{if .timeout}} timeout: '{{.timeout}}' {{end}} diff --git a/x-pack/filebeat/module/netflow/log/ingest/pipeline.yml b/x-pack/filebeat/module/netflow/log/ingest/pipeline.yml index a793268db3d..539540fc2ad 100644 --- a/x-pack/filebeat/module/netflow/log/ingest/pipeline.yml +++ b/x-pack/filebeat/module/netflow/log/ingest/pipeline.yml @@ -51,6 +51,26 @@ processors: field: destination.as.organization_name target_field: destination.as.organization.name ignore_missing: true + - set: + field: network.direction + value: inbound + if: 'ctx?.source?.locality == "external" && ctx?.destination?.locality == "internal"' + - set: + field: network.direction + value: outbound + if: 'ctx?.source?.locality == "internal" && ctx?.destination?.locality == "external"' + - set: + field: network.direction + value: internal + if: 'ctx?.source?.locality == "internal" && ctx?.destination?.locality == "internal"' + - set: + field: network.direction + value: external + if: 'ctx?.source?.locality == "external" && ctx?.destination?.locality == "external"' + - set: + field: network.direction + value: unknown + if: 'ctx?.network?.direction == null' on_failure: - set: diff --git a/x-pack/filebeat/modules.d/netflow.yml.disabled b/x-pack/filebeat/modules.d/netflow.yml.disabled index 522307d7e71..f0d03a1fef2 100644 --- a/x-pack/filebeat/modules.d/netflow.yml.disabled +++ b/x-pack/filebeat/modules.d/netflow.yml.disabled @@ -7,3 +7,8 @@ var: netflow_host: localhost netflow_port: 2055 + # internal_networks specifies which networks are considered internal or private + # you can specify either a CIDR block or any of the special named ranges listed + # at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network + internal_networks: + - private