diff --git a/agent/agent.go b/agent/agent.go index 0ee30191..ce41954c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -698,13 +698,13 @@ func getBestMatchedBTFFile() ([]uint8, error) { common.AgentLog.Debugf("[sys info] vendor: %s, os_arch: %s, kernel_arch: %s", si.OS.Vendor, si.OS.Architecture, si.Kernel.Architecture) if si.OS.Vendor != "ubuntu" && si.OS.Vendor != "centos" { - panic("Current only support centos and ubuntu") + common.AgentLog.Fatal("Current only support centos and ubuntu") } if si.OS.Architecture != "amd64" { - panic("Current only support amd64") + common.AgentLog.Fatal("Current only support amd64") } if si.Kernel.Architecture != "x86_64" { - panic("Current only support x86_64") + common.AgentLog.Fatal("Current only support x86_64") } var btfFileDir string diff --git a/agent/agent_test.go b/agent/agent_test.go index 9ad79f5b..9bb0dac9 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -160,6 +160,100 @@ func TestAccept(t *testing.T) { }) } +func TestExistedConn(t *testing.T) { + StartEchoTcpServerAndWait() + ip := "127.0.0.1" + sendMsg := "GET TestRead\n" + connection := WriteToEchoTcpServerAndReadResponse(WriteToEchoServerOptions{ + t: t, + server: ip + ":" + fmt.Sprint(echoTcpServerPort), + message: sendMsg, + readResponse: true, + writeSyscall: Write, + readSyscall: Read, + keepConnection: true, + }) + // established conn and write data before start kyanos + + connEventList := make([]bpf.AgentConnEvtT, 0) + syscallEventList := make([]bpf.SyscallEventData, 0) + kernEventList := make([]bpf.AgentKernEvt, 0) + var connManager *conn.ConnManager = conn.InitConnManager() + agentStopper := make(chan os.Signal) + StartAgent( + nil, + &connEventList, + &syscallEventList, + &kernEventList, + func(cm *conn.ConnManager) { + *connManager = *cm + }, agentStopper) + defer func() { + agentStopper <- MySignal{} + }() + // then write data + WriteToEchoTcpServerAndReadResponse(WriteToEchoServerOptions{ + t: t, + server: ip + ":" + fmt.Sprint(echoTcpServerPort), + message: sendMsg, + readResponse: true, + writeSyscall: Write, + readSyscall: Read, + keepConnection: true, + existedConnection: connection, + }) + WriteToEchoTcpServerAndReadResponse(WriteToEchoServerOptions{ + t: t, + server: ip + ":" + fmt.Sprint(echoTcpServerPort), + message: sendMsg, + readResponse: true, + writeSyscall: Write, + readSyscall: Read, + keepConnection: true, + existedConnection: connection, + }) + time.Sleep(1500 * time.Millisecond) + + assert.True(t, len(syscallEventList) > 0) + assert.True(t, len(connEventList) > 0) + assert.True(t, len(kernEventList) > 0) + assert.True(t, len(findInterestedKernEvents(t, kernEventList, FindInterestedKernEventOptions{ + connEventList: connEventList, + findByStep: true, + step: bpf.AgentStepTIP_OUT, + })) > 0) + assert.True(t, len(findInterestedKernEvents(t, kernEventList, FindInterestedKernEventOptions{ + connEventList: connEventList, + findByStep: true, + step: bpf.AgentStepTQDISC_OUT, + })) > 0) + assert.True(t, len(findInterestedKernEvents(t, kernEventList, FindInterestedKernEventOptions{ + connEventList: connEventList, + findByStep: true, + step: bpf.AgentStepTDEV_OUT, + })) > 0) + assert.True(t, len(findInterestedKernEvents(t, kernEventList, FindInterestedKernEventOptions{ + connEventList: connEventList, + findByStep: true, + step: bpf.AgentStepTDEV_IN, + })) > 0) + assert.True(t, len(findInterestedKernEvents(t, kernEventList, FindInterestedKernEventOptions{ + connEventList: connEventList, + findByStep: true, + step: bpf.AgentStepTIP_IN, + })) > 0) + assert.True(t, len(findInterestedKernEvents(t, kernEventList, FindInterestedKernEventOptions{ + connEventList: connEventList, + findByStep: true, + step: bpf.AgentStepTTCP_IN, + })) > 0) + assert.True(t, len(findInterestedKernEvents(t, kernEventList, FindInterestedKernEventOptions{ + connEventList: connEventList, + findByStep: true, + step: bpf.AgentStepTUSER_COPY, + })) > 0) +} + func TestRead(t *testing.T) { StartEchoTcpServerAndWait() connEventList := make([]bpf.AgentConnEvtT, 0) @@ -667,98 +761,20 @@ func TestSendMsg(t *testing.T) { } } -func KernRcvTestWithHTTP(t *testing.T, progs []bpf.AttachBpfProgFunction, kernEvtFilter FindInterestedKernEventOptions, kernEvtAsserts KernDataEventAssertConditions) { - connEventList := make([]bpf.AgentConnEvtT, 0) - syscallEventList := make([]bpf.SyscallEventData, 0) - kernEventList := make([]bpf.AgentKernEvt, 0) - var connManager *conn.ConnManager = conn.InitConnManager() - agentStopper := make(chan os.Signal, 1) - StartAgent( - progs, - &connEventList, - &syscallEventList, - &kernEventList, - func(cm *conn.ConnManager) { - *connManager = *cm - }, agentStopper) - - defer func() { - agentStopper <- MySignal{} - time.Sleep(1 * time.Second) - }() - sendTestRequest(t, SendTestHttpRequestOptions{ - targetUrl: "http://www.baidu.com/abc", - disableKeepAlived: true, - }) - - kernEvtFilter.connEventList = connEventList - time.Sleep(500 * time.Millisecond) - intersetedKernEvents := findInterestedKernEvents(t, kernEventList, kernEvtFilter) - assert.Equal(t, 1, len(intersetedKernEvents)) - kernEvent := intersetedKernEvents[0] - conn := connManager.FindConnection4Exactly(kernEvent.ConnIdS.TgidFd) - if !kernEvtAsserts.ignoreFd && kernEvtAsserts.fd == 0 { - kernEvtAsserts.fd = uint32(conn.TgidFd) - } - AssertKernEvent(t, &kernEvent, kernEvtAsserts) -} - -func KernTestWithTcpEchoServer(t *testing.T, progs []bpf.AttachBpfProgFunction, testMessage string, writeSyscall WriteSyscallType, - readSyscall ReadSyscallType, kernEvtFilter FindInterestedKernEventOptions, kernEvtAsserts KernDataEventAssertConditions) { - StartEchoTcpServerAndWait() - connEventList := make([]bpf.AgentConnEvtT, 0) - syscallEventList := make([]bpf.SyscallEventData, 0) - kernEventList := make([]bpf.AgentKernEvt, 0) - var connManager *conn.ConnManager = conn.InitConnManager() - agentStopper := make(chan os.Signal, 1) - StartAgent( - progs, - &connEventList, - &syscallEventList, - &kernEventList, - func(cm *conn.ConnManager) { - *connManager = *cm - }, agentStopper) - - defer func() { - agentStopper <- MySignal{} - }() - ip := "127.0.0.1" - sendMsg := testMessage - WriteToEchoTcpServerAndReadResponse(WriteToEchoServerOptions{ - t: t, - server: ip + ":" + fmt.Sprint(echoTcpServerPort), - message: sendMsg, - readResponse: true, - writeSyscall: writeSyscall, - readSyscall: readSyscall, - }) - time.Sleep(500 * time.Millisecond) - kernEvtFilter.connEventList = connEventList - intersetedKernEvents := findInterestedKernEvents(t, kernEventList, kernEvtFilter) - assert.Equal(t, 1, len(intersetedKernEvents)) - kernEvent := intersetedKernEvents[0] - conn := connManager.FindConnection4Exactly(kernEvent.ConnIdS.TgidFd) - if !kernEvtAsserts.ignoreFd && kernEvtAsserts.fd == 0 { - kernEvtAsserts.fd = uint32(conn.TgidFd) - } - - if !kernEvtAsserts.ignoreDataLen && kernEvtAsserts.dataLen == 0 { - kernEvtAsserts.dataLen = uint32(len(sendMsg)) - } - AssertKernEvent(t, &kernEvent, kernEvtAsserts) -} func TestIpXmit(t *testing.T) { - KernTestWithTcpEchoServer(t, []bpf.AttachBpfProgFunction{ - bpf.AttachSyscallConnectEntry, - bpf.AttachSyscallConnectExit, - bpf.AttachSyscallWriteEntry, - bpf.AttachSyscallWriteExit, - bpf.AttachKProbeSecuritySocketSendmsgEntry, - func(p interface{}) link.Link { - return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT, p) + options := KernTestWithTcpEchoServerOptions{ + t, + []bpf.AttachBpfProgFunction{ + bpf.AttachSyscallConnectEntry, + bpf.AttachSyscallConnectExit, + bpf.AttachSyscallWriteEntry, + bpf.AttachSyscallWriteExit, + bpf.AttachKProbeSecuritySocketSendmsgEntry, + func(p interface{}) link.Link { + return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT, p) + }, }, - }, "GET TestIpXmit\n", Write, Read, + "GET TestIpXmit\n", Write, Read, FindInterestedKernEventOptions{ findDataLenGtZeroEvent: true, findByDirect: true, @@ -772,11 +788,13 @@ func TestIpXmit(t *testing.T) { seq: 1, step: bpf.AgentStepTIP_OUT, tsAssertFunction: func(u uint64) bool { return u > 0 }, - }) + }, + } + KernTestWithTcpEchoServer(options) } func TestDevQueueXmit(t *testing.T) { - KernTestWithTcpEchoServer(t, []bpf.AttachBpfProgFunction{ + options := KernTestWithTcpEchoServerOptions{t, []bpf.AttachBpfProgFunction{ bpf.AttachSyscallConnectEntry, bpf.AttachSyscallConnectExit, bpf.AttachSyscallWriteEntry, @@ -804,23 +822,26 @@ func TestDevQueueXmit(t *testing.T) { seq: 1, step: bpf.AgentStepTQDISC_OUT, tsAssertFunction: func(u uint64) bool { return u > 0 }, - }) + }, + } + KernTestWithTcpEchoServer(options) } func TestDevHardStartXmit(t *testing.T) { - KernTestWithTcpEchoServer(t, []bpf.AttachBpfProgFunction{ - bpf.AttachSyscallConnectEntry, - bpf.AttachSyscallConnectExit, - bpf.AttachSyscallWriteEntry, - bpf.AttachSyscallWriteExit, - bpf.AttachKProbeSecuritySocketSendmsgEntry, - func(p interface{}) link.Link { - return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT, p) - }, - func(p interface{}) link.Link { - return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_OUT, p) - }, - }, "GET DevHardStartXmit\n", Write, Read, + options := KernTestWithTcpEchoServerOptions{ + t, []bpf.AttachBpfProgFunction{ + bpf.AttachSyscallConnectEntry, + bpf.AttachSyscallConnectExit, + bpf.AttachSyscallWriteEntry, + bpf.AttachSyscallWriteExit, + bpf.AttachKProbeSecuritySocketSendmsgEntry, + func(p interface{}) link.Link { + return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT, p) + }, + func(p interface{}) link.Link { + return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_OUT, p) + }, + }, "GET DevHardStartXmit\n", Write, Read, FindInterestedKernEventOptions{ findDataLenGtZeroEvent: true, findByDirect: true, @@ -836,7 +857,9 @@ func TestDevHardStartXmit(t *testing.T) { seq: 1, step: bpf.AgentStepTDEV_OUT, tsAssertFunction: func(u uint64) bool { return u > 0 }, - }) + }, + } + KernTestWithTcpEchoServer(options) } func TestTracepointNetifReceiveSkb(t *testing.T) { diff --git a/agent/agent_utils_test.go b/agent/agent_utils_test.go index 93724a89..b4643a92 100644 --- a/agent/agent_utils_test.go +++ b/agent/agent_utils_test.go @@ -22,6 +22,7 @@ import ( "sync" "syscall" "testing" + "time" "github.com/cilium/ebpf/link" "github.com/jefurry/logrus" @@ -37,22 +38,27 @@ func StartAgent(bpfAttachFunctions []bpf.AttachBpfProgFunction, agentStopper chan os.Signal) { wg := sync.WaitGroup{} wg.Add(1) + + var loadBpfProgramFunction agent.LoadBpfProgramFunction = nil + if bpfAttachFunctions != nil { + loadBpfProgramFunction = func(objs interface{}) *list.List { + progs := list.New() + for _, each := range bpfAttachFunctions { + if each != nil { + progs.PushBack(each(objs)) + } + } + return progs + } + } go func(pid int) { cmd.FilterPid = int64(pid) cmd.DefaultLogLevel = int32(logrus.DebugLevel) cmd.Debug = true cmd.InitLog() agent.SetupAgent(agent.AgentOptions{ - Stopper: agentStopper, - LoadBpfProgramFunction: func(objs interface{}) *list.List { - progs := list.New() - for _, each := range bpfAttachFunctions { - if each != nil { - progs.PushBack(each(objs)) - } - } - return progs - }, + Stopper: agentStopper, + LoadBpfProgramFunction: loadBpfProgramFunction, CustomSyscallEventHook: func(evt *bpf.SyscallEventData) { if syscallEventList != nil { *syscallEventList = append(*syscallEventList, *evt) @@ -451,11 +457,22 @@ type WriteToEchoServerOptions struct { readSyscall ReadSyscallType readBufSizeSlice []int useNonBlockingSoscket bool + keepConnection bool + existedConnection net.Conn } -func WriteToEchoTcpServerAndReadResponse(options WriteToEchoServerOptions) { - connection, fd, _ := getConnectionAndFdToRemoteServer(options.server) - defer connection.Close() +func WriteToEchoTcpServerAndReadResponse(options WriteToEchoServerOptions) net.Conn { + var connection net.Conn + var fd int64 + + if options.existedConnection != nil { + fd = getFdFromConn(options.existedConnection) + } else { + connection, fd, _ = getConnectionAndFdToRemoteServer(options.server) + } + if !options.keepConnection { + defer connection.Close() + } if options.useNonBlockingSoscket { syscall.SetNonblock(int(fd), true) } else { @@ -533,6 +550,11 @@ func WriteToEchoTcpServerAndReadResponse(options WriteToEchoServerOptions) { } fmt.Printf("Read from conn: %s\n", string(readBytes)) } + if options.keepConnection { + return connection + } else { + return nil + } } func getConnectionAndFdToRemoteServer(server string) (net.Conn, int64, error) { connection, err := net.Dial("tcp", server) @@ -540,10 +562,14 @@ func getConnectionAndFdToRemoteServer(server string) (net.Conn, int64, error) { fmt.Fprintf(os.Stderr, "无法连接到服务器 %s: %v\n", server, err) return nil, 0, err } + return connection, getFdFromConn(connection), nil +} + +func getFdFromConn(connection net.Conn) int64 { tcpConn := connection.(*net.TCPConn) tcpConnV := reflect.ValueOf(*tcpConn) fd := tcpConnV.FieldByName("conn").FieldByName("fd").Elem().FieldByName("pfd").FieldByName("Sysfd").Int() - return connection, fd, nil + return fd } var echoTcpServerPort int = 10266 @@ -682,3 +708,96 @@ func ApplyKernelVersionFunctions(t *testing.T, step bpf.AgentStepT, programs any t.FailNow() return nil } + +func KernRcvTestWithHTTP(t *testing.T, progs []bpf.AttachBpfProgFunction, kernEvtFilter FindInterestedKernEventOptions, kernEvtAsserts KernDataEventAssertConditions) { + connEventList := make([]bpf.AgentConnEvtT, 0) + syscallEventList := make([]bpf.SyscallEventData, 0) + kernEventList := make([]bpf.AgentKernEvt, 0) + var connManager *conn.ConnManager = conn.InitConnManager() + agentStopper := make(chan os.Signal, 1) + StartAgent( + progs, + &connEventList, + &syscallEventList, + &kernEventList, + func(cm *conn.ConnManager) { + *connManager = *cm + }, agentStopper) + + defer func() { + agentStopper <- MySignal{} + time.Sleep(1 * time.Second) + }() + sendTestRequest(t, SendTestHttpRequestOptions{ + targetUrl: "http://www.baidu.com/abc", + disableKeepAlived: true, + }) + + kernEvtFilter.connEventList = connEventList + time.Sleep(500 * time.Millisecond) + intersetedKernEvents := findInterestedKernEvents(t, kernEventList, kernEvtFilter) + assert.Equal(t, 1, len(intersetedKernEvents)) + kernEvent := intersetedKernEvents[0] + conn := connManager.FindConnection4Exactly(kernEvent.ConnIdS.TgidFd) + if !kernEvtAsserts.ignoreFd && kernEvtAsserts.fd == 0 { + kernEvtAsserts.fd = uint32(conn.TgidFd) + } + AssertKernEvent(t, &kernEvent, kernEvtAsserts) +} + +type KernTestWithTcpEchoServerOptions struct { + t *testing.T + progs []bpf.AttachBpfProgFunction + testMessage string + writeSyscall WriteSyscallType + readSyscall ReadSyscallType + kernEvtFilter FindInterestedKernEventOptions + kernEvtAsserts KernDataEventAssertConditions + // start bool +} + +func KernTestWithTcpEchoServer(options KernTestWithTcpEchoServerOptions) { + t := options.t + StartEchoTcpServerAndWait() + connEventList := make([]bpf.AgentConnEvtT, 0) + syscallEventList := make([]bpf.SyscallEventData, 0) + kernEventList := make([]bpf.AgentKernEvt, 0) + var connManager *conn.ConnManager = conn.InitConnManager() + agentStopper := make(chan os.Signal, 1) + StartAgent( + options.progs, + &connEventList, + &syscallEventList, + &kernEventList, + func(cm *conn.ConnManager) { + *connManager = *cm + }, agentStopper) + + defer func() { + agentStopper <- MySignal{} + }() + ip := "127.0.0.1" + sendMsg := options.testMessage + WriteToEchoTcpServerAndReadResponse(WriteToEchoServerOptions{ + t: options.t, + server: ip + ":" + fmt.Sprint(echoTcpServerPort), + message: sendMsg, + readResponse: true, + writeSyscall: options.writeSyscall, + readSyscall: options.readSyscall, + }) + time.Sleep(500 * time.Millisecond) + options.kernEvtFilter.connEventList = connEventList + intersetedKernEvents := findInterestedKernEvents(t, kernEventList, options.kernEvtFilter) + assert.Equal(t, 1, len(intersetedKernEvents)) + kernEvent := intersetedKernEvents[0] + conn := connManager.FindConnection4Exactly(kernEvent.ConnIdS.TgidFd) + if !options.kernEvtAsserts.ignoreFd && options.kernEvtAsserts.fd == 0 { + options.kernEvtAsserts.fd = uint32(conn.TgidFd) + } + + if !options.kernEvtAsserts.ignoreDataLen && options.kernEvtAsserts.dataLen == 0 { + options.kernEvtAsserts.dataLen = uint32(len(sendMsg)) + } + AssertKernEvent(t, &kernEvent, options.kernEvtAsserts) +}