Skip to content

Commit

Permalink
feat(collector-agent): error
Browse files Browse the repository at this point in the history
- error analysis
- common,collector-agent  `EXP_V2`
- python new api `add_exception`
  • Loading branch information
eeliu committed May 6, 2024
1 parent 24f9809 commit d41d32d
Show file tree
Hide file tree
Showing 49 changed files with 722 additions and 369 deletions.
106 changes: 53 additions & 53 deletions collector-agent/agent/AgentRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@ type AgentRouter struct {
}

type TSpanEvent struct {
Name string `json:"name"`
ExceptionInfo string `json:"EXP,omitempty"`
DestinationId string `json:"dst,omitempty"`
NextSpanId int64 `json:"nsid,string,omitempty"`
EndPoint string `json:"server,omitempty"`
StartElapsed int32 `json:"S"`
EndElapsed int32 `json:"E"`
StartElapsedV2 int32 `json:":S"`
EndElapsedV2 int32 `json:":E"`
ServiceType int32 `json:"stp,string"`
Clues []string `json:"clues,omitempty"`
Calls []TSpanEvent `json:"calls,omitempty"`
SqlMeta string `json:"SQL,omitempty"`
Name string `json:"name"`
ExceptionInfo string `json:"EXP,omitempty"`
ExceptionInfoV2 *TExceptionInfo `json:"EXP_V2,omitempty"`
DestinationId string `json:"dst,omitempty"`
NextSpanId int64 `json:"nsid,string,omitempty"`
EndPoint string `json:"server,omitempty"`
StartElapsed int32 `json:"S"`
EndElapsed int32 `json:"E"`
StartElapsedV2 int32 `json:":S"`
EndElapsedV2 int32 `json:":E"`
ServiceType int32 `json:"stp,string"`
Clues []string `json:"clues,omitempty"`
Calls []*TSpanEvent `json:"calls,omitempty"`
SqlMeta string `json:"SQL,omitempty"`
}

func (spanEv *TSpanEvent) GetEndElapsed() int32 {
Expand All @@ -70,35 +71,43 @@ type TErrorInfo struct {
Line int `json:"line"`
}

type TExceptionInfo struct {
ClassName string `json:"C"`
Message string `json:"M"`
StartTime int64 `json:":S"`
}

type TSpan struct {
AppServerType int32 `json:"FT"`
AppServerTypeV2 int32 `json:":FT"`
ParentAppServerType int32 `json:"ptype,string"`
ParentSpanId int64 `json:"psid,string"`
ParentApplicationName string `json:"pname"`
StartTime int64 `json:"S"`
StartTimeV2 int64 `json:":S"`
ElapsedTime int32 `json:"E"`
ElapsedTimeV2 int32 `json:":E"`
AppId string `json:"appid"`
AppIdV2 string `json:":appid"`
AppName string `json:"appname"`
AppNameV2 string `json:":appname"`
Calls []TSpanEvent `json:"calls"`
Clues []string `json:"clues,omitempty"`
SpanName string `json:"name"`
SpanId int64 `json:"sid,string"`
ServerType int32 `json:"stp,string"`
TransactionId string `json:"tid"`
Uri string `json:"uri"`
UT string `json:"UT,omitempty"`
EndPoint string `json:"server"`
RemoteAddr string `json:"client"`
AcceptorHost string `json:"Ah"`
ExceptionInfo string `json:"EXP,omitempty"`
ErrorInfo *TErrorInfo `json:"ERR,omitempty"`
NginxHeader string `json:"NP,omitempty"`
ApacheHeader string `json:"AP,omitempty"`
AppServerType int32 `json:"FT"`
AppServerTypeV2 int32 `json:":FT"`
ParentAppServerType int32 `json:"ptype,string"`
ParentSpanId int64 `json:"psid,string"`
ParentApplicationName string `json:"pname"`
StartTime int64 `json:"S"`
StartTimeV2 int64 `json:":S"`
ElapsedTime int32 `json:"E"`
ElapsedTimeV2 int32 `json:":E"`
AppId string `json:"appid"`
AppIdV2 string `json:":appid"`
AppName string `json:"appname"`
AppNameV2 string `json:":appname"`
Calls []*TSpanEvent `json:"calls"`
Clues []string `json:"clues,omitempty"`
SpanName string `json:"name"`
SpanId int64 `json:"sid,string"`
ServerType int32 `json:"stp,string"`
TransactionId string `json:"tid"`
Uri string `json:"uri"`
UT string `json:"UT,omitempty"`
EndPoint string `json:"server"`
RemoteAddr string `json:"client"`
AcceptorHost string `json:"Ah"`
ExceptionInfo string `json:"EXP,omitempty"`
ExceptionInfoV2 *TExceptionInfo `json:"EXP_V2,omitempty"`
ErrorInfo *TErrorInfo `json:"ERR,omitempty"`
ErrorMarked int32 `json:"EA,omitempty"`
NginxHeader string `json:"NP,omitempty"`
ApacheHeader string `json:"AP,omitempty"`
}

func (span *TSpan) IsFailed() bool {
Expand Down Expand Up @@ -188,17 +197,6 @@ func (manager *AgentRouter) Clean() {
manager.rwMutex.RUnlock()
}

//todo rename createAgent
func (manager *AgentRouter) createAgent(id, name string, agentType int32, startTime string) *GrpcAgent {
agent := GrpcAgent{PingId: manager.PingId, AgentOnLine: false}
manager.PingId += 1
agent.Init(id, name, agentType, startTime)
agent.Start()

log.Infof("agent:%v is launched", &agent)
return &agent
}

func GetAgentInfo(span *TSpan) (appid, name string, appServerType int32, startTime string, err error) {

// new feat: get current startTime
Expand Down Expand Up @@ -237,6 +235,7 @@ func (manager *AgentRouter) DispatchPacket(packet *RawPacket) error {
span := &TSpan{
// ParentSpanId:-1 is part of logic in pinpoint
ParentSpanId: -1,
ErrorMarked: 0,
}

if err := json.Unmarshal(packet.RawData, span); err != nil {
Expand All @@ -258,7 +257,8 @@ func (manager *AgentRouter) DispatchPacket(packet *RawPacket) error {
if _t, OK := manager.AgentMap[appid]; OK {
agent = _t
} else {
agent = manager.createAgent(appid, appName, serverType, startTime)
agent = createGrpcAgent(appid, appName, serverType, manager.PingId, startTime)
manager.PingId += 1
}
manager.AgentMap[appid] = agent
manager.rwMutex.Unlock()
Expand Down
27 changes: 27 additions & 0 deletions collector-agent/agent/AgentRouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package agent
import (
"encoding/json"
"testing"

"google.golang.org/grpc/metadata"
)

func TestGetAgentInfo(t *testing.T) {
Expand All @@ -29,6 +31,31 @@ func TestGetAgentInfo(t *testing.T) {

}

func Test_EASpan(t *testing.T) {
msg := `{":E":3,":FT":1700,":S":1714454599243,"EA":1,"ERR":{"file":"FastAPIRequestPlugin","line":0,"msg":"status_code:500 INTERNAL SERVER ERROR"},"UT":"/test_exception_in_Chain","appid":"cd.dev.test.flask","appname":"cd.dev.test.py","calls":[{":E":0,":S":1,"EXP":"abcd","calls":[{":E":0,":S":0,"EXP":"abc","name":"call_exp_01","stp":"1701"},{":E":0,":S":0,"EXP":"abcd","name":"call_exp_02","stp":"1701"}],"name":"main","stp":"1701"}],"client":"172.24.0.1","clues":["206:GET","46:500 INTERNAL SERVER ERROR"],"name":"BaseFlaskrequest","server":"localhost:8184","sid":"212686650","stp":"1700","tid":"cd.dev.test.flask^1714448478218^1205","uri":"/test_exception_in_Chain","EXP_V2":{"M":"asgdf","C":"xxxx",":S":2}}`
var tSpan TSpan

err := json.Unmarshal([]byte(msg), &tSpan)
if err != nil {
t.Error(err)
}

if tSpan.ErrorMarked != 1 {
t.Errorf("EA missed")
}

md := metadata.New(map[string]string{
"test": "2",
"test2": "string",
})
ea := createErrorAnalysisFilter(md)

meta := ea.scanTSpanTree(&tSpan)
if len(meta.Exceptions) == 0 {
t.Errorf("empty exception %v", meta.Exceptions)
}
}

func TestTspan(t *testing.T) {
msg := `{"E":1,"FT":1500,":FT":1500,"ptype":"1500","pname":"abc_d","psid":"23563","NP":"t=1617083759.535 D=0.000","S":1617083759798,"appid":"app-2",":appid":"app-2",
":appname":"APP-2","appname":"APP-2","calls":[{"E":1,"calls":[{"E":1,"S":0,"clues":["-1:input parameters","14:return value"],"name":"abc"}],"S":0,"clues":["-1:input parameters","14:return value"],"name":"app\\AppDate::abc","SQL":"select* from abc"}],"client":"10.34.135.145","clues":["46:200"],"name":"PHP Request: fpm-fcgi","server":"10.34.130.152:8000","sid":"726125302","stp":"1500","tid":"app-2^1617083747^5506","uri":"/index.php?type=get_date","Ah":"123.35.36.3/host","EXP":"exp","ERR":{"msg":"error_msg","file":"file.cc","line":123}}`
Expand Down
117 changes: 117 additions & 0 deletions collector-agent/agent/ErrorAnalysis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package agent

import (
"github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/common"
v1 "github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/pinpoint-grpc-idl/proto/v1"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
)

type ErrorAnalysisFilter struct {
md metadata.MD
// only one gocoroutine working on chn, so no needs to lock it
id int64
}

func createErrorAnalysisFilter(base metadata.MD) *ErrorAnalysisFilter {
eaf := &ErrorAnalysisFilter{
md: base,
id: 0,
}
return eaf
}

func (e *ErrorAnalysisFilter) sendExpMetaData(meta *v1.PExceptionMetaData) {
config := common.GetConfig()
conn, err := common.CreateGrpcConnection(config.AgentAddress)
if err != nil {
log.Warnf("connect:%s failed. %s", config.AgentAddress, err)
return
}

defer conn.Close()
client := v1.NewMetadataClient(conn)

ctx, cancel := common.BuildPinpointCtx(config.MetaDataTimeWaitSec, e.md)
defer cancel()
result, err := client.RequestExceptionMetaData(ctx, meta)
if err != nil {
log.Warnf("RequestExceptionMetaData failed. reason: %v", err)
}
log.Debugf("RequestExceptionMetaData %v %v", meta, result)
}

func (e *ErrorAnalysisFilter) parseException(spanEv []*TSpanEvent, exceptions *[]*v1.PException,
parentErrorId int64, depth int32, parentExp *TExceptionInfo, startTime int64) error {
for _, ev := range spanEv {
if ev.ExceptionInfoV2 != nil {
exp := &v1.PException{
ExceptionMessage: ev.ExceptionInfoV2.Message,
ExceptionClassName: ev.ExceptionInfoV2.ClassName,
StartTime: ev.ExceptionInfoV2.StartTime + startTime,
}
// TODO, just compare the message right now
if parentExp != nil && parentExp.Message == ev.ExceptionInfoV2.Message {
exp.ExceptionId = parentErrorId
} else {
exp.ExceptionId = e.getNewExceptionId()
}

exp.ExceptionDepth = depth + 1
*exceptions = append(*exceptions, exp)
if len(ev.Calls) > 0 {
e.parseException(ev.Calls, exceptions, exp.ExceptionId, exp.ExceptionDepth, ev.ExceptionInfoV2, startTime)
}
} else {
if len(ev.Calls) > 0 {
e.parseException(ev.Calls, exceptions, parentErrorId, depth, parentExp, startTime)
}
}
}
return nil
}

func (e *ErrorAnalysisFilter) getNewExceptionId() int64 {
e.id += 1
return e.id
}

func (e *ErrorAnalysisFilter) scanTSpanTree(span *TSpan) *v1.PExceptionMetaData {
e_md := &v1.PExceptionMetaData{
TransactionId: common.TypeV1_String_TransactionId(span.TransactionId),
SpanId: span.SpanId,
UriTemplate: span.UT,
}
var spanExp []*v1.PException
depth := int32(0)
startTime := span.GetStartTime()
if span.ExceptionInfoV2 != nil {
exp := &v1.PException{
ExceptionClassName: span.ExceptionInfoV2.ClassName,
ExceptionMessage: span.ExceptionInfoV2.Message,
ExceptionId: e.getNewExceptionId(),
ExceptionDepth: depth,
StartTime: span.ExceptionInfoV2.StartTime + startTime,
}
spanExp = append(spanExp, exp)
e.parseException(span.Calls, &spanExp, exp.ExceptionId, depth, span.ExceptionInfoV2, startTime)
} else {
e.parseException(span.Calls, &spanExp, 0, depth, nil, startTime)
}

e_md.Exceptions = spanExp

return e_md
}

func (e *ErrorAnalysisFilter) Interceptor(span *TSpan) bool {
if span.ErrorMarked == 1 {
// parse the error
meta := e.scanTSpanTree(span)
// plan one: send once
go e.sendExpMetaData(meta)
} else {
log.Debugf("ErrorAnalysisFilter Interceptor, not exception mark")
}
return true
}
53 changes: 53 additions & 0 deletions collector-agent/agent/ErrorAnalysis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package agent

import (
"testing"

"google.golang.org/grpc/metadata"
)

func TestErrorAnalysisFilter_Interceptor(t *testing.T) {
md := metadata.New(map[string]string{
"test": "2",
"test2": "string",
})
ea := createErrorAnalysisFilter(md)

cases := []*TSpan{
{
ErrorMarked: 0,
},
{
ErrorMarked: 1,
},
{
ErrorMarked: 1,
ExceptionInfo: "abc",
Calls: []*TSpanEvent{
{
ExceptionInfo: "abc",
},
{
ExceptionInfo: "",
},
},
},
{
ErrorMarked: 0,
ExceptionInfo: "abc",
Calls: []*TSpanEvent{
{
ExceptionInfo: "abc",
},
{
ExceptionInfo: "",
},
},
},
}

for _, c := range cases {
ea.Interceptor(c)
}

}
Loading

0 comments on commit d41d32d

Please sign in to comment.