diff --git a/flog/fopsProvider.go b/flog/fopsProvider.go index f0d12c494..469608d42 100644 --- a/flog/fopsProvider.go +++ b/flog/fopsProvider.go @@ -42,7 +42,7 @@ func (r *fopsLoggerPersistent) Log(LogLevel eumLogLevel.Enum, log *LogData, exce if LogLevel != eumLogLevel.NoneLevel { // 上传到FOPS时需要 if t := trace.CurTraceContext.Get(); t != nil { - log.TraceId = t.GetTraceId() + log.TraceId = t.TraceId } log.Content = mustCompile.ReplaceAllString(log.Content, "") log.AppId = strconv.FormatInt(core.AppId, 10) diff --git a/test/trace_test.go b/test/trace_test.go index 560d71c86..73d7886a4 100644 --- a/test/trace_test.go +++ b/test/trace_test.go @@ -2,10 +2,11 @@ package test import ( "fmt" + "testing" + "github.com/farseer-go/fs/trace" "github.com/farseer-go/fs/trace/eumCallType" "github.com/stretchr/testify/assert" - "testing" ) func TestTrace(t *testing.T) { @@ -55,15 +56,10 @@ func TestTrace(t *testing.T) { iManager.TraceHand("").SetSql("", "", "", "", 0) iManager.TraceHand("").SetHttpRequest("", nil, nil, "", "", 0) - iManager.EntryQueueConsumer("", "").End(nil) iManager.EntryQueueConsumer("", "").Ignore() - iManager.EntryQueueConsumer("", "").GetList() iManager.EntryQueueConsumer("", "").GetAppInfo() - iManager.EntryQueueConsumer("", "").GetTraceId() - iManager.EntryQueueConsumer("", "").GetTraceLevel() iManager.EntryQueueConsumer("", "").AddDetail(nil) iManager.EntryQueueConsumer("", "").Error(nil) - iManager.EntryQueueConsumer("", "").GetStartTs() iManager.EntryQueueConsumer("", "").SetBody("", 0, "") } diff --git a/trace/emptyManager.go b/trace/emptyManager.go index 6e8900c23..d0b62f570 100644 --- a/trace/emptyManager.go +++ b/trace/emptyManager.go @@ -3,35 +3,35 @@ package trace type EmptyManager struct { } -func (*EmptyManager) EntryWebApi(domain string, path string, method string, contentType string, header map[string]string, requestIp string) ITraceContext { - return &emptyTraceContext{} +func (*EmptyManager) EntryWebApi(domain string, path string, method string, contentType string, header map[string]string, requestIp string) *TraceContext { + return &TraceContext{} } -func (*EmptyManager) EntryWebSocket(domain string, path string, header map[string]string, requestIp string) ITraceContext { - return &emptyTraceContext{} +func (*EmptyManager) EntryWebSocket(domain string, path string, header map[string]string, requestIp string) *TraceContext { + return &TraceContext{} } -func (*EmptyManager) EntryMqConsumer(parentTraceId, parentAppName, server string, queueName string, routingKey string) ITraceContext { - return &emptyTraceContext{} +func (*EmptyManager) EntryMqConsumer(parentTraceId, parentAppName, server string, queueName string, routingKey string) *TraceContext { + return &TraceContext{} } -func (*EmptyManager) EntryQueueConsumer(queueName, subscribeName string) ITraceContext { - return &emptyTraceContext{} +func (*EmptyManager) EntryQueueConsumer(queueName, subscribeName string) *TraceContext { + return &TraceContext{} } -func (*EmptyManager) EntryEventConsumer(server, eventName, subscribeName string) ITraceContext { - return &emptyTraceContext{} +func (*EmptyManager) EntryEventConsumer(server, eventName, subscribeName string) *TraceContext { + return &TraceContext{} } -func (*EmptyManager) EntryTask(taskName string) ITraceContext { - return &emptyTraceContext{} +func (*EmptyManager) EntryTask(taskName string) *TraceContext { + return &TraceContext{} } -func (*EmptyManager) EntryFSchedule(taskGroupName string, taskId int64, data map[string]string) ITraceContext { - return &emptyTraceContext{} +func (*EmptyManager) EntryFSchedule(taskGroupName string, taskId int64, data map[string]string) *TraceContext { + return &TraceContext{} } -func (*EmptyManager) EntryTaskGroup(taskName string, taskGroupName string, taskId int64) ITraceContext { - return &emptyTraceContext{} +func (*EmptyManager) EntryTaskGroup(taskName string, taskGroupName string, taskId int64) *TraceContext { + return &TraceContext{} } -func (*EmptyManager) EntryWatchKey(key string) ITraceContext { return &emptyTraceContext{} } -func (*EmptyManager) GetCurTrace() ITraceContext { return nil } +func (*EmptyManager) EntryWatchKey(key string) *TraceContext { return &TraceContext{} } +func (*EmptyManager) GetCurTrace() *TraceContext { return nil } func (*EmptyManager) GetTraceId() string { return "" } @@ -58,23 +58,7 @@ func (*EmptyManager) TraceRedis(method string, key string, field string) ITraceD } func (*EmptyManager) TraceHttp(method string, url string) ITraceDetail { return &emptyTraceDetail{} } -type emptyTraceContext struct{} - -func (*emptyTraceContext) Error(err error) {} -func (*emptyTraceContext) SetBody(requestBody string, statusCode int, responseBody string) {} -func (*emptyTraceContext) SetResponseBody(responseBody string) {} -func (*emptyTraceContext) GetTraceId() string { return "" } -func (*emptyTraceContext) GetTraceLevel() int { return 0 } -func (*emptyTraceContext) GetStartTs() int64 { return 0 } -func (*emptyTraceContext) End(err error) {} -func (*emptyTraceContext) Ignore() {} -func (*emptyTraceContext) AddDetail(detail ITraceDetail) {} -func (*emptyTraceContext) GetList() []any { - return []any{} -} -func (*emptyTraceContext) GetAppInfo() (string, string, string, string, string) { - return "", "", "", "", "" -} +func (*EmptyManager) Push(traceContext *TraceContext, err error) {} type emptyTraceDetail struct{} diff --git a/trace/eumTraceType/enum.go b/trace/eumTraceType/enum.go new file mode 100644 index 000000000..268079410 --- /dev/null +++ b/trace/eumTraceType/enum.go @@ -0,0 +1,34 @@ +package eumTraceType + +type Enum int + +const ( + WebApi Enum = iota // WebApi + MqConsumer // MQ消费 + QueueConsumer // 本地消费 + FSchedule // 调度中心 + Task // 本地任务 + WatchKey // ETCD + EventConsumer // 事件消费 + WebSocket // WebSocket +) + +func (e Enum) ToString() string { + switch e { + case WebApi: + return "WebApi" + case MqConsumer: + return "MqConsumer" + case QueueConsumer: + return "QueueConsumer" + case FSchedule: + return "FSchedule" + case Task: + return "Task" + case WatchKey: + return "WatchKey" + case EventConsumer: + return "EventConsumer" + } + return "" +} diff --git a/trace/global.go b/trace/global.go index 7730c61b0..dc857c73c 100644 --- a/trace/global.go +++ b/trace/global.go @@ -8,7 +8,7 @@ import ( var ScopeLevel = asyncLocal.New[[]BaseTraceDetail]() // CurTraceContext 当前请求的Trace上下文 -var CurTraceContext = asyncLocal.New[ITraceContext]() +var CurTraceContext = asyncLocal.New[*TraceContext]() var detailComment = asyncLocal.New[string]() // SetComment 添加操作的注释 @@ -29,3 +29,10 @@ func GetComment() string { detailComment.Remove() return cmt } + +func GetTraceId() string { + if traceContext := CurTraceContext.Get(); traceContext != nil { + return traceContext.TraceId + } + return "" +} diff --git a/trace/iManager.go b/trace/iManager.go index 4e7a720cd..24e9ccbc4 100644 --- a/trace/iManager.go +++ b/trace/iManager.go @@ -2,27 +2,25 @@ package trace // IManager 链路追踪管理 type IManager interface { - GetCurTrace() ITraceContext - // GetTraceId 获取当前TraceId - GetTraceId() string // EntryWebApi 创建webapi的链路追踪入口 - EntryWebApi(domain string, path string, method string, contentType string, headerDictionary map[string]string, requestIp string) ITraceContext + EntryWebApi(domain string, path string, method string, contentType string, headerDictionary map[string]string, requestIp string) *TraceContext // EntryWebSocket 创建WebSocket的链路追踪入口 - EntryWebSocket(domain string, path string, headerDictionary map[string]string, requestIp string) ITraceContext + EntryWebSocket(domain string, path string, headerDictionary map[string]string, requestIp string) *TraceContext // EntryMqConsumer 创建MQ消费入口 - EntryMqConsumer(parentTraceId, parentAppName, server string, queueName string, routingKey string) ITraceContext + EntryMqConsumer(parentTraceId, parentAppName, server string, queueName string, routingKey string) *TraceContext // EntryQueueConsumer 创建Queue消费入口 - EntryQueueConsumer(queueName, subscribeName string) ITraceContext + EntryQueueConsumer(queueName, subscribeName string) *TraceContext // EntryEventConsumer 创建Event消费入口 - EntryEventConsumer(server, eventName, subscribeName string) ITraceContext + EntryEventConsumer(server, eventName, subscribeName string) *TraceContext // EntryTask 创建本地任务入口 - EntryTask(taskName string) ITraceContext + EntryTask(taskName string) *TraceContext // EntryTaskGroup 创建本地任务入口(调度中心专用) - EntryTaskGroup(taskName string, taskGroupName string, taskId int64) ITraceContext + EntryTaskGroup(taskName string, taskGroupName string, taskId int64) *TraceContext // EntryFSchedule 创建调度中心入口 - EntryFSchedule(taskGroupName string, taskId int64, data map[string]string) ITraceContext + EntryFSchedule(taskGroupName string, taskId int64, data map[string]string) *TraceContext // EntryWatchKey 创建etcd入口 - EntryWatchKey(key string) ITraceContext + EntryWatchKey(key string) *TraceContext + // TraceDatabaseOpen 数据库埋点 TraceDatabaseOpen(dbName string, connectString string) ITraceDetail // TraceDatabase 数据库埋点 @@ -43,4 +41,6 @@ type IManager interface { TraceRedis(method string, key string, field string) ITraceDetail // TraceHttp http埋点 TraceHttp(method string, url string) ITraceDetail + // 推送到队列 + Push(traceContext *TraceContext, err error) } diff --git a/trace/iTraceContext.go b/trace/iTraceContext.go deleted file mode 100644 index 7f18d2988..000000000 --- a/trace/iTraceContext.go +++ /dev/null @@ -1,26 +0,0 @@ -package trace - -type ITraceContext interface { - // End 结束 - End(err error) - // SetBody 设置webapi的响应报文 - SetBody(requestBody string, statusCode int, responseBody string) - // SetResponseBody 设置webapi的响应报文 - SetResponseBody(responseBody string) - // GetTraceId 获取traceId - GetTraceId() string - // GetStartTs 获取链路开启时间 - GetStartTs() int64 - // GetList 获取链路明细 - GetList() []any - // AddDetail 添加链路明细 - AddDetail(detail ITraceDetail) - // Error 异常信息 - Error(err error) - // Ignore 忽略这次的链路追踪 - Ignore() - // GetAppInfo 获取应用信息 - GetAppInfo() (string, string, string, string, string) - // GetTraceLevel 得到当前链路层 - GetTraceLevel() int -} diff --git a/trace/trackContext.go b/trace/trackContext.go new file mode 100644 index 000000000..71b5aae41 --- /dev/null +++ b/trace/trackContext.go @@ -0,0 +1,128 @@ +package trace + +import ( + "time" + + "github.com/farseer-go/collections" + "github.com/farseer-go/fs/dateTime" + "github.com/farseer-go/fs/trace/eumTraceType" +) + +type TraceContext struct { + TraceId string `json:"tid"` // 上下文ID + AppId string `json:"aid"` // 应用ID + AppName string `json:"an"` // 应用名称 + AppIp string `json:"aip"` // 应用IP + ParentAppName string `json:"pn"` // 上游应用 + TraceLevel int `json:"tl"` // 逐层递增(显示上下游顺序) + StartTs int64 `json:"st"` // 调用开始时间戳(微秒) + EndTs int64 `json:"et"` // 调用结束时间戳(微秒) + UseTs time.Duration `json:"ut"` // 总共使用时间(微秒) + UseDesc string `json:"ud"` // 总共使用时间(描述) + TraceType eumTraceType.Enum `json:"tt"` // 状态码 + List []any `json:"l"` // 调用的上下文trace.ITraceDetail + TraceCount int `json:"tc"` // 追踪明细数量 + ignore bool // 忽略这次的链路追踪 + ignoreDetail bool // 忽略链路明细 + Exception *ExceptionStack `json:"e"` // 异常信息 + WebContext + ConsumerContext + TaskContext + WatchKeyContext + CreateAt dateTime.DateTime `json:"ca"` // 请求时间 +} + +type WebContext struct { + WebDomain string `json:"wd"` // 请求域名 + WebPath string `json:"wp"` // 请求地址 + WebMethod string `json:"wm"` // 请求方式 + WebContentType string `json:"wct"` // 请求内容类型 + WebStatusCode int `json:"wsc"` // 状态码 + WebHeaders collections.Dictionary[string, string] `json:"wh"` // 请求头部 + WebRequestBody string `json:"wrb"` // 请求参数 + WebResponseBody string `json:"wpb"` // 输出参数 + WebRequestIp string `json:"wip"` // 客户端IP +} + +func (receiver WebContext) IsNil() bool { + return receiver.WebDomain == "" && receiver.WebPath == "" && receiver.WebMethod == "" && receiver.WebContentType == "" && receiver.WebStatusCode == 0 +} + +type ConsumerContext struct { + ConsumerServer string `json:"cs"` // MQ服务器 + ConsumerQueueName string `json:"cq"` // 队列名称 + ConsumerRoutingKey string `json:"cr"` // 路由KEY +} + +func (receiver ConsumerContext) IsNil() bool { + return receiver.ConsumerServer == "" && receiver.ConsumerQueueName == "" && receiver.ConsumerRoutingKey == "" +} + +type TaskContext struct { + TaskName string `json:"tn"` // 任务名称 + TaskGroupName string `json:"tgn"` // 任务组ID + TaskId int64 `json:"tid"` // 任务ID + TaskData collections.Dictionary[string, string] `json:"td"` // 任务数据 +} + +func (receiver TaskContext) IsNil() bool { + return receiver.TaskName == "" && receiver.TaskGroupName == "" && receiver.TaskId == 0 +} + +type WatchKeyContext struct { + WatchKey string `json:"wk"` // KEY +} + +func (receiver WatchKeyContext) IsNil() bool { + return receiver.WatchKey == "" +} + +func (receiver *TraceContext) SetBody(requestBody string, statusCode int, responseBody string) { + receiver.WebContext.WebRequestBody = requestBody + receiver.WebContext.WebStatusCode = statusCode + receiver.WebContext.WebResponseBody = responseBody +} + +func (receiver *TraceContext) SetResponseBody(responseBody string) { + receiver.WebContext.WebResponseBody = responseBody +} + +func (receiver *TraceContext) Ignore() { + receiver.ignore = true +} + +func (receiver *TraceContext) IsIgnore() bool { + return receiver.ignore +} + +func (receiver *TraceContext) IgnoreDetail() { + //receiver.AddDetail() + receiver.ignoreDetail = true +} + +// GetList 获取链路明细 +func (receiver *TraceContext) GetList() []any { + return receiver.List +} + +// AddDetail 添加链路明细 +func (receiver *TraceContext) AddDetail(detail ITraceDetail) { + // 没有忽略明细,才要加入 + if !receiver.ignoreDetail { + receiver.List = append(receiver.List, detail) + } +} + +func (receiver *TraceContext) Error(err error) { + if err != nil { + receiver.Exception = &ExceptionStack{ + ExceptionIsException: true, + ExceptionMessage: err.Error(), + } + receiver.Exception.ExceptionCallFile, receiver.Exception.ExceptionCallFuncName, receiver.Exception.ExceptionCallLine = GetCallerInfo() + } +} + +func (receiver *TraceContext) GetAppInfo() (string, string, string, string, string) { + return receiver.TraceId, receiver.AppName, receiver.AppId, receiver.AppIp, receiver.ParentAppName +}