Skip to content

Commit 900b937

Browse files
authored
[ISSUE #275] Add trace message for pub and sub. (#276)
* feat(trace): add trace message for sync producer. * feat(trace): add message trace for push consumer * feat(trace): add test case for trace message of push consumer * feat(trace): add default key value to trace message to avoid the bug in broker.
1 parent 753ec41 commit 900b937

40 files changed

+2149
-53
lines changed

include/DefaultMQProducer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
136136

137137
void setUnitName(std::string unitName);
138138
const std::string& getUnitName() const;
139+
void setMessageTrace(bool messageTrace);
140+
bool getMessageTrace() const;
139141

140142
private:
141143
DefaultMQProducerImpl* impl;

include/DefaultMQPushConsumer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer {
132132
const std::string& getUnitName() const;
133133

134134
void setAsyncPull(bool asyncFlag);
135+
void setMessageTrace(bool messageTrace);
136+
bool getMessageTrace() const;
135137

136138
private:
137139
DefaultMQPushConsumerImpl* impl;

src/MQClientFactory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ void MQClientFactory::start() {
7777
m_serviceState = RUNNING;
7878
break;
7979
case RUNNING:
80+
LOG_INFO("The Factory object:%s start before with now state:%d", m_clientId.c_str(), m_serviceState);
81+
break;
8082
case SHUTDOWN_ALREADY:
8183
case START_FAILED:
8284
LOG_INFO("The Factory object:%s start failed with fault state:%d", m_clientId.c_str(), m_serviceState);

src/common/DefaultMQClient.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ DefaultMQClient::DefaultMQClient() {
4646
m_tcpConnectTimeout = 3000; // 3s
4747
m_tcpTransportTryLockTimeout = 3; // 3s
4848
m_unitName = "";
49+
m_messageTrace = false;
4950
}
5051

5152
DefaultMQClient::~DefaultMQClient() {}
@@ -216,6 +217,14 @@ const string& DefaultMQClient::getUnitName() const {
216217
return m_unitName;
217218
}
218219

220+
bool DefaultMQClient::getMessageTrace() const {
221+
return m_messageTrace;
222+
}
223+
224+
void DefaultMQClient::setMessageTrace(bool mMessageTrace) {
225+
m_messageTrace = mMessageTrace;
226+
}
227+
219228
void DefaultMQClient::setSessionCredentials(const string& input_accessKey,
220229
const string& input_secretKey,
221230
const string& input_onsChannel) {
@@ -239,6 +248,7 @@ void DefaultMQClient::showClientConfigs() {
239248
LOG_WARN("PullThreadNum:%d", m_pullThreadNum);
240249
LOG_WARN("TcpConnectTimeout:%lld ms", m_tcpConnectTimeout);
241250
LOG_WARN("TcpTransportTryLockTimeout:%lld s", m_tcpTransportTryLockTimeout);
251+
LOG_WARN("OpenMessageTrace:%s", m_messageTrace ? "true" : "false");
242252
// LOG_WARN("*****************************************************************************");
243253
}
244254
//<!************************************************************************

src/common/NameSpaceUtil.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "NameSpaceUtil.h"
1919
#include "Logging.h"
20+
#include "TraceContant.h"
2021

2122
namespace rocketmq {
2223

@@ -75,6 +76,15 @@ bool NameSpaceUtil::checkNameSpaceExistInNameServer(string nameServerAddr) {
7576
return false;
7677
}
7778

79+
string NameSpaceUtil::withoutNameSpace(string source, string nameSpace) {
80+
if (!nameSpace.empty()) {
81+
auto index = source.find(nameSpace);
82+
if (index != string::npos) {
83+
return source.substr(index + nameSpace.length() + NAMESPACE_SPLIT_FLAG.length(), source.length());
84+
}
85+
}
86+
return source;
87+
}
7888
string NameSpaceUtil::withNameSpace(string source, string ns) {
7989
if (!ns.empty()) {
8090
return ns + NAMESPACE_SPLIT_FLAG + source;
@@ -83,6 +93,10 @@ string NameSpaceUtil::withNameSpace(string source, string ns) {
8393
}
8494

8595
bool NameSpaceUtil::hasNameSpace(string source, string ns) {
96+
if (source.find(TraceContant::TRACE_TOPIC) != string::npos) {
97+
LOG_DEBUG("Find Trace Topic [%s]", source.c_str());
98+
return true;
99+
}
86100
if (!ns.empty() && source.length() >= ns.length() && source.find(ns) != string::npos) {
87101
return true;
88102
}

src/common/NameSpaceUtil.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class NameSpaceUtil {
4343

4444
static string withNameSpace(string source, string ns);
4545

46+
static string withoutNameSpace(string source, string ns);
47+
4648
static bool hasNameSpace(string source, string ns);
4749
};
4850

src/consumer/ConsumeMessageConcurrentlyService.cpp

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
#if !defined(WIN32) && !defined(__APPLE__)
1818
#include <sys/prctl.h>
1919
#endif
20+
2021
#include "ConsumeMsgService.h"
2122
#include "DefaultMQPushConsumer.h"
2223
#include "Logging.h"
2324
#include "MessageAccessor.h"
2425
#include "UtilAll.h"
26+
2527
namespace rocketmq {
2628

2729
//<!************************************************************************
@@ -80,6 +82,7 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul
8082
request->m_messageQueue.toString().c_str());
8183
}
8284
}
85+
8386
void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest> pullRequest,
8487
vector<MQMessageExt>& msgs,
8588
int millis) {
@@ -146,14 +149,26 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
146149
if (request->isDropped()) {
147150
LOG_WARN("the pull request for %s Had been dropped before", request->m_messageQueue.toString().c_str());
148151
request->clearAllMsgs(); // add clear operation to avoid bad state when
149-
// dropped pullRequest returns normal
152+
// dropped pullRequest returns normal
150153
return;
151154
}
152155
if (msgs.empty()) {
153156
LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str());
154157
return;
155158
}
156-
159+
ConsumeMessageContext consumeMessageContext;
160+
DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
161+
if (pConsumer) {
162+
if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
163+
consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
164+
consumeMessageContext.setConsumerGroup(pConsumer->getGroupName());
165+
consumeMessageContext.setMessageQueue(request->m_messageQueue);
166+
consumeMessageContext.setMsgList(msgs);
167+
consumeMessageContext.setSuccess(false);
168+
consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
169+
pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
170+
}
171+
}
157172
ConsumeStatus status = CONSUME_SUCCESS;
158173
if (m_pMessageListener != NULL) {
159174
resetRetryTopic(msgs);
@@ -163,11 +178,48 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
163178
if (m_pConsumer->isUseNameSpaceMode()) {
164179
MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
165180
}
166-
try {
167-
status = m_pMessageListener->consumeMessage(msgs);
168-
} catch (...) {
169-
status = RECONSUME_LATER;
170-
LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
181+
182+
if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
183+
// For open trace message, consume message one by one.
184+
for (size_t i = 0; i < msgs.size(); ++i) {
185+
LOG_DEBUG("=====Trace Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]",
186+
msgs[i].getTopic().c_str(), msgs[i].getMsgId().c_str(), msgs[i].getBody().c_str(),
187+
msgs[i].getReconsumeTimes());
188+
std::vector<MQMessageExt> msgInner;
189+
msgInner.push_back(msgs[i]);
190+
if (status != CONSUME_SUCCESS) {
191+
// all the Messages behind should be set to failed.
192+
status = RECONSUME_LATER;
193+
consumeMessageContext.setMsgIndex(i);
194+
consumeMessageContext.setStatus("RECONSUME_LATER");
195+
consumeMessageContext.setSuccess(false);
196+
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
197+
continue;
198+
}
199+
try {
200+
status = m_pMessageListener->consumeMessage(msgInner);
201+
} catch (...) {
202+
status = RECONSUME_LATER;
203+
LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
204+
}
205+
consumeMessageContext.setMsgIndex(i); // indicate message position,not support batch consumer
206+
if (status == CONSUME_SUCCESS) {
207+
consumeMessageContext.setStatus("CONSUME_SUCCESS");
208+
consumeMessageContext.setSuccess(true);
209+
} else {
210+
status = RECONSUME_LATER;
211+
consumeMessageContext.setStatus("RECONSUME_LATER");
212+
consumeMessageContext.setSuccess(false);
213+
}
214+
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
215+
}
216+
} else {
217+
try {
218+
status = m_pMessageListener->consumeMessage(msgs);
219+
} catch (...) {
220+
status = RECONSUME_LATER;
221+
LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
222+
}
171223
}
172224
}
173225

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "ConsumeMessageHookImpl.h"
19+
#include <memory>
20+
#include <string>
21+
#include "ConsumeMessageContext.h"
22+
#include "DefaultMQPushConsumerImpl.h"
23+
#include "Logging.h"
24+
#include "MQClientException.h"
25+
#include "NameSpaceUtil.h"
26+
#include "TraceContant.h"
27+
#include "TraceContext.h"
28+
#include "TraceTransferBean.h"
29+
#include "TraceUtil.h"
30+
#include "UtilAll.h"
31+
namespace rocketmq {
32+
33+
class TraceMessageConsumeCallback : public SendCallback {
34+
virtual void onSuccess(SendResult& sendResult) {
35+
LOG_DEBUG("TraceMessageConsumeCallback, MsgId:[%s],OffsetMsgId[%s]", sendResult.getMsgId().c_str(),
36+
sendResult.getOffsetMsgId().c_str());
37+
}
38+
virtual void onException(MQException& e) {}
39+
};
40+
static TraceMessageConsumeCallback* consumeTraceCallback = new TraceMessageConsumeCallback();
41+
std::string ConsumeMessageHookImpl::getHookName() {
42+
return "RocketMQConsumeMessageHookImpl";
43+
}
44+
45+
void ConsumeMessageHookImpl::executeHookBefore(ConsumeMessageContext* context) {
46+
if (context == NULL || context->getMsgList().empty()) {
47+
return;
48+
}
49+
TraceContext* traceContext = new TraceContext();
50+
context->setTraceContext(traceContext);
51+
traceContext->setTraceType(SubBefore);
52+
traceContext->setGroupName(NameSpaceUtil::withoutNameSpace(context->getConsumerGroup(), context->getNameSpace()));
53+
std::vector<TraceBean> beans;
54+
55+
std::vector<MQMessageExt> msgs = context->getMsgList();
56+
std::vector<MQMessageExt>::iterator it = msgs.begin();
57+
for (; it != msgs.end(); ++it) {
58+
TraceBean bean;
59+
bean.setTopic((*it).getTopic());
60+
bean.setMsgId((*it).getMsgId());
61+
bean.setTags((*it).getTags());
62+
bean.setKeys((*it).getKeys());
63+
bean.setStoreHost((*it).getStoreHostString());
64+
bean.setStoreTime((*it).getStoreTimestamp());
65+
bean.setBodyLength((*it).getStoreSize());
66+
bean.setRetryTimes((*it).getReconsumeTimes());
67+
std::string regionId = (*it).getProperty(MQMessage::PROPERTY_MSG_REGION);
68+
if (regionId.empty()) {
69+
regionId = TraceContant::DEFAULT_REDION;
70+
}
71+
traceContext->setRegionId(regionId);
72+
traceContext->setTraceBean(bean);
73+
}
74+
traceContext->setTimeStamp(UtilAll::currentTimeMillis());
75+
76+
std::string topic = TraceContant::TRACE_TOPIC + traceContext->getRegionId();
77+
78+
TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext);
79+
MQMessage message(topic, ben.getTransData());
80+
message.setKeys(ben.getTransKey());
81+
82+
// send trace message async.
83+
context->getDefaultMQPushConsumer()->submitSendTraceRequest(message, consumeTraceCallback);
84+
return;
85+
}
86+
87+
void ConsumeMessageHookImpl::executeHookAfter(ConsumeMessageContext* context) {
88+
if (context == NULL || context->getMsgList().empty()) {
89+
return;
90+
}
91+
92+
std::shared_ptr<TraceContext> subBeforeContext = context->getTraceContext();
93+
TraceContext subAfterContext;
94+
subAfterContext.setTraceType(SubAfter);
95+
subAfterContext.setRegionId(subBeforeContext->getRegionId());
96+
subAfterContext.setGroupName(subBeforeContext->getGroupName());
97+
subAfterContext.setRequestId(subBeforeContext->getRequestId());
98+
subAfterContext.setStatus(context->getSuccess());
99+
int costTime = static_cast<int>(UtilAll::currentTimeMillis() - subBeforeContext->getTimeStamp());
100+
subAfterContext.setCostTime(costTime);
101+
subAfterContext.setTraceBeanIndex(context->getMsgIndex());
102+
TraceBean bean = subBeforeContext->getTraceBeans()[subAfterContext.getTraceBeanIndex()];
103+
subAfterContext.setTraceBean(bean);
104+
105+
std::string topic = TraceContant::TRACE_TOPIC + subAfterContext.getRegionId();
106+
TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(&subAfterContext);
107+
MQMessage message(topic, ben.getTransData());
108+
message.setKeys(ben.getTransKey());
109+
110+
// send trace message async.
111+
context->getDefaultMQPushConsumer()->submitSendTraceRequest(message, consumeTraceCallback);
112+
return;
113+
}
114+
} // namespace rocketmq
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#ifndef __ROCKETMQ_CONSUME_MESSAGE_RPC_HOOK_IMPL_H__
18+
#define __ROCKETMQ_CONSUME_MESSAGE_RPC_HOOK_IMPL_H__
19+
20+
#include <string>
21+
#include "ConsumeMessageContext.h"
22+
#include "ConsumeMessageHook.h"
23+
namespace rocketmq {
24+
class ConsumeMessageHookImpl : public ConsumeMessageHook {
25+
public:
26+
virtual ~ConsumeMessageHookImpl() {}
27+
virtual std::string getHookName();
28+
virtual void executeHookBefore(ConsumeMessageContext* context);
29+
virtual void executeHookAfter(ConsumeMessageContext* context);
30+
};
31+
} // namespace rocketmq
32+
#endif

src/consumer/ConsumeMessageOrderlyService.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,27 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> p
185185
if (m_pConsumer->isUseNameSpaceMode()) {
186186
MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
187187
}
188+
ConsumeMessageContext consumeMessageContext;
189+
DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
190+
if (pConsumer) {
191+
if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
192+
consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
193+
consumeMessageContext.setConsumerGroup(pConsumer->getGroupName());
194+
consumeMessageContext.setMessageQueue(request->m_messageQueue);
195+
consumeMessageContext.setMsgList(msgs);
196+
consumeMessageContext.setSuccess(false);
197+
consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
198+
pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
199+
}
200+
}
188201
ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs);
189202
if (consumeStatus == RECONSUME_LATER) {
203+
if (pConsumer) {
204+
consumeMessageContext.setMsgIndex(0);
205+
consumeMessageContext.setStatus("RECONSUME_LATER");
206+
consumeMessageContext.setSuccess(false);
207+
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
208+
}
190209
if (msgs[0].getReconsumeTimes() <= 15) {
191210
msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
192211
request->makeMessageToCosumeAgain(msgs);
@@ -202,6 +221,12 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> p
202221
tryLockLaterAndReconsumeDelay(request, false, 5000);
203222
}
204223
} else {
224+
if (pConsumer) {
225+
consumeMessageContext.setMsgIndex(0);
226+
consumeMessageContext.setStatus("CONSUME_SUCCESS");
227+
consumeMessageContext.setSuccess(true);
228+
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
229+
}
205230
m_pConsumer->updateConsumeOffset(request->m_messageQueue, request->commit());
206231
}
207232
} else {

0 commit comments

Comments
 (0)