Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxy): add metrics and trace for remoting send and pull RPC #844

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ProxyContextExt extends ProxyContext implements TraceContext {
private boolean relayed;

private final Tracer tracer;
private Span rootSpan;

private final BlockingDeque<Span> spanStack = new LinkedBlockingDeque<>();

Expand Down Expand Up @@ -78,6 +79,10 @@ public Long getRemainingMs() {
return super.getRemainingMs() == null ? DEFAULT_TIMEOUT_MILLIS : super.getRemainingMs();
}

public Span rootSpan() {
return rootSpan;
}

public BlockingDeque<Span> spanStack() {
return spanStack;
}
Expand All @@ -94,6 +99,9 @@ public Optional<Span> span() {

@Override
public void attachSpan(Span span) {
if (spanStack.isEmpty()) {
rootSpan = span;
}
spanStack.push(span);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package com.automq.rocketmq.proxy.remoting.activity;

import com.automq.rocketmq.common.trace.TraceHelper;
import com.automq.rocketmq.proxy.exception.ExceptionHandler;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import com.automq.rocketmq.proxy.remoting.RemotingUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
Expand All @@ -35,10 +40,12 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.proxy.common.ContextVariable;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.ForbiddenType;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
Expand Down Expand Up @@ -95,11 +102,26 @@
protected void writeResponse(ChannelHandlerContext ctx, ProxyContext context, RemotingCommand request,
RemotingCommand response, Throwable t) {
recordRpcLatency(context, response);
ProxyContextExt contextExt = (ProxyContextExt) context;
Span rootSpan = contextExt.rootSpan();
rootSpan.setAttribute("code", RemotingHelper.getResponseCodeDesc(response.getCode()));
TraceHelper.endSpan(contextExt, rootSpan, t);

Check warning on line 108 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java#L105-L108

Added lines #L105 - L108 were not covered by tests
super.writeResponse(ctx, context, request, response, t);
}

private RemotingCommand pullMessage(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws RemotingCommandException {
ProxyContextExt contextExt = (ProxyContextExt) context;
Tracer tracer = contextExt.tracer().get();
Span rootSpan = tracer.spanBuilder("PullMessage")
.setNoParent()
.setSpanKind(SpanKind.SERVER)
.setAttribute(ContextVariable.PROTOCOL_TYPE, contextExt.getProtocolType())
.setAttribute(ContextVariable.ACTION, contextExt.getAction())
.setAttribute(ContextVariable.CLIENT_ID, contextExt.getClientID())
.startSpan();
contextExt.attachSpan(rootSpan);

Check warning on line 123 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java#L114-L123

Added lines #L114 - L123 were not covered by tests

// Retrieve the request header.
final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader)
request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
Expand Down Expand Up @@ -234,13 +256,15 @@
ctx.writeAndFlush(fileRegion)
.addListener(future -> {
recordRpcLatency(context, response);
rootSpan.setAttribute("code", RemotingHelper.getResponseCodeDesc(response.getCode()));
TraceHelper.endSpan(contextExt, rootSpan, future.cause());

Check warning on line 260 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java#L259-L260

Added lines #L259 - L260 were not covered by tests
if (!future.isSuccess()) {
LOGGER.error("Write pull message response failed", future.cause());
}
});
return;
}
writeResponse(ctx, context, request, response);
writeResponse(ctx, context, request, response, null);

Check warning on line 267 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java#L267

Added line #L267 was not covered by tests
});

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package com.automq.rocketmq.proxy.remoting.activity;

import com.automq.rocketmq.common.trace.TraceHelper;
import com.automq.rocketmq.proxy.exception.ExceptionHandler;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
import com.automq.rocketmq.proxy.remoting.RemotingUtil;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
Expand All @@ -32,12 +37,14 @@
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.proxy.common.ContextVariable;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.QueueSelector;
import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
Expand Down Expand Up @@ -77,6 +84,17 @@
@Override
protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
ProxyContextExt contextExt = (ProxyContextExt) context;
Tracer tracer = contextExt.tracer().get();
Span rootSpan = tracer.spanBuilder("SendMessage")
.setNoParent()
.setSpanKind(SpanKind.SERVER)
.setAttribute(ContextVariable.PROTOCOL_TYPE, contextExt.getProtocolType())
.setAttribute(ContextVariable.ACTION, contextExt.getAction())
.setAttribute(ContextVariable.CLIENT_ID, contextExt.getClientID())
.startSpan();
contextExt.attachSpan(rootSpan);

Check warning on line 96 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java#L87-L96

Added lines #L87 - L96 were not covered by tests

// The parent class already checked the message type, added the transaction subscription.
RemotingCommand superResponse = super.sendMessage(ctx, request, context);
if (superResponse != null) {
Expand Down Expand Up @@ -136,7 +154,7 @@
// TODO: Support batch message in the future.
SendResult sendResult = sendResults.get(0);
fillSendMessageResponse(response, sendResult);
writeResponse(ctx, context, request, response);
writeResponse(ctx, context, request, response, null);

Check warning on line 157 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java#L157

Added line #L157 was not covered by tests
});

// Return null to uplevel, the response will be sent back in the future.
Expand All @@ -146,6 +164,17 @@
@Override
protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
ProxyContextExt contextExt = (ProxyContextExt) context;
Tracer tracer = contextExt.tracer().get();
Span rootSpan = tracer.spanBuilder("ConsumerSendMessage")
.setNoParent()
.setSpanKind(SpanKind.SERVER)
.setAttribute(ContextVariable.PROTOCOL_TYPE, contextExt.getProtocolType())
.setAttribute(ContextVariable.ACTION, contextExt.getAction())
.setAttribute(ContextVariable.CLIENT_ID, contextExt.getClientID())
.startSpan();
contextExt.attachSpan(rootSpan);

Check warning on line 176 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java#L167-L176

Added lines #L167 - L176 were not covered by tests

ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
messagingProcessor.getServiceManager()
.getMessageService()
Expand All @@ -155,7 +184,7 @@
writeErrResponse(ctx, context, request, error);
return;
}
writeResponse(ctx, context, request, finalResponse);
writeResponse(ctx, context, request, finalResponse, null);

Check warning on line 187 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java#L187

Added line #L187 was not covered by tests
});

return null;
Expand All @@ -178,6 +207,10 @@
protected void writeResponse(ChannelHandlerContext ctx, ProxyContext context, RemotingCommand request,
RemotingCommand response, Throwable t) {
recordRpcLatency(context, response);
ProxyContextExt contextExt = (ProxyContextExt) context;
Span rootSpan = contextExt.rootSpan();
rootSpan.setAttribute("code", RemotingHelper.getResponseCodeDesc(response.getCode()));
TraceHelper.endSpan(contextExt, rootSpan, t);

Check warning on line 213 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java#L210-L213

Added lines #L210 - L213 were not covered by tests
super.writeResponse(ctx, context, request, response, t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
Topic topic = pair.getLeft();
ConsumerGroup group = pair.getRight();

return store.pull(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(),
return store.pull(StoreContext.EMPTY, group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(),
Filter.DEFAULT_FILTER, requestHeader.getOffset(), 1, false)
.thenApply(pullResult -> {
if (pullResult.status() == com.automq.rocketmq.store.model.message.PullResult.Status.FOUND) {
Expand Down Expand Up @@ -809,36 +809,49 @@
metadataService.updateConsumerOffset(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), requestHeader.getCommitOffset());
}

return store.pull(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), filter, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), false);
StoreContext storeContext = ContextUtil.buildStoreContext(ctx, topic.getName(), group.getName());
return store.pull(storeContext, group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), filter, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), false);
})
.thenCompose(result -> {
Topic topic = topicReference.get();
ConsumerGroup group = consumerGroupReference.get();
if (result.messageList().isEmpty()) {
if (result.maxOffset() - result.nextBeginOffset() > 0) {
// This means there are messages in the queue but not match the filter. So we should prevent long polling.
return CompletableFuture.completedFuture(new PullResult(PullStatus.NO_MATCHED_MSG, result.nextBeginOffset(), result.minOffset(), result.maxOffset(), Collections.emptyList()));
} else {
StoreContext storeContext = ContextUtil.buildStoreContext(ctx, topic.getName(), group.getName());
return suspendRequestService.suspendRequest((ProxyContextExt) ctx, requestHeader.getTopic(), virtualQueue.physicalQueueId(), filter, timeoutMillis,
// Function to pull message later.
timeout -> store.pull(consumerGroupReference.get().getGroupId(), topicReference.get().getTopicId(), virtualQueue.physicalQueueId(), filter,
timeout -> store.pull(storeContext, group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), filter,
requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), false)
.thenApply(PullResultWrapper::new))
.thenApply(resultWrapper -> {
if (resultWrapper.isEmpty() || resultWrapper.get().inner().messageList().isEmpty()) {
return new PullResult(PullStatus.NO_MATCHED_MSG, result.nextBeginOffset(), result.minOffset(), result.maxOffset(), Collections.emptyList());
}
com.automq.rocketmq.store.model.message.PullResult suspendResult = resultWrapper.get().inner();
recordMetricsForPulling(topic.getName(), group.getName(), suspendResult);

Check warning on line 834 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L834

Added line #L834 was not covered by tests
return new PullResult(PullStatus.FOUND, suspendResult.nextBeginOffset(), suspendResult.minOffset(), suspendResult.maxOffset(),
FlatMessageUtil.convertTo(null, suspendResult.messageList(), requestHeader.getTopic(), 0, config.hostName(), config.remotingListenPort()));
});
}
}

recordMetricsForPulling(topic.getName(), group.getName(), result);
return CompletableFuture.completedFuture(
new PullResult(PullStatus.FOUND, result.nextBeginOffset(), result.minOffset(), result.maxOffset(),
FlatMessageUtil.convertTo(null, result.messageList(), requestHeader.getTopic(), 0, config.hostName(), config.remotingListenPort()))
);
});
}

private void recordMetricsForPulling(String topic, String group,
com.automq.rocketmq.store.model.message.PullResult result) {
Integer messageBytesTotal = result.messageList().stream().map(message -> message.message().payloadAsByteBuffer().remaining()).reduce(0, Integer::sum);
ProxyMetricsManager.recordOutgoingMessages(topic, group, result.messageList().size(), messageBytesTotal, topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX));
}

@Override
public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, AddressableMessageQueue messageQueue,
LockBatchRequestBody requestBody, long timeoutMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public CompletableFuture<PopResult> pop(StoreContext context, long consumerGroup
}

@Override
public CompletableFuture<PullResult> pull(long consumerGroupId, long topicId, int queueId, Filter filter,
public CompletableFuture<PullResult> pull(StoreContext context, long consumerGroupId, long topicId, int queueId,
Filter filter,
long offset, int batchSize, boolean retry) {
if (retry) {
return CompletableFuture.completedFuture(new PullResult(PullResult.Status.NO_NEW_MSG, 0L, 0L, 0L, new ArrayList<>()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,16 @@
}

@Override
public CompletableFuture<PullResult> pull(long consumerGroupId, long topicId, int queueId, Filter filter,
@WithSpan(kind = SpanKind.SERVER)
public CompletableFuture<PullResult> pull(StoreContext context, long consumerGroupId, long topicId, int queueId,
Filter filter,
long offset, int batchSize, boolean retry) {
return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId)
return logicQueueManager.getOrCreate(context, topicId, queueId)
.thenCompose(topicQueue -> {
if (retry) {
return topicQueue.pullRetry(consumerGroupId, filter, offset, batchSize);
return topicQueue.pullRetry(context, consumerGroupId, filter, offset, batchSize);

Check warning on line 184 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L184

Added line #L184 was not covered by tests
}
return topicQueue.pullNormal(consumerGroupId, filter, offset, batchSize);
return topicQueue.pullNormal(context, consumerGroupId, filter, offset, batchSize);

Check warning on line 186 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L186

Added line #L186 was not covered by tests
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ public abstract CompletableFuture<ChangeInvisibleDurationResult> changeInvisible

public abstract int getInflightStats(long consumerGroupId);

public abstract CompletableFuture<PullResult> pullNormal(long consumerGroupId, Filter filter, long startOffset,
public abstract CompletableFuture<PullResult> pullNormal(StoreContext context, long consumerGroupId, Filter filter,
long startOffset,
int batchSize);

public abstract CompletableFuture<PullResult> pullRetry(long consumerGroupId, Filter filter, long startOffset,
public abstract CompletableFuture<PullResult> pullRetry(StoreContext context, long consumerGroupId, Filter filter,
long startOffset,
int batchSize);

public abstract long getConsumeOffset(long consumerGroupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ CompletableFuture<PopResult> pop(StoreContext context, long consumerGroupId, lon
* @param retry whether to pull retry messages
* @return pull result, see {@link PullResult}
*/
CompletableFuture<PullResult> pull(long consumerGroupId, long topicId, int queueId, Filter filter, long offset,
CompletableFuture<PullResult> pull(StoreContext context, long consumerGroupId, long topicId, int queueId,
Filter filter, long offset,
int batchSize, boolean retry);

/**
Expand Down
Loading
Loading