Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event.kind and event.category #10357

Merged
merged 2 commits into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ func createEvent(
"end": common.Time(f.ts),
"duration": f.ts.Sub(f.createTS),
"dataset": "flow",
"kind": "event",
"category": "network_traffic",
"action": "network_flow",
}
flow := common.MapStr{
"id": common.NetString(f.id.Serialize()),
Expand Down
39 changes: 34 additions & 5 deletions packetbeat/pb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ import (
// event at publish time.
const FieldsKey = "_packetbeat"

// Network direction values.
const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have these in the ECS go code ;-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. 👍

Inbound = "inbound"
Outbound = "outbound"
Internal = "internal"
)

// Fields contains common fields used in Packetbeat events. Protocol
// implementations can publish a Fields pointer in a beat.Event and it will
// be marshaled into the event following the ECS schema where applicable.
Expand All @@ -54,13 +61,23 @@ type Fields struct {
DestinationProcess *ecs.Process `ecs:"destination.process"`
Process *ecs.Process `ecs:"process"`

Error struct {
Message []string
}

ICMPType uint8 // ICMP message type for use in computing network.community_id.
ICMPCode uint8 // ICMP message code for use in computing network.community_id.
}

// NewFields returns a new Fields value.
func NewFields() *Fields {
return &Fields{Event: ecs.Event{Duration: -1}}
return &Fields{
Event: ecs.Event{
Duration: -1,
Kind: "event",
Category: "network_traffic",
},
}
}

// NewBeatEvent creates a new beat.Event populated with a Fields value and
Expand Down Expand Up @@ -181,18 +198,23 @@ func (f *Fields) ComputeValues(localIPs []net.IP) error {
}

// network.direction
if len(localIPs) > 0 {
if len(localIPs) > 0 && f.Network.Direction == "" {
if flow.SourceIP != nil {
for _, ip := range localIPs {
if flow.SourceIP.Equal(ip) {
f.Network.Direction = "outbound"
f.Network.Direction = Outbound
break
}
}
} else if flow.DestinationIP != nil {
}
if flow.DestinationIP != nil {
for _, ip := range localIPs {
if flow.DestinationIP.Equal(ip) {
f.Network.Direction = "inbound"
if f.Network.Direction == Outbound {
f.Network.Direction = Internal
} else {
f.Network.Direction = Inbound
}
break
}
}
Expand Down Expand Up @@ -256,6 +278,13 @@ func (f *Fields) MarshalMapStr(m common.MapStr) error {
return err
}
}

if len(f.Error.Message) == 1 {
m.Put("error.message", f.Error.Message[0])
} else if len(f.Error.Message) > 1 {
m.Put("error.message", f.Error.Message)
}

return nil
}

Expand Down
8 changes: 7 additions & 1 deletion packetbeat/pb/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ func TestMarshalMapStr(t *testing.T) {
t.Fatal(err)
}

assert.Equal(t, common.MapStr{"source": common.MapStr{"ip": "127.0.0.1"}}, m)
assert.Equal(t, common.MapStr{
"event": common.MapStr{
"kind": "event",
"category": "network_traffic",
},
"source": common.MapStr{"ip": "127.0.0.1"},
}, m)
}

func TestComputeValues(t *testing.T) {
Expand Down
7 changes: 1 addition & 6 deletions packetbeat/protos/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func (amqp *amqpPlugin) publishTransaction(t *amqpTransaction) {
pbf.Event.Dataset = "amqp"
pbf.Network.Protocol = pbf.Event.Dataset
pbf.Network.Transport = "tcp"
pbf.Error.Message = t.notes

fields := evt.Fields
fields["type"] = pbf.Event.Dataset
Expand Down Expand Up @@ -490,12 +491,6 @@ func (amqp *amqpPlugin) publishTransaction(t *amqpTransaction) {
}
}

if len(t.notes) == 1 {
evt.PutValue("error.message", t.notes[0])
} else if len(t.notes) > 1 {
evt.PutValue("error.message", t.notes)
}

amqp.results(evt)
}

Expand Down
15 changes: 2 additions & 13 deletions packetbeat/protos/amqp/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,16 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/packetbeat/pb"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/publish"
)

type eventStore struct {
events []beat.Event
}

func (e *eventStore) publish(event beat.Event) {
pbf, err := pb.GetFields(event.Fields)
if err != nil || pbf == nil {
panic("_packetbeat not found")
}
delete(event.Fields, pb.FieldsKey)
if err = pbf.ComputeValues(nil); err != nil {
panic(err)
}
if err = pbf.MarshalMapStr(event.Fields); err != nil {
panic(err)
}

publish.MarshalPacketbeatFields(&event, nil)
e.events = append(e.events, event)
}

Expand Down
8 changes: 2 additions & 6 deletions packetbeat/protos/applayer/applayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (t *Transaction) InitWithMsg(
func (t *Transaction) Event(event *beat.Event) error {
event.Timestamp = t.Ts.Ts

pbf := &pb.Fields{}
pbf := pb.NewFields()
pbf.SetSource(&t.Src)
pbf.SetDestination(&t.Dst)
pbf.Source.Bytes = int64(t.BytesIn)
Expand All @@ -233,16 +233,12 @@ func (t *Transaction) Event(event *beat.Event) error {
pbf.Event.End = t.EndTime
pbf.Network.Transport = t.Transport.String()
pbf.Network.Protocol = pbf.Event.Dataset
pbf.Error.Message = t.Notes

fields := event.Fields
fields[pb.FieldsKey] = pbf
fields["type"] = pbf.Event.Dataset
fields["status"] = t.Status
if len(t.Notes) == 1 {
event.PutValue("error.message", t.Notes[0])
} else if len(t.Notes) > 1 {
event.PutValue("error.message", t.Notes)
}
return nil
}

Expand Down
11 changes: 2 additions & 9 deletions packetbeat/protos/cassandra/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event {

cassandra := common.MapStr{}
status := common.OK_STATUS
var notes []string

//requ can be null, if the message is a PUSHed message
if requ != nil {
pbf.Source.Bytes = int64(requ.Size)
pbf.Event.Start = requ.Ts
notes = append(notes, requ.Notes...)
pbf.Error.Message = requ.Notes

if pub.sendRequest {
if pub.sendRequestHeader {
Expand All @@ -107,7 +106,7 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event {
if resp != nil {
pbf.Destination.Bytes = int64(resp.Size)
pbf.Event.End = resp.Ts
notes = append(notes, resp.Notes...)
pbf.Error.Message = append(pbf.Error.Message, resp.Notes...)

if resp.failed {
status = common.ERROR_STATUS
Expand All @@ -133,11 +132,5 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event {
fields["cassandra"] = cassandra
}

if len(notes) == 1 {
fields.Put("error.message", notes[0])
} else if len(notes) > 1 {
fields.Put("error.message", notes)
}

return evt
}
37 changes: 13 additions & 24 deletions packetbeat/protos/dhcpv4/dhcpv4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/pb"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/publish"
)

var _ protos.UDPPlugin = &dhcpv4Plugin{}
Expand Down Expand Up @@ -117,8 +117,10 @@ func TestParseDHCPRequest(t *testing.T) {
"port": 67,
},
"event": common.MapStr{
"dataset": "dhcpv4",
"start": pkt.Ts,
"category": "network_traffic",
"dataset": "dhcpv4",
"kind": "event",
"start": pkt.Ts,
},
"network": common.MapStr{
"type": "ipv4",
Expand Down Expand Up @@ -150,8 +152,9 @@ func TestParseDHCPRequest(t *testing.T) {
},
}

actual := marshalPacketbeatFields(t, p.parseDHCPv4(pkt))
actual := p.parseDHCPv4(pkt)
if assert.NotNil(t, actual) {
publish.MarshalPacketbeatFields(actual, nil)
t.Logf("DHCP event: %+v", actual)
assertEqual(t, expected, *actual)
}
Expand Down Expand Up @@ -194,8 +197,10 @@ func TestParseDHCPACK(t *testing.T) {
"bytes": 300,
},
"event": common.MapStr{
"dataset": "dhcpv4",
"start": pkt.Ts,
"category": "network_traffic",
"dataset": "dhcpv4",
"kind": "event",
"start": pkt.Ts,
},
"network": common.MapStr{
"type": "ipv4",
Expand Down Expand Up @@ -226,8 +231,9 @@ func TestParseDHCPACK(t *testing.T) {
},
}

actual := marshalPacketbeatFields(t, p.parseDHCPv4(pkt))
actual := p.parseDHCPv4(pkt)
if assert.NotNil(t, actual) {
publish.MarshalPacketbeatFields(actual, nil)
t.Logf("DHCP event: %+v", actual)
assertEqual(t, expected, *actual)
}
Expand All @@ -249,20 +255,3 @@ func normalizeEvent(t testing.TB, event beat.Event) interface{} {
}
return out
}

func marshalPacketbeatFields(t testing.TB, evt *beat.Event) *beat.Event {
pbf, err := pb.GetFields(evt.Fields)
if err != nil || pbf == nil {
t.Fatal("failed getting _packetbeat", err)
}
delete(evt.Fields, pb.FieldsKey)

if err = pbf.ComputeValues(nil); err != nil {
t.Fatal(err)
}

if err = pbf.MarshalMapStr(evt.Fields); err != nil {
t.Fatal(err)
}
return evt
}
6 changes: 1 addition & 5 deletions packetbeat/protos/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,11 @@ func (dns *dnsPlugin) publishTransaction(t *dnsTransaction) {
pbf.SetDestination(&t.dst)
pbf.Network.Transport = t.transport.String()
pbf.Network.Protocol = "dns"
pbf.Error.Message = t.notes

fields := evt.Fields
fields["type"] = "dns"
fields["status"] = common.ERROR_STATUS
if len(t.notes) == 1 {
fields.Put("error.message", t.notes[0])
} else if len(t.notes) > 1 {
fields.Put("error.message", t.notes)
}

dnsEvent := common.MapStr{}
fields["dns"] = dnsEvent
Expand Down
15 changes: 2 additions & 13 deletions packetbeat/protos/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/pb"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/publish"
)

// Test Constants
Expand Down Expand Up @@ -79,18 +79,7 @@ type eventStore struct {
}

func (e *eventStore) publish(event beat.Event) {
pbf, err := pb.GetFields(event.Fields)
if err != nil || pbf == nil {
panic("_packetbeat not found")
}
delete(event.Fields, pb.FieldsKey)
if err = pbf.ComputeValues(nil); err != nil {
panic(err)
}
if err = pbf.MarshalMapStr(event.Fields); err != nil {
panic(err)
}

publish.MarshalPacketbeatFields(&event, nil)
e.events = append(e.events, event)
}

Expand Down
12 changes: 3 additions & 9 deletions packetbeat/protos/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
fields["status"] = status

var httpFields ProtocolFields
var notes []string
if requ != nil {
http.decodeBody(requ)
path, params, err := http.extractParameters(requ)
Expand All @@ -534,7 +533,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
}
pbf.Event.Start = requ.ts
pbf.Network.ForwardedIP = string(requ.realIP)
notes = append(notes, requ.notes...)
pbf.Error.Message = requ.notes

// http
httpFields.Version = requ.version.String()
Expand All @@ -560,6 +559,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
if http.sendRequest {
fields["request"] = string(http.makeRawMessage(requ))
}
fields["method"] = httpFields.RequestMethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this upper / lower case? Should we use strings.ToLower here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value has already been ToLowered.

fields["query"] = fmt.Sprintf("%s %s", requ.method, path)
}

Expand All @@ -568,7 +568,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {

pbf.Destination.Bytes = int64(resp.size)
pbf.Event.End = resp.ts
notes = append(notes, resp.notes...)
pbf.Error.Message = append(pbf.Error.Message, resp.notes...)

// http
httpFields.ResponseStatusCode = int64(resp.statusCode)
Expand All @@ -587,12 +587,6 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
}
}

if len(notes) == 1 {
fields.Put("error.message", notes[0])
} else if len(notes) > 1 {
fields.Put("error.message", notes)
}

pb.MarshalStruct(evt.Fields, "http", httpFields)
return evt
}
Expand Down
Loading