Skip to content

Commit

Permalink
新增:事件驱动和链路追踪
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Jan 16, 2024
1 parent 659f11c commit 68c8321
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 2 deletions.
21 changes: 21 additions & 0 deletions TraceDetailEvent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package linkTrace

import (
"fmt"
"github.com/farseer-go/fs/flog"
"github.com/farseer-go/fs/trace"
)

// TraceDetailEventConsumer 事件埋点
type TraceDetailEventConsumer struct {
trace.BaseTraceDetail
Name string
}

func (receiver *TraceDetailEventConsumer) GetTraceDetail() *trace.BaseTraceDetail {
return &receiver.BaseTraceDetail
}

func (receiver *TraceDetailEventConsumer) ToString() string {
return fmt.Sprintf("[%s]耗时:%s, %s", flog.Yellow(receiver.CallType.ToString()), flog.Red(receiver.UseTs.String()), receiver.Name)
}
3 changes: 3 additions & 0 deletions eumTraceType/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
FSchedule // 调度中心
Task // 本地任务
WatchKey // ETCD
EventConsumer // 事件消费
)

func (e Enum) ToString() string {
Expand All @@ -25,6 +26,8 @@ func (e Enum) ToString() string {
return "Task"
case WatchKey:
return "WatchKey"
case EventConsumer:
return "EventConsumer"
}
return ""
}
41 changes: 40 additions & 1 deletion traceManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (*traceManager) EntryQueueConsumer(queueName, subscribeName string) trace.I
StartTs: time.Now().UnixMicro(),
TraceType: eumTraceType.QueueConsumer,
ConsumerContext: ConsumerContext{
ConsumerServer: fmt.Sprintf("%s/%s/%v", core.AppName, core.AppIp, core.AppId),
ConsumerServer: fmt.Sprintf("本地Queue/%s/%s/%v", core.AppName, core.AppIp, core.AppId),
ConsumerQueueName: queueName + "/" + subscribeName,
},
}
Expand All @@ -96,6 +96,35 @@ func (*traceManager) EntryQueueConsumer(queueName, subscribeName string) trace.I
return context
}

// EntryEventConsumer event 事件消费埋点
func (receiver *traceManager) EntryEventConsumer(eventName, subscribeName string) trace.ITraceContext {
// 事件消费,一般是由其它入口的程序触发的,所以这里先看能不能取到之前的上下文
var traceId string
var parentAppName string
if cur := receiver.GetCurTrace(); cur != nil {
traceId = cur.GetTraceId()
parentAppName = core.AppName
} else {
traceId = parse.ToString(sonyflake.GenerateId())
}
context := &TraceContext{
AppId: parse.ToString(core.AppId),
AppName: core.AppName,
AppIp: core.AppIp,
ParentAppName: parentAppName,
TraceId: traceId,
StartTs: time.Now().UnixMicro(),
TraceType: eumTraceType.EventConsumer,
ConsumerContext: ConsumerContext{
ConsumerServer: fmt.Sprintf("本地Event/%s/%s/%v", core.AppName, core.AppIp, core.AppId),
ConsumerQueueName: eventName + "/" + subscribeName,
},
}
trace.CurTraceContext.Set(context)
trace.ScopeLevel.Set([]trace.BaseTraceDetail{})
return context
}

// EntryTask 创建本地任务入口
func (*traceManager) EntryTask(taskName string) trace.ITraceContext {
traceId := parse.ToString(sonyflake.GenerateId())
Expand Down Expand Up @@ -233,6 +262,16 @@ func (*traceManager) TraceHand(name string) trace.ITraceDetail {
return detail
}

// TraceEventPublish 事件发布
func (*traceManager) TraceEventPublish(eventName string) trace.ITraceDetail {
detail := &TraceDetailEventConsumer{
BaseTraceDetail: newTraceDetail(eumCallType.EventPublish, ""),
Name: eventName,
}
add(detail)
return detail
}

// TraceMqSend mq发送埋点
func (*traceManager) TraceMqSend(method string, server string, exchange string, routingKey string) trace.ITraceDetail {
detail := &TraceDetailMq{
Expand Down
2 changes: 1 addition & 1 deletion trackContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (receiver *TraceContext) printLog() {
switch receiver.TraceType {
case eumTraceType.WebApi:
flog.Printf("【%s链路追踪】TraceId:%s,耗时:%s,%s\n%s\n", receiver.TraceType.ToString(), flog.Green(parse.ToString(receiver.TraceId)), flog.Red(receiver.UseTs.String()), receiver.WebContext.WebPath, logs)
case eumTraceType.MqConsumer, eumTraceType.QueueConsumer:
case eumTraceType.MqConsumer, eumTraceType.QueueConsumer, eumTraceType.EventConsumer:
flog.Printf("【%s链路追踪】TraceId:%s,耗时:%s,%s\n%s\n", receiver.TraceType.ToString(), flog.Green(parse.ToString(receiver.TraceId)), flog.Red(receiver.UseTs.String()), receiver.ConsumerContext.ConsumerQueueName, logs)
case eumTraceType.Task, eumTraceType.FSchedule:
flog.Printf("【%s链路追踪】TraceId:%s,耗时:%s,%s %s\n%s\n", receiver.TraceType.ToString(), flog.Green(parse.ToString(receiver.TraceId)), flog.Red(receiver.UseTs.String()), receiver.TaskContext.TaskName, receiver.TaskContext.TaskGroupName, logs)
Expand Down

0 comments on commit 68c8321

Please sign in to comment.