Skip to content

Commit

Permalink
调整链路实现
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Nov 30, 2024
1 parent 52fae26 commit cc9e6db
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 81 deletions.
2 changes: 1 addition & 1 deletion flog/fopsProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *fopsLoggerPersistent) Log(LogLevel eumLogLevel.Enum, log *LogData, exce
if LogLevel != eumLogLevel.NoneLevel {
// 上传到FOPS时需要
if t := trace.CurTraceContext.Get(); t != nil {
log.TraceId = t.GetTraceId()
log.TraceId = t.TraceId
}
log.Content = mustCompile.ReplaceAllString(log.Content, "")
log.AppId = strconv.FormatInt(core.AppId, 10)
Expand Down
8 changes: 2 additions & 6 deletions test/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package test

import (
"fmt"
"testing"

"github.com/farseer-go/fs/trace"
"github.com/farseer-go/fs/trace/eumCallType"
"github.com/stretchr/testify/assert"
"testing"
)

func TestTrace(t *testing.T) {
Expand Down Expand Up @@ -55,15 +56,10 @@ func TestTrace(t *testing.T) {
iManager.TraceHand("").SetSql("", "", "", "", 0)
iManager.TraceHand("").SetHttpRequest("", nil, nil, "", "", 0)

iManager.EntryQueueConsumer("", "").End(nil)
iManager.EntryQueueConsumer("", "").Ignore()
iManager.EntryQueueConsumer("", "").GetList()
iManager.EntryQueueConsumer("", "").GetAppInfo()
iManager.EntryQueueConsumer("", "").GetTraceId()
iManager.EntryQueueConsumer("", "").GetTraceLevel()
iManager.EntryQueueConsumer("", "").AddDetail(nil)
iManager.EntryQueueConsumer("", "").Error(nil)
iManager.EntryQueueConsumer("", "").GetStartTs()
iManager.EntryQueueConsumer("", "").SetBody("", 0, "")
}

Expand Down
54 changes: 19 additions & 35 deletions trace/emptyManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,35 @@ package trace
type EmptyManager struct {
}

func (*EmptyManager) EntryWebApi(domain string, path string, method string, contentType string, header map[string]string, requestIp string) ITraceContext {
return &emptyTraceContext{}
func (*EmptyManager) EntryWebApi(domain string, path string, method string, contentType string, header map[string]string, requestIp string) *TraceContext {
return &TraceContext{}
}
func (*EmptyManager) EntryWebSocket(domain string, path string, header map[string]string, requestIp string) ITraceContext {
return &emptyTraceContext{}
func (*EmptyManager) EntryWebSocket(domain string, path string, header map[string]string, requestIp string) *TraceContext {
return &TraceContext{}
}

func (*EmptyManager) EntryMqConsumer(parentTraceId, parentAppName, server string, queueName string, routingKey string) ITraceContext {
return &emptyTraceContext{}
func (*EmptyManager) EntryMqConsumer(parentTraceId, parentAppName, server string, queueName string, routingKey string) *TraceContext {
return &TraceContext{}
}
func (*EmptyManager) EntryQueueConsumer(queueName, subscribeName string) ITraceContext {
return &emptyTraceContext{}
func (*EmptyManager) EntryQueueConsumer(queueName, subscribeName string) *TraceContext {
return &TraceContext{}
}
func (*EmptyManager) EntryEventConsumer(server, eventName, subscribeName string) ITraceContext {
return &emptyTraceContext{}
func (*EmptyManager) EntryEventConsumer(server, eventName, subscribeName string) *TraceContext {
return &TraceContext{}
}
func (*EmptyManager) EntryTask(taskName string) ITraceContext {
return &emptyTraceContext{}
func (*EmptyManager) EntryTask(taskName string) *TraceContext {
return &TraceContext{}
}

func (*EmptyManager) EntryFSchedule(taskGroupName string, taskId int64, data map[string]string) ITraceContext {
return &emptyTraceContext{}
func (*EmptyManager) EntryFSchedule(taskGroupName string, taskId int64, data map[string]string) *TraceContext {
return &TraceContext{}
}
func (*EmptyManager) EntryTaskGroup(taskName string, taskGroupName string, taskId int64) ITraceContext {
return &emptyTraceContext{}
func (*EmptyManager) EntryTaskGroup(taskName string, taskGroupName string, taskId int64) *TraceContext {
return &TraceContext{}
}

func (*EmptyManager) EntryWatchKey(key string) ITraceContext { return &emptyTraceContext{} }
func (*EmptyManager) GetCurTrace() ITraceContext { return nil }
func (*EmptyManager) EntryWatchKey(key string) *TraceContext { return &TraceContext{} }
func (*EmptyManager) GetCurTrace() *TraceContext { return nil }
func (*EmptyManager) GetTraceId() string {
return ""
}
Expand All @@ -58,23 +58,7 @@ func (*EmptyManager) TraceRedis(method string, key string, field string) ITraceD
}
func (*EmptyManager) TraceHttp(method string, url string) ITraceDetail { return &emptyTraceDetail{} }

type emptyTraceContext struct{}

func (*emptyTraceContext) Error(err error) {}
func (*emptyTraceContext) SetBody(requestBody string, statusCode int, responseBody string) {}
func (*emptyTraceContext) SetResponseBody(responseBody string) {}
func (*emptyTraceContext) GetTraceId() string { return "" }
func (*emptyTraceContext) GetTraceLevel() int { return 0 }
func (*emptyTraceContext) GetStartTs() int64 { return 0 }
func (*emptyTraceContext) End(err error) {}
func (*emptyTraceContext) Ignore() {}
func (*emptyTraceContext) AddDetail(detail ITraceDetail) {}
func (*emptyTraceContext) GetList() []any {
return []any{}
}
func (*emptyTraceContext) GetAppInfo() (string, string, string, string, string) {
return "", "", "", "", ""
}
func (*EmptyManager) Push(traceContext *TraceContext, err error) {}

type emptyTraceDetail struct{}

Expand Down
34 changes: 34 additions & 0 deletions trace/eumTraceType/enum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package eumTraceType

type Enum int

const (
WebApi Enum = iota // WebApi
MqConsumer // MQ消费
QueueConsumer // 本地消费
FSchedule // 调度中心
Task // 本地任务
WatchKey // ETCD
EventConsumer // 事件消费
WebSocket // WebSocket
)

func (e Enum) ToString() string {
switch e {
case WebApi:
return "WebApi"
case MqConsumer:
return "MqConsumer"
case QueueConsumer:
return "QueueConsumer"
case FSchedule:
return "FSchedule"
case Task:
return "Task"
case WatchKey:
return "WatchKey"
case EventConsumer:
return "EventConsumer"
}
return ""
}
9 changes: 8 additions & 1 deletion trace/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
var ScopeLevel = asyncLocal.New[[]BaseTraceDetail]()

// CurTraceContext 当前请求的Trace上下文
var CurTraceContext = asyncLocal.New[ITraceContext]()
var CurTraceContext = asyncLocal.New[*TraceContext]()
var detailComment = asyncLocal.New[string]()

// SetComment 添加操作的注释
Expand All @@ -29,3 +29,10 @@ func GetComment() string {
detailComment.Remove()
return cmt
}

func GetTraceId() string {
if traceContext := CurTraceContext.Get(); traceContext != nil {
return traceContext.TraceId
}
return ""
}
24 changes: 12 additions & 12 deletions trace/iManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,25 @@ package trace

// IManager 链路追踪管理
type IManager interface {
GetCurTrace() ITraceContext
// GetTraceId 获取当前TraceId
GetTraceId() string
// EntryWebApi 创建webapi的链路追踪入口
EntryWebApi(domain string, path string, method string, contentType string, headerDictionary map[string]string, requestIp string) ITraceContext
EntryWebApi(domain string, path string, method string, contentType string, headerDictionary map[string]string, requestIp string) *TraceContext
// EntryWebSocket 创建WebSocket的链路追踪入口
EntryWebSocket(domain string, path string, headerDictionary map[string]string, requestIp string) ITraceContext
EntryWebSocket(domain string, path string, headerDictionary map[string]string, requestIp string) *TraceContext
// EntryMqConsumer 创建MQ消费入口
EntryMqConsumer(parentTraceId, parentAppName, server string, queueName string, routingKey string) ITraceContext
EntryMqConsumer(parentTraceId, parentAppName, server string, queueName string, routingKey string) *TraceContext
// EntryQueueConsumer 创建Queue消费入口
EntryQueueConsumer(queueName, subscribeName string) ITraceContext
EntryQueueConsumer(queueName, subscribeName string) *TraceContext
// EntryEventConsumer 创建Event消费入口
EntryEventConsumer(server, eventName, subscribeName string) ITraceContext
EntryEventConsumer(server, eventName, subscribeName string) *TraceContext
// EntryTask 创建本地任务入口
EntryTask(taskName string) ITraceContext
EntryTask(taskName string) *TraceContext
// EntryTaskGroup 创建本地任务入口(调度中心专用)
EntryTaskGroup(taskName string, taskGroupName string, taskId int64) ITraceContext
EntryTaskGroup(taskName string, taskGroupName string, taskId int64) *TraceContext
// EntryFSchedule 创建调度中心入口
EntryFSchedule(taskGroupName string, taskId int64, data map[string]string) ITraceContext
EntryFSchedule(taskGroupName string, taskId int64, data map[string]string) *TraceContext
// EntryWatchKey 创建etcd入口
EntryWatchKey(key string) ITraceContext
EntryWatchKey(key string) *TraceContext

// TraceDatabaseOpen 数据库埋点
TraceDatabaseOpen(dbName string, connectString string) ITraceDetail
// TraceDatabase 数据库埋点
Expand All @@ -43,4 +41,6 @@ type IManager interface {
TraceRedis(method string, key string, field string) ITraceDetail
// TraceHttp http埋点
TraceHttp(method string, url string) ITraceDetail
// 推送到队列
Push(traceContext *TraceContext, err error)
}
26 changes: 0 additions & 26 deletions trace/iTraceContext.go

This file was deleted.

128 changes: 128 additions & 0 deletions trace/trackContext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package trace

import (
"time"

"github.com/farseer-go/collections"
"github.com/farseer-go/fs/dateTime"
"github.com/farseer-go/fs/trace/eumTraceType"
)

type TraceContext struct {
TraceId string `json:"tid"` // 上下文ID
AppId string `json:"aid"` // 应用ID
AppName string `json:"an"` // 应用名称
AppIp string `json:"aip"` // 应用IP
ParentAppName string `json:"pn"` // 上游应用
TraceLevel int `json:"tl"` // 逐层递增(显示上下游顺序)
StartTs int64 `json:"st"` // 调用开始时间戳(微秒)
EndTs int64 `json:"et"` // 调用结束时间戳(微秒)
UseTs time.Duration `json:"ut"` // 总共使用时间(微秒)
UseDesc string `json:"ud"` // 总共使用时间(描述)
TraceType eumTraceType.Enum `json:"tt"` // 状态码
List []any `json:"l"` // 调用的上下文trace.ITraceDetail
TraceCount int `json:"tc"` // 追踪明细数量
ignore bool // 忽略这次的链路追踪
ignoreDetail bool // 忽略链路明细
Exception *ExceptionStack `json:"e"` // 异常信息
WebContext
ConsumerContext
TaskContext
WatchKeyContext
CreateAt dateTime.DateTime `json:"ca"` // 请求时间
}

type WebContext struct {
WebDomain string `json:"wd"` // 请求域名
WebPath string `json:"wp"` // 请求地址
WebMethod string `json:"wm"` // 请求方式
WebContentType string `json:"wct"` // 请求内容类型
WebStatusCode int `json:"wsc"` // 状态码
WebHeaders collections.Dictionary[string, string] `json:"wh"` // 请求头部
WebRequestBody string `json:"wrb"` // 请求参数
WebResponseBody string `json:"wpb"` // 输出参数
WebRequestIp string `json:"wip"` // 客户端IP
}

func (receiver WebContext) IsNil() bool {
return receiver.WebDomain == "" && receiver.WebPath == "" && receiver.WebMethod == "" && receiver.WebContentType == "" && receiver.WebStatusCode == 0
}

type ConsumerContext struct {
ConsumerServer string `json:"cs"` // MQ服务器
ConsumerQueueName string `json:"cq"` // 队列名称
ConsumerRoutingKey string `json:"cr"` // 路由KEY
}

func (receiver ConsumerContext) IsNil() bool {
return receiver.ConsumerServer == "" && receiver.ConsumerQueueName == "" && receiver.ConsumerRoutingKey == ""
}

type TaskContext struct {
TaskName string `json:"tn"` // 任务名称
TaskGroupName string `json:"tgn"` // 任务组ID
TaskId int64 `json:"tid"` // 任务ID
TaskData collections.Dictionary[string, string] `json:"td"` // 任务数据
}

func (receiver TaskContext) IsNil() bool {
return receiver.TaskName == "" && receiver.TaskGroupName == "" && receiver.TaskId == 0
}

type WatchKeyContext struct {
WatchKey string `json:"wk"` // KEY
}

func (receiver WatchKeyContext) IsNil() bool {
return receiver.WatchKey == ""
}

func (receiver *TraceContext) SetBody(requestBody string, statusCode int, responseBody string) {
receiver.WebContext.WebRequestBody = requestBody
receiver.WebContext.WebStatusCode = statusCode
receiver.WebContext.WebResponseBody = responseBody
}

func (receiver *TraceContext) SetResponseBody(responseBody string) {
receiver.WebContext.WebResponseBody = responseBody
}

func (receiver *TraceContext) Ignore() {
receiver.ignore = true
}

func (receiver *TraceContext) IsIgnore() bool {
return receiver.ignore
}

func (receiver *TraceContext) IgnoreDetail() {
//receiver.AddDetail()
receiver.ignoreDetail = true
}

// GetList 获取链路明细
func (receiver *TraceContext) GetList() []any {
return receiver.List
}

// AddDetail 添加链路明细
func (receiver *TraceContext) AddDetail(detail ITraceDetail) {
// 没有忽略明细,才要加入
if !receiver.ignoreDetail {
receiver.List = append(receiver.List, detail)
}
}

func (receiver *TraceContext) Error(err error) {
if err != nil {
receiver.Exception = &ExceptionStack{
ExceptionIsException: true,
ExceptionMessage: err.Error(),
}
receiver.Exception.ExceptionCallFile, receiver.Exception.ExceptionCallFuncName, receiver.Exception.ExceptionCallLine = GetCallerInfo()
}
}

func (receiver *TraceContext) GetAppInfo() (string, string, string, string, string) {
return receiver.TraceId, receiver.AppName, receiver.AppId, receiver.AppIp, receiver.ParentAppName
}

0 comments on commit cc9e6db

Please sign in to comment.