Skip to content

Commit

Permalink
[bugfix] AMQP consumers are not living up to expectations (#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
renyansongno1 authored Dec 18, 2024
1 parent f989faf commit 988702e
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 155 deletions.
29 changes: 25 additions & 4 deletions plugins/amqp/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,36 @@ import (
"github.com/apache/skywalking-go/plugins/core/operator"
)

type ConsumersSendInterceptor struct {
}

func (c *ConsumersSendInterceptor) BeforeInvoke(invocation operator.Invocation) error {
return nil
}

func (c *ConsumersSendInterceptor) AfterInvoke(invocation operator.Invocation, results ...interface{}) error {
return GeneralConsumersSendAfterInvoke(invocation, results...)
}

type ConsumerInterceptor struct {
}

func (c *ConsumerInterceptor) BeforeInvoke(invocation operator.Invocation) error {
args := invocation.Args()[6].(amqp091.Table)
return GeneralConsumerBeforeInvoke(invocation, args)
}

func (c *ConsumerInterceptor) AfterInvoke(operator.Invocation, ...interface{}) error {
return nil
}

func (c *ConsumerInterceptor) AfterInvoke(invocation operator.Invocation, results ...interface{}) error {
queue, consumerTag, args := invocation.Args()[0].(string), invocation.Args()[1].(string),
invocation.Args()[6].(amqp091.Table)
return GeneralConsumerAfterInvoke(invocation, queue, consumerTag, args, results...)
type ConsumersCloseInterceptor struct {
}

func (c *ConsumersCloseInterceptor) BeforeInvoke(invocation operator.Invocation) error {
return GeneralConsumerCloseBeforeInvoke(invocation)
}

func (c *ConsumersCloseInterceptor) AfterInvoke(operator.Invocation, ...interface{}) error {
return nil
}
37 changes: 0 additions & 37 deletions plugins/amqp/consumer_with_ctx.go

This file was deleted.

87 changes: 64 additions & 23 deletions plugins/amqp/general_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package amqp

import (
"fmt"
"os"
"strconv"
"sync/atomic"

"github.com/rabbitmq/amqp091-go"

Expand All @@ -27,44 +30,82 @@ import (
)

const (
ConsumerComponentID = 145
amqpConsumerPrefix = "AMQP/"
amqpConsumerSuffix = "/Consumer"
tagMQConsumerTag = "mq.consumer_tag"
tagMQReplyTo = "mq.reply_to"
tagMQCorrelationID = "mq.correlation_id"
tagMQArgs = "mq.args"
ConsumerComponentID = 145
amqpConsumerPrefix = "AMQP/"
amqpConsumerSuffix = "/Consumer"
tagMQConsumerTag = "mq.consumer_tag"
tagMQReplyTo = "mq.reply_to"
tagMQCorrelationID = "mq.correlation_id"
tagMQArgs = "mq.args"
consumerTagLengthMax = 0xFF
)

func GeneralConsumerAfterInvoke(invocation operator.Invocation, queue, consumerTag string, args amqp091.Table, results ...interface{}) error {
deliveries := <-results[0].(<-chan Delivery)
if consumerTag == "" {
consumerTag = deliveries.ConsumerTag
}
operationName := amqpConsumerPrefix + queue + "/" + consumerTag + amqpConsumerSuffix
var consumerSeq uint64
var queueConsumerTagMapping = make(map[string]string)

channel := invocation.CallerInstance().(*nativeChannel)
func GeneralConsumersSendAfterInvoke(invocation operator.Invocation, results ...interface{}) error {
if foundConsumer := results[0].(bool); !foundConsumer {
return nil
}
consumerTag, _ := invocation.Args()[0].(string)
delivery, _ := invocation.Args()[1].(*Delivery)
operationName := amqpConsumerPrefix + queueConsumerTagMapping[consumerTag] + "/" + consumerTag + amqpConsumerSuffix
channel, _ := delivery.Acknowledger.(*nativeChannel)
peer := getPeerInfo(channel.connection)

span, err := tracing.CreateEntrySpan(operationName, func(headerKey string) (string, error) {
return deliveries.Headers[headerKey].(string), nil
header, _ := delivery.Headers[headerKey].(string)
return header, nil
}, tracing.WithLayer(tracing.SpanLayerMQ),
tracing.WithComponent(ConsumerComponentID),
tracing.WithTag(tracing.TagMQBroker, peer),
tracing.WithTag(tracing.TagMQQueue, queue),
tracing.WithTag(tracing.TagMQMsgID, deliveries.MessageId),
tracing.WithTag(tracing.TagMQQueue, queueConsumerTagMapping[consumerTag]),
tracing.WithTag(tracing.TagMQMsgID, delivery.MessageId),
tracing.WithTag(tagMQConsumerTag, consumerTag),
tracing.WithTag(tagMQCorrelationID, deliveries.CorrelationId),
tracing.WithTag(tagMQReplyTo, deliveries.ReplyTo),
tracing.WithTag(tagMQArgs, fmt.Sprintf("%v", args)),
tracing.WithTag(tagMQCorrelationID, delivery.CorrelationId),
tracing.WithTag(tagMQReplyTo, delivery.ReplyTo),
tracing.WithTag(tagMQArgs, fmt.Sprintf("%v", delivery.Headers)),
)
if err != nil {
return err
}
span.SetPeer(peer)
if err, ok := results[1].(error); ok && err != nil {
span.Error(err.Error())
}
span.End()
return nil
}

func GeneralConsumerBeforeInvoke(invocation operator.Invocation, args amqp091.Table) error {
queue := invocation.Args()[0].(string)
consumerTag := invocation.Args()[1].(string)
if consumerTag == "" {
consumerTag = uniqueConsumerTag()
}
queueConsumerTagMapping[consumerTag] = queue
return nil
}

func GeneralConsumerCloseBeforeInvoke(invocation operator.Invocation) error {
consumers, _ := invocation.CallerInstance().(*nativeConsumers)
consumers.Lock()
defer consumers.Unlock()
for consumerTag := range consumers.chans {
delete(queueConsumerTagMapping, consumerTag)
}
return nil
}

func uniqueConsumerTag() string {
return commandNameBasedUniqueConsumerTag(os.Args[0])
}

func commandNameBasedUniqueConsumerTag(commandName string) string {
tagPrefix := "ctag-"
tagInfix := commandName
tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10)

if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax {
tagInfix = "streadway/amqp"
}

return tagPrefix + tagInfix + tagSuffix
}
33 changes: 20 additions & 13 deletions plugins/amqp/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,37 @@ func (i *Instrument) Points() []*instrument.Point {
{
PackagePath: "",
PackageName: "amqp091",
At: instrument.NewMethodEnhance("*Channel", "Consume",
instrument.WithArgsCount(7),
At: instrument.NewMethodEnhance("*consumers", "send",
instrument.WithArgsCount(2),
instrument.WithArgType(0, "string"),
instrument.WithArgType(1, "string"),
instrument.WithArgType(6, "Table"),
instrument.WithResultCount(2),
instrument.WithResultType(0, "<-chan Delivery"),
instrument.WithResultType(1, "error"),
instrument.WithArgType(1, "*Delivery"),
instrument.WithResultCount(1),
instrument.WithResultType(0, "bool"),
),
Interceptor: "ConsumerInterceptor",
Interceptor: "ConsumersSendInterceptor",
},
{
PackagePath: "",
PackageName: "amqp091",
At: instrument.NewMethodEnhance("*consumers", "close",
instrument.WithArgsCount(0),
instrument.WithResultCount(0),
),
Interceptor: "ConsumersCloseInterceptor",
},
{
PackagePath: "",
PackageName: "amqp091",
At: instrument.NewMethodEnhance("*Channel", "ConsumeWithContext",
instrument.WithArgsCount(8),
At: instrument.NewMethodEnhance("*Channel", "Consume",
instrument.WithArgsCount(7),
instrument.WithArgType(0, "string"),
instrument.WithArgType(1, "string"),
instrument.WithArgType(2, "string"),
instrument.WithArgType(7, "Table"),
instrument.WithArgType(6, "Table"),
instrument.WithResultCount(2),
instrument.WithResultType(0, "<-chan Delivery"),
instrument.WithResultType(1, "error"),
),
Interceptor: "ConsumerWithCtxInterceptor",
Interceptor: "ConsumerInterceptor",
},
{
PackagePath: "",
Expand Down
27 changes: 27 additions & 0 deletions plugins/amqp/structures.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,27 @@ package amqp

import (
"io"
"sync"
)

//skywalking:native github.com/rabbitmq/amqp091-go Channel
type nativeChannel struct {
connection *nativeConnection
}

func (ch *nativeChannel) Ack(tag uint64, multiple bool) error {
return nil
}
func (ch *nativeChannel) Nack(tag uint64, multiple, requeue bool) error {
return nil
}
func (ch *nativeChannel) Reject(tag uint64, requeue bool) error {
return nil
}

//skywalking:native github.com/rabbitmq/amqp091-go Delivery
type Delivery struct {
Acknowledger nativeAcknowledger
Headers Table
MessageId string //nolint
ConsumerTag string
Expand All @@ -44,3 +56,18 @@ type Table map[string]interface{}
type nativeConnection struct {
conn io.ReadWriteCloser
}

//skywalking:native github.com/rabbitmq/amqp091-go Acknowledger
type nativeAcknowledger interface {
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple bool, requeue bool) error
Reject(tag uint64, requeue bool) error
}

//skywalking:native github.com/rabbitmq/amqp091-go consumers
type nativeConsumers struct {
sync.Mutex
chans consumerBuffers
}

type consumerBuffers map[string]chan *Delivery
40 changes: 20 additions & 20 deletions test/plugins/scenarios/amqp/config/excepted.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,27 @@ segmentItems:
- { key: mq.broker, value: 'amqp-server:5672' }
- { key: mq.exchange, value: not null }
- { key: mq.routing_key, value: sw-queue-1 }
- operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
- operationName: AMQP/sw-queue-1/Producer
parentSpanId: 0
spanId: 2
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 144
isError: false
spanType: Exit
peer: amqp-server:5672
skipAnalysis: false
tags:
- { key: mq.broker, value: 'amqp-server:5672' }
- { key: mq.exchange, value: not null }
- { key: mq.routing_key, value: sw-queue-1 }
- operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
parentSpanId: 0
spanId: 3
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 145
isError: false
spanType: Entry
Expand All @@ -59,22 +74,7 @@ segmentItems:
parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: amqp,
traceId: not null }
- operationName: AMQP/sw-queue-2/Producer
parentSpanId: 0
spanId: 3
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 144
isError: false
spanType: Exit
peer: amqp-server:5672
skipAnalysis: false
tags:
- { key: mq.broker, value: 'amqp-server:5672' }
- { key: mq.exchange, value: not null }
- { key: mq.routing_key, value: sw-queue-2 }
- operationName: AMQP/sw-queue-2/sw-consumer-2/Consumer
- operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
parentSpanId: 0
spanId: 4
spanLayer: MQ
Expand All @@ -87,15 +87,15 @@ segmentItems:
skipAnalysis: false
tags:
- { key: mq.broker, value: 'amqp-server:5672' }
- { key: mq.queue, value: sw-queue-2 }
- { key: mq.queue, value: sw-queue-1 }
- { key: mq.msg.id, value: not null }
- { key: mq.consumer_tag, value: sw-consumer-2 }
- { key: mq.consumer_tag, value: sw-consumer-1 }
- { key: mq.correlation_id, value: not null }
- { key: mq.reply_to, value: not null }
- { key: mq.args, value: not null }
refs:
- { parentEndpoint: 'GET:/execute', networkAddress: 'amqp-server:5672', refType: CrossProcess,
parentSpanId: 3, parentTraceSegmentId: not null,
parentSpanId: 2, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: amqp,
traceId: not null }
- operationName: GET:/execute
Expand Down
Loading

0 comments on commit 988702e

Please sign in to comment.