Skip to content

Commit b5b3b35

Browse files
authored
feat(client): add timer to clean off line broker and test case. (#222)
* fix(tranport): using recursive mutex to avoid death lock * fix(transport): add test case for client factory. * feat(unittest): open test case for client api impl * feat(unittest): open test case for client api impl
1 parent 2faaa1f commit b5b3b35

12 files changed

+557
-191
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ bin
44
build
55
libs/signature/lib
66
tmp_*
7+
Testing

build.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,11 @@ ExecutionTesting() {
368368
fi
369369
echo "############# unit test start ###########"
370370
cd ${build_dir}
371-
make test
371+
if [ $verbose -eq 0 ]; then
372+
ctest
373+
else
374+
ctest -V
375+
fi
372376
if [ $? -ne 0 ]; then
373377
echo "############# unit test failed ###########"
374378
exit 1

src/MQClientAPIImpl.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,9 @@ SendResult MQClientAPIImpl::sendMessage(const string& addr,
252252
return SendResult();
253253
}
254254

255-
void MQClientAPIImpl::sendHearbeat(const string& addr,
256-
HeartbeatData* pHeartbeatData,
257-
const SessionCredentials& sessionCredentials) {
255+
void MQClientAPIImpl::sendHeartbeat(const string& addr,
256+
HeartbeatData* pHeartbeatData,
257+
const SessionCredentials& sessionCredentials) {
258258
RemotingCommand request(HEART_BEAT, NULL);
259259

260260
string body;
@@ -265,7 +265,9 @@ void MQClientAPIImpl::sendHearbeat(const string& addr,
265265
request.Encode();
266266

267267
if (m_pRemotingClient->invokeHeartBeat(addr, request)) {
268-
LOG_INFO("sendheartbeat to broker:%s success", addr.c_str());
268+
LOG_DEBUG("sendHeartbeat to broker:%s success", addr.c_str());
269+
} else {
270+
LOG_WARN("sendHeartbeat to broker:%s failed", addr.c_str());
269271
}
270272
}
271273

@@ -314,6 +316,7 @@ TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const string& t
314316
}
315317
}
316318
case TOPIC_NOT_EXIST: {
319+
LOG_WARN("Get topic[%s] route failed [TOPIC_NOT_EXIST].", topic.c_str());
317320
return NULL;
318321
}
319322
default:
@@ -323,6 +326,7 @@ TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const string& t
323326
return NULL;
324327
}
325328
}
329+
LOG_WARN("Get topic[%s] route failed [Null Response].", topic.c_str());
326330
return NULL;
327331
}
328332

src/MQClientAPIImpl.h

Lines changed: 142 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -49,150 +49,152 @@ class MQClientAPIImpl {
4949
uint64_t tcpTransportTryLockTimeout,
5050
string unitName);
5151
virtual ~MQClientAPIImpl();
52-
void stopAllTcpTransportThread();
53-
bool writeDataToFile(string filename, string data, bool isSync);
54-
string fetchNameServerAddr(const string& NSDomain);
55-
void updateNameServerAddr(const string& addrs);
56-
57-
void callSignatureBeforeRequest(const string& addr,
58-
RemotingCommand& request,
59-
const SessionCredentials& session_credentials);
60-
void createTopic(const string& addr,
61-
const string& defaultTopic,
62-
TopicConfig topicConfig,
63-
const SessionCredentials& sessionCredentials);
64-
void endTransactionOneway(std::string addr,
65-
EndTransactionRequestHeader* requestHeader,
66-
std::string remark,
67-
const SessionCredentials& sessionCredentials);
68-
69-
SendResult sendMessage(const string& addr,
70-
const string& brokerName,
71-
const MQMessage& msg,
72-
SendMessageRequestHeader* pRequestHeader,
73-
int timeoutMillis,
74-
int maxRetrySendTimes,
75-
int communicationMode,
76-
SendCallback* pSendCallback,
77-
const SessionCredentials& sessionCredentials);
78-
79-
PullResult* pullMessage(const string& addr,
80-
PullMessageRequestHeader* pRequestHeader,
81-
int timeoutMillis,
82-
int communicationMode,
83-
PullCallback* pullCallback,
84-
void* pArg,
85-
const SessionCredentials& sessionCredentials);
86-
87-
void sendHearbeat(const string& addr, HeartbeatData* pHeartbeatData, const SessionCredentials& sessionCredentials);
88-
89-
void unregisterClient(const string& addr,
90-
const string& clientID,
91-
const string& producerGroup,
92-
const string& consumerGroup,
93-
const SessionCredentials& sessionCredentials);
94-
95-
TopicRouteData* getTopicRouteInfoFromNameServer(const string& topic,
96-
int timeoutMillis,
97-
const SessionCredentials& sessionCredentials);
98-
99-
TopicList* getTopicListFromNameServer(const SessionCredentials& sessionCredentials);
100-
101-
int wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName, int timeoutMillis);
102-
103-
void deleteTopicInBroker(const string& addr, const string& topic, int timeoutMillis);
104-
105-
void deleteTopicInNameServer(const string& addr, const string& topic, int timeoutMillis);
106-
107-
void deleteSubscriptionGroup(const string& addr, const string& groupName, int timeoutMillis);
108-
109-
string getKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
110-
111-
KVTable getKVListByNamespace(const string& projectNamespace, int timeoutMillis);
112-
113-
void deleteKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
114-
115-
SendResult processSendResponse(const string& brokerName, const MQMessage& msg, RemotingCommand* pResponse);
116-
117-
PullResult* processPullResponse(RemotingCommand* pResponse);
118-
119-
int64 getMinOffset(const string& addr,
120-
const string& topic,
121-
int queueId,
122-
int timeoutMillis,
123-
const SessionCredentials& sessionCredentials);
124-
125-
int64 getMaxOffset(const string& addr,
126-
const string& topic,
127-
int queueId,
128-
int timeoutMillis,
129-
const SessionCredentials& sessionCredentials);
130-
131-
int64 searchOffset(const string& addr,
132-
const string& topic,
133-
int queueId,
134-
uint64_t timestamp,
135-
int timeoutMillis,
136-
const SessionCredentials& sessionCredentials);
137-
138-
MQMessageExt* viewMessage(const string& addr,
139-
int64 phyoffset,
140-
int timeoutMillis,
141-
const SessionCredentials& sessionCredentials);
142-
143-
int64 getEarliestMsgStoretime(const string& addr,
144-
const string& topic,
145-
int queueId,
146-
int timeoutMillis,
147-
const SessionCredentials& sessionCredentials);
52+
virtual void stopAllTcpTransportThread();
53+
virtual bool writeDataToFile(string filename, string data, bool isSync);
54+
virtual string fetchNameServerAddr(const string& NSDomain);
55+
virtual void updateNameServerAddr(const string& addrs);
56+
57+
virtual void callSignatureBeforeRequest(const string& addr,
58+
RemotingCommand& request,
59+
const SessionCredentials& session_credentials);
60+
virtual void createTopic(const string& addr,
61+
const string& defaultTopic,
62+
TopicConfig topicConfig,
63+
const SessionCredentials& sessionCredentials);
64+
virtual void endTransactionOneway(std::string addr,
65+
EndTransactionRequestHeader* requestHeader,
66+
std::string remark,
67+
const SessionCredentials& sessionCredentials);
68+
69+
virtual SendResult sendMessage(const string& addr,
70+
const string& brokerName,
71+
const MQMessage& msg,
72+
SendMessageRequestHeader* pRequestHeader,
73+
int timeoutMillis,
74+
int maxRetrySendTimes,
75+
int communicationMode,
76+
SendCallback* pSendCallback,
77+
const SessionCredentials& sessionCredentials);
78+
79+
virtual PullResult* pullMessage(const string& addr,
80+
PullMessageRequestHeader* pRequestHeader,
81+
int timeoutMillis,
82+
int communicationMode,
83+
PullCallback* pullCallback,
84+
void* pArg,
85+
const SessionCredentials& sessionCredentials);
86+
87+
virtual void sendHeartbeat(const string& addr,
88+
HeartbeatData* pHeartbeatData,
89+
const SessionCredentials& sessionCredentials);
14890

149-
void getConsumerIdListByGroup(const string& addr,
91+
virtual void unregisterClient(const string& addr,
92+
const string& clientID,
93+
const string& producerGroup,
15094
const string& consumerGroup,
151-
vector<string>& cids,
152-
int timeoutMillis,
15395
const SessionCredentials& sessionCredentials);
15496

155-
int64 queryConsumerOffset(const string& addr,
156-
QueryConsumerOffsetRequestHeader* pRequestHeader,
157-
int timeoutMillis,
158-
const SessionCredentials& sessionCredentials);
97+
virtual TopicRouteData* getTopicRouteInfoFromNameServer(const string& topic,
98+
int timeoutMillis,
99+
const SessionCredentials& sessionCredentials);
159100

160-
void updateConsumerOffset(const string& addr,
161-
UpdateConsumerOffsetRequestHeader* pRequestHeader,
162-
int timeoutMillis,
163-
const SessionCredentials& sessionCredentials);
101+
virtual TopicList* getTopicListFromNameServer(const SessionCredentials& sessionCredentials);
164102

165-
void updateConsumerOffsetOneway(const string& addr,
166-
UpdateConsumerOffsetRequestHeader* pRequestHeader,
167-
int timeoutMillis,
168-
const SessionCredentials& sessionCredentials);
103+
virtual int wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName, int timeoutMillis);
169104

170-
void consumerSendMessageBack(const string addr,
171-
MQMessageExt& msg,
172-
const string& consumerGroup,
173-
int delayLevel,
174-
int timeoutMillis,
175-
const SessionCredentials& sessionCredentials);
176-
177-
void lockBatchMQ(const string& addr,
178-
LockBatchRequestBody* requestBody,
179-
vector<MQMessageQueue>& mqs,
180-
int timeoutMillis,
181-
const SessionCredentials& sessionCredentials);
182-
183-
void unlockBatchMQ(const string& addr,
184-
UnlockBatchRequestBody* requestBody,
185-
int timeoutMillis,
186-
const SessionCredentials& sessionCredentials);
187-
188-
void sendMessageAsync(const string& addr,
189-
const string& brokerName,
190-
const MQMessage& msg,
191-
RemotingCommand& request,
192-
SendCallback* pSendCallback,
193-
int64 timeoutMilliseconds,
194-
int maxRetryTimes = 1,
195-
int retrySendTimes = 1);
105+
virtual void deleteTopicInBroker(const string& addr, const string& topic, int timeoutMillis);
106+
107+
virtual void deleteTopicInNameServer(const string& addr, const string& topic, int timeoutMillis);
108+
109+
virtual void deleteSubscriptionGroup(const string& addr, const string& groupName, int timeoutMillis);
110+
111+
virtual string getKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
112+
113+
virtual KVTable getKVListByNamespace(const string& projectNamespace, int timeoutMillis);
114+
115+
virtual void deleteKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
116+
117+
virtual SendResult processSendResponse(const string& brokerName, const MQMessage& msg, RemotingCommand* pResponse);
118+
119+
virtual PullResult* processPullResponse(RemotingCommand* pResponse);
120+
121+
virtual int64 getMinOffset(const string& addr,
122+
const string& topic,
123+
int queueId,
124+
int timeoutMillis,
125+
const SessionCredentials& sessionCredentials);
126+
127+
virtual int64 getMaxOffset(const string& addr,
128+
const string& topic,
129+
int queueId,
130+
int timeoutMillis,
131+
const SessionCredentials& sessionCredentials);
132+
133+
virtual int64 searchOffset(const string& addr,
134+
const string& topic,
135+
int queueId,
136+
uint64_t timestamp,
137+
int timeoutMillis,
138+
const SessionCredentials& sessionCredentials);
139+
140+
virtual MQMessageExt* viewMessage(const string& addr,
141+
int64 phyoffset,
142+
int timeoutMillis,
143+
const SessionCredentials& sessionCredentials);
144+
145+
virtual int64 getEarliestMsgStoretime(const string& addr,
146+
const string& topic,
147+
int queueId,
148+
int timeoutMillis,
149+
const SessionCredentials& sessionCredentials);
150+
151+
virtual void getConsumerIdListByGroup(const string& addr,
152+
const string& consumerGroup,
153+
vector<string>& cids,
154+
int timeoutMillis,
155+
const SessionCredentials& sessionCredentials);
156+
157+
virtual int64 queryConsumerOffset(const string& addr,
158+
QueryConsumerOffsetRequestHeader* pRequestHeader,
159+
int timeoutMillis,
160+
const SessionCredentials& sessionCredentials);
161+
162+
virtual void updateConsumerOffset(const string& addr,
163+
UpdateConsumerOffsetRequestHeader* pRequestHeader,
164+
int timeoutMillis,
165+
const SessionCredentials& sessionCredentials);
166+
167+
virtual void updateConsumerOffsetOneway(const string& addr,
168+
UpdateConsumerOffsetRequestHeader* pRequestHeader,
169+
int timeoutMillis,
170+
const SessionCredentials& sessionCredentials);
171+
172+
virtual void consumerSendMessageBack(const string addr,
173+
MQMessageExt& msg,
174+
const string& consumerGroup,
175+
int delayLevel,
176+
int timeoutMillis,
177+
const SessionCredentials& sessionCredentials);
178+
179+
virtual void lockBatchMQ(const string& addr,
180+
LockBatchRequestBody* requestBody,
181+
vector<MQMessageQueue>& mqs,
182+
int timeoutMillis,
183+
const SessionCredentials& sessionCredentials);
184+
185+
virtual void unlockBatchMQ(const string& addr,
186+
UnlockBatchRequestBody* requestBody,
187+
int timeoutMillis,
188+
const SessionCredentials& sessionCredentials);
189+
190+
virtual void sendMessageAsync(const string& addr,
191+
const string& brokerName,
192+
const MQMessage& msg,
193+
RemotingCommand& request,
194+
SendCallback* pSendCallback,
195+
int64 timeoutMilliseconds,
196+
int maxRetryTimes = 1,
197+
int retrySendTimes = 1);
196198

197199
private:
198200
SendResult sendMessageSync(const string& addr,
@@ -213,8 +215,10 @@ class MQClientAPIImpl {
213215
PullCallback* pullCallback,
214216
void* pArg);
215217

216-
private:
218+
protected:
217219
unique_ptr<TcpRemotingClient> m_pRemotingClient;
220+
221+
private:
218222
unique_ptr<TopAddressing> m_topAddressing;
219223
string m_nameSrvAddr;
220224
bool m_firstFetchNameSrv;

0 commit comments

Comments
 (0)