Skip to content

Commit

Permalink
Add event.kind and event.category (#10357)
Browse files Browse the repository at this point in the history
* Add event.kind and event.category

Part of #7968

Adds event.kind = event and event.category = network_traffic to all Packetbeat events.
Packetbeat flow events will additional have event.action = network_flow (same as Filebeat
netflow).

This also does some cleanup of redundant and unused code that resulted from the ECS
migration.

* Set method for http
  • Loading branch information
andrewkroh authored Jan 28, 2019
1 parent 3895a23 commit 4e78750
Show file tree
Hide file tree
Showing 22 changed files with 199 additions and 366 deletions.
3 changes: 3 additions & 0 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ func createEvent(
"end": common.Time(f.ts),
"duration": f.ts.Sub(f.createTS),
"dataset": "flow",
"kind": "event",
"category": "network_traffic",
"action": "network_flow",
}
flow := common.MapStr{
"id": common.NetString(f.id.Serialize()),
Expand Down
39 changes: 34 additions & 5 deletions packetbeat/pb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ import (
// event at publish time.
const FieldsKey = "_packetbeat"

// Network direction values.
const (
Inbound = "inbound"
Outbound = "outbound"
Internal = "internal"
)

// Fields contains common fields used in Packetbeat events. Protocol
// implementations can publish a Fields pointer in a beat.Event and it will
// be marshaled into the event following the ECS schema where applicable.
Expand All @@ -54,13 +61,23 @@ type Fields struct {
DestinationProcess *ecs.Process `ecs:"destination.process"`
Process *ecs.Process `ecs:"process"`

Error struct {
Message []string
}

ICMPType uint8 // ICMP message type for use in computing network.community_id.
ICMPCode uint8 // ICMP message code for use in computing network.community_id.
}

// NewFields returns a new Fields value.
func NewFields() *Fields {
return &Fields{Event: ecs.Event{Duration: -1}}
return &Fields{
Event: ecs.Event{
Duration: -1,
Kind: "event",
Category: "network_traffic",
},
}
}

// NewBeatEvent creates a new beat.Event populated with a Fields value and
Expand Down Expand Up @@ -181,18 +198,23 @@ func (f *Fields) ComputeValues(localIPs []net.IP) error {
}

// network.direction
if len(localIPs) > 0 {
if len(localIPs) > 0 && f.Network.Direction == "" {
if flow.SourceIP != nil {
for _, ip := range localIPs {
if flow.SourceIP.Equal(ip) {
f.Network.Direction = "outbound"
f.Network.Direction = Outbound
break
}
}
} else if flow.DestinationIP != nil {
}
if flow.DestinationIP != nil {
for _, ip := range localIPs {
if flow.DestinationIP.Equal(ip) {
f.Network.Direction = "inbound"
if f.Network.Direction == Outbound {
f.Network.Direction = Internal
} else {
f.Network.Direction = Inbound
}
break
}
}
Expand Down Expand Up @@ -256,6 +278,13 @@ func (f *Fields) MarshalMapStr(m common.MapStr) error {
return err
}
}

if len(f.Error.Message) == 1 {
m.Put("error.message", f.Error.Message[0])
} else if len(f.Error.Message) > 1 {
m.Put("error.message", f.Error.Message)
}

return nil
}

Expand Down
8 changes: 7 additions & 1 deletion packetbeat/pb/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ func TestMarshalMapStr(t *testing.T) {
t.Fatal(err)
}

assert.Equal(t, common.MapStr{"source": common.MapStr{"ip": "127.0.0.1"}}, m)
assert.Equal(t, common.MapStr{
"event": common.MapStr{
"kind": "event",
"category": "network_traffic",
},
"source": common.MapStr{"ip": "127.0.0.1"},
}, m)
}

func TestComputeValues(t *testing.T) {
Expand Down
7 changes: 1 addition & 6 deletions packetbeat/protos/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func (amqp *amqpPlugin) publishTransaction(t *amqpTransaction) {
pbf.Event.Dataset = "amqp"
pbf.Network.Protocol = pbf.Event.Dataset
pbf.Network.Transport = "tcp"
pbf.Error.Message = t.notes

fields := evt.Fields
fields["type"] = pbf.Event.Dataset
Expand Down Expand Up @@ -490,12 +491,6 @@ func (amqp *amqpPlugin) publishTransaction(t *amqpTransaction) {
}
}

if len(t.notes) == 1 {
evt.PutValue("error.message", t.notes[0])
} else if len(t.notes) > 1 {
evt.PutValue("error.message", t.notes)
}

amqp.results(evt)
}

Expand Down
15 changes: 2 additions & 13 deletions packetbeat/protos/amqp/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,16 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/packetbeat/pb"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/publish"
)

type eventStore struct {
events []beat.Event
}

func (e *eventStore) publish(event beat.Event) {
pbf, err := pb.GetFields(event.Fields)
if err != nil || pbf == nil {
panic("_packetbeat not found")
}
delete(event.Fields, pb.FieldsKey)
if err = pbf.ComputeValues(nil); err != nil {
panic(err)
}
if err = pbf.MarshalMapStr(event.Fields); err != nil {
panic(err)
}

publish.MarshalPacketbeatFields(&event, nil)
e.events = append(e.events, event)
}

Expand Down
8 changes: 2 additions & 6 deletions packetbeat/protos/applayer/applayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (t *Transaction) InitWithMsg(
func (t *Transaction) Event(event *beat.Event) error {
event.Timestamp = t.Ts.Ts

pbf := &pb.Fields{}
pbf := pb.NewFields()
pbf.SetSource(&t.Src)
pbf.SetDestination(&t.Dst)
pbf.Source.Bytes = int64(t.BytesIn)
Expand All @@ -233,16 +233,12 @@ func (t *Transaction) Event(event *beat.Event) error {
pbf.Event.End = t.EndTime
pbf.Network.Transport = t.Transport.String()
pbf.Network.Protocol = pbf.Event.Dataset
pbf.Error.Message = t.Notes

fields := event.Fields
fields[pb.FieldsKey] = pbf
fields["type"] = pbf.Event.Dataset
fields["status"] = t.Status
if len(t.Notes) == 1 {
event.PutValue("error.message", t.Notes[0])
} else if len(t.Notes) > 1 {
event.PutValue("error.message", t.Notes)
}
return nil
}

Expand Down
11 changes: 2 additions & 9 deletions packetbeat/protos/cassandra/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event {

cassandra := common.MapStr{}
status := common.OK_STATUS
var notes []string

//requ can be null, if the message is a PUSHed message
if requ != nil {
pbf.Source.Bytes = int64(requ.Size)
pbf.Event.Start = requ.Ts
notes = append(notes, requ.Notes...)
pbf.Error.Message = requ.Notes

if pub.sendRequest {
if pub.sendRequestHeader {
Expand All @@ -107,7 +106,7 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event {
if resp != nil {
pbf.Destination.Bytes = int64(resp.Size)
pbf.Event.End = resp.Ts
notes = append(notes, resp.Notes...)
pbf.Error.Message = append(pbf.Error.Message, resp.Notes...)

if resp.failed {
status = common.ERROR_STATUS
Expand All @@ -133,11 +132,5 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event {
fields["cassandra"] = cassandra
}

if len(notes) == 1 {
fields.Put("error.message", notes[0])
} else if len(notes) > 1 {
fields.Put("error.message", notes)
}

return evt
}
37 changes: 13 additions & 24 deletions packetbeat/protos/dhcpv4/dhcpv4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/pb"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/publish"
)

var _ protos.UDPPlugin = &dhcpv4Plugin{}
Expand Down Expand Up @@ -117,8 +117,10 @@ func TestParseDHCPRequest(t *testing.T) {
"port": 67,
},
"event": common.MapStr{
"dataset": "dhcpv4",
"start": pkt.Ts,
"category": "network_traffic",
"dataset": "dhcpv4",
"kind": "event",
"start": pkt.Ts,
},
"network": common.MapStr{
"type": "ipv4",
Expand Down Expand Up @@ -150,8 +152,9 @@ func TestParseDHCPRequest(t *testing.T) {
},
}

actual := marshalPacketbeatFields(t, p.parseDHCPv4(pkt))
actual := p.parseDHCPv4(pkt)
if assert.NotNil(t, actual) {
publish.MarshalPacketbeatFields(actual, nil)
t.Logf("DHCP event: %+v", actual)
assertEqual(t, expected, *actual)
}
Expand Down Expand Up @@ -194,8 +197,10 @@ func TestParseDHCPACK(t *testing.T) {
"bytes": 300,
},
"event": common.MapStr{
"dataset": "dhcpv4",
"start": pkt.Ts,
"category": "network_traffic",
"dataset": "dhcpv4",
"kind": "event",
"start": pkt.Ts,
},
"network": common.MapStr{
"type": "ipv4",
Expand Down Expand Up @@ -226,8 +231,9 @@ func TestParseDHCPACK(t *testing.T) {
},
}

actual := marshalPacketbeatFields(t, p.parseDHCPv4(pkt))
actual := p.parseDHCPv4(pkt)
if assert.NotNil(t, actual) {
publish.MarshalPacketbeatFields(actual, nil)
t.Logf("DHCP event: %+v", actual)
assertEqual(t, expected, *actual)
}
Expand All @@ -249,20 +255,3 @@ func normalizeEvent(t testing.TB, event beat.Event) interface{} {
}
return out
}

func marshalPacketbeatFields(t testing.TB, evt *beat.Event) *beat.Event {
pbf, err := pb.GetFields(evt.Fields)
if err != nil || pbf == nil {
t.Fatal("failed getting _packetbeat", err)
}
delete(evt.Fields, pb.FieldsKey)

if err = pbf.ComputeValues(nil); err != nil {
t.Fatal(err)
}

if err = pbf.MarshalMapStr(evt.Fields); err != nil {
t.Fatal(err)
}
return evt
}
6 changes: 1 addition & 5 deletions packetbeat/protos/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,11 @@ func (dns *dnsPlugin) publishTransaction(t *dnsTransaction) {
pbf.SetDestination(&t.dst)
pbf.Network.Transport = t.transport.String()
pbf.Network.Protocol = "dns"
pbf.Error.Message = t.notes

fields := evt.Fields
fields["type"] = "dns"
fields["status"] = common.ERROR_STATUS
if len(t.notes) == 1 {
fields.Put("error.message", t.notes[0])
} else if len(t.notes) > 1 {
fields.Put("error.message", t.notes)
}

dnsEvent := common.MapStr{}
fields["dns"] = dnsEvent
Expand Down
15 changes: 2 additions & 13 deletions packetbeat/protos/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/pb"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/publish"
)

// Test Constants
Expand Down Expand Up @@ -79,18 +79,7 @@ type eventStore struct {
}

func (e *eventStore) publish(event beat.Event) {
pbf, err := pb.GetFields(event.Fields)
if err != nil || pbf == nil {
panic("_packetbeat not found")
}
delete(event.Fields, pb.FieldsKey)
if err = pbf.ComputeValues(nil); err != nil {
panic(err)
}
if err = pbf.MarshalMapStr(event.Fields); err != nil {
panic(err)
}

publish.MarshalPacketbeatFields(&event, nil)
e.events = append(e.events, event)
}

Expand Down
12 changes: 3 additions & 9 deletions packetbeat/protos/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
fields["status"] = status

var httpFields ProtocolFields
var notes []string
if requ != nil {
http.decodeBody(requ)
path, params, err := http.extractParameters(requ)
Expand All @@ -534,7 +533,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
}
pbf.Event.Start = requ.ts
pbf.Network.ForwardedIP = string(requ.realIP)
notes = append(notes, requ.notes...)
pbf.Error.Message = requ.notes

// http
httpFields.Version = requ.version.String()
Expand All @@ -560,6 +559,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
if http.sendRequest {
fields["request"] = string(http.makeRawMessage(requ))
}
fields["method"] = httpFields.RequestMethod
fields["query"] = fmt.Sprintf("%s %s", requ.method, path)
}

Expand All @@ -568,7 +568,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {

pbf.Destination.Bytes = int64(resp.size)
pbf.Event.End = resp.ts
notes = append(notes, resp.notes...)
pbf.Error.Message = append(pbf.Error.Message, resp.notes...)

// http
httpFields.ResponseStatusCode = int64(resp.statusCode)
Expand All @@ -587,12 +587,6 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
}
}

if len(notes) == 1 {
fields.Put("error.message", notes[0])
} else if len(notes) > 1 {
fields.Put("error.message", notes)
}

pb.MarshalStruct(evt.Fields, "http", httpFields)
return evt
}
Expand Down
Loading

0 comments on commit 4e78750

Please sign in to comment.