Skip to content

Commit

Permalink
Merge branch 'main' into metricbeat-process-multierr
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana authored Jul 23, 2024
2 parents 9433065 + dd671a6 commit 58bc2ff
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Improve robustness and error reporting from packetbeat default route testing. {pull}39757[39757]
- Move x-pack/filebeat/input/salesforce jwt import to v5. {pull}39823[39823]
- Drop x-pack/filebeat/input dependency on github.com/lestrrat-go/jwx/v2. {pull}39968[39968]
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]

==== Deprecated

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]

*Heartbeat*

Expand Down
7 changes: 7 additions & 0 deletions x-pack/filebeat/processors/decode_cef/cef/cef.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ func (e *Event) Unpack(data string, opts ...Option) error {

// Mark the data type and do the actual conversion.
field.Type = mapping.Type

if settings.removeEmptyValues && field.String == "" {
// Drop the key because the value is empty field.
delete(e.Extensions, key)
continue
}

field.Interface, err = toType(field.String, mapping.Type, &settings)
if err != nil {
// Drop the key because the field value is invalid.
Expand Down
19 changes: 19 additions & 0 deletions x-pack/filebeat/processors/decode_cef/cef/cef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const (

truncatedHeader = "CEF:0|SentinelOne|Mgmt|activityID=1111111111111111111 activityType=3505 siteId=None siteName=None accountId=1222222222222222222 accountName=foo-bar mdr notificationScope=ACCOUNT"

noValueInExtension = `CEF:26|security|threat=manager|1.0|100|trojan successfully stopped|10|src= dst=12.121.122.82 spt=`

// Found by fuzzing but minimised by hand.
fuzz0 = `CEF:0|a=\\ b|`
fuzz1 = `CEF:0|\|a=|b=`
Expand All @@ -84,6 +86,7 @@ var testMessages = []string{
tabMessage,
escapedMessage,
truncatedHeader,
noValueInExtension,
fuzz0,
fuzz1,
fuzz2,
Expand Down Expand Up @@ -161,6 +164,22 @@ func TestEventUnpack(t *testing.T) {
}, e.Extensions)
})

t.Run("noValueInExtension", func(t *testing.T) {
var e Event
err := e.Unpack(noValueInExtension, WithRemoveEmptyValues())
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
assert.Equal(t, "threat=manager", e.DeviceProduct)
assert.Equal(t, "1.0", e.DeviceVersion)
assert.Equal(t, "100", e.DeviceEventClassID)
assert.Equal(t, "trojan successfully stopped", e.Name)
assert.Equal(t, "10", e.Severity)
assert.Equal(t, map[string]*Field{
"dst": IPField("12.121.122.82"),
}, e.Extensions)
})

t.Run("equalsSignInHeader", func(t *testing.T) {
var e Event
err := e.Unpack(equalsSignInHeader)
Expand Down
14 changes: 13 additions & 1 deletion x-pack/filebeat/processors/decode_cef/cef/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,20 @@ type Option interface {
// Settings for unpacking messages.
type Settings struct {
fullExtensionNames bool
removeEmptyValues bool
timezone *time.Location
}

timezone *time.Location
type withRemoveEmptyValues struct{}

func (w withRemoveEmptyValues) Apply(s *Settings) {
s.removeEmptyValues = true
}

// WithRemoveEmptyValues causes CEF extension keys without values to be
// dropped.
func WithRemoveEmptyValues() Option {
return withRemoveEmptyValues{}
}

type withFullExtensionNames struct{}
Expand Down
15 changes: 8 additions & 7 deletions x-pack/filebeat/processors/decode_cef/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ package decode_cef
import "github.com/elastic/beats/v7/libbeat/common/cfgtype"

type config struct {
Field string `config:"field"` // Source field containing the CEF message.
TargetField string `config:"target_field"` // Target field for the CEF object.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures when the source field does not contain a CEF message. Parse errors do not cause failures, but are added to error.message.
ID string `config:"id"` // Instance ID for debugging purposes.
ECS bool `config:"ecs"` // Generate ECS fields.
Timezone *cfgtype.Timezone `config:"timezone"` // Timezone used when parsing timestamps that do not contain a time zone or offset.
Field string `config:"field"` // Source field containing the CEF message.
TargetField string `config:"target_field"` // Target field for the CEF object.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures when the source field does not contain a CEF message. Parse errors do not cause failures, but are added to error.message.
IgnoreEmptyValues bool `config:"ignore_empty_values"` // Ignore CEF extensions with empty values
ID string `config:"id"` // Instance ID for debugging purposes.
ECS bool `config:"ecs"` // Generate ECS fields.
Timezone *cfgtype.Timezone `config:"timezone"` // Timezone used when parsing timestamps that do not contain a time zone or offset.
}

func defaultConfig() config {
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/processors/decode_cef/decode_cef.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {

// If the version < 0 after parsing then none of the data is valid so return here.
var ce cef.Event
if err = ce.Unpack(cefData, cef.WithFullExtensionNames(), cef.WithTimezone(p.Timezone.Location())); ce.Version < 0 && err != nil {
if err = ce.Unpack(cefData, cef.WithFullExtensionNames(), cef.WithTimezone(p.Timezone.Location()), cef.WithRemoveEmptyValues()); ce.Version < 0 && err != nil {
if p.IgnoreFailure {
return event, nil
}
Expand Down Expand Up @@ -141,7 +141,6 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

//nolint:errcheck // All errors are from mapstr puts.
func toCEFObject(cefEvent *cef.Event) mapstr.M {
// Add CEF header fields.
cefObject := mapstr.M{"version": strconv.Itoa(cefEvent.Version)}
Expand Down Expand Up @@ -180,32 +179,35 @@ func toCEFObject(cefEvent *cef.Event) mapstr.M {
return cefObject
}

//nolint:errcheck // All errors are from mapstr puts.
func writeCEFHeaderToECS(cefEvent *cef.Event, event *beat.Event) {
if cefEvent.DeviceVendor != "" {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("observer.vendor", cefEvent.DeviceVendor)
}
if cefEvent.DeviceProduct != "" {
// TODO: observer.product is not officially part of ECS.
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("observer.product", cefEvent.DeviceProduct)
}
if cefEvent.DeviceVersion != "" {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("observer.version", cefEvent.DeviceVersion)
}
if cefEvent.DeviceEventClassID != "" {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("event.code", cefEvent.DeviceEventClassID)
}
if cefEvent.Name != "" {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("message", cefEvent.Name)
}
if cefEvent.Severity != "" {
if sev, ok := cefSeverityToNumber(cefEvent.Severity); ok {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("event.severity", sev)
}
}
}

//nolint:errcheck // All errors are from mapstr puts.
func appendErrorMessage(m mapstr.M, msg string) error {
const field = "error.message"
list, _ := m.GetValue(field)
Expand Down
27 changes: 27 additions & 0 deletions x-pack/filebeat/processors/decode_cef/decode_cef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ func TestProcessorRun(t *testing.T) {
"source.user.name": "admin",
},
},
"empty_field_values": {
config: func() config {
c := defaultConfig()
c.TargetField = ""
c.IgnoreEmptyValues = true
return c
},
message: "CEF:1|Trend Micro|Deep Security Manager|1.2.3|600|User Signed In|3|src= suser= target=admin msg=User signed in from 2001:db8::5",
fields: mapstr.M{
"version": "1",
"device.event_class_id": "600",
"device.product": "Deep Security Manager",
"device.vendor": "Trend Micro",
"device.version": "1.2.3",
"name": "User Signed In",
"severity": "3",
"event.severity": 3,
"extensions.message": "User signed in from 2001:db8::5",
"extensions.target": "admin",
// ECS
"event.code": "600",
"message": "User signed in from 2001:db8::5",
"observer.product": "Deep Security Manager",
"observer.vendor": "Trend Micro",
"observer.version": "1.2.3",
},
},
"parse_errors": {
message: "CEF:0|Trend Micro|Deep Security Manager|1.2.3|600|User Signed In|Low|msg=User signed in with =xyz",
fields: mapstr.M{
Expand Down
19 changes: 10 additions & 9 deletions x-pack/filebeat/processors/decode_cef/docs/decode_cef.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ The `decode_cef` processor has the following configuration settings.
.Decode CEF options
[options="header"]
|======
| Name | Required | Default | Description |
| `field` | no | message | Source field containing the CEF message to be parsed. |
| `target_field` | no | cef | Target field where the parsed CEF object will be written. |
| `ecs` | no | true | Generate Elastic Common Schema (ECS) fields from the CEF data.
Certain CEF header and extension values will be used to populate ECS fields. |
| `timezone` | no | UTC | IANA time zone name (e.g. `America/New_York`) or fixed time offset (e.g. `+0200`) to use when parsing times that do not contain a time zone. `Local` may be specified to use the machine's local time zone.|
| `ignore_missing` | no | false | Ignore errors when the source field is missing. |
| `ignore_failure` | no | false | Ignore failures when the source field does not contain a CEF message. |
| `id` | no | | An identifier for this processor instance. Useful for debugging. |
| Name | Required | Default | Description |
| `field` | no | message | Source field containing the CEF message to be parsed. |
| `target_field` | no | cef | Target field where the parsed CEF object will be written. |
| `ecs` | no | true | Generate Elastic Common Schema (ECS) fields from the CEF data.
Certain CEF header and extension values will be used to populate ECS fields. |
| `timezone` | no | UTC | IANA time zone name (e.g. `America/New_York`) or fixed time offset (e.g. `+0200`) to use when parsing times that do not contain a time zone. `Local` may be specified to use the machine's local time zone.|
| `ignore_missing` | no | false | Ignore errors when the source field is missing. |
| `ignore_failure` | no | false | Ignore failures when the source field does not contain a CEF message. |
| `ignore_empty_values` | no | false | Ignore CEF extensions with empty values (e.g. `spt= type=1`) |
| `id` | no | | An identifier for this processor instance. Useful for debugging. |
|======

0 comments on commit 58bc2ff

Please sign in to comment.