Skip to content

Commit

Permalink
Fix NPE in RocketMQ instrumentation (open-telemetry#4901)
Browse files Browse the repository at this point in the history
* Fix NPE in RocketMQ instrumentation

* fix tests

* add exception

* Changed the condition a bit
  • Loading branch information
Mateusz Rzeszutek authored and RashmiRam committed May 23, 2022
1 parent 8b26315 commit c1289c7
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ protected String conversationId(MessageExt request) {
return null;
}

@Nullable
@Override
protected Long messagePayloadSize(MessageExt request) {
return (long) request.getBody().length;
byte[] body = request.getBody();
return body == null ? null : (long) body.length;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

class RockerMqProducerAttributeExtractor
extends MessagingAttributesExtractor<SendMessageContext, Void> {
Expand All @@ -28,9 +30,11 @@ protected String destinationKind(SendMessageContext sendMessageContext) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}

@Nullable
@Override
protected String destination(SendMessageContext sendMessageContext) {
return sendMessageContext.getMessage().getTopic();
Message message = sendMessageContext.getMessage();
return message == null ? null : message.getTopic();
}

@Override
Expand Down Expand Up @@ -77,6 +81,7 @@ protected Long messagePayloadCompressedSize(SendMessageContext sendMessageContex
@Nullable
@Override
protected String messageId(SendMessageContext request, @Nullable Void unused) {
return request.getSendResult().getMsgId();
SendResult sendResult = request.getSendResult();
return sendResult == null ? null : sendResult.getMsgId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class RockerMqProducerExperimentalAttributeExtractor

@Override
public void onStart(AttributesBuilder attributes, SendMessageContext request) {
set(attributes, MESSAGING_ROCKETMQ_TAGS, request.getMessage().getTags());
if (request.getMessage() != null) {
set(attributes, MESSAGING_ROCKETMQ_TAGS, request.getMessage().getTags());
}
set(attributes, MESSAGING_ROCKETMQ_BROKER_ADDRESS, request.getBrokerAddr());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ public void sendMessageAfter(SendMessageContext context) {
if (context == null) {
return;
}
if (context.getMqTraceContext() instanceof Context) {
if (context.getMqTraceContext() instanceof Context
&& (context.getSendResult() != null || context.getException() != null)) {
Context otelContext = (Context) context.getMqTraceContext();
instrumenter.end(otelContext, context, null, null);
instrumenter.end(otelContext, context, null, context.getException());
}
}
}

0 comments on commit c1289c7

Please sign in to comment.