Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat. add overview subcommand #81

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions agent/analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ import (
type aggregator struct {
*analysis_common.AnalysisOptions
*ConnStat
isSub bool
}

func createAggregatorWithHumanReadableClassId(humanReadableClassId string,
classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
aggregator := createAggregator(classId, aggregateOption)
classId analysis_common.ClassId,
aggregateOption *analysis_common.AnalysisOptions, isSub bool) *aggregator {
aggregator := createAggregator(classId, aggregateOption, isSub)
aggregator.HumanReadbleClassId = humanReadableClassId
return aggregator
}

func createAggregator(classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
func createAggregator(classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions, isSub bool) *aggregator {
aggregator := aggregator{}
aggregator.isSub = isSub
aggregator.reset(classId, aggregateOption)
return &aggregator
}
Expand All @@ -34,6 +37,10 @@ func (a *aggregator) reset(classId analysis_common.ClassId, aggregateOption *ana
a.ConnStat = &ConnStat{
ClassId: classId,
ClassfierType: aggregateOption.ClassfierType,
IsSub: a.isSub,
}
if a.isSub {
a.ConnStat.ClassfierType = aggregateOption.SubClassfierType
}
a.SamplesMap = make(map[analysis_common.MetricType][]*analysis_common.AnnotatedRecord)
a.PercentileCalculators = make(map[analysis_common.MetricType]*PercentileCalculator)
Expand Down Expand Up @@ -65,9 +72,14 @@ func (a *aggregator) receive(record *analysis_common.AnnotatedRecord) error {
metricType := analysis_common.MetricType(rawMetricType)

if enabled {
samples := a.SamplesMap[metricType]
MetricExtract := analysis_common.GetMetricExtractFunc[float64](metricType)
a.SamplesMap[metricType] = AddToSamples(samples, record, MetricExtract, a.AnalysisOptions.SampleLimit)
samples := a.SamplesMap[metricType]
// only sample if aggregator is sub or no sub classfier
if a.isSub || a.SubClassfierType == analysis_common.None {
a.SamplesMap[metricType] = AddToSamples(samples, record, MetricExtract, a.AnalysisOptions.SampleLimit)
} else {
a.SamplesMap[metricType] = AddToSamples(samples, record, MetricExtract, 1)
}

metricValue := MetricExtract(record)

Expand Down Expand Up @@ -102,6 +114,7 @@ func AddToSamples[T analysis_common.MetricValueType](samples []*analysis_common.

type Analyzer struct {
Classfier
subClassfier Classfier
*analysis_common.AnalysisOptions
common.SideEnum // 那一边的统计指标TODO 根据参数自动推断
Aggregators map[analysis_common.ClassId]*aggregator
Expand All @@ -120,7 +133,7 @@ func CreateAnalyzer(recordsChannel <-chan *analysis_common.AnnotatedRecord, opts
// ac.AddToFastStopper(stopper)
opts.Init()
analyzer := &Analyzer{
Classfier: getClassfier(opts.ClassfierType),
Classfier: getClassfier(opts.ClassfierType, *opts),
recordsChannel: recordsChannel,
Aggregators: make(map[analysis_common.ClassId]*aggregator),
AnalysisOptions: opts,
Expand All @@ -129,6 +142,9 @@ func CreateAnalyzer(recordsChannel <-chan *analysis_common.AnnotatedRecord, opts
renderStopper: renderStopper,
ctx: ctx,
}
if opts.SubClassfierType != analysis_common.None {
analyzer.subClassfier = getClassfier(opts.SubClassfierType, *opts)
}
opts.CurrentReceivedSamples = func() int {
return analyzer.recordReceived
}
Expand Down Expand Up @@ -189,14 +205,33 @@ func (a *Analyzer) analyze(record *analysis_common.AnnotatedRecord) {
if ok {
humanReadableClassId := humanReadableFunc(record)
a.Aggregators[class] = createAggregatorWithHumanReadableClassId(humanReadableClassId,
class, a.AnalysisOptions)
class, a.AnalysisOptions, false)
} else {
a.Aggregators[class] = createAggregator(class, a.AnalysisOptions)
a.Aggregators[class] = createAggregator(class, a.AnalysisOptions, false)
}

aggregator = a.Aggregators[class]
}
aggregator.receive(record)

if a.subClassfier != nil {
subClassId, err := a.subClassfier(record)
if err == nil {
fullClassId := class + "||" + subClassId
subAggregator, exists := a.Aggregators[fullClassId]
if !exists {
subHumanReadableFunc, ok := getClassIdHumanReadableFunc(a.AnalysisOptions.SubClassfierType, *a.AnalysisOptions)
if ok {
subHumanReadableClassId := subHumanReadableFunc(record)
a.Aggregators[fullClassId] = createAggregatorWithHumanReadableClassId(subHumanReadableClassId, fullClassId, a.AnalysisOptions, true)
} else {
a.Aggregators[fullClassId] = createAggregator(fullClassId, a.AnalysisOptions, true)
}
subAggregator = a.Aggregators[fullClassId]
}
subAggregator.receive(record)
}
}
} else {
common.DefaultLog.Warnf("classify error: %v\n", err)
}
Expand Down
49 changes: 47 additions & 2 deletions agent/analysis/classfier.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,25 @@ func init() {
}
}

classfierMap[anc.ProtocolAdaptive] = func(ar *anc.AnnotatedRecord) (anc.ClassId, error) {
redisReq, ok := ar.Record.Request().(*protocol.RedisMessage)
if !ok {
return "_not_a_redis_req_", nil
} else {
return anc.ClassId(redisReq.Command()), nil
}
}

classIdHumanReadableMap = make(map[anc.ClassfierType]ClassIdAsHumanReadable)
classIdHumanReadableMap[anc.RemoteIp] = func(ar *anc.AnnotatedRecord) string {
return ar.ConnDesc.RemoteAddr.String()
}
classIdHumanReadableMap[anc.RemotePort] = func(ar *anc.AnnotatedRecord) string {
return fmt.Sprintf("%d", ar.ConnDesc.RemotePort)
}
classIdHumanReadableMap[anc.LocalPort] = func(ar *anc.AnnotatedRecord) string {
return fmt.Sprintf("%d", ar.ConnDesc.LocalPort)
}
classIdHumanReadableMap[anc.Conn] = func(ar *anc.AnnotatedRecord) string {
return ar.ConnDesc.SimpleString()
}
Expand All @@ -73,6 +91,33 @@ func init() {
}
}

func getClassfier(classfierType anc.ClassfierType) Classfier {
return classfierMap[classfierType]
func getClassfier(classfierType anc.ClassfierType, options anc.AnalysisOptions) Classfier {
if classfierType == anc.ProtocolAdaptive {
return func(ar *anc.AnnotatedRecord) (anc.ClassId, error) {
c, ok := options.ProtocolSpecificClassfiers[bpf.AgentTrafficProtocolT(ar.Protocol)]
if !ok {
return classfierMap[anc.RemoteIp](ar)
} else {
return classfierMap[c](ar)
}
}
} else {
return classfierMap[classfierType]
}
}

func getClassIdHumanReadableFunc(classfierType anc.ClassfierType, options anc.AnalysisOptions) (ClassIdAsHumanReadable, bool) {
if classfierType == anc.ProtocolAdaptive {
return func(ar *anc.AnnotatedRecord) string {
c, ok := options.ProtocolSpecificClassfiers[bpf.AgentTrafficProtocolT(ar.Protocol)]
if !ok {
return classIdHumanReadableMap[anc.RemoteIp](ar)
} else {
return classIdHumanReadableMap[c](ar)
}
}, true
} else {
f, ok := classIdHumanReadableMap[classfierType]
return f, ok
}
}
21 changes: 12 additions & 9 deletions agent/analysis/common/classfier.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package common

var ClassfierTypeNames = map[ClassfierType]string{
None: "none",
Conn: "conn",
RemotePort: "remote-port",
LocalPort: "local-port",
RemoteIp: "remote-ip",
Protocol: "protocol",
HttpPath: "http-path",
RedisCommand: "redis-command",
Default: "default",
None: "none",
Conn: "conn",
RemotePort: "remote-port",
LocalPort: "local-port",
RemoteIp: "remote-ip",
Protocol: "protocol",
HttpPath: "http-path",
RedisCommand: "redis-command",
ProtocolAdaptive: "protocol-adaptive",
Default: "default",
}

const (
Expand All @@ -26,6 +27,8 @@ const (

// Redis
RedisCommand

ProtocolAdaptive
)

type ClassId string
8 changes: 7 additions & 1 deletion agent/analysis/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"fmt"
"kyanos/agent/protocol"
"kyanos/bpf"
"kyanos/common"
ac "kyanos/common"

Expand All @@ -14,7 +15,9 @@ type AnalysisOptions struct {
SampleLimit int
Side ac.SideEnum
ClassfierType
CleanWhenHarvest bool
SubClassfierType ClassfierType
ProtocolSpecificClassfiers map[bpf.AgentTrafficProtocolT]ClassfierType
CleanWhenHarvest bool

// Fast Inspect Options
SlowMode bool
Expand All @@ -23,6 +26,9 @@ type AnalysisOptions struct {
TargetSamples int
CurrentReceivedSamples func() int
HavestSignal chan struct{}

// overview mode
Overview bool
}

func (a *AnalysisOptions) Init() {
Expand Down
6 changes: 5 additions & 1 deletion agent/analysis/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ConnStat struct {
ClassId anc.ClassId
HumanReadbleClassId string
ClassfierType anc.ClassfierType
IsSub bool
}

func (c *ConnStat) ClassIdAsHumanReadable(classId anc.ClassId) string {
Expand All @@ -31,10 +32,13 @@ func (c *ConnStat) ClassIdAsHumanReadable(classId anc.ClassId) string {
case anc.LocalPort:
fallthrough
case anc.RemoteIp:
return string(classId)
return c.HumanReadbleClassId
case anc.Protocol:
return c.HumanReadbleClassId
default:
if c.HumanReadbleClassId != "" {
return c.HumanReadbleClassId
}
return string(classId)
}
}
Expand Down
Loading
Loading