Skip to content

Commit

Permalink
fix: server side ssl event can't be captured correctly (#236)
Browse files Browse the repository at this point in the history
1. collect sendfile syscall event(nginx may send static file to client via sendfile syscall)
2. when conntrack created , transfer old connection's temp events to new conn, because some events may come in before conn created at userspace.
3. ignore recvmsg, recvfrom syscall with flags : MSG_OOB, MSG_PEEK.
  • Loading branch information
hengyoush committed Jan 6, 2025
1 parent e90f9ad commit 44f973f
Show file tree
Hide file tree
Showing 42 changed files with 711 additions and 368 deletions.
23 changes: 15 additions & 8 deletions agent/analysis/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"kyanos/agent/protocol"
"kyanos/bpf"
. "kyanos/common"
"math"

"github.com/jefurry/logrus"
)
Expand Down Expand Up @@ -176,14 +177,20 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
hasUserCopyEvents := len(events.userCopyEvents) > 0
hasTcpInEvents := len(events.tcpInEvents) > 0
if connection.IsServerSide() {
// why not use nicIngressEvents[0] directly?
// because we could missed some nicIngressEvents, the total duration may be negative
annotatedRecord.StartTs = math.MaxUint64
if hasNicInEvents {
annotatedRecord.StartTs = events.nicIngressEvents[0].GetTimestamp()
} else if hasTcpInEvents {
annotatedRecord.StartTs = events.tcpInEvents[0].GetTimestamp()
} else if hasUserCopyEvents {
annotatedRecord.StartTs = events.userCopyEvents[0].GetTimestamp()
} else if hasReadSyscallEvents {
annotatedRecord.StartTs = events.readSyscallEvents[0].GetTimestamp()
annotatedRecord.StartTs = min(events.nicIngressEvents[0].GetTimestamp(), annotatedRecord.StartTs)
}
if hasTcpInEvents {
annotatedRecord.StartTs = min(events.tcpInEvents[0].GetTimestamp(), annotatedRecord.StartTs)
}
if hasUserCopyEvents {
annotatedRecord.StartTs = min(events.userCopyEvents[0].GetTimestamp(), annotatedRecord.StartTs)
}
if hasReadSyscallEvents {
annotatedRecord.StartTs = min(events.readSyscallEvents[0].GetTimestamp(), annotatedRecord.StartTs)
}
if hasDevOutEvents {
annotatedRecord.EndTs = events.devOutEvents[len(events.devOutEvents)-1].GetTimestamp()
Expand All @@ -194,7 +201,7 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
}
annotatedRecord.ReqSize = events.ingressKernLen
annotatedRecord.RespSize = events.egressKernLen
if hasNicInEvents && hasDevOutEvents {
if annotatedRecord.StartTs != math.MaxUint64 && hasDevOutEvents {
annotatedRecord.TotalDuration = float64(annotatedRecord.EndTs) - float64(annotatedRecord.StartTs)
}
if hasReadSyscallEvents && hasWriteSyscallEvents {
Expand Down
13 changes: 13 additions & 0 deletions agent/conn/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ func (c *ConnManager) AddConnection4(TgidFd uint64, conn *Connection4) error {
prevConn = append(prevConn, existedConn)
conn.prevConn = prevConn

// transfer existedConn's Events to conn
conn.TempSyscallEvents = append(conn.TempSyscallEvents, existedConn.TempSyscallEvents...)
existedConn.TempSyscallEvents = []*bpf.SyscallEventData{}
conn.TempKernEvents = append(conn.TempKernEvents, existedConn.TempKernEvents...)
existedConn.TempKernEvents = []*bpf.AgentKernEvt{}
conn.TempSslEvents = append(conn.TempSslEvents, existedConn.TempSslEvents...)
existedConn.TempSslEvents = []*bpf.SslData{}

c.connMap.Store(TgidFd, conn)
atomic.AddInt64(&c.connectionAdded, 1)
return nil
Expand Down Expand Up @@ -414,6 +422,11 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, r
} else {
c.addDataToBufferAndTryParse(data, &event.SyscallEvent.Ke)
}
} else if event.SyscallEvent.GetSourceFunction() == bpf.AgentSourceFunctionTKSyscallSendfile {
// sendfile has no data, so we need to fill a fake data
common.ConntrackLog.Errorln("sendfile has no data, so we need to fill a fake data")
fakeData := make([]byte, event.SyscallEvent.Ke.Len)
c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke)
}
c.StreamEvents.AddSyscallEvent(event)

Expand Down
40 changes: 27 additions & 13 deletions agent/conn/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,23 @@ func (p *Processor) run() {
if isProtocolInterested && !isSideNotMatched(p, conn) {
if conn.Protocol != bpf.AgentTrafficProtocolTKProtocolUnknown {
for _, sysEvent := range conn.TempSyscallEvents {
if common.ConntrackLog.Level >= logrus.DebugLevel {
common.ConntrackLog.Debugf("%s process %d temp syscall events before infer\n", conn.ToString(), len(conn.TempSyscallEvents))
if sysEvent.SyscallEvent.Ke.Ts > conn.ConnectStartTs {
if common.ConntrackLog.Level >= logrus.DebugLevel {
common.ConntrackLog.Debugf("%s process %d temp syscall events before infer\n", conn.ToString(), len(conn.TempSyscallEvents))
}
conn.OnSyscallEvent(sysEvent.Buf, sysEvent, recordChannel)
}
conn.OnSyscallEvent(sysEvent.Buf, sysEvent, recordChannel)
}
conn.TempConnEvents = conn.TempConnEvents[0:0]
for _, sslEvent := range conn.TempSslEvents {
if common.ConntrackLog.Level >= logrus.DebugLevel {
common.ConntrackLog.Debugf("%s process %d temp ssl events before infer\n", conn.ToString(), len(conn.TempSslEvents))
if sslEvent.SslEventHeader.Ke.Ts > conn.ConnectStartTs {
if common.ConntrackLog.Level >= logrus.DebugLevel {
common.ConntrackLog.Debugf("%s process %d temp ssl events before infer\n", conn.ToString(), len(conn.TempSslEvents))
}
conn.OnSslDataEvent(sslEvent.Buf, sslEvent, recordChannel)
}
conn.OnSslDataEvent(sslEvent.Buf, sslEvent, recordChannel)
}
conn.TempSslEvents = conn.TempSslEvents[0:0]
conn.UpdateConnectionTraceable(true)
}
conn.TempKernEvents = conn.TempKernEvents[0:0]
Expand All @@ -228,23 +234,22 @@ func (p *Processor) run() {
eventType = "close"
} else if event.ConnType == bpf.AgentConnTypeTKProtocolInfer {
eventType = "infer"
// 连接推断事件可以不上报
} else if event.ConnType == bpf.AgentConnTypeTKConnect {
conn.AddConnEvent(event)
}

if common.ConntrackLog.Level >= logrus.DebugLevel {
if event.ConnType == bpf.AgentConnTypeTKProtocolInfer && conn.ProtocolInferred() {
common.ConntrackLog.Debugf("[conn] %s | type: %s, protocol: %d, \n", conn.ToString(), eventType, conn.Protocol)
} else {
common.ConntrackLog.Debugf("[conn] %s | type: %s, protocol: %d, \n", conn.ToString(), eventType, conn.Protocol)
}
common.ConntrackLog.Debugf("[conn][ts=%d] %s | type: %s, protocol: %d, \n", event.Ts, conn.ToString(), eventType, conn.Protocol)
}
case event := <-p.syscallEvents:
tgidFd := event.SyscallEvent.Ke.ConnIdS.TgidFd
conn := p.connManager.FindConnection4Or(tgidFd, event.SyscallEvent.Ke.Ts+common.LaunchEpochTime)
event.SyscallEvent.Ke.Ts += common.LaunchEpochTime
if conn != nil && conn.Status == Closed {
conn.AddSyscallEvent(event)
if common.BPFEventLog.Level >= logrus.DebugLevel {
common.BPFEventLog.Debugf("[syscall][closed conn][len=%d][ts=%d][fn=%d]%s | %s", max(event.SyscallEvent.BufSize, event.SyscallEvent.Ke.Len), event.SyscallEvent.Ke.Ts, event.SyscallEvent.GetSourceFunction(), conn.ToString(), string(event.Buf))
}
continue
}
if conn != nil && !conn.tracable {
Expand All @@ -255,7 +260,7 @@ func (p *Processor) run() {
}
if conn != nil && conn.ProtocolInferred() {
if common.BPFEventLog.Level >= logrus.DebugLevel {
common.BPFEventLog.Debugf("[syscall][len=%d][ts=%d]%s | %s", max(event.SyscallEvent.BufSize, event.SyscallEvent.Ke.Len), event.SyscallEvent.Ke.Ts, conn.ToString(), string(event.Buf))
common.BPFEventLog.Debugf("[syscall][len=%d][ts=%d][fn=%d]%s | %s", max(event.SyscallEvent.BufSize, event.SyscallEvent.Ke.Len), event.SyscallEvent.Ke.Ts, event.SyscallEvent.GetSourceFunction(), conn.ToString(), string(event.Buf))
}

conn.OnSyscallEvent(event.Buf, event, recordChannel)
Expand All @@ -279,9 +284,14 @@ func (p *Processor) run() {
conn := p.connManager.FindConnection4Or(tgidFd, event.SslEventHeader.Ke.Ts+common.LaunchEpochTime)
event.SslEventHeader.Ke.Ts += common.LaunchEpochTime
if conn != nil && conn.Status == Closed {
conn.AddSslEvent(event)
if common.BPFEventLog.Level >= logrus.DebugLevel {
common.BPFEventLog.Debugf("[ssl][closed conn][len=%d][ts=%d]%s | %s", event.SslEventHeader.BufSize, event.SslEventHeader.Ke.Ts, conn.ToString(), string(event.Buf))
}
continue
}
if conn != nil && !conn.tracable {
conn.AddSslEvent(event)
if common.BPFEventLog.Level >= logrus.DebugLevel {
common.BPFEventLog.Debugf("[ssl][no-trace][len=%d][ts=%d]%s | %s", event.SslEventHeader.BufSize, event.SslEventHeader.Ke.Ts, conn.ToString(), string(event.Buf))
}
Expand Down Expand Up @@ -312,6 +322,10 @@ func (p *Processor) run() {
tgidFd := event.ConnIdS.TgidFd
conn := p.connManager.FindConnection4Or(tgidFd, event.Ts+common.LaunchEpochTime)
if conn != nil && conn.Status == Closed {
conn.AddKernEvent(event)
if common.BPFEventLog.Level >= logrus.DebugLevel {
common.BPFEventLog.Debugf("[closed conn]%s", FormatKernEvt(event, conn))
}
continue
}
event.Ts += common.LaunchEpochTime
Expand Down
Loading

0 comments on commit 44f973f

Please sign in to comment.