Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduled cleaning in javatraces , and add null pointer checking #514

Merged
merged 23 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ analyzers:
# edge_events_window_size is the size of the duration window that seats the edge events.
# The unit is second. The greater it is, the more data will be stored.
edge_events_window_size: 2
# java_trace_delete_interval is the interval for cleaning up expired data in javatraces.
# The unit is seconds.
java_trace_delete_interval: 10
# java_trace_expiration_time is the expiration time for data in javatraces.
# The unit is seconds.
java_trace_expiration_time: 30
tcpconnectanalyzer:
channel_size: 10000
wait_event_second: 10
Expand Down Expand Up @@ -242,4 +248,4 @@ observability:
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
collect_period: 15s
9 changes: 9 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ type Config struct {
// EdgeEventsWindowSize is the size of the duration window that seats the edge events.
// The unit is seconds. The greater it is, the more data will be stored.
EdgeEventsWindowSize int `mapstructure:"edge_events_window_size"`
// JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces.
// The unit is seconds.
JavaTraceDeleteInterval int `mapstructure:"java_trace_delete_interval"`
// JavaTraceExpirationTime is the expiration time for data in javatraces.
// The unit is seconds.
JavaTraceExpirationTime int `mapstructure:"java_trace_expiration_time"`

}

func NewDefaultConfig() *Config {
Expand All @@ -29,5 +36,7 @@ func NewDefaultConfig() *Config {
JavaTraceSlowTime: 500,
SegmentSize: 40,
EdgeEventsWindowSize: 2,
JavaTraceDeleteInterval: 10,
JavaTraceExpirationTime: 30,
dxsup marked this conversation as resolved.
Show resolved Hide resolved
}
}
37 changes: 23 additions & 14 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ const (
)

type CpuAnalyzer struct {
cfg *Config
cpuPidEvents map[uint32]map[uint32]*TimeSegments
routineSize *atomic.Int32
lock sync.RWMutex
telemetry *component.TelemetryTools
tidExpiredQueue *tidDeleteQueue
javaTraces map[string]*TransactionIdEvent
nextConsumers []consumer.Consumer
metadata *kubernetes.K8sMetaDataCache

stopProfileChan chan struct{}
cfg *Config
cpuPidEvents map[uint32]map[uint32]*TimeSegments
routineSize *atomic.Int32
lock sync.RWMutex
telemetry *component.TelemetryTools
tidExpiredQueue *tidDeleteQueue
javaTraces map[string]*TransactionIdEvent
javaTraceExpiredQueue *javaTraceDeleteQueue
nextConsumers []consumer.Consumer
metadata *kubernetes.K8sMetaDataCache
stopProfileChan chan struct{}
}

func (ca *CpuAnalyzer) Type() analyzer.Type {
Expand All @@ -59,13 +59,16 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue()
ca.javaTraces = make(map[string]*TransactionIdEvent, 100000)
newSelfMetrics(telemetry.MeterProvider, ca)
return ca
}

func (ca *CpuAnalyzer) Start() error {
// Disable receiving and sending the profiling data by default.
interval := time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second
expiredDuration :=time.Duration(ca.cfg.JavaTraceExpirationTime) * time.Second
go ca.JavaTraceDelete(interval,expiredDuration)
return nil
}

Expand Down Expand Up @@ -116,10 +119,16 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) {
}

func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) {
key := ev.TraceId+ev.PidString
ca.javaTraceExpiredQueue.Push(deleteVal{key: key,enterTime: time.Now()})
if ev.IsEntry == 1 {
ca.javaTraces[ev.TraceId+ev.PidString] = ev
ca.javaTraces[key] = ev
} else {
oldEvent := ca.javaTraces[ev.TraceId+ev.PidString]
oldEvent,ok := ca.javaTraces[key]
if(!ok){
ca.telemetry.Logger.Warnf("No javaTraces traceid=%s, pid=%s", ev.TraceId,ev.PidString)
return
}
pid, _ := strconv.ParseInt(ev.PidString, 10, 64)
spendTime := ev.Timestamp - oldEvent.Timestamp
contentKey := oldEvent.Url
Expand Down
75 changes: 75 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cpuanalyzer

import (
"sync"
"time"
)

type javaTraceDeleteQueue struct {
queueMutex sync.Mutex
queue []deleteVal
}

type deleteVal struct {
key string
enterTime time.Time
}

func newJavaTraceDeleteQueue() *javaTraceDeleteQueue {
return &javaTraceDeleteQueue{queue: make([]deleteVal,0)}
}

func (dq *javaTraceDeleteQueue) GetFront() *deleteVal {
if len(dq.queue) > 0 {
return &dq.queue[0]
}
return nil
}

func (dq *javaTraceDeleteQueue) Push(elem deleteVal) {
dq.queue = append(dq.queue, elem)
}

func (dq *javaTraceDeleteQueue) Pop() {
if len(dq.queue) > 0 {
dq.queue = dq.queue[1:len(dq.queue)]
}
}

func (ca *CpuAnalyzer) JavaTraceDelete(interval time.Duration, expiredDuration time.Duration) {
for {
select {
case <-ca.stopProfileChan:
return
case <-time.After(interval):
ca.telemetry.Logger.Debug("Start regular cleaning of javatrace...")
now := time.Now()
func() {
ca.javaTraceExpiredQueue.queueMutex.Lock()
defer ca.javaTraceExpiredQueue.queueMutex.Unlock()
for {
val := ca.javaTraceExpiredQueue.GetFront()
if val == nil {
break
}
if val.enterTime.Add(expiredDuration).After(now) {
break
}

func() {
ca.lock.Lock()
defer ca.lock.Unlock()
dxsup marked this conversation as resolved.
Show resolved Hide resolved
event := ca.javaTraces[val.key]
if event == nil {
ca.javaTraceExpiredQueue.Pop()
} else {
ca.telemetry.Logger.Debugf("Delete expired javatrace... pid=%s, tid=%s", event.PidString, event.TraceId)
delete(ca.javaTraces, val.key)
ca.javaTraceExpiredQueue.Pop()
}
}()
}
}()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package cpuanalyzer

import (
"math/rand"
"strconv"
"testing"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
)

var (
cnt int
quitCnt int
)


func TestJavaTraceDeleteQueue(t *testing.T) {

jt := make(map[string]*TransactionIdEvent, 100000)
testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools()
mycfg := &Config{SegmentSize: 40,JavaTraceDeleteInterval:15,JavaTraceExpirationTime: 10}
ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg}
ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue()
ca.Start()
for i := 0; i < 20; i++ {

ev := new(TransactionIdEvent)
ev.TraceId = strconv.Itoa(rand.Intn(10000))
ev.PidString = strconv.Itoa(rand.Intn(10000))
ev.IsEntry = 1
key:= ev.TraceId + ev.PidString
ca.javaTraces[key] = ev
val := new(deleteVal)
val.key = ev.TraceId+ev.PidString
val.enterTime = time.Now()
ca.javaTraceExpiredQueue.Push(*val)
t.Logf("pid=%s, tid=%s enter time=%s\n",ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000"))
cnt++
time.Sleep(3 * time.Second)
}
time.Sleep(10 * timeDuration)

if cnt != quitCnt {
t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", cnt, quitCnt)
} else {
t.Logf("All javatraces have cleaned normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt)
}
time.Sleep(10*time.Minute)

}
8 changes: 7 additions & 1 deletion deploy/agent/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ analyzers:
# edge_events_window_size is the size of the duration window that seats the edge events.
# The unit is second. The greater it is, the more data will be stored.
edge_events_window_size: 2
# java_trace_delete_interval is the interval for cleaning up expired data in javatraces.
# The unit is seconds.
java_trace_delete_interval: 10
# java_trace_expiration_time is the expiration time for data in javatraces.
# The unit is seconds.
java_trace_expiration_time: 30
tcpconnectanalyzer:
channel_size: 10000
wait_event_second: 10
Expand Down Expand Up @@ -242,4 +248,4 @@ observability:
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
collect_period: 15s