Skip to content

Commit

Permalink
Merge pull request #58 from hengyoush/feature/cloud
Browse files Browse the repository at this point in the history
[Feature] Support NATed flow translation
  • Loading branch information
hengyoush authored Oct 8, 2024
2 parents d600af6 + 068e84b commit 23ba67f
Show file tree
Hide file tree
Showing 410 changed files with 4,818 additions and 1,773 deletions.
870 changes: 33 additions & 837 deletions agent/agent.go

Large diffs are not rendered by default.

78 changes: 39 additions & 39 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package agent_test

import (
"fmt"
"kyanos/agent"
ac "kyanos/agent/common"
"kyanos/agent/compatible"
"kyanos/agent/conn"
"kyanos/bpf"
Expand All @@ -15,11 +15,11 @@ import (
"github.com/stretchr/testify/assert"
)

var customAgentOptions agent.AgentOptions = agent.AgentOptions{}
var customAgentOptions ac.AgentOptions = ac.AgentOptions{}

func TestMain(m *testing.M) {
// call flag.Parse() here if TestMain uses flags]
customAgentOptions = agent.AgentOptions{}
customAgentOptions = ac.AgentOptions{}
retCode := m.Run()
tearDown()
os.Exit(retCode)
Expand Down Expand Up @@ -586,26 +586,26 @@ func TestSslEventsCanRelatedToKernEvents(t *testing.T) {
bpf.AttachSyscallWriteExit,
bpf.AttachKProbeSecuritySocketSendmsgEntry,
bpf.AttachKProbeSecuritySocketRecvmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTUSER_COPY, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTUSER_COPY)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_IN, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_IN)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTTCP_IN, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTTCP_IN)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_OUT, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_OUT)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTQDISC_OUT, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTQDISC_OUT)
},
},
&connEventList,
Expand Down Expand Up @@ -964,8 +964,8 @@ func TestIpXmit(t *testing.T) {
bpf.AttachSyscallWriteEntry,
bpf.AttachSyscallWriteExit,
bpf.AttachKProbeSecuritySocketSendmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT)
},
},
"GET TestIpXmit\n", Write, Read,
Expand Down Expand Up @@ -994,11 +994,11 @@ func TestDevQueueXmit(t *testing.T) {
bpf.AttachSyscallWriteEntry,
bpf.AttachSyscallWriteExit,
bpf.AttachKProbeSecuritySocketSendmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTQDISC_OUT, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTQDISC_OUT)
},
}, "GET DevQueueXmit\n", Write, Read,
FindInterestedKernEventOptions{
Expand Down Expand Up @@ -1029,11 +1029,11 @@ func TestDevHardStartXmit(t *testing.T) {
bpf.AttachSyscallWriteEntry,
bpf.AttachSyscallWriteExit,
bpf.AttachKProbeSecuritySocketSendmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_OUT)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_OUT, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_OUT)
},
}, "GET DevHardStartXmit\n", Write, Read,
FindInterestedKernEventOptions{
Expand Down Expand Up @@ -1110,11 +1110,11 @@ func TestIpRcvCore(t *testing.T) {
bpf.AttachSyscallWriteExit,
bpf.AttachKProbeSecuritySocketSendmsgEntry,
bpf.AttachKProbeSecuritySocketRecvmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_IN, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTIP_IN)
},
},
FindInterestedKernEventOptions{
Expand Down Expand Up @@ -1150,11 +1150,11 @@ func TestTcpV4DoRcv(t *testing.T) {
bpf.AttachSyscallWriteExit,
bpf.AttachKProbeSecuritySocketSendmsgEntry,
bpf.AttachKProbeSecuritySocketRecvmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTTCP_IN, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTTCP_IN)
},
},
FindInterestedKernEventOptions{
Expand Down Expand Up @@ -1190,11 +1190,11 @@ func TestSkbCopyDatagramIter(t *testing.T) {
bpf.AttachSyscallWriteExit,
bpf.AttachKProbeSecuritySocketSendmsgEntry,
bpf.AttachKProbeSecuritySocketRecvmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTUSER_COPY, p)
func() link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTUSER_COPY)
},
},
FindInterestedKernEventOptions{
Expand Down
21 changes: 11 additions & 10 deletions agent/agent_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"kyanos/agent"
ac "kyanos/agent/common"
"kyanos/agent/compatible"
"kyanos/agent/conn"
"kyanos/bpf"
Expand Down Expand Up @@ -45,26 +46,26 @@ func StartAgent0(bpfAttachFunctions []bpf.AttachBpfProgFunction,
wg := sync.WaitGroup{}
wg.Add(1)

var loadBpfProgramFunction agent.LoadBpfProgramFunction = nil
var loadBpfProgramFunction ac.LoadBpfProgramFunction = nil
if bpfAttachFunctions != nil {
loadBpfProgramFunction = func(objs interface{}) *list.List {
loadBpfProgramFunction = func() *list.List {
progs := list.New()
for _, each := range bpfAttachFunctions {
if each != nil {
progs.PushBack(each(objs))
progs.PushBack(each())
}
}
return progs
}
}
go func(pid int) {
if useSelfPidAsFitler {
cmd.FilterPid = int64(pid)
cmd.FilterPids = []string{strconv.Itoa(pid)}
}
cmd.DefaultLogLevel = int32(logrus.DebugLevel)
cmd.Debug = true
cmd.InitLog()
agent.SetupAgent(agent.AgentOptions{
agent.SetupAgent(ac.AgentOptions{
Stopper: agentStopper,
LoadBpfProgramFunction: loadBpfProgramFunction,
DisableOpensslUprobe: customAgentOptions.DisableOpensslUprobe,
Expand Down Expand Up @@ -776,11 +777,11 @@ func min(a, b int) int {
// compatilbeMode = b
// }

func ApplyKernelVersionFunctions(t *testing.T, step bpf.AgentStepT, programs any) link.Link {
func ApplyKernelVersionFunctions(t *testing.T, step bpf.AgentStepT) link.Link {
v := compatible.GetCurrentKernelVersion()
if step == bpf.AgentStepTNIC_IN {
if v.SupportCapability(compatible.SupportXDP) {
l, err := bpf.AttachXdp(programs)
l, err := bpf.AttachXdp()
if err != nil {
t.Fatal(err)
} else {
Expand All @@ -798,12 +799,12 @@ func ApplyKernelVersionFunctions(t *testing.T, step bpf.AgentStepT, programs any
var err error
var l link.Link
if function.IsKprobe() {
l, err = bpf.Kprobe(function.GetKprobeName(), bpf.GetProgram(programs, function.BPFGoProgName))
l, err = bpf.Kprobe(function.GetKprobeName(), bpf.GetProgramFromObjs(bpf.Objs, function.BPFGoProgName))
} else if function.IsTracepoint() {
l, err = bpf.Tracepoint(function.GetTracepointGroupName(), function.GetTracepointName(),
bpf.GetProgram(programs, function.BPFGoProgName))
bpf.GetProgramFromObjs(bpf.Objs, function.BPFGoProgName))
} else if function.IsKRetprobe() {
l, err = bpf.Kretprobe(function.GetKprobeName(), bpf.GetProgram(programs, function.BPFGoProgName))
l, err = bpf.Kretprobe(function.GetKprobeName(), bpf.GetProgramFromObjs(bpf.Objs, function.BPFGoProgName))
} else {
panic(fmt.Sprintf("invalid program type: %v", function))
}
Expand Down
64 changes: 28 additions & 36 deletions agent/analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,53 @@ package analysis

import (
"cmp"
ac "kyanos/agent/common"
"context"
analysis_common "kyanos/agent/analysis/common"
"kyanos/agent/protocol"
"kyanos/common"
"math"
"slices"
"time"
)

type AnalysisOptions struct {
EnabledMetricTypeSet MetricTypeSet
SampleLimit int
DisplayLimit int
Interval int
Side common.SideEnum
ClassfierType
SortBy LatencyMetric
FullRecordBody bool
}

type aggregator struct {
*AnalysisOptions
*analysis_common.AnalysisOptions
*ConnStat
}

func createAggregatorWithHumanReadableClassId(humanReadableClassId string,
classId ClassId, aggregateOption *AnalysisOptions) *aggregator {
classId ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
aggregator := createAggregator(classId, aggregateOption)
aggregator.HumanReadbleClassId = humanReadableClassId
return aggregator
}

func createAggregator(classId ClassId, aggregateOption *AnalysisOptions) *aggregator {
func createAggregator(classId ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
aggregator := aggregator{}
aggregator.reset(classId, aggregateOption)
return &aggregator
}

func (a *aggregator) reset(classId ClassId, aggregateOption *AnalysisOptions) {
func (a *aggregator) reset(classId ClassId, aggregateOption *analysis_common.AnalysisOptions) {
a.AnalysisOptions = aggregateOption
a.ConnStat = &ConnStat{
ClassId: classId,
ClassfierType: aggregateOption.ClassfierType,
}
a.SamplesMap = make(map[MetricType][]*AnnotatedRecord)
a.PercentileCalculators = make(map[MetricType]*PercentileCalculator)
a.MaxMap = make(map[MetricType]float32)
a.SumMap = make(map[MetricType]float64)
a.SamplesMap = make(map[analysis_common.MetricType][]*analysis_common.AnnotatedRecord)
a.PercentileCalculators = make(map[analysis_common.MetricType]*PercentileCalculator)
a.MaxMap = make(map[analysis_common.MetricType]float32)
a.SumMap = make(map[analysis_common.MetricType]float64)
for rawMetricType, enabled := range aggregateOption.EnabledMetricTypeSet {
if enabled {
metricType := MetricType(rawMetricType)
metricType := analysis_common.MetricType(rawMetricType)
a.PercentileCalculators[metricType] = NewPercentileCalculator()
a.SamplesMap[metricType] = make([]*AnnotatedRecord, 0)
a.SamplesMap[metricType] = make([]*analysis_common.AnnotatedRecord, 0)
}
}
}

func (a *aggregator) receive(record *AnnotatedRecord) error {
func (a *aggregator) receive(record *analysis_common.AnnotatedRecord) error {
o := a.ConnStat

o.Count++
Expand All @@ -72,12 +62,12 @@ func (a *aggregator) receive(record *AnnotatedRecord) error {
a.ConnStat.Side = record.ConnDesc.Side

for rawMetricType, enabled := range a.AnalysisOptions.EnabledMetricTypeSet {
metricType := MetricType(rawMetricType)
metricType := analysis_common.MetricType(rawMetricType)

if enabled {
samples := a.SamplesMap[metricType]
MetricExtract := GetMetricExtractFunc[float64](metricType)
a.SamplesMap[metricType] = AddToSamples(samples, record, MetricExtract, a.SampleLimit)
MetricExtract := analysis_common.GetMetricExtractFunc[float64](metricType)
a.SamplesMap[metricType] = AddToSamples(samples, record, MetricExtract, a.AnalysisOptions.SampleLimit)

metricValue := MetricExtract(record)

Expand All @@ -91,10 +81,10 @@ func (a *aggregator) receive(record *AnnotatedRecord) error {
return nil
}

func AddToSamples[T MetricValueType](samples []*AnnotatedRecord, newSample *AnnotatedRecord, extractMetric MetricExtract[T], maxSamplesNum int) []*AnnotatedRecord {
func AddToSamples[T analysis_common.MetricValueType](samples []*analysis_common.AnnotatedRecord, newSample *analysis_common.AnnotatedRecord, extractMetric analysis_common.MetricExtract[T], maxSamplesNum int) []*analysis_common.AnnotatedRecord {
result := samples
isFull := len(samples) == maxSamplesNum
idx, _ := slices.BinarySearchFunc(samples, newSample, func(o1 *AnnotatedRecord, o2 *AnnotatedRecord) int {
idx, _ := slices.BinarySearchFunc(samples, newSample, func(o1 *analysis_common.AnnotatedRecord, o2 *analysis_common.AnnotatedRecord) int {
t1, t2 := extractMetric(o1), extractMetric(o2)
return cmp.Compare(t1, t2)
})
Expand All @@ -112,20 +102,21 @@ func AddToSamples[T MetricValueType](samples []*AnnotatedRecord, newSample *Anno

type Analyzer struct {
Classfier
*AnalysisOptions
*analysis_common.AnalysisOptions
common.SideEnum // 那一边的统计指标TODO 根据参数自动推断
Aggregators map[ClassId]*aggregator
recordsChannel <-chan *AnnotatedRecord
recordsChannel <-chan *analysis_common.AnnotatedRecord
stopper <-chan int
resultChannel chan<- []*ConnStat
renderStopper chan int
ticker *time.Ticker
tickerC <-chan time.Time
ctx context.Context
}

func CreateAnalyzer(recordsChannel <-chan *AnnotatedRecord, showOption *AnalysisOptions, resultChannel chan<- []*ConnStat, renderStopper chan int) *Analyzer {
func CreateAnalyzer(recordsChannel <-chan *analysis_common.AnnotatedRecord, showOption *analysis_common.AnalysisOptions, resultChannel chan<- []*ConnStat, renderStopper chan int, ctx context.Context) *Analyzer {
stopper := make(chan int)
ac.AddToFastStopper(stopper)
// ac.AddToFastStopper(stopper)
analyzer := &Analyzer{
Classfier: getClassfier(showOption.ClassfierType),
recordsChannel: recordsChannel,
Expand All @@ -147,7 +138,8 @@ func CreateAnalyzer(recordsChannel <-chan *AnnotatedRecord, showOption *Analysis
func (a *Analyzer) Run() {
for {
select {
case <-a.stopper:
// case <-a.stopper:
case <-a.ctx.Done():
if a.AnalysisOptions.Interval == 0 {
a.resultChannel <- a.harvest()
time.Sleep(1 * time.Second)
Expand All @@ -166,19 +158,19 @@ func (a *Analyzer) harvest() []*ConnStat {
result := make([]*ConnStat, 0)
for _, aggregator := range a.Aggregators {
connstat := aggregator.ConnStat
// aggregator.reset(classId, a.AnalysisOptions)
// aggregator.reset(classId, a.analysis_common.AnalysisOptions)
result = append(result, connstat)
}
a.Aggregators = make(map[ClassId]*aggregator)
return result
}

func (a *Analyzer) analyze(record *AnnotatedRecord) {
func (a *Analyzer) analyze(record *analysis_common.AnnotatedRecord) {
class, err := a.Classfier(record)
if err == nil {
aggregator, exists := a.Aggregators[class]
if !exists {
humanReadableFunc, ok := classIdHumanReadableMap[a.ClassfierType]
humanReadableFunc, ok := classIdHumanReadableMap[a.AnalysisOptions.ClassfierType]
if ok {
humanReadableClassId := humanReadableFunc(record)
a.Aggregators[class] = createAggregatorWithHumanReadableClassId(humanReadableClassId,
Expand Down
Loading

0 comments on commit 23ba67f

Please sign in to comment.