Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support tracking data on existing connections #36

Merged
merged 5 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func validateAndRepairOptions(options AgentOptions) AgentOptions {
newOptions.ProcessorsNum = runtime.NumCPU()
}
if newOptions.MessageFilter == nil {
newOptions.MessageFilter = protocol.NoopFilter{}
newOptions.MessageFilter = protocol.BaseFilter{}
}
if newOptions.BPFVerifyLogSize <= 0 {
newOptions.BPFVerifyLogSize = 1 * 1024 * 1024
Expand Down
4 changes: 2 additions & 2 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ func TestTcpV4DoRcv(t *testing.T) {
bpf.AttachKProbeSecuritySocketSendmsgEntry,
bpf.AttachKProbeSecuritySocketRecvmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTNIC_IN, p)
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN, p)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTTCP_IN, p)
Expand Down Expand Up @@ -977,7 +977,7 @@ func TestSkbCopyDatagramIter(t *testing.T) {
bpf.AttachKProbeSecuritySocketSendmsgEntry,
bpf.AttachKProbeSecuritySocketRecvmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTNIC_IN, p)
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN, p)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTUSER_COPY, p)
Expand Down
26 changes: 13 additions & 13 deletions agent/analysis/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type AnnotatedRecord struct {
endTs uint64
reqSize int
respSize int
totalDuration int
blackBoxDuration int
readFromSocketBufferDuration int
totalDuration float64
blackBoxDuration float64
readFromSocketBufferDuration float64
reqSyscallEventDetails []SyscallEventDetail
respSyscallEventDetails []SyscallEventDetail
reqNicEventDetails []NicEventDetail
Expand Down Expand Up @@ -78,17 +78,17 @@ func (r *AnnotatedRecord) String(options AnnotatedRecordToStringOptions) string
result += r.Record.String(options.RecordToStringOptions)
result += "\n"
if _, ok := options.MetricTypeSet[TotalDuration]; ok {
result += fmt.Sprintf("[total duration] = %d(%s)(start=%s, end=%s)\n", common.ConvertDurationToMillisecondsIfNeeded(int64(r.totalDuration), nano), timeUnitName(nano),
result += fmt.Sprintf("[total duration] = %.3f(%s)(start=%s, end=%s)\n", common.ConvertDurationToMillisecondsIfNeeded(float64(r.totalDuration), nano), timeUnitName(nano),
common.FormatTimestampWithPrecision(r.startTs, nano),
common.FormatTimestampWithPrecision(r.endTs, nano))
}
if _, ok := options.MetricTypeSet[ReadFromSocketBufferDuration]; ok {
result += fmt.Sprintf("[read from sockbuf]=%d(%s)\n", common.ConvertDurationToMillisecondsIfNeeded(int64(r.readFromSocketBufferDuration), nano),
result += fmt.Sprintf("[read from sockbuf]=%.3f(%s)\n", common.ConvertDurationToMillisecondsIfNeeded(float64(r.readFromSocketBufferDuration), nano),
timeUnitName(nano))
}
if _, ok := options.MetricTypeSet[BlackBoxDuration]; ok {
result += fmt.Sprintf("[%s]=%d(%s)\n", r.blackboxName(),
common.ConvertDurationToMillisecondsIfNeeded(int64(r.blackBoxDuration), nano),
result += fmt.Sprintf("[%s]=%.3f(%s)\n", r.blackboxName(),
common.ConvertDurationToMillisecondsIfNeeded(float64(r.blackBoxDuration), nano),
timeUnitName(nano))
}
if _, ok := options.MetricTypeSet[RequestSize]; ok {
Expand Down Expand Up @@ -198,13 +198,13 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.reqSize = ingressMessage.ByteSize()
annotatedRecord.respSize = egressMessage.ByteSize()
if hasNicInEvents && hasDevOutEvents {
annotatedRecord.totalDuration = int(annotatedRecord.endTs) - int(annotatedRecord.startTs)
annotatedRecord.totalDuration = float64(annotatedRecord.endTs) - float64(annotatedRecord.startTs)
}
if hasReadSyscallEvents && hasWriteSyscallEvents {
annotatedRecord.blackBoxDuration = int(writeSyscallEvents[len(writeSyscallEvents)-1].GetTimestamp()) - int(readSyscallEvents[0].GetTimestamp())
annotatedRecord.blackBoxDuration = float64(writeSyscallEvents[len(writeSyscallEvents)-1].GetTimestamp()) - float64(readSyscallEvents[0].GetTimestamp())
}
if hasUserCopyEvents && hasTcpInEvents {
annotatedRecord.readFromSocketBufferDuration = int(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - int(tcpInEvents[0].GetTimestamp())
annotatedRecord.readFromSocketBufferDuration = float64(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - float64(tcpInEvents[0].GetTimestamp())
}
annotatedRecord.reqSyscallEventDetails = KernEventsToEventDetails[SyscallEventDetail](readSyscallEvents)
annotatedRecord.respSyscallEventDetails = KernEventsToEventDetails[SyscallEventDetail](writeSyscallEvents)
Expand All @@ -220,13 +220,13 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.reqSize = egressMessage.ByteSize()
annotatedRecord.respSize = ingressMessage.ByteSize()
if hasReadSyscallEvents && hasWriteSyscallEvents {
annotatedRecord.totalDuration = int(annotatedRecord.endTs) - int(annotatedRecord.startTs)
annotatedRecord.totalDuration = float64(annotatedRecord.endTs) - float64(annotatedRecord.startTs)
}
if hasNicInEvents && hasDevOutEvents {
annotatedRecord.blackBoxDuration = int(nicIngressEvents[len(nicIngressEvents)-1].GetTimestamp()) - int(devOutSyscallEvents[0].GetTimestamp())
annotatedRecord.blackBoxDuration = float64(nicIngressEvents[len(nicIngressEvents)-1].GetTimestamp()) - float64(devOutSyscallEvents[0].GetTimestamp())
}
if hasUserCopyEvents && hasTcpInEvents {
annotatedRecord.readFromSocketBufferDuration = int(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - int(tcpInEvents[0].GetTimestamp())
annotatedRecord.readFromSocketBufferDuration = float64(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - float64(tcpInEvents[0].GetTimestamp())
}
annotatedRecord.reqSyscallEventDetails = KernEventsToEventDetails[SyscallEventDetail](writeSyscallEvents)
annotatedRecord.respSyscallEventDetails = KernEventsToEventDetails[SyscallEventDetail](readSyscallEvents)
Expand Down
1 change: 1 addition & 0 deletions agent/buffer/stream_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (sb *StreamBuffer) IsEmpty() bool {
}
func (sb *StreamBuffer) Clear() {
sb.buffers = sb.buffers[:]
sb.timestamps.Clear()
}
func (sb *StreamBuffer) RemovePrefix(length int) {
if sb.IsEmpty() {
Expand Down
112 changes: 89 additions & 23 deletions agent/conn/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/cilium/ebpf"
)

// var RecordFunc func(protocol.Record, *Connection4) error
Expand Down Expand Up @@ -233,11 +235,11 @@ func (c *Connection4) extractSockKeys() (bpf.AgentSockKey, bpf.AgentSockKey) {
key.Family = uint32(common.AF_INET) // TODO @ipv6

var revKey bpf.AgentSockKey
key.Sip = common.BytesToInt[uint32](c.RemoteIp)
key.Dip = common.BytesToInt[uint32](c.LocalIp)
key.Sport = uint32(c.RemotePort)
key.Dport = uint32(c.LocalPort)
key.Family = uint32(common.AF_INET)
revKey.Sip = common.BytesToInt[uint32](c.RemoteIp)
revKey.Dip = common.BytesToInt[uint32](c.LocalIp)
revKey.Sport = uint32(c.RemotePort)
revKey.Dport = uint32(c.LocalPort)
revKey.Family = uint32(common.AF_INET)
return key, revKey
}

Expand All @@ -249,36 +251,76 @@ func (c *Connection4) OnClose(needClearBpfMap bool) {
err := connInfoMap.Delete(c.TgidFd)
if err != nil {
log.Debugf("clean conn_info_map failed: %v", err)
} else {
log.Debugf("clean conn_info_map deleted")
}
key, revKey := c.extractSockKeys()
sockKeyConnIdMap := bpf.GetMap("SockKeyConnIdMap")
err = sockKeyConnIdMap.Delete(&key)
err = sockKeyConnIdMap.Delete(key)
if err != nil {
log.Debugf("clean sock_key_conn_id_map key failed: %v", err)
} else {
log.Debugf("clean sockKeyConnIdMap deleted key")
}
err = sockKeyConnIdMap.Delete(&revKey)
err = sockKeyConnIdMap.Delete(revKey)
if err != nil {
log.Debugf("clean sock_key_conn_id_map revkey failed: %v", err)
} else {
log.Debugf("clean sockKeyConnIdMap deleted revkey")
}
sockXmitMap := bpf.GetMap("SockXmitMap")
n, err := sockXmitMap.BatchDelete([]bpf.AgentSockKey{key, revKey}, nil)
err = sockXmitMap.Delete(key)
if err == nil {
log.Debugf("clean sockXmitMap deleted: %v", n)
log.Debugf("clean sockXmitMap deleted key")
} else {
log.Debugf("clean sockXmitMap failed: %v", err)
}
err = sockXmitMap.Delete(revKey)
if err == nil {
log.Debugf("clean sockXmitMap deleted revkey")
} else {
log.Debugf("clean sockXmitMap failed: %v", err)
}
}
monitor.UnregisterMetricExporter(c.StreamEvents)
}

func (c *Connection4) OnCloseWithoutClearBpfMap() {
c.OnClose(false)
func (c *Connection4) UpdateConnectionTraceable(traceable bool) {
key, revKey := c.extractSockKeys()
sockKeyConnIdMap := bpf.GetMap("SockKeyConnIdMap")
c.doUpdateConnIdMapProtocolToUnknwon(key, sockKeyConnIdMap, traceable)
c.doUpdateConnIdMapProtocolToUnknwon(revKey, sockKeyConnIdMap, traceable)

connInfoMap := bpf.GetMap("ConnInfoMap")
connInfo := bpf.AgentConnInfoT{}
err := connInfoMap.Lookup(c.TgidFd, &connInfo)
if err == nil {
connInfo.NoTrace = !traceable
connInfoMap.Update(c.TgidFd, &connInfo, ebpf.UpdateExist)
} else {
log.Debugf("try to update %s conn_info_map to no_trace, but no entry in map found!", c.ToString())
}
}

func (c *Connection4) doUpdateConnIdMapProtocolToUnknwon(key bpf.AgentSockKey, m *ebpf.Map, traceable bool) {
var connIds bpf.AgentConnIdS_t
err := m.Lookup(&key, &connIds)
if err == nil {
connIds.NoTrace = !traceable
m.Update(&key, &connIds, ebpf.UpdateExist)
} else {
log.Debugf("try to update %s conn_id_map to no_trace, but no entry in map found! key: %v", c.ToString(), key)
}
}

// func (c *Connection4) OnCloseWithoutClearBpfMap() {
// c.OnClose(false)
// }
func (c *Connection4) OnKernEvent(event *bpf.AgentKernEvt) bool {
isReq := isReq(c, event)
isReq, ok := isReq(c, event)
if event.Len > 0 {
c.StreamEvents.AddKernEvent(event)
} else {
} else if ok {
if (event.Flags&uint8(common.TCP_FLAGS_SYN) != 0) && !isReq && event.Step == bpf.AgentStepTIP_IN {
// 接收到Server给的Syn包
if c.ServerSynReceived {
Expand All @@ -298,16 +340,15 @@ func (c *Connection4) OnKernEvent(event *bpf.AgentKernEvt) bool {
return true
}
func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData) {
isReq := isReq(c, &event.SyscallEvent.Ke)
isReq, _ := isReq(c, &event.SyscallEvent.Ke)
if isReq {

c.reqStreamBuffer.Add(event.SyscallEvent.Ke.Seq, data, event.SyscallEvent.Ke.Ts)
} else {
c.respStreamBuffer.Add(event.SyscallEvent.Ke.Seq, data, event.SyscallEvent.Ke.Ts)
}

c.parseStreamBuffer(c.reqStreamBuffer, protocol.Request, &c.ReqQueue)
c.parseStreamBuffer(c.respStreamBuffer, protocol.Response, &c.RespQueue)
c.parseStreamBuffer(c.reqStreamBuffer, protocol.Request, &c.ReqQueue, event.SyscallEvent.Ke.Step)
c.parseStreamBuffer(c.respStreamBuffer, protocol.Response, &c.RespQueue, event.SyscallEvent.Ke.Step)
c.StreamEvents.AddSyscallEvent(event)

parser := c.GetProtocolParser(c.Protocol)
Expand All @@ -321,7 +362,7 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData) {
}
}

func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, resultQueue *[]protocol.ParsedMessage) {
func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, resultQueue *[]protocol.ParsedMessage, step bpf.AgentStepT) {
parser := c.GetProtocolParser(c.Protocol)
if parser == nil {
streamBuffer.Clear()
Expand All @@ -341,8 +382,19 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa
parseResult := parser.ParseStream(streamBuffer, messageType)
switch parseResult.ParseState {
case protocol.Success:
*resultQueue = append(*resultQueue, parseResult.ParsedMessages...)
streamBuffer.RemovePrefix(parseResult.ReadBytes)
if c.Role == bpf.AgentEndpointRoleTKRoleUnknown && len(parseResult.ParsedMessages) > 0 {
parsedMessage := parseResult.ParsedMessages[0]
if (step == bpf.AgentStepTSYSCALL_IN && parsedMessage.IsReq()) || (step == bpf.AgentStepTSYSCALL_OUT && !parsedMessage.IsReq()) {
c.Role = bpf.AgentEndpointRoleTKRoleServer
} else {
c.Role = bpf.AgentEndpointRoleTKRoleClient
}
log.Debugf("Update %s role", c.ToString())
c.resetParseProgress()
} else {
*resultQueue = append(*resultQueue, parseResult.ParsedMessages...)
streamBuffer.RemovePrefix(parseResult.ReadBytes)
}
case protocol.Invalid:
pos := parser.FindBoundary(streamBuffer, messageType, 1)
if pos != -1 {
Expand All @@ -363,14 +415,17 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa

}

func isReq(conn *Connection4, event *bpf.AgentKernEvt) bool {
func isReq(conn *Connection4, event *bpf.AgentKernEvt) (bool, bool) {
if conn.Role == bpf.AgentEndpointRoleTKRoleUnknown {
return false, false
}
var isReq bool
if !conn.IsServerSide() {
isReq = event.ConnIdS.Direct == bpf.AgentTrafficDirectionTKEgress
} else {
isReq = event.ConnIdS.Direct == bpf.AgentTrafficDirectionTKIngress
}
return isReq
return isReq, true
}

func (c *Connection4) IsServerSide() bool {
Expand Down Expand Up @@ -399,8 +454,12 @@ func (c *Connection4) Identity() string {
}
func (c *Connection4) ToString() string {
direct := "=>"
if c.Role != bpf.AgentEndpointRoleTKRoleClient {
if c.Role == bpf.AgentEndpointRoleTKRoleServer {
direct = "<="
} else if c.Role == bpf.AgentEndpointRoleTKRoleClient {
direct = "=>"
} else {
direct = "<unknown>"
}
return fmt.Sprintf("[tgid=%d fd=%d][protocol=%d][%s] *%s:%d %s %s:%d", c.TgidFd>>32, uint32(c.TgidFd), c.Protocol, c.StatusString(), c.LocalIp.String(), c.LocalPort, direct, c.RemoteIp.String(), c.RemotePort)
}
Expand All @@ -422,3 +481,10 @@ func (c *Connection4) GetProtocolParser(p bpf.AgentTrafficProtocolT) protocol.Pr
return parser
}
}

func (c *Connection4) resetParseProgress() {
c.reqStreamBuffer.Clear()
c.respStreamBuffer.Clear()
c.ReqQueue = c.ReqQueue[:]
c.RespQueue = c.RespQueue[:]
}
18 changes: 14 additions & 4 deletions agent/conn/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (p *Processor) run() {
}
conn.StreamEvents = NewKernEventStream(conn, 300)
if p.side != common.AllSide && p.side != conn.Side() {
conn.OnClose(true)
// conn.OnClose(true)
conn.UpdateConnectionTraceable(false)
continue
}
conn.ConnectStartTs = event.Ts + common.LaunchEpochTime
Expand All @@ -145,30 +146,39 @@ func (p *Processor) run() {
}
go func(c *Connection4) {
time.Sleep(1 * time.Second)
c.OnCloseWithoutClearBpfMap()
c.OnClose(true)
}(conn)
} else if event.ConnType == bpf.AgentConnTypeTKProtocolInfer {
// 协议推断
conn = p.connManager.FindConnection4Or(TgidFd, event.Ts+common.LaunchEpochTime)
// previousProtocol := conn.Protocol
if conn != nil && conn.Status != Closed {
conn.Protocol = event.ConnInfo.Protocol
} else {
continue
}
isProtocolInterested := conn.Protocol == bpf.AgentTrafficProtocolTKProtocolUnknown ||

if conn.Role == bpf.AgentEndpointRoleTKRoleUnknown && event.ConnInfo.Role != bpf.AgentEndpointRoleTKRoleUnknown {
conn.Role = event.ConnInfo.Role
}

isProtocolInterested := conn.Protocol == bpf.AgentTrafficProtocolTKProtocolUnset ||
conn.MessageFilter.FilterByProtocol(conn.Protocol)

if isProtocolInterested {
if conn.Protocol != bpf.AgentTrafficProtocolTKProtocolUnknown {
for _, sysEvent := range conn.TempSyscallEvents {
log.Debugf("%s process temp syscall events before infer\n", conn.ToString())
conn.OnSyscallEvent(sysEvent.Buf, sysEvent)
}
conn.UpdateConnectionTraceable(true)
}
conn.TempKernEvents = conn.TempKernEvents[0:0]
conn.TempConnEvents = conn.TempConnEvents[0:0]
} else {
log.Debugf("%s discarded due to not interested", conn.ToString())
conn.OnClose(true)
conn.UpdateConnectionTraceable(false)
// conn.OnClose(true)
}
}
eventType := "connect"
Expand Down
21 changes: 21 additions & 0 deletions agent/protocol/generic_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ func (NoopFilter) Filter(ParsedMessage, ParsedMessage) bool {
return true
}

var _ ProtocolFilter = BaseFilter{}

type BaseFilter struct {
}

func (n BaseFilter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool {
return p != bpf.AgentTrafficProtocolTKProtocolUnknown
}

func (n BaseFilter) FilterByRequest() bool {
return false
}

func (n BaseFilter) FilterByResponse() bool {
return false
}

func (BaseFilter) Filter(ParsedMessage, ParsedMessage) bool {
return true
}

func IsNoopFilter(filter ProtocolFilter) bool {
_, ok := filter.(NoopFilter)
return ok
Expand Down
1 change: 1 addition & 0 deletions agent/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type MessageType int
const (
Request MessageType = iota
Response
Unknown
)

const (
Expand Down
Loading
Loading