From 6788047cc4874c68ae9d0fa8587320ac509e2c61 Mon Sep 17 00:00:00 2001 From: Hui Date: Thu, 27 Apr 2023 21:00:51 +0800 Subject: [PATCH 01/12] The probe will gracefully exit and close the async-profiler. Signed-off-by: Hui --- collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go b/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go index e9cb4f860..4f4c6ab1d 100644 --- a/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go +++ b/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go @@ -114,6 +114,7 @@ func (r *CgoReceiver) consumeEvents() { func (r *CgoReceiver) Shutdown() error { // TODO stop the C routine + C.stopProfile() close(r.stopCh) r.shutdownWG.Wait() return nil From 67509ca8002368620017bde6324550b28851e808 Mon Sep 17 00:00:00 2001 From: Hui Date: Thu, 11 May 2023 10:53:59 +0800 Subject: [PATCH 02/12] Now javatrace will regularly clean up the space, and users can configure expiration time and cleanup interval through the configuration file.Fixed a bug in javatrace that could cause a null pointer and crash. Signed-off-by: Hui --- collector/docker/build-asyncprofiler.sh | 4 +- .../docker/kindling-collector-config.yml | 6 ++ .../component/analyzer/cpuanalyzer/config.go | 9 +++ .../analyzer/cpuanalyzer/cpu_analyzer.go | 40 ++++++++++++-- .../analyzer/cpuanalyzer/cpu_analyzer_test.go | 55 +++++++++++++++++++ deploy/agent/kindling-collector-config.yml | 6 ++ 6 files changed, 113 insertions(+), 7 deletions(-) create mode 100644 collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go diff --git a/collector/docker/build-asyncprofiler.sh b/collector/docker/build-asyncprofiler.sh index 681e1d344..acc1f39b7 100755 --- a/collector/docker/build-asyncprofiler.sh +++ b/collector/docker/build-asyncprofiler.sh @@ -1,5 +1,5 @@ -ASYNC_PROFILER=async-profiler-1.0.3-linux-x64.tar.gz -KINDLING_JAVA=kindling-java-1.0.3.tar.gz +ASYNC_PROFILER=async-profiler-1.0.4-linux-x64.tar.gz +KINDLING_JAVA=kindling-java-1.0.4.tar.gz APM_ALL=apm-all-3.1.0.jar SCRIPT_DIR="$(cd "$(dirname "$SCRIPT_BIN")" > /dev/null 2>&1; pwd -P)" diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 8062dda93..7a0fc19cc 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -51,6 +51,12 @@ analyzers: # edge_events_window_size is the size of the duration window that seats the edge events. # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 + #JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces. + #The unit is seconds. + JavaTraceDeleteInterval: 10 + #JavaTraceExpirationTime is the expiration time for data in javatraces. + #The unit is seconds. + JavaTraceExpirationTime: 30 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 diff --git a/collector/pkg/component/analyzer/cpuanalyzer/config.go b/collector/pkg/component/analyzer/cpuanalyzer/config.go index 18ee61eb3..8cfdd3cc3 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/config.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/config.go @@ -20,6 +20,13 @@ type Config struct { // EdgeEventsWindowSize is the size of the duration window that seats the edge events. // The unit is seconds. The greater it is, the more data will be stored. EdgeEventsWindowSize int `mapstructure:"edge_events_window_size"` + //JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces. + //The unit is seconds. + JavaTraceDeleteInterval int `mapstructure:"java_trace_delete_interval"` + //JavaTraceExpirationTime is the expiration time for data in javatraces. + //The unit is seconds. + JavaTraceExpirationTime int `mapstructure:"java_trace_expiration_time"` + } func NewDefaultConfig() *Config { @@ -29,5 +36,7 @@ func NewDefaultConfig() *Config { JavaTraceSlowTime: 500, SegmentSize: 40, EdgeEventsWindowSize: 2, + JavaTraceDeleteInterval: 10, + JavaTraceExpirationTime: 30, } } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 7d5d07f56..ffe0df505 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -33,9 +33,16 @@ type CpuAnalyzer struct { lock sync.RWMutex telemetry *component.TelemetryTools tidExpiredQueue *tidDeleteQueue - javaTraces map[string]*TransactionIdEvent + javaTraces map[JavaTracesKey]*TransactionIdEvent nextConsumers []consumer.Consumer metadata *kubernetes.K8sMetaDataCache + cleanerTicker *time.Ticker +} + +type JavaTracesKey struct{ + TraceId string + PidString string + StartTime time.Time } func (ca *CpuAnalyzer) Type() analyzer.Type { @@ -57,7 +64,7 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum } ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000) ca.tidExpiredQueue = newTidDeleteQueue() - ca.javaTraces = make(map[string]*TransactionIdEvent, 100000) + ca.javaTraces = make(map[JavaTracesKey]*TransactionIdEvent, 100000) go ca.TidDelete(30*time.Second, 10*time.Second) go ca.sampleSend() newSelfMetrics(telemetry.MeterProvider, ca) @@ -65,12 +72,26 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum } func (ca *CpuAnalyzer) Start() error { - // Disable receiving and sending the profiling data by default. + ca.cleanerTicker = time.NewTicker(time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second) + go func() { + for range ca.cleanerTicker.C { + ca.lock.Lock() + now := time.Now() + for key:= range ca.javaTraces { + if now.Sub(key.StartTime) > time.Duration(ca.cfg.JavaTraceExpirationTime)*time.Second { + delete(ca.javaTraces, key) + fmt.Print("Expired data has been released,pid = " + key.PidString) + } + } + ca.lock.Unlock() + } + }() return nil } func (ca *CpuAnalyzer) Shutdown() error { _ = ca.StopProfile() + ca.cleanerTicker.Stop() return nil } @@ -114,10 +135,19 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) { } func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { + javatracekey := &JavaTracesKey{ + TraceId: ev.TraceId, + PidString: ev.PidString, + StartTime: time.Now(), + } if ev.IsEntry == 1 { - ca.javaTraces[ev.TraceId+ev.PidString] = ev + ca.javaTraces[*javatracekey] = ev } else { - oldEvent := ca.javaTraces[ev.TraceId+ev.PidString] + oldEvent,ok := ca.javaTraces[*javatracekey] + if(!ok){ + ca.telemetry.Logger.Warnf("No javaTraces traceid=%d, pid=%s", javatracekey.TraceId,javatracekey.PidString) + return + } pid, _ := strconv.ParseInt(ev.PidString, 10, 64) spendTime := ev.Timestamp - oldEvent.Timestamp contentKey := oldEvent.Url diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go new file mode 100644 index 000000000..23a48a366 --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go @@ -0,0 +1,55 @@ +package cpuanalyzer + +import ( + "math/rand" + "strconv" + "testing" + "time" +) + +func TestStart(t *testing.T) { + keys := make([]JavaTracesKey, 10) + for i := 0; i < 10; i++ { + offset := time.Duration(rand.Int63n(int64(10 * time.Second))) + keys[i] = JavaTracesKey{ + TraceId: strconv.Itoa(rand.Intn(1000000)), + PidString: strconv.Itoa(rand.Intn(1000000)), + StartTime: time.Now().Add(offset - 5*time.Second), + } + } + tevent := &TransactionIdEvent{ + TraceId: "0", + PidString: "1", + } + javaTraces := make(map[JavaTracesKey]*TransactionIdEvent) + for _, key := range keys { + javaTraces[key] = tevent + } + config:= NewDefaultConfig() + ca := &CpuAnalyzer{ + javaTraces: javaTraces, + cfg: config, + } + ca.cleanerTicker = time.NewTicker(time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second) + go func() { + for range ca.cleanerTicker.C { + ca.lock.Lock() + now := time.Now() + for key:= range ca.javaTraces { + if now.Sub(key.StartTime) > time.Duration(ca.cfg.JavaTraceExpirationTime)*time.Second { + delete(ca.javaTraces, key) + t.Log("已删除pid="+ + key.PidString+ ",当前时间:"+ + time.Now().Truncate(time.Second).Format("15:04:05")+ + ",map剩余数量:"+strconv.Itoa(len(javaTraces))) + } + } + ca.lock.Unlock() + } + }() + time.Sleep(10*time.Minute) +} + +func Test(t *testing.T){ + +} diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 8062dda93..7a0fc19cc 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -51,6 +51,12 @@ analyzers: # edge_events_window_size is the size of the duration window that seats the edge events. # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 + #JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces. + #The unit is seconds. + JavaTraceDeleteInterval: 10 + #JavaTraceExpirationTime is the expiration time for data in javatraces. + #The unit is seconds. + JavaTraceExpirationTime: 30 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 From 46db4cfc189e74d78615a4b0fa3c3b155daa6296 Mon Sep 17 00:00:00 2001 From: Hui Date: Thu, 11 May 2023 13:41:07 +0800 Subject: [PATCH 03/12] scheduled cleaning in javatraces , and add null pointer checking Signed-off-by: Hui --- collector/docker/kindling-collector-config.yml | 8 ++++---- deploy/agent/kindling-collector-config.yml | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 7a0fc19cc..10d324c23 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -51,12 +51,12 @@ analyzers: # edge_events_window_size is the size of the duration window that seats the edge events. # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 - #JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces. + #java_trace_delete_interval is the interval for cleaning up expired data in javatraces. #The unit is seconds. - JavaTraceDeleteInterval: 10 - #JavaTraceExpirationTime is the expiration time for data in javatraces. + java_trace_delete_interval: 10 + #java_trace_expiration_time is the expiration time for data in javatraces. #The unit is seconds. - JavaTraceExpirationTime: 30 + java_trace_expiration_time: 30 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 7a0fc19cc..10d324c23 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -51,12 +51,12 @@ analyzers: # edge_events_window_size is the size of the duration window that seats the edge events. # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 - #JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces. + #java_trace_delete_interval is the interval for cleaning up expired data in javatraces. #The unit is seconds. - JavaTraceDeleteInterval: 10 - #JavaTraceExpirationTime is the expiration time for data in javatraces. + java_trace_delete_interval: 10 + #java_trace_expiration_time is the expiration time for data in javatraces. #The unit is seconds. - JavaTraceExpirationTime: 30 + java_trace_expiration_time: 30 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 From 002d6d663c596377914c6478bb045f1629d4c025 Mon Sep 17 00:00:00 2001 From: AnthonyHui <48346142+hwz779866221@users.noreply.github.com> Date: Thu, 11 May 2023 13:44:21 +0800 Subject: [PATCH 04/12] Update kindling-collector-config.yml Signed-off-by: AnthonyHui <48346142+hwz779866221@users.noreply.github.com> --- collector/docker/kindling-collector-config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 10d324c23..cbcd57e0a 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -52,11 +52,11 @@ analyzers: # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 #java_trace_delete_interval is the interval for cleaning up expired data in javatraces. - #The unit is seconds. + #The unit is seconds. java_trace_delete_interval: 10 #java_trace_expiration_time is the expiration time for data in javatraces. - #The unit is seconds. - java_trace_expiration_time: 30 + #The unit is seconds. + java_trace_expiration_time: 30 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 @@ -248,4 +248,4 @@ observability: # Note: DO NOT add the prefix "http://" endpoint: 10.10.10.10:8080 stdout: - collect_period: 15s \ No newline at end of file + collect_period: 15s From 2e95d2e394ac5faabf1d21c1f2d7fc77f4adbda6 Mon Sep 17 00:00:00 2001 From: AnthonyHui <48346142+hwz779866221@users.noreply.github.com> Date: Thu, 11 May 2023 13:46:25 +0800 Subject: [PATCH 05/12] Update kindling-collector-config.yml Signed-off-by: AnthonyHui <48346142+hwz779866221@users.noreply.github.com> --- deploy/agent/kindling-collector-config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 10d324c23..cbcd57e0a 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -52,11 +52,11 @@ analyzers: # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 #java_trace_delete_interval is the interval for cleaning up expired data in javatraces. - #The unit is seconds. + #The unit is seconds. java_trace_delete_interval: 10 #java_trace_expiration_time is the expiration time for data in javatraces. - #The unit is seconds. - java_trace_expiration_time: 30 + #The unit is seconds. + java_trace_expiration_time: 30 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 @@ -248,4 +248,4 @@ observability: # Note: DO NOT add the prefix "http://" endpoint: 10.10.10.10:8080 stdout: - collect_period: 15s \ No newline at end of file + collect_period: 15s From 5c951f41df7835d85686e580275d867e1f5d1a65 Mon Sep 17 00:00:00 2001 From: Hui Date: Tue, 23 May 2023 16:49:34 +0800 Subject: [PATCH 06/12] test Signed-off-by: Hui --- collector/docker/build-asyncprofiler.sh | 4 +- .../docker/kindling-collector-config.yml | 8 +- .../component/analyzer/cpuanalyzer/config.go | 8 +- .../analyzer/cpuanalyzer/cpu_analyzer.go | 61 +++++-------- .../analyzer/cpuanalyzer/cpu_analyzer_test.go | 55 ------------ .../analyzer/cpuanalyzer/delete_javatrace.go | 76 ++++++++++++++++ .../cpuanalyzer/delete_javatrace_test.go | 90 +++++++++++++++++++ deploy/agent/kindling-collector-config.yml | 8 +- 8 files changed, 200 insertions(+), 110 deletions(-) delete mode 100644 collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go create mode 100644 collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go create mode 100644 collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go diff --git a/collector/docker/build-asyncprofiler.sh b/collector/docker/build-asyncprofiler.sh index acc1f39b7..681e1d344 100755 --- a/collector/docker/build-asyncprofiler.sh +++ b/collector/docker/build-asyncprofiler.sh @@ -1,5 +1,5 @@ -ASYNC_PROFILER=async-profiler-1.0.4-linux-x64.tar.gz -KINDLING_JAVA=kindling-java-1.0.4.tar.gz +ASYNC_PROFILER=async-profiler-1.0.3-linux-x64.tar.gz +KINDLING_JAVA=kindling-java-1.0.3.tar.gz APM_ALL=apm-all-3.1.0.jar SCRIPT_DIR="$(cd "$(dirname "$SCRIPT_BIN")" > /dev/null 2>&1; pwd -P)" diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index cbcd57e0a..2675a735e 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -51,11 +51,11 @@ analyzers: # edge_events_window_size is the size of the duration window that seats the edge events. # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 - #java_trace_delete_interval is the interval for cleaning up expired data in javatraces. - #The unit is seconds. + # java_trace_delete_interval is the interval for cleaning up expired data in javatraces. + # The unit is seconds. java_trace_delete_interval: 10 - #java_trace_expiration_time is the expiration time for data in javatraces. - #The unit is seconds. + # java_trace_expiration_time is the expiration time for data in javatraces. + # The unit is seconds. java_trace_expiration_time: 30 tcpconnectanalyzer: channel_size: 10000 diff --git a/collector/pkg/component/analyzer/cpuanalyzer/config.go b/collector/pkg/component/analyzer/cpuanalyzer/config.go index 8cfdd3cc3..1a1da3329 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/config.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/config.go @@ -20,11 +20,11 @@ type Config struct { // EdgeEventsWindowSize is the size of the duration window that seats the edge events. // The unit is seconds. The greater it is, the more data will be stored. EdgeEventsWindowSize int `mapstructure:"edge_events_window_size"` - //JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces. - //The unit is seconds. + // JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces. + // The unit is seconds. JavaTraceDeleteInterval int `mapstructure:"java_trace_delete_interval"` - //JavaTraceExpirationTime is the expiration time for data in javatraces. - //The unit is seconds. + // JavaTraceExpirationTime is the expiration time for data in javatraces. + // The unit is seconds. JavaTraceExpirationTime int `mapstructure:"java_trace_expiration_time"` } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 5b138b74f..824179034 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -27,23 +27,17 @@ const ( ) type CpuAnalyzer struct { - cfg *Config - cpuPidEvents map[uint32]map[uint32]*TimeSegments - routineSize *atomic.Int32 - lock sync.RWMutex - telemetry *component.TelemetryTools - tidExpiredQueue *tidDeleteQueue - javaTraces map[JavaTracesKey]*TransactionIdEvent - nextConsumers []consumer.Consumer - metadata *kubernetes.K8sMetaDataCache - cleanerTicker *time.Ticker - stopProfileChan chan struct{} -} - -type JavaTracesKey struct{ - TraceId string - PidString string - StartTime time.Time + cfg *Config + cpuPidEvents map[uint32]map[uint32]*TimeSegments + routineSize *atomic.Int32 + lock sync.RWMutex + telemetry *component.TelemetryTools + tidExpiredQueue *tidDeleteQueue + javaTraces map[string]*TransactionIdEvent + javaTraceExpiredQueue *javaTraceDeleteQueue + nextConsumers []consumer.Consumer + metadata *kubernetes.K8sMetaDataCache + stopProfileChan chan struct{} } func (ca *CpuAnalyzer) Type() analyzer.Type { @@ -65,26 +59,15 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum } ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000) ca.tidExpiredQueue = newTidDeleteQueue() - ca.javaTraces = make(map[JavaTracesKey]*TransactionIdEvent, 100000) + ca.javaTraces = make(map[string]*TransactionIdEvent, 100000) newSelfMetrics(telemetry.MeterProvider, ca) return ca } func (ca *CpuAnalyzer) Start() error { - ca.cleanerTicker = time.NewTicker(time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second) - go func() { - for range ca.cleanerTicker.C { - ca.lock.Lock() - now := time.Now() - for key:= range ca.javaTraces { - if now.Sub(key.StartTime) > time.Duration(ca.cfg.JavaTraceExpirationTime)*time.Second { - delete(ca.javaTraces, key) - fmt.Print("Expired data has been released,pid = " + key.PidString) - } - } - ca.lock.Unlock() - } - }() + interval := time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second + expiredDuration :=time.Duration(ca.cfg.JavaTraceExpirationTime) * time.Second + go ca.JavaTraceDelete(interval,expiredDuration) return nil } @@ -92,7 +75,6 @@ func (ca *CpuAnalyzer) Shutdown() error { if enableProfile { _ = ca.StopProfile() } - ca.cleanerTicker.Stop() return nil } @@ -136,17 +118,13 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) { } func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { - javatracekey := &JavaTracesKey{ - TraceId: ev.TraceId, - PidString: ev.PidString, - StartTime: time.Now(), - } + key := ev.TraceId+ev.PidString if ev.IsEntry == 1 { - ca.javaTraces[*javatracekey] = ev + ca.javaTraces[key] = ev } else { - oldEvent,ok := ca.javaTraces[*javatracekey] + oldEvent,ok := ca.javaTraces[key] if(!ok){ - ca.telemetry.Logger.Warnf("No javaTraces traceid=%d, pid=%s", javatracekey.TraceId,javatracekey.PidString) + ca.telemetry.Logger.Warnf("No javaTraces traceid=%s, pid=%s", ev.TraceId,ev.PidString) return } pid, _ := strconv.ParseInt(ev.PidString, 10, 64) @@ -179,6 +157,7 @@ func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { ReceiveDataGroupAsSignal(dataGroup) } } + ca.javaTraceExpiredQueue.Push(deleteVal{key: key,enterTime: time.Now()}) } func (ca *CpuAnalyzer) ConsumeJavaFutexEvent(event *model.KindlingEvent) { diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go deleted file mode 100644 index 23a48a366..000000000 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package cpuanalyzer - -import ( - "math/rand" - "strconv" - "testing" - "time" -) - -func TestStart(t *testing.T) { - keys := make([]JavaTracesKey, 10) - for i := 0; i < 10; i++ { - offset := time.Duration(rand.Int63n(int64(10 * time.Second))) - keys[i] = JavaTracesKey{ - TraceId: strconv.Itoa(rand.Intn(1000000)), - PidString: strconv.Itoa(rand.Intn(1000000)), - StartTime: time.Now().Add(offset - 5*time.Second), - } - } - tevent := &TransactionIdEvent{ - TraceId: "0", - PidString: "1", - } - javaTraces := make(map[JavaTracesKey]*TransactionIdEvent) - for _, key := range keys { - javaTraces[key] = tevent - } - config:= NewDefaultConfig() - ca := &CpuAnalyzer{ - javaTraces: javaTraces, - cfg: config, - } - ca.cleanerTicker = time.NewTicker(time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second) - go func() { - for range ca.cleanerTicker.C { - ca.lock.Lock() - now := time.Now() - for key:= range ca.javaTraces { - if now.Sub(key.StartTime) > time.Duration(ca.cfg.JavaTraceExpirationTime)*time.Second { - delete(ca.javaTraces, key) - t.Log("已删除pid="+ - key.PidString+ ",当前时间:"+ - time.Now().Truncate(time.Second).Format("15:04:05")+ - ",map剩余数量:"+strconv.Itoa(len(javaTraces))) - } - } - ca.lock.Unlock() - } - }() - time.Sleep(10*time.Minute) -} - -func Test(t *testing.T){ - -} diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go new file mode 100644 index 000000000..a884ea639 --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go @@ -0,0 +1,76 @@ +package cpuanalyzer + +import ( + "fmt" + "sync" + "time" +) + +type javaTraceDeleteQueue struct { + queueMutex sync.Mutex + queue []deleteVal +} + +type deleteVal struct { + key string + enterTime time.Time +} + +func newJavaTraceDeleteQueue() *javaTraceDeleteQueue { + return &javaTraceDeleteQueue{queue: make([]deleteVal,0)} +} + +func (dq *javaTraceDeleteQueue) GetFront() *deleteVal { + if len(dq.queue) > 0 { + return &dq.queue[0] + } + return nil +} + +func (dq *javaTraceDeleteQueue) Push(elem deleteVal) { + dq.queue = append(dq.queue, elem) +} + +func (dq *javaTraceDeleteQueue) Pop() { + if len(dq.queue) > 0 { + dq.queue = dq.queue[1:len(dq.queue)] + } +} + +func (ca *CpuAnalyzer) JavaTraceDelete(interval time.Duration, expiredDuration time.Duration) { + for { + select { + case <-ca.stopProfileChan: + return + case <-time.After(interval): + now := time.Now() + func() { + ca.javaTraceExpiredQueue.queueMutex.Lock() + defer ca.javaTraceExpiredQueue.queueMutex.Unlock() + for { + val := ca.javaTraceExpiredQueue.GetFront() + if val == nil { + break + } + if val.enterTime.Add(expiredDuration).After(now) { + break + } + + func() { + ca.lock.Lock() + defer ca.lock.Unlock() + event := ca.javaTraces[val.key] + if event == nil { + ca.javaTraceExpiredQueue.Pop() + } else { + // ca.telemetry.Logger.Debugf("Delete expired thread... pid=%s, tid=%s", event.PidString, event.TraceId) + fmt.Printf("Delete expired thread... pid=%s, tid=%s", event.PidString, event.TraceId) + delete(ca.javaTraces, val.key) + ca.javaTraceExpiredQueue.Pop() + } + }() + } + }() + } + } +} diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go new file mode 100644 index 000000000..70cac8df7 --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go @@ -0,0 +1,90 @@ +package cpuanalyzer + +import ( + "math/rand" + "strconv" + "testing" + "time" + + "github.com/Kindling-project/kindling/collector/pkg/component" +) + +var ( + cnt int + quitCnt int +) + + +func TestJavaTraceDeleteQueue(t *testing.T) { + + jt := make(map[string]*TransactionIdEvent, 100000) + testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools() + mycfg := &Config{SegmentSize: 40} + ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg} + ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() + expiredDuration := time.Second * 1 + interval := time.Second *1 + go func () { + for { + select { + case <-ca.stopProfileChan: + return + case <-time.After(interval): + now := time.Now() + func() { + ca.javaTraceExpiredQueue.queueMutex.Lock() + defer ca.javaTraceExpiredQueue.queueMutex.Unlock() + for { + val := ca.javaTraceExpiredQueue.GetFront() + if val == nil { + break + } + if val.enterTime.Add(expiredDuration).After(now) { + break + } + //Delete expired threads (current_time >= thread_exit_time + interval_time). + func() { + ca.lock.Lock() + defer ca.lock.Unlock() + event := ca.javaTraces[val.key] + if event == nil { + ca.javaTraceExpiredQueue.Pop() + } else { + t.Logf("Delete expired thread... pid=%s, tid=%s", event.PidString, event.TraceId) + delete(ca.javaTraces, val.key) + quitCnt++; + ca.javaTraceExpiredQueue.Pop() + } + }() + } + }() + } + } + }() + for i := 0; i < 20; i++ { + + ev := new(TransactionIdEvent) + ev.TraceId = strconv.Itoa(rand.Intn(10000)) + ev.PidString = strconv.Itoa(rand.Intn(10000)) + ev.IsEntry = 1 + key:= ev.TraceId + ev.PidString + ca.javaTraces[key] = ev + val := new(deleteVal) + val.key = ev.TraceId+ev.PidString + val.enterTime = time.Now() + ca.javaTraceExpiredQueue.Push(*val) + t.Logf("pid=%s, tid=%s enter time=%s\n",ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) + cnt++ + time.Sleep(3 * timeDuration) + } + time.Sleep(10 * timeDuration) + + if cnt != quitCnt { + t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", cnt, quitCnt) + } else { + t.Logf("All javatraces have exited normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt) + } + + time.Sleep(10*time.Minute) + +} diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index cbcd57e0a..2675a735e 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -51,11 +51,11 @@ analyzers: # edge_events_window_size is the size of the duration window that seats the edge events. # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 - #java_trace_delete_interval is the interval for cleaning up expired data in javatraces. - #The unit is seconds. + # java_trace_delete_interval is the interval for cleaning up expired data in javatraces. + # The unit is seconds. java_trace_delete_interval: 10 - #java_trace_expiration_time is the expiration time for data in javatraces. - #The unit is seconds. + # java_trace_expiration_time is the expiration time for data in javatraces. + # The unit is seconds. java_trace_expiration_time: 30 tcpconnectanalyzer: channel_size: 10000 From 1d558d017617c39741bc935bde014a4d7787e8b0 Mon Sep 17 00:00:00 2001 From: Hui Date: Tue, 23 May 2023 17:55:17 +0800 Subject: [PATCH 07/12] fix bug Signed-off-by: Hui --- collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 824179034..449328bca 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -59,6 +59,7 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum } ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000) ca.tidExpiredQueue = newTidDeleteQueue() + ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() ca.javaTraces = make(map[string]*TransactionIdEvent, 100000) newSelfMetrics(telemetry.MeterProvider, ca) return ca From 53dab9b524cccd8a5eed62f9f15ed5b855b4f0f9 Mon Sep 17 00:00:00 2001 From: Hui Date: Wed, 24 May 2023 11:12:41 +0800 Subject: [PATCH 08/12] regular clean of javatraces Signed-off-by: Hui --- .../analyzer/cpuanalyzer/cpu_analyzer.go | 2 +- .../analyzer/cpuanalyzer/delete_javatrace.go | 5 +- .../cpuanalyzer/delete_javatrace_test.go | 47 ++----------------- 3 files changed, 8 insertions(+), 46 deletions(-) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 449328bca..b7fc9ed55 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -120,6 +120,7 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) { func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { key := ev.TraceId+ev.PidString + ca.javaTraceExpiredQueue.Push(deleteVal{key: key,enterTime: time.Now()}) if ev.IsEntry == 1 { ca.javaTraces[key] = ev } else { @@ -158,7 +159,6 @@ func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { ReceiveDataGroupAsSignal(dataGroup) } } - ca.javaTraceExpiredQueue.Push(deleteVal{key: key,enterTime: time.Now()}) } func (ca *CpuAnalyzer) ConsumeJavaFutexEvent(event *model.KindlingEvent) { diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go index a884ea639..d2101a02e 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go @@ -2,6 +2,7 @@ package cpuanalyzer import ( "fmt" + "strconv" "sync" "time" ) @@ -43,6 +44,7 @@ func (ca *CpuAnalyzer) JavaTraceDelete(interval time.Duration, expiredDuration t case <-ca.stopProfileChan: return case <-time.After(interval): + ca.telemetry.Logger.Debug("Start regular cleaning of javatrace...") now := time.Now() func() { ca.javaTraceExpiredQueue.queueMutex.Lock() @@ -63,8 +65,7 @@ func (ca *CpuAnalyzer) JavaTraceDelete(interval time.Duration, expiredDuration t if event == nil { ca.javaTraceExpiredQueue.Pop() } else { - // ca.telemetry.Logger.Debugf("Delete expired thread... pid=%s, tid=%s", event.PidString, event.TraceId) - fmt.Printf("Delete expired thread... pid=%s, tid=%s", event.PidString, event.TraceId) + ca.telemetry.Logger.Debugf("Delete expired javatrace... pid=%s, tid=%s", event.PidString, event.TraceId) delete(ca.javaTraces, val.key) ca.javaTraceExpiredQueue.Pop() } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go index 70cac8df7..311434213 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go @@ -19,48 +19,10 @@ func TestJavaTraceDeleteQueue(t *testing.T) { jt := make(map[string]*TransactionIdEvent, 100000) testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools() - mycfg := &Config{SegmentSize: 40} + mycfg := &Config{SegmentSize: 40,JavaTraceDeleteInterval:15,JavaTraceExpirationTime: 10} ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg} ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() - expiredDuration := time.Second * 1 - interval := time.Second *1 - go func () { - for { - select { - case <-ca.stopProfileChan: - return - case <-time.After(interval): - now := time.Now() - func() { - ca.javaTraceExpiredQueue.queueMutex.Lock() - defer ca.javaTraceExpiredQueue.queueMutex.Unlock() - for { - val := ca.javaTraceExpiredQueue.GetFront() - if val == nil { - break - } - if val.enterTime.Add(expiredDuration).After(now) { - break - } - //Delete expired threads (current_time >= thread_exit_time + interval_time). - func() { - ca.lock.Lock() - defer ca.lock.Unlock() - event := ca.javaTraces[val.key] - if event == nil { - ca.javaTraceExpiredQueue.Pop() - } else { - t.Logf("Delete expired thread... pid=%s, tid=%s", event.PidString, event.TraceId) - delete(ca.javaTraces, val.key) - quitCnt++; - ca.javaTraceExpiredQueue.Pop() - } - }() - } - }() - } - } - }() + ca.Start() for i := 0; i < 20; i++ { ev := new(TransactionIdEvent) @@ -75,16 +37,15 @@ func TestJavaTraceDeleteQueue(t *testing.T) { ca.javaTraceExpiredQueue.Push(*val) t.Logf("pid=%s, tid=%s enter time=%s\n",ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) cnt++ - time.Sleep(3 * timeDuration) + time.Sleep(3 * time.Second) } time.Sleep(10 * timeDuration) if cnt != quitCnt { t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", cnt, quitCnt) } else { - t.Logf("All javatraces have exited normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt) + t.Logf("All javatraces have cleaned normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt) } - time.Sleep(10*time.Minute) } From c613333f1aaaa9d770ab6a7443f774c3ad663e5f Mon Sep 17 00:00:00 2001 From: Hui Date: Wed, 24 May 2023 11:46:43 +0800 Subject: [PATCH 09/12] regular clean of javatraces Signed-off-by: Hui --- .../pkg/component/analyzer/cpuanalyzer/delete_javatrace.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go index d2101a02e..636f9ef8e 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go @@ -1,8 +1,6 @@ package cpuanalyzer import ( - "fmt" - "strconv" "sync" "time" ) From a59b33e97fd2852fb2ff0820206166296e7fd1ec Mon Sep 17 00:00:00 2001 From: anthonyhui Date: Fri, 2 Jun 2023 09:57:35 +0800 Subject: [PATCH 10/12] scheduled cleaning in javatraces , and add null pointer checking Signed-off-by: anthonyhui --- .../docker/kindling-collector-config.yml | 4 ++-- .../component/analyzer/cpuanalyzer/config.go | 15 +++++++-------- .../analyzer/cpuanalyzer/cpu_analyzer.go | 19 +++++++++++-------- .../analyzer/cpuanalyzer/delete_javatrace.go | 6 +++--- .../cpuanalyzer/delete_javatrace_test.go | 17 ++++++++--------- deploy/agent/kindling-collector-config.yml | 4 ++-- 6 files changed, 33 insertions(+), 32 deletions(-) diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 2675a735e..0be79b7f5 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -53,10 +53,10 @@ analyzers: edge_events_window_size: 2 # java_trace_delete_interval is the interval for cleaning up expired data in javatraces. # The unit is seconds. - java_trace_delete_interval: 10 + java_trace_delete_interval: 20 # java_trace_expiration_time is the expiration time for data in javatraces. # The unit is seconds. - java_trace_expiration_time: 30 + java_trace_expiration_time: 120 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 diff --git a/collector/pkg/component/analyzer/cpuanalyzer/config.go b/collector/pkg/component/analyzer/cpuanalyzer/config.go index 1a1da3329..64e395d42 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/config.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/config.go @@ -26,17 +26,16 @@ type Config struct { // JavaTraceExpirationTime is the expiration time for data in javatraces. // The unit is seconds. JavaTraceExpirationTime int `mapstructure:"java_trace_expiration_time"` - } func NewDefaultConfig() *Config { return &Config{ - SamplingInterval: 5, - OpenJavaTraceSampling: false, - JavaTraceSlowTime: 500, - SegmentSize: 40, - EdgeEventsWindowSize: 2, - JavaTraceDeleteInterval: 10, - JavaTraceExpirationTime: 30, + SamplingInterval: 5, + OpenJavaTraceSampling: false, + JavaTraceSlowTime: 500, + SegmentSize: 40, + EdgeEventsWindowSize: 2, + JavaTraceDeleteInterval: 20, + JavaTraceExpirationTime: 120, } } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index b7fc9ed55..f37ff65ac 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -27,10 +27,11 @@ const ( ) type CpuAnalyzer struct { - cfg *Config + cfg *Config cpuPidEvents map[uint32]map[uint32]*TimeSegments routineSize *atomic.Int32 lock sync.RWMutex + jtlock sync.RWMutex telemetry *component.TelemetryTools tidExpiredQueue *tidDeleteQueue javaTraces map[string]*TransactionIdEvent @@ -67,8 +68,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum func (ca *CpuAnalyzer) Start() error { interval := time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second - expiredDuration :=time.Duration(ca.cfg.JavaTraceExpirationTime) * time.Second - go ca.JavaTraceDelete(interval,expiredDuration) + expiredDuration := time.Duration(ca.cfg.JavaTraceExpirationTime) * time.Second + go ca.JavaTraceDelete(interval, expiredDuration) return nil } @@ -119,14 +120,16 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) { } func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { - key := ev.TraceId+ev.PidString - ca.javaTraceExpiredQueue.Push(deleteVal{key: key,enterTime: time.Now()}) + ca.jtlock.Lock() + defer ca.jtlock.Unlock() + key := ev.TraceId + ev.PidString + ca.javaTraceExpiredQueue.Push(deleteVal{key: key, enterTime: time.Now()}) if ev.IsEntry == 1 { ca.javaTraces[key] = ev } else { - oldEvent,ok := ca.javaTraces[key] - if(!ok){ - ca.telemetry.Logger.Warnf("No javaTraces traceid=%s, pid=%s", ev.TraceId,ev.PidString) + oldEvent, ok := ca.javaTraces[key] + if !ok { + ca.telemetry.Logger.Warnf("No javaTraces traceid=%s, pid=%s", ev.TraceId, ev.PidString) return } pid, _ := strconv.ParseInt(ev.PidString, 10, 64) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go index 636f9ef8e..8028d26b3 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go @@ -16,7 +16,7 @@ type deleteVal struct { } func newJavaTraceDeleteQueue() *javaTraceDeleteQueue { - return &javaTraceDeleteQueue{queue: make([]deleteVal,0)} + return &javaTraceDeleteQueue{queue: make([]deleteVal, 0)} } func (dq *javaTraceDeleteQueue) GetFront() *deleteVal { @@ -57,8 +57,8 @@ func (ca *CpuAnalyzer) JavaTraceDelete(interval time.Duration, expiredDuration t } func() { - ca.lock.Lock() - defer ca.lock.Unlock() + ca.jtlock.Lock() + defer ca.jtlock.Unlock() event := ca.javaTraces[val.key] if event == nil { ca.javaTraceExpiredQueue.Pop() diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go index 311434213..9571bc1ce 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go @@ -10,16 +10,15 @@ import ( ) var ( - cnt int - quitCnt int + cnt int + quitCnt int ) - func TestJavaTraceDeleteQueue(t *testing.T) { jt := make(map[string]*TransactionIdEvent, 100000) testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools() - mycfg := &Config{SegmentSize: 40,JavaTraceDeleteInterval:15,JavaTraceExpirationTime: 10} + mycfg := &Config{SegmentSize: 40, JavaTraceDeleteInterval: 15, JavaTraceExpirationTime: 10} ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg} ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() ca.Start() @@ -29,13 +28,13 @@ func TestJavaTraceDeleteQueue(t *testing.T) { ev.TraceId = strconv.Itoa(rand.Intn(10000)) ev.PidString = strconv.Itoa(rand.Intn(10000)) ev.IsEntry = 1 - key:= ev.TraceId + ev.PidString + key := ev.TraceId + ev.PidString ca.javaTraces[key] = ev val := new(deleteVal) - val.key = ev.TraceId+ev.PidString + val.key = ev.TraceId + ev.PidString val.enterTime = time.Now() ca.javaTraceExpiredQueue.Push(*val) - t.Logf("pid=%s, tid=%s enter time=%s\n",ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) + t.Logf("pid=%s, tid=%s enter time=%s\n", ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) cnt++ time.Sleep(3 * time.Second) } @@ -44,8 +43,8 @@ func TestJavaTraceDeleteQueue(t *testing.T) { if cnt != quitCnt { t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", cnt, quitCnt) } else { - t.Logf("All javatraces have cleaned normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt) + t.Logf("All javatraces have cleaned normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt) } - time.Sleep(10*time.Minute) + time.Sleep(10 * time.Minute) } diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 2675a735e..0be79b7f5 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -53,10 +53,10 @@ analyzers: edge_events_window_size: 2 # java_trace_delete_interval is the interval for cleaning up expired data in javatraces. # The unit is seconds. - java_trace_delete_interval: 10 + java_trace_delete_interval: 20 # java_trace_expiration_time is the expiration time for data in javatraces. # The unit is seconds. - java_trace_expiration_time: 30 + java_trace_expiration_time: 120 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 From 71383902f2d9dd66f59b7c643859fda6c2fbb957 Mon Sep 17 00:00:00 2001 From: anthonyhui Date: Wed, 16 Aug 2023 18:16:40 +0800 Subject: [PATCH 11/12] scheduled cleaning in javatraces , and add null pointer checking Signed-off-by: anthonyhui --- .../analyzer/cpuanalyzer/delete_javatrace_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go index 9571bc1ce..882a9a94e 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go @@ -21,7 +21,7 @@ func TestJavaTraceDeleteQueue(t *testing.T) { mycfg := &Config{SegmentSize: 40, JavaTraceDeleteInterval: 15, JavaTraceExpirationTime: 10} ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg} ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() - ca.Start() + go ca.JavaTraceDelete(1*time.Second, 1*time.Second) for i := 0; i < 20; i++ { ev := new(TransactionIdEvent) @@ -36,14 +36,14 @@ func TestJavaTraceDeleteQueue(t *testing.T) { ca.javaTraceExpiredQueue.Push(*val) t.Logf("pid=%s, tid=%s enter time=%s\n", ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) cnt++ - time.Sleep(3 * time.Second) + time.Sleep(1 * time.Second) } time.Sleep(10 * timeDuration) - if cnt != quitCnt { - t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", cnt, quitCnt) + if len(ca.javaTraceExpiredQueue.queue) != 0 { + t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! enterCount=%d\n", cnt) } else { - t.Logf("All javatraces have cleaned normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt) + t.Logf("All javatraces have cleaned normally. enterCount=%d\n", cnt) } time.Sleep(10 * time.Minute) From ace01541c52d13bf25b2d50ffbf3297ea8098d3f Mon Sep 17 00:00:00 2001 From: anthonyhui Date: Thu, 31 Aug 2023 10:33:49 +0800 Subject: [PATCH 12/12] scheduled cleaning in javatraces , and add null pointer checking Signed-off-by: anthonyhui --- .../cpuanalyzer/delete_javatrace_test.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go index 882a9a94e..027801342 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go @@ -9,10 +9,7 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/component" ) -var ( - cnt int - quitCnt int -) +var cnt int func TestJavaTraceDeleteQueue(t *testing.T) { @@ -22,8 +19,8 @@ func TestJavaTraceDeleteQueue(t *testing.T) { ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg} ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() go ca.JavaTraceDelete(1*time.Second, 1*time.Second) - for i := 0; i < 20; i++ { + for i := 0; i < 20; i++ { ev := new(TransactionIdEvent) ev.TraceId = strconv.Itoa(rand.Intn(10000)) ev.PidString = strconv.Itoa(rand.Intn(10000)) @@ -36,15 +33,13 @@ func TestJavaTraceDeleteQueue(t *testing.T) { ca.javaTraceExpiredQueue.Push(*val) t.Logf("pid=%s, tid=%s enter time=%s\n", ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) cnt++ - time.Sleep(1 * time.Second) } - time.Sleep(10 * timeDuration) + time.Sleep(5 * time.Second) - if len(ca.javaTraceExpiredQueue.queue) != 0 { - t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! enterCount=%d\n", cnt) + if len(ca.javaTraces) != 0 { + t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! "+ + "enterCount=%d , len of javatrace is : %d\n", cnt, len(ca.javaTraces)) } else { t.Logf("All javatraces have cleaned normally. enterCount=%d\n", cnt) } - time.Sleep(10 * time.Minute) - }