Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): sql v3 support #595

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions .github/workflows/publish-collector-agent.yml
Original file line number Diff line number Diff line change
@@ -3,8 +3,13 @@ name: publish-collector-agent
on:
workflow_dispatch:
inputs:
tag:
description: "Give me a name: 1.2.3"
type: string
required: true
default: "0.0.0"
push:
description: Do you want to push to ghcr.io
description: "Do you want to push to ghcr.io"
required: true
default: false
type: boolean
@@ -27,14 +32,15 @@ jobs:
- name: Build the collector-agent Docker image
run: |
IMAGE_NAME=collector-agent
IMAGE_TAG=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,')
[[ "${{ github.ref }}" == "refs/tags/"* ]] && IMAGE_TAG=$(echo $IMAGE_TAG | sed -e 's/^v//')
# IMAGE_TAG=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,')
# [[ "${{ github.ref }}" == "refs/tags/"* ]] && IMAGE_TAG=$(echo $IMAGE_TAG | sed -e 's/^v//')
IMAGE_TAG=${{ inputs.tag }}
IMAGE_ID=ghcr.io/$GITHUB_REPOSITORY

# docker build --pull --no-cache . --file collector-agent/Dockerfile --tag $IMAGE_NAME
cd collector-agent && make docker TAG=$IMAGE_NAME
docker tag $IMAGE_NAME $IMAGE_ID/$IMAGE_NAME:$IMAGE_TAG
docker tag $IMAGE_NAME $IMAGE_ID/$IMAGE_NAME:latest
cd collector-agent && make docker TAG=$IMAGE_TAG
docker tag $IMAGE_NAME:$IMAGE_TAG $IMAGE_ID/$IMAGE_NAME:$IMAGE_TAG
docker tag $IMAGE_NAME:$IMAGE_TAG $IMAGE_ID/$IMAGE_NAME:latest
if [[ ${{ inputs.push }} == "true" ]]; then
echo ${{ inputs.push }}
docker push --all-tags $IMAGE_ID/$IMAGE_NAME
6 changes: 3 additions & 3 deletions collector-agent/agent/GrpcAgent.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ type GrpcAgent struct {
pingMd metadata.MD
PingId int32
spanFilter []Filter
spanSender SpanSender
spanSender *SpanSender
AgentOnLine bool
requestCounter RequestProfiler
utReport *UrlTemplateReport
@@ -180,7 +180,7 @@ func (agent *GrpcAgent) registerFilter() {
agent.AddFilter(agent.utReport)
// send span
agent.log.Debug("register spanSender filter")
agent.AddFilter(&agent.spanSender)
agent.AddFilter(agent.spanSender)

}

@@ -296,7 +296,7 @@ func (agent *GrpcAgent) Init(id, _name string, _type int32, StartTime string) {

agent.tSpanCh = make(chan *TSpan, config.AgentChannelSize)
agent.ExitCh = make(chan bool)
agent.spanSender = SpanSender{Md: agent.BaseMD, ExitCh: agent.ExitCh}
agent.spanSender = CreateSpanSender(agent.BaseMD, agent.ExitCh)
agent.spanSender.Init()
agent.requestCounter.CTime = time.Now().Unix()
agent.registerFilter()
48 changes: 0 additions & 48 deletions collector-agent/agent/MetaData.go

This file was deleted.

133 changes: 86 additions & 47 deletions collector-agent/agent/SpanSender.go
Original file line number Diff line number Diff line change
@@ -11,24 +11,35 @@ import (
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/common"
v1 "github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/pinpoint-grpc-idl/proto/v1"
"github.com/spaolacci/murmur3"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type ApiIdMap map[string]interface{}

var unique_id_count = int32(1)

type SpanSender struct {
sequenceId int32
apiMeta MetaData
stringMeta MetaData
sqlMeta MetaData
idMap ApiIdMap
Md metadata.MD
ExitCh chan bool
exitCh chan bool
spanMessageCh chan *v1.PSpanMessage
spanRespCh chan int32
wg sync.WaitGroup
}

func CreateSpanSender(base metadata.MD, exitCh chan bool) *SpanSender {
sender := &SpanSender{
Md: base, exitCh: exitCh,
idMap: make(ApiIdMap),
}
return sender
}

func (spanSender *SpanSender) Stop() {
log.Warn("Try to close spanSend goroutine")
close(spanSender.spanMessageCh)
@@ -51,7 +62,7 @@ func (spanSender *SpanSender) senderMain() {

stream, err := client.SendSpan(ctx)
if err != nil {
log.Warnf("create streamfailed. %s", err)
log.Warnf("create stream failed. %s", err)
return
}
defer stream.CloseSend()
@@ -75,17 +86,17 @@ func (spanSender *SpanSender) sendThread() {
for {
spanSender.senderMain()
config := common.GetConfig()
if common.WaitChannelEvent(spanSender.ExitCh, config.SpanTimeWaitSec) == common.E_AGENT_STOPPING {
if common.WaitChannelEvent(spanSender.exitCh, config.SpanTimeWaitSec) == common.E_AGENT_STOPPING {
break
}
}
log.Info("sendThread exit")
}

func (spanSender *SpanSender) Init() {
spanSender.sqlMeta = MetaData{MetaDataType: common.META_SQL, IDMap: make(PARAMS_TYPE), Sender: spanSender}
spanSender.apiMeta = MetaData{MetaDataType: common.META_API, IDMap: make(PARAMS_TYPE), Sender: spanSender}
spanSender.stringMeta = MetaData{MetaDataType: common.META_STRING, IDMap: make(PARAMS_TYPE), Sender: spanSender}
// spanSender.sqlMeta = MetaData{MetaDataType: common.META_SQL_UID, IDMap: make(PARAMS_TYPE), Sender: spanSender}
// spanSender.apiMeta = MetaData{MetaDataType: common.META_API, IDMap: make(PARAMS_TYPE), Sender: spanSender}
// spanSender.stringMeta = MetaData{MetaDataType: common.META_STRING, IDMap: make(PARAMS_TYPE), Sender: spanSender}

spanSender.spanMessageCh = make(chan *v1.PSpanMessage, common.GetConfig().AgentChannelSize)
spanSender.spanRespCh = make(chan int32, 1)
@@ -98,9 +109,7 @@ func (spanSender *SpanSender) Init() {

func (spanSender *SpanSender) cleanAllMetaData() {
log.Info("Clean all metaData")
spanSender.sqlMeta.ResetMeta()
spanSender.apiMeta.ResetMeta()
spanSender.stringMeta.ResetMeta()
spanSender.idMap = make(ApiIdMap)
}

func (spanSender *SpanSender) makePinpointSpanEv(genSpan *v1.PSpan, spanEv *TSpanEvent, depth int32) error {
@@ -118,13 +127,38 @@ func (spanSender *SpanSender) makePinpointSpanEv(genSpan *v1.PSpan, spanEv *TSpa
}
}

func (spanSender *SpanSender) getMetaApiId(name string, metaType int32) int32 {
id, ok := spanSender.idMap[name]
if ok {
return id.(int32)
} else {
unique_id_count += 1
spanSender.idMap[name] = unique_id_count
spanSender.SenderGrpcMetaData(name, metaType)
return unique_id_count
}
}

func (spanSender *SpanSender) getSqlUidMetaApiId(name string) []byte {
id, ok := spanSender.idMap[name]
if ok {
return id.([]byte)
} else {
h1, h2 := murmur3.Sum128([]byte(name))
id := []byte(strconv.FormatUint(h1, 16) + strconv.FormatUint(h2, 16))
spanSender.idMap[name] = id
spanSender.SenderGrpcMetaData(name, common.META_Sql_uid_api)
return id
}
}

func (spanSender *SpanSender) createPinpointSpanEv(spanEv *TSpanEvent) (*v1.PSpanEvent, error) {
pbSpanEv := &v1.PSpanEvent{}

pbSpanEv.ApiId = int32(spanSender.apiMeta.GetId(spanEv.Name, PARAMS_TYPE{"type": common.API_DEFAULT}))
pbSpanEv.ApiId = spanSender.getMetaApiId(spanEv.Name, common.META_Default_api)

if len(spanEv.ExceptionInfo) > 0 {
id := spanSender.stringMeta.GetId("EXP", nil)
id := spanSender.getMetaApiId("___EXP___", common.META_String_api)
pbSpanEv.ExceptionInfo = &v1.PIntStringValue{}
pbSpanEv.ExceptionInfo.IntValue = id
stringValue := wrapperspb.StringValue{Value: spanEv.ExceptionInfo}
@@ -152,30 +186,30 @@ func (spanSender *SpanSender) createPinpointSpanEv(spanEv *TSpanEvent) (*v1.PSpa
if value, err := strconv.ParseInt(ann[0:iColon], 10, 32); err == nil {
stringValue := v1.PAnnotationValue_StringValue{StringValue: ann[iColon+1:]}

pAnvalue := v1.PAnnotationValue{
v := v1.PAnnotationValue{
Field: &stringValue,
}
ann := v1.PAnnotation{
Key: int32(value),
Value: &pAnvalue,
Value: &v,
}
pbSpanEv.Annotation = append(pbSpanEv.Annotation, &ann)
}
}

if len(spanEv.SqlMeta) > 0 {
id := spanSender.sqlMeta.GetId(spanEv.SqlMeta, nil)
sqlvalue := v1.PIntStringStringValue{
IntValue: id,
id := spanSender.getSqlUidMetaApiId(spanEv.SqlMeta)
sqlByteSv := &v1.PBytesStringStringValue{
BytesValue: id,
StringValue1: &wrappers.StringValue{
Value: spanEv.SqlMeta,
},
}
pbSpanEv.Annotation = append(pbSpanEv.Annotation, &v1.PAnnotation{
Key: 20,
Key: 25,
Value: &v1.PAnnotationValue{
Field: &v1.PAnnotationValue_IntStringStringValue{
IntStringStringValue: &sqlvalue,
Field: &v1.PAnnotationValue_BytesStringStringValue{
BytesStringStringValue: sqlByteSv,
},
},
})
@@ -188,7 +222,7 @@ func (spanSender *SpanSender) makePinpointSpan(span *TSpan) (*v1.PSpan, error) {
spanSender.sequenceId = 0
pbSpan := &v1.PSpan{}
pbSpan.Version = 1
pbSpan.ApiId = spanSender.apiMeta.GetId(span.GetAppid(), PARAMS_TYPE{"type": int32(common.API_WEB_REQUEST)})
pbSpan.ApiId = spanSender.getMetaApiId(span.GetAppid(), common.META_Web_request_api)

pbSpan.ServiceType = span.ServerType

@@ -226,14 +260,14 @@ func (spanSender *SpanSender) makePinpointSpan(span *TSpan) (*v1.PSpan, error) {
pbSpan.AcceptEvent = &acceptEv
// changes: ERRs's priority bigger EXP, so ERR will replace EXP
if len(span.ExceptionInfo) > 0 {
id := spanSender.stringMeta.GetId("EXP", nil)
id := spanSender.getMetaApiId("___EXP___", common.META_String_api)
stringValue := wrapperspb.StringValue{Value: span.ExceptionInfo}
pbSpan.ExceptionInfo = &v1.PIntStringValue{IntValue: id,
StringValue: &stringValue}
}

if span.ErrorInfo != nil {
id := spanSender.stringMeta.GetId("ERR", nil)
id := spanSender.getMetaApiId("___ERR___", common.META_String_api)
pbSpan.Err = 1 // mark as an error
pbSpan.ExceptionInfo = &v1.PIntStringValue{
IntValue: id,
@@ -360,7 +394,7 @@ func (spanSender *SpanSender) Interceptor(span *TSpan) bool {
return true
}

func (spanSender *SpanSender) SenderGrpcMetaData(name string, id int32, Type int32, params PARAMS_TYPE) error {
func (spanSender *SpanSender) SenderGrpcMetaData(name string, metaType int32) error {
config := common.GetConfig()
conn, err := common.CreateGrpcConnection(config.AgentAddress)
if err != nil {
@@ -374,27 +408,31 @@ func (spanSender *SpanSender) SenderGrpcMetaData(name string, id int32, Type int
ctx, cancel := common.BuildPinpointCtx(config.MetaDataTimeWaitSec, spanSender.Md)
defer cancel()

switch Type {
case common.META_API:
switch metaType {
case common.META_Default_api:
{
//line := int32(0)
_type := int32(common.API_DEFAULT) //
if params != nil {
if value, OK := params["type"]; OK {
_type = value
}
id := spanSender.idMap[name].(int32)
apiMeta := v1.PApiMetaData{ApiId: id, ApiInfo: name, Type: common.API_DEFAULT}

if _, err = client.RequestApiMetaData(ctx, &apiMeta); err != nil {
log.Warnf("agentOnline api meta failed %s", err)
return errors.New("SenderGrpcMetaData: PApiMetaData failed")
}
// disable line
apiMeta := v1.PApiMetaData{ApiId: id, ApiInfo: name, Type: _type}
}

case common.META_Web_request_api:
{
id := spanSender.idMap[name].(int32)
apiMeta := v1.PApiMetaData{ApiId: id, ApiInfo: name, Type: common.API_WEB_REQUEST}

if _, err = client.RequestApiMetaData(ctx, &apiMeta); err != nil {
log.Warnf("agentOnline api meta failed %s", err)
return errors.New("SenderGrpcMetaData: RequestApiMetaData failed")
return errors.New("SenderGrpcMetaData: PApiMetaData failed")
}
}
case common.META_STRING:
case common.META_String_api:
{
id := spanSender.idMap[name].(int32)
metaMeta := v1.PStringMetaData{
StringId: id,
StringValue: name,
@@ -405,22 +443,23 @@ func (spanSender *SpanSender) SenderGrpcMetaData(name string, id int32, Type int
return errors.New("SenderGrpcMetaData: RequestStringMetaData failed")
}
}
case common.META_SQL:

case common.META_Sql_uid_api:
{
sqlMeta := v1.PSqlMetaData{
SqlId: id,
Sql: name,
id := spanSender.idMap[name].([]byte)
sqlUidMeta := v1.PSqlUidMetaData{
SqlUid: id,
Sql: name,
}

if _, err = client.RequestSqlMetaData(ctx, &sqlMeta); err != nil {
if _, err = client.RequestSqlUidMetaData(ctx, &sqlUidMeta); err != nil {
log.Warnf("agentOnline api meta failed %s", err)
return errors.New("SenderGrpcMetaData: RequestSqlMetaData failed")
return errors.New("SenderGrpcMetaData: RequestSqlUidMetaData failed")
}
}
default:
log.Warnf("SenderGrpcMetaData: No such Type:%d", Type)
log.Warnf("SenderGrpcMetaData: No such Type:%d", metaType)
}

log.Debugf("send metaData type:%d,Id:%d,value:%s para:%v", Type, id, name, params)
log.Debugf("send metaData %s", name)
return nil
}
8 changes: 5 additions & 3 deletions collector-agent/common/Defines.go
Original file line number Diff line number Diff line change
@@ -2,8 +2,10 @@ package common

const API_WEB_REQUEST = 100
const API_DEFAULT = 0

const (
META_API = 1
META_STRING = 2
META_SQL = 3
META_Default_api = iota
META_Web_request_api = iota
META_String_api = iota
META_Sql_uid_api = iota
)
Loading
Loading