Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] Support setting different log levels for different modules. #41

Merged
merged 1 commit into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 47 additions & 49 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/fs"
"kyanos/agent/analysis"
ac "kyanos/agent/common"
"kyanos/agent/compatible"
"kyanos/agent/conn"
"kyanos/agent/protocol"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/emirpasic/gods/maps/treemap"
"github.com/jefurry/logrus"
"github.com/spf13/viper"
"github.com/zcalusic/sysinfo"
)
Expand All @@ -46,8 +46,6 @@ type KernEventHook func(evt *bpf.AgentKernEvt)
type InitCompletedHook func()
type ConnManagerInitHook func(*conn.ConnManager)

var log *logrus.Logger = common.Log

const perfEventDataBufferSize = 30 * 1024 * 1024
const perfEventControlBufferSize = 2 * 1024 * 1024

Expand Down Expand Up @@ -131,7 +129,7 @@ func SetupAgent(options AgentOptions) {

// Remove resource limits for kernels <5.11.
if err := rlimit.RemoveMemlock(); err != nil {
log.Warn("Remove memlock:", err)
common.AgentLog.Warn("Remove memlock:", err)
}

kernelVersion := compatible.GetCurrentKernelVersion()
Expand All @@ -144,7 +142,7 @@ func SetupAgent(options AgentOptions) {
if options.BTFFilePath != "" {
btfPath, err := btf.LoadSpec(options.BTFFilePath)
if err != nil {
log.Fatalf("can't load btf spec: %v", err)
common.AgentLog.Fatalf("can't load btf spec: %v", err)
}
collectionOptions = &ebpf.CollectionOptions{
Programs: ebpf.ProgramOptions{
Expand All @@ -155,22 +153,22 @@ func SetupAgent(options AgentOptions) {
} else {
fileBytes, err := getBestMatchedBTFFile()
if err != nil {
log.Fatalln(err)
common.AgentLog.Fatalln(err)
return
}
needGenerateBTF := fileBytes != nil

if needGenerateBTF {
btfFilePath, err := writeToFile(fileBytes, ".kyanos.btf")
if err != nil {
log.Fatalln(err)
common.AgentLog.Fatalln(err)
return
}
defer os.Remove(btfFilePath)

btfPath, err := btf.LoadSpec(btfFilePath)
if err != nil {
log.Fatalf("can't load btf spec: %v", err)
common.AgentLog.Fatalf("can't load btf spec: %v", err)
}
collectionOptions = &ebpf.CollectionOptions{
Programs: ebpf.ProgramOptions{
Expand All @@ -191,13 +189,13 @@ func SetupAgent(options AgentOptions) {
objs = &bpf.AgentOldObjects{}
spec, err = bpf.LoadAgentOld()
if err != nil {
log.Fatal("load Agent error:", err)
common.AgentLog.Fatal("load Agent error:", err)
}
} else {
objs = &bpf.AgentObjects{}
spec, err = bpf.LoadAgent()
if err != nil {
log.Fatal("load Agent error:", err)
common.AgentLog.Fatal("load Agent error:", err)
}
}
bpf.Objs = objs
Expand All @@ -216,9 +214,9 @@ func SetupAgent(options AgentOptions) {
inner_err, ok := err.(*ebpf.VerifierError)
if ok {
inner_err.Truncated = false
log.Errorf("loadAgentObjects: %+v", inner_err)
common.AgentLog.Errorf("loadAgentObjects: %+v", inner_err)
} else {
log.Errorf("loadAgentObjects: %+v", err)
common.AgentLog.Errorf("loadAgentObjects: %+v", err)
}
return
}
Expand Down Expand Up @@ -268,11 +266,11 @@ func SetupAgent(options AgentOptions) {
err := l.Close()
if err != nil {
info, _ := l.Info()
log.Errorf("Fail to close link for: %v\n", info)
common.AgentLog.Errorf("Fail to close link for: %v\n", info)
}
}
}
log.Debugln("All links closed!")
common.AgentLog.Debugln("All links closed!")
}()
// Close the reader when the process receives a signal, which will exit
// the read loop.
Expand All @@ -282,24 +280,24 @@ func SetupAgent(options AgentOptions) {
for _, reader := range readers {
reader.Close()
}
log.Warnf("setup event reader err: %v", err)
common.AgentLog.Warnf("setup event reader err: %v", err)
return
}

go func() {
<-stopper
common.SendStopSignal()
log.Debugln("stop!")
ac.SendStopSignal()
common.AgentLog.Debugln("stop!")
for _, reader := range readers {
if err := reader.Close(); err != nil {
log.Fatalf("closing reader(%v) error: %s", reader, err)
common.AgentLog.Fatalf("closing reader(%v) error: %s", reader, err)
}
}
pm.StopAll()
stop = true
}()

log.Info("Waiting for events..")
common.AgentLog.Info("Waiting for events..")

startReaders(options, kernelVersion, pm, readers)

Expand All @@ -309,7 +307,7 @@ func SetupAgent(options AgentOptions) {
for !stop {
time.Sleep(time.Second * 1)
}
log.Infoln("Kyanos Stopped")
common.AgentLog.Infoln("Kyanos Stopped")
return
}

Expand Down Expand Up @@ -381,14 +379,14 @@ func startRingbufferReader(reader io.Closer, consumeFunction func(ringbuf.Record
record, err := ringbuffer.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
log.Debug("[dataReader] Received signal, exiting..")
common.AgentLog.Debug("[dataReader] Received signal, exiting..")
return
}
log.Debugf("[dataReader] reading from reader: %s\n", err)
common.AgentLog.Debugf("[dataReader] reading from reader: %s\n", err)
continue
}
if err := consumeFunction(record); err != nil {
log.Errorf("[dataReader] handleKernEvt err: %s\n", err)
common.AgentLog.Errorf("[dataReader] handleKernEvt err: %s\n", err)
continue
}
}
Expand All @@ -402,17 +400,17 @@ func startPerfeventReader(reader io.Closer, consumeFunction func(perf.Record) er
record, err := perfReader.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
log.Debug("[dataReader] Received signal, exiting..")
common.AgentLog.Debug("[dataReader] Received signal, exiting..")
return
}
log.Debugf("[dataReader] reading from reader: %s\n", err)
common.AgentLog.Debugf("[dataReader] reading from reader: %s\n", err)
continue
}
if err := consumeFunction(record); err != nil {
log.Errorf("[dataReader] handleKernEvt err: %s\n", err)
common.AgentLog.Errorf("[dataReader] handleKernEvt err: %s\n", err)
continue
} else if record.LostSamples > 0 {
log.Warnf("[dataReader] lost sample: %d", record.LostSamples)
common.AgentLog.Warnf("[dataReader] lost sample: %d", record.LostSamples)
}
}
}()
Expand All @@ -425,73 +423,73 @@ func setAndValidateParameters() bool {
var enabledLocalPortMap *ebpf.Map = bpf.GetMap("EnabledLocalPortMap")

if targetPid := viper.GetInt64(common.FilterPidVarName); targetPid > 0 {
log.Infoln("filter for pid: ", targetPid)
common.AgentLog.Infoln("filter for pid: ", targetPid)
controlValues.Update(bpf.AgentControlValueIndexTKTargetTGIDIndex, targetPid, ebpf.UpdateAny)
}

remotePorts := viper.GetStringSlice(common.RemotePortsVarName)
oneKey := uint16(1)
zeroValue := uint8(0)
if len(remotePorts) > 0 {
log.Infoln("filter for remote ports: ", remotePorts)
common.AgentLog.Infoln("filter for remote ports: ", remotePorts)
err := enabledRemotePortMap.Update(oneKey, zeroValue, ebpf.UpdateAny)
if err != nil {
log.Errorln("Update EnabledRemotePortMap failed: ", err)
common.AgentLog.Errorln("Update EnabledRemotePortMap failed: ", err)
}
for _, each := range remotePorts {
portInt, err := strconv.Atoi(each)
if err != nil || portInt <= 0 {
log.Errorf("Invalid remote port : %s\n", each)
common.AgentLog.Errorf("Invalid remote port : %s\n", each)
return false
}
portNumber := uint16(portInt)
err = enabledRemotePortMap.Update(portNumber, zeroValue, ebpf.UpdateAny)
if err != nil {
log.Errorln("Update EnabledRemotePortMap failed: ", err)
common.AgentLog.Errorln("Update EnabledRemotePortMap failed: ", err)
}
}
}

remoteIps := viper.GetStringSlice(common.RemoteIpsVarName)
if len(remoteIps) > 0 {
log.Infoln("filter for remote ips: ", remoteIps)
common.AgentLog.Infoln("filter for remote ips: ", remoteIps)
oneKeyU32 := uint32(1)
err := enabledRemoteIpv4Map.Update(&oneKeyU32, &zeroValue, ebpf.UpdateAny)
if err != nil {
log.Errorln("Update EnabledRemoteIpv4Map failed: ", err)
common.AgentLog.Errorln("Update EnabledRemoteIpv4Map failed: ", err)
}
for _, each := range remoteIps {
ipInt32, err := common.IPv4ToUint32(each)
if err != nil {
log.Errorf("IPv4ToUint32 parse failed, ip string is: %s, err: %v", each, err)
common.AgentLog.Errorf("IPv4ToUint32 parse failed, ip string is: %s, err: %v", each, err)
return false
} else {
log.Debugln("Update EnabledRemoteIpv4Map, key: ", ipInt32, common.IntToIP(ipInt32))
common.AgentLog.Debugln("Update EnabledRemoteIpv4Map, key: ", ipInt32, common.IntToIP(ipInt32))
err = enabledRemoteIpv4Map.Update(&ipInt32, &zeroValue, ebpf.UpdateAny)
if err != nil {
log.Errorln("Update EnabledRemoteIpv4Map failed: ", err)
common.AgentLog.Errorln("Update EnabledRemoteIpv4Map failed: ", err)
}
}
}
}

localPorts := viper.GetStringSlice(common.LocalPortsVarName)
if len(localPorts) > 0 {
log.Infoln("filter for local ports: ", localPorts)
common.AgentLog.Infoln("filter for local ports: ", localPorts)
err := enabledLocalPortMap.Update(oneKey, uint8(oneKey), ebpf.UpdateAny)
if err != nil {
log.Errorln("Update EnabledLocalPortMap failed: ", err)
common.AgentLog.Errorln("Update EnabledLocalPortMap failed: ", err)
}
for _, each := range localPorts {
portInt, err := strconv.Atoi(each)
if err != nil || portInt <= 0 {
log.Errorf("Invalid local port : %s\n", each)
common.AgentLog.Errorf("Invalid local port : %s\n", each)
return false
}
portNumber := uint16(portInt)
err = enabledLocalPortMap.Update(portNumber, zeroValue, ebpf.UpdateAny)
if err != nil {
log.Errorln("Update EnabledLocalPortMap failed: ", err)
common.AgentLog.Errorln("Update EnabledLocalPortMap failed: ", err)
}
}
}
Expand Down Expand Up @@ -560,7 +558,7 @@ func attachBpfProgs(programs any, ifName string, kernelVersion compatible.Kernel
if kernelVersion.SupportCapability(compatible.SupportXDP) {
l, err := bpf.AttachXdpWithSpecifiedIfName(programs, options.IfName)
if err != nil {
log.Warnf("Attach XDP program failed, fallbacking...")
common.AgentLog.Warnf("Attach XDP program failed, fallbacking...")
} else {
linkList.PushBack(l)
}
Expand All @@ -569,7 +567,7 @@ func attachBpfProgs(programs any, ifName string, kernelVersion compatible.Kernel
if kernelVersion.SupportCapability(compatible.SupportRawTracepoint) {
l, err := bpf.AttachRawTracepointTcpDestroySockEntry(programs)
if err != nil {
log.Warnf("Attach TCP destroy raw tracepoint failed, fallbacking...")
common.AgentLog.Warnf("Attach TCP destroy raw tracepoint failed, fallbacking...")
} else {
linkList.PushBack(l)
}
Expand All @@ -591,7 +589,7 @@ func attachBpfProgs(programs any, ifName string, kernelVersion compatible.Kernel
}
if err != nil {
if idx == len(functions)-1 {
log.Fatalf("Attach failed: %v, functions: %v", err, functions)
common.AgentLog.Fatalf("Attach failed: %v, functions: %v", err, functions)
}
} else {
linkList.PushBack(l)
Expand Down Expand Up @@ -694,7 +692,7 @@ func getBestMatchedBTFFile() ([]uint8, error) {

var si sysinfo.SysInfo
si.GetSysInfo()
log.Debugf("[sys info] vendor: %s, os_arch: %s, kernel_arch: %s", si.OS.Vendor, si.OS.Architecture, si.Kernel.Architecture)
common.AgentLog.Debugf("[sys info] vendor: %s, os_arch: %s, kernel_arch: %s", si.OS.Vendor, si.OS.Architecture, si.Kernel.Architecture)

if si.OS.Vendor != "ubuntu" && si.OS.Vendor != "centos" {
panic("Current only support centos and ubuntu")
Expand All @@ -717,7 +715,7 @@ func getBestMatchedBTFFile() ([]uint8, error) {
btfFileDir += "/" + si.Kernel.Architecture
dir, err := bpf.BtfFiles.ReadDir(btfFileDir)
if err != nil {
log.Warnf("btf file not exists, path: %s", btfFileDir)
common.AgentLog.Warnf("btf file not exists, path: %s", btfFileDir)
return nil, err
}
btfFileNames := treemap.NewWithStringComparator()
Expand All @@ -731,22 +729,22 @@ func getBestMatchedBTFFile() ([]uint8, error) {

release := si.Kernel.Release
if value, found := btfFileNames.Get(release); found {
log.Debug("find btf file exactly!")
common.AgentLog.Debug("find btf file exactly!")
dirEntry := value.(fs.DirEntry)
fileName := dirEntry.Name()
file, err := bpf.BtfFiles.ReadFile(btfFileDir + "/" + fileName)
if err == nil {
return file, nil
}
} else {
log.Debug("find btf file exactly failed, try to find a lower version btf file...")
common.AgentLog.Debug("find btf file exactly failed, try to find a lower version btf file...")
}

key, value := btfFileNames.Floor(release)
if key != nil {
dirEntry := value.(fs.DirEntry)
fileName := dirEntry.Name()
log.Debugf("find a lower version btf file success: %s", fileName)
common.AgentLog.Debugf("find a lower version btf file success: %s", fileName)
file, err := bpf.BtfFiles.ReadFile(btfFileDir + "/" + fileName)
if err == nil {
return file, nil
Expand Down
3 changes: 1 addition & 2 deletions agent/agent_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ func StartAgent(bpfAttachFunctions []bpf.AttachBpfProgFunction,
wg.Add(1)
go func(pid int) {
cmd.FilterPid = int64(pid)
cmd.Verbose = true
cmd.DefaultLogLevel = int32(logrus.DebugLevel)
cmd.Debug = true
common.Log.SetLevel(logrus.DebugLevel)
agent.SetupAgent(agent.AgentOptions{
Stopper: agentStopper,
LoadBpfProgramFunction: func(objs interface{}) *list.List {
Expand Down
5 changes: 3 additions & 2 deletions agent/analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package analysis

import (
"cmp"
ac "kyanos/agent/common"
"kyanos/agent/protocol"
"kyanos/common"
"math"
Expand Down Expand Up @@ -124,7 +125,7 @@ type Analyzer struct {

func CreateAnalyzer(recordsChannel <-chan *AnnotatedRecord, showOption *AnalysisOptions, resultChannel chan<- []*ConnStat, renderStopper chan int) *Analyzer {
stopper := make(chan int)
common.AddToFastStopper(stopper)
ac.AddToFastStopper(stopper)
analyzer := &Analyzer{
Classfier: getClassfier(showOption.ClassfierType),
recordsChannel: recordsChannel,
Expand Down Expand Up @@ -190,6 +191,6 @@ func (a *Analyzer) analyze(record *AnnotatedRecord) {
}
aggregator.receive(record)
} else {
log.Errorf("classify error: %v\n", err)
common.DefaultLog.Warnf("classify error: %v\n", err)
}
}
Loading
Loading