Skip to content

Commit 29848f9

Browse files
committed
Update: change method signature 'operationComplete' to 'onComplete' in AsyncCallbackWrap.
- change 'void operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest)' to 'onComplete(ResponseFuture *pResponseFuture)'
1 parent 59438f7 commit 29848f9

File tree

3 files changed

+97
-105
lines changed

3 files changed

+97
-105
lines changed

src/common/AsyncCallbackWrap.cpp

Lines changed: 77 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -27,56 +27,49 @@
2727

2828
namespace rocketmq {
2929
//<!***************************************************************************
30-
AsyncCallbackWrap::AsyncCallbackWrap(AsyncCallback* pAsyncCallback,
31-
MQClientAPIImpl* pclientAPI)
32-
: m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pclientAPI) {}
30+
AsyncCallbackWrap::AsyncCallbackWrap(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI)
31+
: m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pClientAPI) {}
3332

3433
AsyncCallbackWrap::~AsyncCallbackWrap() {
35-
m_pAsyncCallBack = NULL;
36-
m_pClientAPI = NULL;
34+
m_pAsyncCallBack = nullptr;
35+
m_pClientAPI = nullptr;
3736
}
3837

3938
//<!************************************************************************
40-
SendCallbackWrap::SendCallbackWrap(const string& brokerName,
41-
const MQMessage& msg,
42-
AsyncCallback* pAsyncCallback,
43-
MQClientAPIImpl* pclientAPI)
44-
: AsyncCallbackWrap(pAsyncCallback, pclientAPI),
39+
SendCallbackWrap::SendCallbackWrap(
40+
const string &brokerName, const MQMessage &msg, AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI)
41+
: AsyncCallbackWrap(pAsyncCallback, pClientAPI),
4542
m_msg(msg),
4643
m_brokerName(brokerName) {}
4744

4845
void SendCallbackWrap::onException() {
49-
if (m_pAsyncCallBack == NULL) return;
46+
if (m_pAsyncCallBack == nullptr) return;
5047

51-
SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
48+
auto *pCallback = static_cast<SendCallback *>(m_pAsyncCallBack);
5249
if (pCallback) {
5350
unique_ptr<MQException> exception(new MQException(
54-
"send msg failed due to wait response timeout or network error", -1,
55-
__FILE__, __LINE__));
51+
"send msg failed due to wait response timeout or network error", -1, __FILE__, __LINE__));
5652
pCallback->onException(*exception);
5753
if (pCallback->getSendCallbackType() == autoDeleteSendCallback) {
5854
deleteAndZero(pCallback);
5955
}
6056
}
6157
}
6258

63-
void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
64-
bool bProducePullRequest) {
59+
void SendCallbackWrap::onComplete(ResponseFuture *pResponseFuture) {
6560
unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
6661

67-
if (m_pAsyncCallBack == NULL) {
62+
if (m_pAsyncCallBack == nullptr) {
6863
return;
6964
}
7065
int opaque = pResponseFuture->getOpaque();
71-
SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
66+
auto *pCallback = static_cast<SendCallback *>(m_pAsyncCallBack);
7267

7368
if (!pResponse) {
74-
string err = "unknow reseaon";
69+
string err = "unknown reason";
7570
if (!pResponseFuture->isSendRequestOK()) {
7671
err = "send request failed";
77-
7872
} else if (pResponseFuture->isTimeOut()) {
79-
// pResponseFuture->setAsyncResponseFlag();
8073
err = "wait response timeout";
8174
}
8275
if (pCallback) {
@@ -86,44 +79,46 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
8679
LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
8780
} else {
8881
try {
89-
SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
90-
if (pCallback) {
91-
LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
92-
pCallback->onSuccess(ret);
93-
}
94-
} catch (MQException& e) {
95-
LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
96-
97-
//broker may return exception, need consider retry send
98-
int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
99-
int retryTimes = pResponseFuture->getRetrySendTimes();
100-
if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
101-
102-
int64 left_timeout_ms = pResponseFuture->leftTime();
103-
string brokerAddr = pResponseFuture->getBrokerAddr();
104-
const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
105-
retryTimes += 1;
106-
LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
107-
opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
108-
109-
bool exception_flag = false;
110-
try {
111-
m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
112-
} catch (MQClientException& e) {
113-
LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, retryTimes, m_msg.toString().data());
114-
exception_flag = true;
115-
}
116-
117-
if (exception_flag == false) {
118-
return; //send retry again, here need return
119-
}
120-
}
121-
122-
if (pCallback) {
123-
MQException exception("process send response error", -1, __FILE__,
124-
__LINE__);
125-
pCallback->onException(exception);
126-
}
82+
SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
83+
if (pCallback) {
84+
LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d",
85+
opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
86+
pCallback->onSuccess(ret);
87+
}
88+
} catch (MQException &e) {
89+
LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
90+
91+
//broker may return exception, need consider retry send
92+
int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
93+
int retryTimes = pResponseFuture->getRetrySendTimes();
94+
if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
95+
96+
int64 left_timeout_ms = pResponseFuture->leftTime();
97+
string brokerAddr = pResponseFuture->getBrokerAddr();
98+
auto &requestCommand = const_cast<RemotingCommand &>(pResponseFuture->getRequestCommand());
99+
retryTimes += 1;
100+
LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
101+
opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
102+
103+
bool exception_flag = false;
104+
try {
105+
m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, requestCommand,
106+
pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
107+
} catch (MQClientException &e) {
108+
LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again",
109+
e.what(), opaque, retryTimes, m_msg.toString().data());
110+
exception_flag = true;
111+
}
112+
113+
if (!exception_flag) {
114+
return; //send retry again, here need return
115+
}
116+
}
117+
118+
if (pCallback) {
119+
MQException exception("process send response error", -1, __FILE__, __LINE__);
120+
pCallback->onException(exception);
121+
}
127122
}
128123
}
129124
if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
@@ -132,18 +127,17 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
132127
}
133128

134129
//<!************************************************************************
135-
PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback,
136-
MQClientAPIImpl* pclientAPI, void* pArg)
137-
: AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
138-
m_pArg = *static_cast<AsyncArg*>(pArg);
130+
PullCallbackWarp::PullCallbackWarp(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI, void *pArg)
131+
: AsyncCallbackWrap(pAsyncCallback, pClientAPI) {
132+
m_pArg = *static_cast<AsyncArg *>(pArg);
139133
}
140134

141-
PullCallbackWarp::~PullCallbackWarp() {}
135+
PullCallbackWarp::~PullCallbackWarp() = default;
142136

143137
void PullCallbackWarp::onException() {
144-
if (m_pAsyncCallBack == NULL) return;
138+
if (m_pAsyncCallBack == nullptr) return;
145139

146-
PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
140+
auto *pCallback = static_cast<PullCallback *>(m_pAsyncCallBack);
147141
if (pCallback) {
148142
MQException exception("wait response timeout", -1, __FILE__, __LINE__);
149143
pCallback->onException(exception);
@@ -152,43 +146,42 @@ void PullCallbackWarp::onException() {
152146
}
153147
}
154148

155-
void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture,
156-
bool bProducePullRequest) {
149+
void PullCallbackWarp::onComplete(ResponseFuture *pResponseFuture) {
157150
unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
158-
if (m_pAsyncCallBack == NULL) {
151+
if (m_pAsyncCallBack == nullptr) {
159152
LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
160153
return;
161154
}
162-
PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
155+
auto *pCallback = static_cast<PullCallback *>(m_pAsyncCallBack);
163156
if (!pResponse) {
164-
string err = "unknow reseaon";
157+
string err = "unknown reason";
165158
if (!pResponseFuture->isSendRequestOK()) {
166159
err = "send request failed";
167-
168160
} else if (pResponseFuture->isTimeOut()) {
169-
// pResponseFuture->setAsyncResponseFlag();
170161
err = "wait response timeout";
171162
}
172163
MQException exception(err, -1, __FILE__, __LINE__);
173-
LOG_ERROR("Async pull exception of opaque:%d",
174-
pResponseFuture->getOpaque());
175-
if (pCallback && bProducePullRequest) pCallback->onException(exception);
164+
LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque());
165+
if (pCallback) {
166+
pCallback->onException(exception);
167+
}
176168
} else {
177169
try {
178170
if (m_pArg.pPullWrapper) {
179-
unique_ptr<PullResult> pullResult(
180-
m_pClientAPI->processPullResponse(pResponse.get()));
181-
PullResult result = m_pArg.pPullWrapper->processPullResult(
182-
m_pArg.mq, pullResult.get(), &m_pArg.subData);
183-
if (pCallback)
184-
pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest);
171+
unique_ptr<PullResult> pullResult(m_pClientAPI->processPullResponse(pResponse.get()));
172+
PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData);
173+
if (pCallback) {
174+
pCallback->onSuccess(m_pArg.mq, result, true);
175+
}
185176
} else {
186177
LOG_ERROR("pPullWrapper had been destroyed with consumer");
187178
}
188-
} catch (MQException& e) {
179+
} catch (MQException &e) {
189180
LOG_ERROR(e.what());
190181
MQException exception("pullResult error", -1, __FILE__, __LINE__);
191-
if (pCallback && bProducePullRequest) pCallback->onException(exception);
182+
if (pCallback) {
183+
pCallback->onException(exception);
184+
}
192185
}
193186
}
194187
}

src/common/AsyncCallbackWrap.h

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class ResponseFuture;
2929
class MQClientAPIImpl;
3030
class DefaultMQProducer;
3131
class SendMessageRequestHeader;
32+
3233
//<!***************************************************************************
3334
enum asyncCallBackType {
3435
asyncCallbackWrap = 0,
@@ -38,29 +39,27 @@ enum asyncCallBackType {
3839

3940
struct AsyncCallbackWrap {
4041
public:
41-
AsyncCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI);
42+
AsyncCallbackWrap(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI);
4243
virtual ~AsyncCallbackWrap();
43-
virtual void operationComplete(ResponseFuture* pResponseFuture,
44-
bool bProducePullRequest) = 0;
44+
virtual void onComplete(ResponseFuture *pResponseFuture) = 0;
4545
virtual void onException() = 0;
4646
virtual asyncCallBackType getCallbackType() = 0;
4747

4848
protected:
49-
AsyncCallback* m_pAsyncCallBack;
50-
MQClientAPIImpl* m_pClientAPI;
49+
AsyncCallback *m_pAsyncCallBack;
50+
MQClientAPIImpl *m_pClientAPI;
5151
};
5252

5353
//<!************************************************************************
5454
class SendCallbackWrap : public AsyncCallbackWrap {
5555
public:
56-
SendCallbackWrap(const string& brokerName, const MQMessage& msg,
57-
AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI);
56+
SendCallbackWrap(const string &brokerName, const MQMessage &msg,
57+
AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI);
5858

59-
virtual ~SendCallbackWrap(){};
60-
virtual void operationComplete(ResponseFuture* pResponseFuture,
61-
bool bProducePullRequest);
62-
virtual void onException();
63-
virtual asyncCallBackType getCallbackType() { return sendCallbackWrap; }
59+
~SendCallbackWrap() override = default;;
60+
void onComplete(ResponseFuture *pResponseFuture) override;
61+
void onException() override;
62+
asyncCallBackType getCallbackType() override { return sendCallbackWrap; }
6463

6564
private:
6665
MQMessage m_msg;
@@ -70,13 +69,11 @@ class SendCallbackWrap : public AsyncCallbackWrap {
7069
//<!***************************************************************************
7170
class PullCallbackWarp : public AsyncCallbackWrap {
7271
public:
73-
PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI,
74-
void* pArg);
75-
virtual ~PullCallbackWarp();
76-
virtual void operationComplete(ResponseFuture* pResponseFuture,
77-
bool bProducePullRequest);
78-
virtual void onException();
79-
virtual asyncCallBackType getCallbackType() { return pullCallbackWarp; }
72+
PullCallbackWarp(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI, void *pArg);
73+
~PullCallbackWarp() override;
74+
void onComplete(ResponseFuture *pResponseFuture) override;
75+
void onException() override;
76+
asyncCallBackType getCallbackType() override { return pullCallbackWarp; }
8077

8178
private:
8279
AsyncArg m_pArg;

src/transport/ResponseFuture.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ ResponseFuture::~ResponseFuture() {
4747
*/
4848
}
4949

50-
void ResponseFuture::releaseThreadCondition() { m_defaultEvent.notify_all(); }
50+
void ResponseFuture::releaseThreadCondition() {
51+
m_defaultEvent.notify_all();
52+
}
5153

5254
RemotingCommand *ResponseFuture::waitResponse(int timeoutMillis) {
5355
boost::unique_lock<boost::mutex> eventLock(m_defaultEventLock);
@@ -105,7 +107,7 @@ void ResponseFuture::executeInvokeCallback() {
105107
deleteAndZero(m_pResponseCommand);
106108
return;
107109
} else {
108-
m_pCallbackWrap->operationComplete(this, true);
110+
m_pCallbackWrap->onComplete(this);
109111
}
110112
}
111113

0 commit comments

Comments
 (0)