Skip to content

Commit

Permalink
Packetbeat: add PID to the list of exported fields
Browse files Browse the repository at this point in the history
We had the process names, and this adds also the PID, closing elastic#562.
  • Loading branch information
Tudor Golubenco committed Jun 9, 2017
1 parent cde8f38 commit 5faec69
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 38 deletions.
1 change: 1 addition & 0 deletions libbeat/common/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ type Endpoint struct {
Name string
Cmdline string
Proc string
PID int
}
3 changes: 2 additions & 1 deletion libbeat/common/tuples.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,6 @@ func (t *TCPTuple) Hashable() HashableTCPTuple {

// Source and destination process names, as found by the proc module.
type CmdlineTuple struct {
Src, Dst []byte
Src, Dst string
SrcPID, DstPID int
}
12 changes: 6 additions & 6 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,15 @@ func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (pro

if proc.isLocalIP(tuple.SrcIP) {
logp.Debug("procs", "Looking for port %d", tuple.SrcPort)
procTuple.Src = []byte(proc.findProc(tuple.SrcPort))
procTuple.Src, procTuple.SrcPID = proc.findProc(tuple.SrcPort)
if len(procTuple.Src) > 0 {
logp.Debug("procs", "Found device %s for port %d", procTuple.Src, tuple.SrcPort)
}
}

if proc.isLocalIP(tuple.DstIP) {
logp.Debug("procs", "Looking for port %d", tuple.DstPort)
procTuple.Dst = []byte(proc.findProc(tuple.DstPort))
procTuple.Dst, procTuple.DstPID = proc.findProc(tuple.DstPort)
if len(procTuple.Dst) > 0 {
logp.Debug("procs", "Found device %s for port %d", procTuple.Dst, tuple.DstPort)
}
Expand All @@ -208,13 +208,13 @@ func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (pro
return
}

func (proc *ProcessesWatcher) findProc(port uint16) (procname string) {
func (proc *ProcessesWatcher) findProc(port uint16) (procname string, pid int) {
procname = ""
defer logp.Recover("FindProc exception")

p, exists := proc.portProcMap[port]
if exists {
return p.proc.name
return p.proc.name, p.pid
}

now := time.Now()
Expand All @@ -226,11 +226,11 @@ func (proc *ProcessesWatcher) findProc(port uint16) (procname string) {
// try again
p, exists := proc.portProcMap[port]
if exists {
return p.proc.name
return p.proc.name, p.pid
}
}

return ""
return "", 0
}

func hexToIpv4(word string) (net.IP, error) {
Expand Down
18 changes: 12 additions & 6 deletions packetbeat/protos/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,14 @@ func (amqp *amqpPlugin) handleAmqpRequest(msg *amqpMessage) {
trans.src = common.Endpoint{
IP: msg.tcpTuple.SrcIP.String(),
Port: msg.tcpTuple.SrcPort,
Proc: string(msg.cmdlineTuple.Src),
Proc: msg.cmdlineTuple.Src,
PID: msg.cmdlineTuple.SrcPID,
}
trans.dst = common.Endpoint{
IP: msg.tcpTuple.DstIP.String(),
Port: msg.tcpTuple.DstPort,
Proc: string(msg.cmdlineTuple.Dst),
Proc: msg.cmdlineTuple.Dst,
PID: msg.cmdlineTuple.DstPID,
}
if msg.direction == tcp.TCPDirectionReverse {
trans.src, trans.dst = trans.dst, trans.src
Expand Down Expand Up @@ -357,12 +359,14 @@ func (amqp *amqpPlugin) handlePublishing(client *amqpMessage) {
trans.src = common.Endpoint{
IP: client.tcpTuple.SrcIP.String(),
Port: client.tcpTuple.SrcPort,
Proc: string(client.cmdlineTuple.Src),
Proc: client.cmdlineTuple.Src,
PID: client.cmdlineTuple.SrcPID,
}
trans.dst = common.Endpoint{
IP: client.tcpTuple.DstIP.String(),
Port: client.tcpTuple.DstPort,
Proc: string(client.cmdlineTuple.Dst),
Proc: client.cmdlineTuple.Dst,
PID: client.cmdlineTuple.DstPID,
}

trans.method = client.method
Expand Down Expand Up @@ -402,12 +406,14 @@ func (amqp *amqpPlugin) handleDelivering(server *amqpMessage) {
trans.src = common.Endpoint{
IP: server.tcpTuple.SrcIP.String(),
Port: server.tcpTuple.SrcPort,
Proc: string(server.cmdlineTuple.Src),
Proc: server.cmdlineTuple.Src,
PID: server.cmdlineTuple.SrcPID,
}
trans.dst = common.Endpoint{
IP: server.tcpTuple.DstIP.String(),
Port: server.tcpTuple.DstPort,
Proc: string(server.cmdlineTuple.Dst),
Proc: server.cmdlineTuple.Dst,
PID: server.cmdlineTuple.DstPID,
}

//for publishing and delivering, bytes in and out represent the length of the
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/applayer/applayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,14 @@ func (t *Transaction) Init(
t.Src = common.Endpoint{
IP: tuple.SrcIP.String(),
Port: tuple.SrcPort,
Proc: string(cmdline.Src),
Proc: cmdline.Src,
PID: cmdline.SrcPID,
}
t.Dst = common.Endpoint{
IP: tuple.DstIP.String(),
Port: tuple.DstPort,
Proc: string(cmdline.Dst),
Proc: cmdline.Dst,
PID: cmdline.DstPID,
}
t.Notes = notes

Expand Down
9 changes: 6 additions & 3 deletions packetbeat/protos/cassandra/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
src := &common.Endpoint{
IP: requ.Tuple.SrcIP.String(),
Port: requ.Tuple.SrcPort,
Proc: string(requ.CmdlineTuple.Src),
Proc: requ.CmdlineTuple.Src,
PID: requ.CmdlineTuple.SrcPID,
}

event["@timestamp"] = common.Time(requ.Ts)
Expand Down Expand Up @@ -83,7 +84,8 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
dst := &common.Endpoint{
IP: requ.Tuple.DstIP.String(),
Port: requ.Tuple.DstPort,
Proc: string(requ.CmdlineTuple.Dst),
Proc: requ.CmdlineTuple.Dst,
PID: requ.CmdlineTuple.DstPID,
}
event["dst"] = dst

Expand All @@ -95,7 +97,8 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
dst := &common.Endpoint{
IP: resp.Tuple.DstIP.String(),
Port: resp.Tuple.DstPort,
Proc: string(resp.CmdlineTuple.Dst),
Proc: resp.CmdlineTuple.Dst,
PID: resp.CmdlineTuple.DstPID,
}
event["dst"] = dst
}
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,14 @@ func newTransaction(ts time.Time, tuple dnsTuple, cmd common.CmdlineTuple) *dnsT
trans.src = common.Endpoint{
IP: tuple.srcIP.String(),
Port: tuple.srcPort,
Proc: string(cmd.Src),
Proc: cmd.Src,
PID: cmd.SrcPID,
}
trans.dst = common.Endpoint{
IP: tuple.dstIP.String(),
Port: tuple.dstPort,
Proc: string(cmd.Dst),
Proc: cmd.Dst,
PID: cmd.DstPID,
}
return trans
}
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,14 @@ func (http *httpPlugin) newTransaction(requ, resp *message) common.MapStr {
src := common.Endpoint{
IP: requ.tcpTuple.SrcIP.String(),
Port: requ.tcpTuple.SrcPort,
Proc: string(requ.cmdlineTuple.Src),
Proc: requ.cmdlineTuple.Src,
PID: requ.cmdlineTuple.SrcPID,
}
dst := common.Endpoint{
IP: requ.tcpTuple.DstIP.String(),
Port: requ.tcpTuple.DstPort,
Proc: string(requ.cmdlineTuple.Dst),
Proc: requ.cmdlineTuple.Dst,
PID: requ.cmdlineTuple.DstPID,
}
if requ.direction == tcp.TCPDirectionReverse {
src, dst = dst, src
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,14 @@ func newTransaction(requ, resp *mongodbMessage) *transaction {
trans.src = common.Endpoint{
IP: requ.tcpTuple.SrcIP.String(),
Port: requ.tcpTuple.SrcPort,
Proc: string(requ.cmdlineTuple.Src),
Proc: requ.cmdlineTuple.Src,
PID: requ.cmdlineTuple.SrcPID,
}
trans.dst = common.Endpoint{
IP: requ.tcpTuple.DstIP.String(),
Port: requ.tcpTuple.DstPort,
Proc: string(requ.cmdlineTuple.Dst),
Proc: requ.cmdlineTuple.Dst,
PID: requ.cmdlineTuple.DstPID,
}
if requ.direction == tcp.TCPDirectionReverse {
trans.src, trans.dst = trans.dst, trans.src
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,14 @@ func (mysql *mysqlPlugin) receivedMysqlRequest(msg *mysqlMessage) {
trans.src = common.Endpoint{
IP: msg.tcpTuple.SrcIP.String(),
Port: msg.tcpTuple.SrcPort,
Proc: string(msg.cmdlineTuple.Src),
Proc: msg.cmdlineTuple.Src,
PID: msg.cmdlineTuple.SrcPID,
}
trans.dst = common.Endpoint{
IP: msg.tcpTuple.DstIP.String(),
Port: msg.tcpTuple.DstPort,
Proc: string(msg.cmdlineTuple.Dst),
Proc: msg.cmdlineTuple.Dst,
PID: msg.cmdlineTuple.DstPID,
}
if msg.direction == tcp.TCPDirectionReverse {
trans.src, trans.dst = trans.dst, trans.src
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,14 @@ func (pgsql *pgsqlPlugin) receivedPgsqlRequest(msg *pgsqlMessage) {
trans.src = common.Endpoint{
IP: msg.tcpTuple.SrcIP.String(),
Port: msg.tcpTuple.SrcPort,
Proc: string(msg.cmdlineTuple.Src),
Proc: msg.cmdlineTuple.Src,
PID: msg.cmdlineTuple.SrcPID,
}
trans.dst = common.Endpoint{
IP: msg.tcpTuple.DstIP.String(),
Port: msg.tcpTuple.DstPort,
Proc: string(msg.cmdlineTuple.Dst),
Proc: msg.cmdlineTuple.Dst,
PID: msg.cmdlineTuple.DstPID,
}
if msg.direction == tcp.TCPDirectionReverse {
trans.src, trans.dst = trans.dst, trans.src
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,14 @@ func (redis *redisPlugin) newTransaction(requ, resp *redisMessage) common.MapStr
src := &common.Endpoint{
IP: requ.tcpTuple.SrcIP.String(),
Port: requ.tcpTuple.SrcPort,
Proc: string(requ.cmdlineTuple.Src),
Proc: requ.cmdlineTuple.Src,
PID: requ.cmdlineTuple.SrcPID,
}
dst := &common.Endpoint{
IP: requ.tcpTuple.DstIP.String(),
Port: requ.tcpTuple.DstPort,
Proc: string(requ.cmdlineTuple.Dst),
Proc: requ.cmdlineTuple.Dst,
PID: requ.cmdlineTuple.DstPID,
}
if requ.direction == tcp.TCPDirectionReverse {
src, dst = dst, src
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/thrift/thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,12 +988,14 @@ func (thrift *thriftPlugin) receivedRequest(msg *thriftMessage) {
trans.src = common.Endpoint{
IP: msg.tcpTuple.SrcIP.String(),
Port: msg.tcpTuple.SrcPort,
Proc: string(msg.cmdlineTuple.Src),
Proc: msg.cmdlineTuple.Src,
PID: msg.cmdlineTuple.SrcPID,
}
trans.dst = common.Endpoint{
IP: msg.tcpTuple.DstIP.String(),
Port: msg.tcpTuple.DstPort,
Proc: string(msg.cmdlineTuple.Dst),
Proc: msg.cmdlineTuple.Dst,
PID: msg.cmdlineTuple.DstPID,
}
if msg.direction == tcp.TCPDirectionReverse {
trans.src, trans.dst = trans.dst, trans.src
Expand Down
6 changes: 6 additions & 0 deletions packetbeat/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ func (p *PacketbeatPublisher) normalizeTransAddr(event common.MapStr) bool {
event["client_ip"] = src.IP
event["client_port"] = src.Port
event["client_proc"] = src.Proc
if src.PID != 0 {
event["client_pid"] = src.PID
}
event["client_server"] = srcServer
delete(event, "src")
}
Expand All @@ -226,6 +229,9 @@ func (p *PacketbeatPublisher) normalizeTransAddr(event common.MapStr) bool {
event["ip"] = dst.IP
event["port"] = dst.Port
event["proc"] = dst.Proc
if dst.PID != 0 {
event["pid"] = dst.PID
}
event["server"] = dstServer
delete(event, "dst")

Expand Down
10 changes: 6 additions & 4 deletions packetbeat/scripts/tcp-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ This requires [python](https://www.python.org/downloads/) to be installed.
- server: Send echo response upon receiving a message. Echo response begins
with `<` character. Errors responses begin with `!` character. An error message
will be returned for any received request not starting with `>`.

- Echo Server sample code:

```
Expand Down Expand Up @@ -99,7 +99,7 @@ func echo(sock net.Conn) {

### 2.1 Add protocol analyzer (echo) to packetbeat:

Create analyzer skeleton from code generator template.
Create analyzer skeleton from code generator template.

```
$ cd ${GOPATH}/src/github.com/elastic/beats/packetbeat/protos
Expand Down Expand Up @@ -238,12 +238,14 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
src := &common.Endpoint{
IP: requ.Tuple.SrcIP.String(),
Port: requ.Tuple.SrcPort,
Proc: string(requ.CmdlineTuple.Src),
Proc: requ.CmdlineTuple.Src,
PID: requ.CmdlineTuple.SrcPID,
}
dst := &common.Endpoint{
IP: requ.Tuple.DstIP.String(),
Port: requ.Tuple.DstPort,
Proc: string(requ.CmdlineTuple.Dst),
Proc: requ.CmdlineTuple.Dst,
PID: requ.CmdlineTuple.DstPID,
}
event := common.MapStr{
Expand Down
6 changes: 4 additions & 2 deletions packetbeat/scripts/tcp-protocol/{protocol}/pub.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
src := &common.Endpoint{
IP: requ.Tuple.SrcIP.String(),
Port: requ.Tuple.SrcPort,
Proc: string(requ.CmdlineTuple.Src),
Proc: requ.CmdlineTuple.Src,
PID: requ.CmdlineTuple.SrcPID,
}
dst := &common.Endpoint{
IP: requ.Tuple.DstIP.String(),
Port: requ.Tuple.DstPort,
Proc: string(requ.CmdlineTuple.Dst),
Proc: requ.CmdlineTuple.Dst,
PID: requ.CmdlineTuple.DstPID,
}

event := common.MapStr{
Expand Down

0 comments on commit 5faec69

Please sign in to comment.