Skip to content

Commit

Permalink
Merge pull request #98 from hengyoush/feature/gotls
Browse files Browse the repository at this point in the history
feat. support gotls
hengyoush authored Nov 9, 2024

Unverified

The committer email address is not verified.
2 parents d1be6b4 + b256242 commit 52b267c
Showing 408 changed files with 4,401 additions and 709 deletions.
53 changes: 41 additions & 12 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -156,8 +156,8 @@ jobs:
sudo apt install -y python3 python3-pip pipx

- name: Test side
- name: Test gotls
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
with:
provision: 'false'
@@ -167,13 +167,13 @@ jobs:
cat /etc/issue
pushd /host
if [ -f "/var/lib/kyanos/btf/current.btf" ]; then
bash /host/testdata/test_side.sh '/host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
bash /host/testdata/test_gotls.sh '/host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
else
bash /host/testdata/test_side.sh '/host/kyanos/kyanos $kyanos_log_option'
bash /host/testdata/test_gotls.sh '/host/kyanos/kyanos $kyanos_log_option'
fi
popd
- name: Test mysql
- name: Test https
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
with:
provision: 'false'
@@ -183,13 +183,13 @@ jobs:
cat /etc/issue
pushd /host
if [ -f "/var/lib/kyanos/btf/current.btf" ]; then
bash /host/testdata/test_mysql.sh '/host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
bash /host/testdata/test_https.sh '/host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
else
bash /host/testdata/test_mysql.sh '/host/kyanos/kyanos $kyanos_log_option'
bash /host/testdata/test_https.sh '/host/kyanos/kyanos $kyanos_log_option'
fi
popd
- name: Test https
- name: Test side
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
with:
provision: 'false'
@@ -199,9 +199,25 @@ jobs:
cat /etc/issue
pushd /host
if [ -f "/var/lib/kyanos/btf/current.btf" ]; then
bash /host/testdata/test_https.sh '/host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
bash /host/testdata/test_side.sh '/host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
else
bash /host/testdata/test_https.sh '/host/kyanos/kyanos $kyanos_log_option'
bash /host/testdata/test_side.sh '/host/kyanos/kyanos $kyanos_log_option'
fi
popd
- name: Test mysql
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
with:
provision: 'false'
cmd: |
set -ex
uname -a
cat /etc/issue
pushd /host
if [ -f "/var/lib/kyanos/btf/current.btf" ]; then
bash /host/testdata/test_mysql.sh '/host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
else
bash /host/testdata/test_mysql.sh '/host/kyanos/kyanos $kyanos_log_option'
fi
popd
@@ -331,3 +347,16 @@ jobs:
else
bash /host/testdata/test_redis.sh '/host/kyanos/kyanos $kyanos_log_option'
fi
- name: Test k8s
if: ${{ startsWith(matrix.kernel, '6.') }}
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
with:
provision: 'false'
cmd: |
set -ex
uname -a
cat /etc/issue
pushd /host
bash /host/testdata/run_k8s_test.sh "" 1
popd
245 changes: 83 additions & 162 deletions README.md

Large diffs are not rendered by default.

253 changes: 83 additions & 170 deletions README_CN.md

Large diffs are not rendered by default.

80 changes: 47 additions & 33 deletions agent/agent.go
Original file line number Diff line number Diff line change
@@ -8,14 +8,17 @@ import (
"kyanos/agent/compatible"
"kyanos/agent/conn"
"kyanos/agent/protocol"
loader_render "kyanos/agent/render/loader"
"kyanos/agent/render/stat"
"kyanos/agent/render/watch"
"kyanos/bpf"
"kyanos/bpf/loader"
"kyanos/common"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/cilium/ebpf/rlimit"
)
@@ -56,36 +59,52 @@ func SetupAgent(options ac.AgentOptions) {
common.AgentLog.Warn("Remove memlock:", err)
}

kernelVersion := compatible.GetCurrentKernelVersion()
options.Kv = &kernelVersion
var err error
bf, err := loader.LoadBPF(options)
if err != nil {
if bf != nil {
bf.Close()
wg := new(sync.WaitGroup)
wg.Add(1)

var _bf loader.BPF
go func(_bf *loader.BPF) {
options.LoadPorgressChannel <- "🍩 Kyanos starting..."
kernelVersion := compatible.GetCurrentKernelVersion()
options.Kv = &kernelVersion
var err error
bf, err := loader.LoadBPF(options)
if err != nil {
if bf != nil {
bf.Close()
}
return
}
return
}
defer bf.Close()
err = bpf.PullSyscallDataEvents(ctx, pm.GetSyscallEventsChannels(), 2048, options.CustomSyscallEventHook)
if err != nil {
return
}
err = bpf.PullSslDataEvents(ctx, pm.GetSslEventsChannels(), 512, options.CustomSslEventHook)
if err != nil {
return
}
err = bpf.PullConnDataEvents(ctx, pm.GetConnEventsChannels(), 4, options.CustomConnEventHook)
if err != nil {
return
}
err = bpf.PullKernEvents(ctx, pm.GetKernEventsChannels(), 32, options.CustomKernEventHook)
if err != nil {
return
*_bf = *bf
err = bpf.PullSyscallDataEvents(ctx, pm.GetSyscallEventsChannels(), 2048, options.CustomSyscallEventHook)
if err != nil {
return
}
err = bpf.PullSslDataEvents(ctx, pm.GetSslEventsChannels(), 512, options.CustomSslEventHook)
if err != nil {
return
}
err = bpf.PullConnDataEvents(ctx, pm.GetConnEventsChannels(), 4, options.CustomConnEventHook)
if err != nil {
return
}
err = bpf.PullKernEvents(ctx, pm.GetKernEventsChannels(), 32, options.CustomKernEventHook)
if err != nil {
return
}
bf.AttachProgs(options)
options.LoadPorgressChannel <- "🍹 All programs attached"
options.LoadPorgressChannel <- "🍭 Waiting for events.."
time.Sleep(500 * time.Millisecond)
options.LoadPorgressChannel <- "quit"
}(&_bf)
defer func() {
_bf.Close()
}()
if !options.WatchOptions.DebugOutput {
loader_render.Start(ctx, options)
}

bf.AttachProgs(options)

stop := false
go func() {
<-stopper
@@ -95,7 +114,7 @@ func SetupAgent(options ac.AgentOptions) {
stop = true
}()

common.AgentLog.Info("Waiting for events..")
// common.AgentLog.Info("Waiting for events..")

if options.InitCompletedHook != nil {
options.InitCompletedHook()
@@ -107,11 +126,6 @@ func SetupAgent(options ac.AgentOptions) {
analyzer := analysis.CreateAnalyzer(recordsChannel, &options.AnalysisOptions, resultChannel, renderStopper, options.Ctx)
go analyzer.Run()
stat.StartStatRender(ctx, resultChannel, options.AnalysisOptions)
// render := render.CreateRender(resultChannel, renderStopper, analyzer.AnalysisOptions)
// go render.Run()
// for !stop {
// time.Sleep(time.Second * 1)
// }
} else {
watch.RunWatchRender(ctx, recordsChannel, options.WatchOptions)
}
12 changes: 6 additions & 6 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -634,20 +634,20 @@ func TestSslEventsCanRelatedToKernEvents(t *testing.T) {
sslEvent := sslEvents[0]
conn := connManager.FindConnection4Exactly(sslEvent.SslEventHeader.Ke.ConnIdS.TgidFd)
se := conn.StreamEvents
sslOutEvents := se.FindAndRemoveSslEventsBySeqAndLen(bpf.AgentStepTSSL_OUT, 1, 1000)
sslOutEvents := se.FindSslEventsBySeqAndLen(bpf.AgentStepTSSL_OUT, 1, 1000)
assert.True(t, len(sslOutEvents) > 0)
sslInEvents := se.FindAndRemoveSslEventsBySeqAndLen(bpf.AgentStepTSSL_IN, 1, 10000)
sslInEvents := se.FindSslEventsBySeqAndLen(bpf.AgentStepTSSL_IN, 1, 10000)
assert.True(t, len(sslInEvents) > 0)

kernSeq := sslInEvents[0].KernSeq
kernLen := sslInEvents[0].KernLen
devinEvents := se.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTDEV_IN, kernSeq, kernLen)
devinEvents := se.FindEventsBySeqAndLen(bpf.AgentStepTDEV_IN, kernSeq, kernLen)
assert.True(t, len(devinEvents) > 0)
ipinEvents := se.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTIP_IN, kernSeq, kernLen)
ipinEvents := se.FindEventsBySeqAndLen(bpf.AgentStepTIP_IN, kernSeq, kernLen)
assert.True(t, len(ipinEvents) > 0)
usercopyEvents := se.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTUSER_COPY, kernSeq, kernLen)
usercopyEvents := se.FindEventsBySeqAndLen(bpf.AgentStepTUSER_COPY, kernSeq, kernLen)
assert.True(t, len(usercopyEvents) > 0)
syscallInEvents := se.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTSYSCALL_IN, kernSeq, kernLen)
syscallInEvents := se.FindEventsBySeqAndLen(bpf.AgentStepTSYSCALL_IN, kernSeq, kernLen)
assert.True(t, len(syscallInEvents) > 0)
}

28 changes: 17 additions & 11 deletions agent/analysis/stat.go
Original file line number Diff line number Diff line change
@@ -100,8 +100,8 @@ func prepareEvents(r protocol.Record, connection *conn.Connection4) *events {
if ssl {
ingressSeq = ingressMessage.Seq()
egressSeq = egressMessage.Seq()
sslWriteSyscallEvents = streamEvents.FindAndRemoveSslEventsBySeqAndLen(bpf.AgentStepTSSL_OUT, egressMessage.Seq(), egressMessage.ByteSize())
sslReadSyscallEvents = streamEvents.FindAndRemoveSslEventsBySeqAndLen(bpf.AgentStepTSSL_IN, ingressMessage.Seq(), ingressMessage.ByteSize())
sslWriteSyscallEvents = streamEvents.FindSslEventsBySeqAndLen(bpf.AgentStepTSSL_OUT, egressMessage.Seq(), egressMessage.ByteSize())
sslReadSyscallEvents = streamEvents.FindSslEventsBySeqAndLen(bpf.AgentStepTSSL_IN, ingressMessage.Seq(), ingressMessage.ByteSize())

egressKernSeq, egressKernLen = getKernSeqAndLen(sslWriteSyscallEvents)
ingressKernSeq, ingressKernLen = getKernSeqAndLen(sslReadSyscallEvents)
@@ -114,16 +114,16 @@ func prepareEvents(r protocol.Record, connection *conn.Connection4) *events {
egressKernSeq = egressSeq
egressKernLen = egressMessage.ByteSize()
}
writeSyscallEvents = streamEvents.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTSYSCALL_OUT, egressKernSeq, egressKernLen)
readSyscallEvents = streamEvents.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTSYSCALL_IN, ingressKernSeq, ingressKernLen)
writeSyscallEvents = streamEvents.FindEventsBySeqAndLen(bpf.AgentStepTSYSCALL_OUT, egressKernSeq, egressKernLen)
readSyscallEvents = streamEvents.FindEventsBySeqAndLen(bpf.AgentStepTSYSCALL_IN, ingressKernSeq, ingressKernLen)

devOutEvents = streamEvents.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTDEV_OUT, egressKernSeq, egressKernLen)
nicIngressEvents = streamEvents.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTNIC_IN, ingressKernSeq, ingressKernLen)
userCopyEvents = streamEvents.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTUSER_COPY, ingressKernSeq, ingressKernLen)
tcpInEvents = streamEvents.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTTCP_IN, ingressKernSeq, ingressKernLen)
devOutEvents = streamEvents.FindEventsBySeqAndLen(bpf.AgentStepTDEV_OUT, egressKernSeq, egressKernLen)
nicIngressEvents = streamEvents.FindEventsBySeqAndLen(bpf.AgentStepTNIC_IN, ingressKernSeq, ingressKernLen)
userCopyEvents = streamEvents.FindEventsBySeqAndLen(bpf.AgentStepTUSER_COPY, ingressKernSeq, ingressKernLen)
tcpInEvents = streamEvents.FindEventsBySeqAndLen(bpf.AgentStepTTCP_IN, ingressKernSeq, ingressKernLen)

if len(nicIngressEvents) == 0 {
nicIngressEvents = streamEvents.FindAndRemoveEventsBySeqAndLen(bpf.AgentStepTDEV_IN, ingressKernSeq, ingressKernLen)
nicIngressEvents = streamEvents.FindEventsBySeqAndLen(bpf.AgentStepTDEV_IN, ingressKernSeq, ingressKernLen)
}
events.sslReadSyscallEvents = sslReadSyscallEvents
events.sslWriteSyscallEvents = sslWriteSyscallEvents
@@ -248,8 +248,14 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.ReqNicEventDetails = KernEventsToNicEventDetails(events.devOutEvents)
annotatedRecord.RespNicEventDetails = KernEventsToNicEventDetails(events.nicIngressEvents)
}
streamEvents.DiscardEventsBySeq(events.egressKernSeq+uint64(events.egressKernLen), true)
streamEvents.DiscardEventsBySeq(events.ingressKernSeq+uint64(events.ingressKernLen), false)
streamEvents.MarkNeedDiscardSeq(events.egressKernSeq+uint64(events.egressKernLen), true)
streamEvents.MarkNeedDiscardSeq(events.ingressKernSeq+uint64(events.ingressKernLen), false)
if connection.IsSsl() {
streamEvents.MarkNeedDiscardSslSeq(events.egressSeq+uint64(events.egressMessage.ByteSize()), true)
streamEvents.MarkNeedDiscardSslSeq(events.ingressSeq+uint64(events.ingressMessage.ByteSize()), false)
}
// streamEvents.DiscardEventsBySeq(events.egressKernSeq+uint64(events.egressKernLen), true)
// streamEvents.DiscardEventsBySeq(events.ingressKernSeq+uint64(events.ingressKernLen), false)
if recordsChannel == nil {
outputLog.Infoln(annotatedRecord.String(analysisCommon.AnnotatedRecordToStringOptions{
Nano: false,
11 changes: 7 additions & 4 deletions agent/common/options.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ type AgentOptions struct {
PerfEventBufferSizeForEvent int
DisableOpensslUprobe bool
WatchOptions watch.WatchOptions
PerformanceMode bool

DockerEndpoint string
ContainerdEndpoint string
@@ -56,10 +57,11 @@ type AgentOptions struct {
PodName string
PodNameSpace string

Cc *metadata.ContainerCache
Objs any
Ctx context.Context
Kv *compatible.KernelVersion
Cc *metadata.ContainerCache
Objs any
Ctx context.Context
Kv *compatible.KernelVersion
LoadPorgressChannel chan string
}

func (o AgentOptions) FilterByContainer() bool {
@@ -118,5 +120,6 @@ func ValidateAndRepairOptions(options AgentOptions) AgentOptions {
newOptions.CriRuntimeEndpoint = getEndpoint(newOptions.CriRuntimeEndpoint)
}
newOptions.WatchOptions.Init()
newOptions.LoadPorgressChannel = make(chan string, 10)
return newOptions
}
1 change: 1 addition & 0 deletions agent/compatible/type.go
Original file line number Diff line number Diff line change
@@ -129,6 +129,7 @@ func init() {
v5d4 := copyKernelVersion(v5d15)
v5d4.Version = "5.4.0"
v5d4.addBackupInstrumentFunction(bpf.AgentStepTIP_IN, InstrumentFunction{"kprobe/ip_rcv_core.isra.0", "IpRcvCore"})
v5d4.addBackupInstrumentFunction(bpf.AgentStepTIP_IN, InstrumentFunction{"kprobe/ip_rcv_core.isra.20", "IpRcvCore"})
v5d4.removeCapability(SupportRingBuffer).removeCapability(SupportXDP)
KernelVersionsMap.Put(v5d4.Version, v5d4)

41 changes: 40 additions & 1 deletion agent/conn/conntrack.go
Original file line number Diff line number Diff line change
@@ -57,6 +57,44 @@ type Connection4 struct {

prevConn []*Connection4
}

func NewConnFromEvent(event *bpf.AgentConnEvtT, p *Processor) *Connection4 {
TgidFd := uint64(event.ConnInfo.ConnId.Upid.Pid)<<32 | uint64(event.ConnInfo.ConnId.Fd)
isIpv6 := event.ConnInfo.Laddr.In6.Sin6Family == common.AF_INET6
conn := &Connection4{
LocalIp: common.BytesToNetIP(event.ConnInfo.Laddr.In6.Sin6Addr.In6U.U6Addr8[:], isIpv6),
// LocalIp: common.IntToBytes(event.ConnInfo.Laddr.In4.SinAddr.S_addr),
RemoteIp: common.BytesToNetIP(event.ConnInfo.Raddr.In6.Sin6Addr.In6U.U6Addr8[:], isIpv6),
// RemoteIp: common.IntToBytes(event.ConnInfo.Raddr.In4.SinAddr.S_addr),
LocalPort: common.Port(event.ConnInfo.Laddr.In6.Sin6Port),
RemotePort: common.Port(event.ConnInfo.Raddr.In6.Sin6Port),
Protocol: event.ConnInfo.Protocol,
Role: event.ConnInfo.Role,
TgidFd: TgidFd,
Status: Connected,
tracable: true,

MessageFilter: p.messageFilter,
LatencyFilter: p.latencyFilter,
SizeFilter: p.SizeFilter,

reqStreamBuffer: buffer.New(1024 * 1024),
respStreamBuffer: buffer.New(1024 * 1024),
ReqQueue: make([]protocol.ParsedMessage, 0),
RespQueue: make([]protocol.ParsedMessage, 0),

prevConn: []*Connection4{},

protocolParsers: make(map[bpf.AgentTrafficProtocolT]protocol.ProtocolStreamParser),
}
conn.onRoleChanged = func() {
onRoleChanged(p, conn)
}
conn.StreamEvents = NewKernEventStream(conn, 300)
conn.ConnectStartTs = event.Ts + common.LaunchEpochTime
return conn
}

type ConnStatus uint8

type TCPHandshakeStatus struct {
@@ -391,7 +429,7 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa
// parseState = parseResult.ParseState
switch parseResult.ParseState {
case protocol.Success:
common.ConntrackLog.Debugf("[parseStreamBuffer] Success, %s(%s)", c.ToString(), messageType.String())
// common.ConntrackLog.Debugf("[parseStreamBuffer] Success, %s(%s) read bytes: %d, headsize: %d", c.ToString(), messageType.String(), parseResult.ReadBytes, streamBuffer.Head().Len())
if c.Role == bpf.AgentEndpointRoleTKRoleUnknown && len(parseResult.ParsedMessages) > 0 {
parsedMessage := parseResult.ParsedMessages[0]
if (bpf.IsIngressStep(ke.Step) && parsedMessage.IsReq()) || (bpf.IsEgressStep(ke.Step) && !parsedMessage.IsReq()) {
@@ -444,6 +482,7 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa
common.ConntrackLog.Debugf("[parseStreamBuffer] Needs more data, %s Removed streambuffer head due to stuck from %s queue", c.ToString(), messageType.String())
stop = false
} else {
// common.ConntrackLog.Debugf("[parseStreamBuffer] Needs more data, %s stop processing %s queue, headsize: %d", c.ToString(), messageType.String(), streamBuffer.Head().Len())
stop = true
}
case protocol.Ignore:
Loading

0 comments on commit 52b267c

Please sign in to comment.