Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.6](backport #33656) [Filebeat] Add parse_aws_vpc_flow_log processor #33705

Merged
merged 1 commit into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]
Expand Down
1 change: 1 addition & 0 deletions auditbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:linux_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_script_processor:
:no_timestamp_processor:

Expand Down
1 change: 1 addition & 0 deletions heartbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:no_dashboards:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_timestamp_processor:

include::{libbeat-dir}/shared-beats-attributes.asciidoc[]
Expand Down
6 changes: 6 additions & 0 deletions libbeat/docs/processors-list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ endif::[]
ifndef::no_rename_processor[]
* <<rename-fields,`rename`>>
endif::[]
ifndef::no_parse_aws_vpc_flow_log_processor[]
* <<processor-parse-aws-vpc-flow-log, `parse_aws_vpc_flow_log`>>
endif::[]
ifndef::no_script_processor[]
* <<processor-script,`script`>>
endif::[]
Expand Down Expand Up @@ -231,6 +234,9 @@ endif::[]
ifndef::no_rename_processor[]
include::{libbeat-processors-dir}/actions/docs/rename.asciidoc[]
endif::[]
ifndef::no_parse_aws_vpc_flow_log_processor[]
include::{x-filebeat-processors-dir}/aws_vpcflow/docs/parse_aws_vpc_flow_log.asciidoc[]
endif::[]
ifndef::no_script_processor[]
include::{libbeat-processors-dir}/script/docs/script.asciidoc[]
endif::[]
Expand Down
1 change: 1 addition & 0 deletions metricbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_timestamp_processor:

:kubernetes_default_indexers: {docdir}/kubernetes-default-indexers-matchers.asciidoc
Expand Down
1 change: 1 addition & 0 deletions packetbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_script_processor:
:no_timestamp_processor:

Expand Down
1 change: 1 addition & 0 deletions winlogbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_only:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:include_translate_sid_processor:
:export_pipeline:

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

140 changes: 140 additions & 0 deletions x-pack/filebeat/processors/aws_vpcflow/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package aws_vpcflow

import (
"errors"
"fmt"
"strings"
)

// mode represents the processing mode (original, ecs, ecs_and_original).
type mode uint8

const (
originalMode mode = iota // originalMode generates the fields specified in the format string.
ecsMode // ecsMode maps the original fields to ECS and removes the original field if it was mapped.
ecsAndOriginalMode // ecsAndOriginalMode maps the original fields to ECS and retains all the original fields.
)

var modeStrings = map[mode]string{
originalMode: "original",
ecsMode: "ecs",
ecsAndOriginalMode: "ecs_and_original",
}

func (m *mode) Unpack(s string) error {
for modeConst, modeStr := range modeStrings {
if strings.EqualFold(modeStr, s) {
*m = modeConst
return nil
}
}
return fmt.Errorf("invalid mode type %q for "+procName, s)
}

func (m *mode) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}
return m.Unpack(str)
}

func (m *mode) String() string {
if m == nil {
return "<nil>"
}
if s, found := modeStrings[*m]; found {
return s
}
return "unknown mode"
}

// config contains the configuration options for the processor.
type config struct {
Format formats `config:"format" validate:"required"` // VPC flow log format. In config, it can accept a string or list of strings. Each format must have a unique number of fields to enable matching it to a flow log message.
Mode mode `config:"mode"` // Mode controls what fields are generated.
Field string `config:"field"` // Source field containing the VPC flow log message.
TargetField string `config:"target_field"` // Target field for the VPC flow log object. This applies only to the original VPC flow log fields. ECS fields are written to the standard location.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures while parsing and transforming the flow log message.
ID string `config:"id"` // Instance ID for debugging purposes.
}

// Validate validates the format strings. Each format must have a unique number
// of fields.
func (c *config) Validate() error {
counts := map[int]struct{}{}
for _, format := range c.Format {
fields, err := parseFormat(format)
if err != nil {
return err
}

_, found := counts[len(fields)]
if found {
return fmt.Errorf("each format must have a unique number of fields")
}
counts[len(fields)] = struct{}{}
}
return nil
}

func defaultConfig() config {
return config{
Mode: ecsMode,
Field: "message",
TargetField: "aws.vpcflow",
}
}

// parseFormat parses VPC flow log format string and returns an ordered list of
// the expected fields.
func parseFormat(format string) ([]vpcFlowField, error) {
tokens := strings.Fields(format)
if len(tokens) == 0 {
return nil, errors.New("format must contain at lease one field")
}

fields := make([]vpcFlowField, 0, len(tokens))
for _, token := range tokens {
// Elastic uses underscores in field names rather than dashes.
underscoreToken := strings.ReplaceAll(token, "-", "_")

field, found := nameToFieldMap[underscoreToken]
if !found {
return nil, fmt.Errorf("unknown field %q", token)
}

fields = append(fields, field)
}

return fields, nil
}

type formats []string

func (f *formats) Unpack(value interface{}) error {
switch v := value.(type) {
case string:
*f = []string{v}
case []string:
*f = v
case []interface{}:
list := make([]string, 0, len(v))
for _, ifc := range v {
s, ok := ifc.(string)
if !ok {
return fmt.Errorf("format values must be strings, got %T", ifc)
}
list = append(list, s)
}
*f = list
default:
return fmt.Errorf("format must be a string or list of strings, got %T", v)
}
return nil
}
101 changes: 101 additions & 0 deletions x-pack/filebeat/processors/aws_vpcflow/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package aws_vpcflow

import (
"strconv"
"testing"

"github.com/stretchr/testify/require"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestConfigUnpack(t *testing.T) {
testCases := []struct {
yamlConfig string
error bool
}{
{
yamlConfig: `
---
mode: ecs_and_original
id: us-east-vpcflow
format: instance-id interface-id srcaddr dstaddr pkt-srcaddr pkt-dstaddr
`,
},
{
yamlConfig: `
---
mode: original
format: version interface-id account-id vpc-id subnet-id instance-id srcaddr dstaddr srcport dstport protocol tcp-flags type pkt-srcaddr pkt-dstaddr action log-status
`,
},
{
yamlConfig: `
---
mode: ecs
format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status
`,
},
{
yamlConfig: `
---
mode: ecs
format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status
`,
},
{
error: true,
yamlConfig: `
---
mode: invalid
format: version
`,
},
{
error: false,
yamlConfig: `
---
mode: ecs
format:
- version srcaddr dstaddr
- version srcaddr dstaddr srcport dstport protocol
`,
},
{
// Each format must have a unique token count.
error: true,
yamlConfig: `
---
mode: ecs
format:
- version srcaddr dstaddr
- srcport dstport protocol
`,
},
}

for i, tc := range testCases {
tc := tc
t.Run(strconv.Itoa(i), func(t *testing.T) {
rawConfig := conf.MustNewConfigFrom(tc.yamlConfig)

c := defaultConfig()
err := rawConfig.Unpack(&c)
if tc.error {
require.Error(t, err, "config: %v", tc.yamlConfig)
t.Log("Error:", err)
return
}
require.NoError(t, err)

// Make sure valid configs produce processors.
p, err := New(rawConfig)
require.NoError(t, err)
require.NotNil(t, p)
})
}
}
Loading