Skip to content

Commit dc602a7

Browse files
committed
Update: change method signature 'operationComplete' to 'onComplete' in AsyncCallbackWrap.
- change 'void operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest)' to 'onComplete(ResponseFuture *pResponseFuture)'
1 parent 4a32838 commit dc602a7

File tree

3 files changed

+113
-113
lines changed

3 files changed

+113
-113
lines changed

src/common/AsyncCallbackWrap.cpp

Lines changed: 91 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -27,168 +27,169 @@
2727

2828
namespace rocketmq {
2929
//<!***************************************************************************
30-
AsyncCallbackWrap::AsyncCallbackWrap(AsyncCallback* pAsyncCallback,
31-
MQClientAPIImpl* pclientAPI)
32-
: m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pclientAPI) {}
30+
AsyncCallbackWrap::AsyncCallbackWrap(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI)
31+
: m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pClientAPI) {}
3332

3433
AsyncCallbackWrap::~AsyncCallbackWrap() {
35-
m_pAsyncCallBack = NULL;
36-
m_pClientAPI = NULL;
34+
m_pAsyncCallBack = nullptr;
35+
m_pClientAPI = nullptr;
3736
}
3837

3938
//<!************************************************************************
40-
SendCallbackWrap::SendCallbackWrap(const string& brokerName,
41-
const MQMessage& msg,
42-
AsyncCallback* pAsyncCallback,
43-
MQClientAPIImpl* pclientAPI)
44-
: AsyncCallbackWrap(pAsyncCallback, pclientAPI),
45-
m_msg(msg),
46-
m_brokerName(brokerName) {}
39+
SendCallbackWrap::SendCallbackWrap(
40+
const string &brokerName, const MQMessage &msg, SendCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI)
41+
: AsyncCallbackWrap(pAsyncCallback, pClientAPI), m_msg(msg), m_brokerName(brokerName) {}
4742

4843
void SendCallbackWrap::onException() {
49-
if (m_pAsyncCallBack == NULL) return;
44+
if (m_pAsyncCallBack == nullptr) {
45+
return;
46+
}
5047

51-
SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
52-
if (pCallback) {
48+
auto *pCallback = static_cast<SendCallback *>(m_pAsyncCallBack);
49+
if (pCallback != nullptr) {
5350
unique_ptr<MQException> exception(new MQException(
54-
"send msg failed due to wait response timeout or network error", -1,
55-
__FILE__, __LINE__));
51+
"send msg failed due to wait response timeout or network error", -1, __FILE__, __LINE__));
5652
pCallback->onException(*exception);
5753
if (pCallback->getSendCallbackType() == autoDeleteSendCallback) {
5854
deleteAndZero(pCallback);
5955
}
6056
}
6157
}
6258

63-
void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
64-
bool bProducePullRequest) {
59+
void SendCallbackWrap::onComplete(ResponseFuture *pResponseFuture) {
6560
unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
6661

67-
if (m_pAsyncCallBack == NULL) {
62+
if (m_pAsyncCallBack == nullptr) {
6863
return;
6964
}
7065
int opaque = pResponseFuture->getOpaque();
71-
SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
66+
auto *pCallback = static_cast<SendCallback *>(m_pAsyncCallBack);
7267

7368
if (!pResponse) {
74-
string err = "unknow reseaon";
69+
string err = "unknown reason";
7570
if (!pResponseFuture->isSendRequestOK()) {
7671
err = "send request failed";
77-
7872
} else if (pResponseFuture->isTimeOut()) {
79-
// pResponseFuture->setAsyncResponseFlag();
8073
err = "wait response timeout";
8174
}
82-
if (pCallback) {
75+
76+
if (pCallback != nullptr) {
8377
MQException exception(err, -1, __FILE__, __LINE__);
8478
pCallback->onException(exception);
8579
}
8680
LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
8781
} else {
8882
try {
89-
SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
90-
if (pCallback) {
91-
LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
92-
pCallback->onSuccess(ret);
93-
}
94-
} catch (MQException& e) {
95-
LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
96-
97-
//broker may return exception, need consider retry send
98-
int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
99-
int retryTimes = pResponseFuture->getRetrySendTimes();
100-
if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
101-
102-
int64 left_timeout_ms = pResponseFuture->leftTime();
103-
string brokerAddr = pResponseFuture->getBrokerAddr();
104-
const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
105-
retryTimes += 1;
106-
LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
107-
opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
108-
109-
bool exception_flag = false;
110-
try {
111-
m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
112-
} catch (MQClientException& e) {
113-
LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, retryTimes, m_msg.toString().data());
114-
exception_flag = true;
115-
}
116-
117-
if (exception_flag == false) {
118-
return; //send retry again, here need return
119-
}
120-
}
121-
122-
if (pCallback) {
123-
MQException exception("process send response error", -1, __FILE__,
124-
__LINE__);
125-
pCallback->onException(exception);
126-
}
83+
SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
84+
85+
if (pCallback != nullptr) {
86+
LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d",
87+
opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
88+
pCallback->onSuccess(ret);
89+
}
90+
} catch (MQException &e) {
91+
LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
92+
93+
//broker may return exception, need consider retry send
94+
int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
95+
int retryTimes = pResponseFuture->getRetrySendTimes();
96+
if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
97+
98+
int64 left_timeout_ms = pResponseFuture->leftTime();
99+
string brokerAddr = pResponseFuture->getBrokerAddr();
100+
auto &requestCommand = const_cast<RemotingCommand &>(pResponseFuture->getRequestCommand());
101+
retryTimes += 1;
102+
LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
103+
opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
104+
105+
bool exception_flag = false;
106+
try {
107+
m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, requestCommand,
108+
pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
109+
} catch (MQClientException &e) {
110+
LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again",
111+
e.what(), opaque, retryTimes, m_msg.toString().data());
112+
exception_flag = true;
113+
}
114+
115+
if (!exception_flag) {
116+
return; //send retry again, here need return
117+
}
118+
}
119+
120+
if (pCallback != nullptr) {
121+
MQException exception("process send response error", -1, __FILE__, __LINE__);
122+
pCallback->onException(exception);
123+
}
127124
}
128125
}
129-
if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
126+
127+
if (pCallback != nullptr && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
130128
deleteAndZero(pCallback);
131129
}
132130
}
133131

134132
//<!************************************************************************
135-
PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback,
136-
MQClientAPIImpl* pclientAPI, void* pArg)
137-
: AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
138-
m_pArg = *static_cast<AsyncArg*>(pArg);
133+
PullCallbackWarp::PullCallbackWarp(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI, void *pArg)
134+
: AsyncCallbackWrap(pAsyncCallback, pClientAPI) {
135+
m_pArg = *static_cast<AsyncArg *>(pArg);
139136
}
140137

141-
PullCallbackWarp::~PullCallbackWarp() {}
138+
PullCallbackWarp::~PullCallbackWarp() = default;
142139

143140
void PullCallbackWarp::onException() {
144-
if (m_pAsyncCallBack == NULL) return;
141+
if (m_pAsyncCallBack == nullptr) {
142+
return;
143+
}
145144

146-
PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
147-
if (pCallback) {
145+
auto *pCallback = static_cast<PullCallback *>(m_pAsyncCallBack);
146+
if (pCallback != nullptr) {
148147
MQException exception("wait response timeout", -1, __FILE__, __LINE__);
149148
pCallback->onException(exception);
150149
} else {
151150
LOG_ERROR("PullCallback is NULL, AsyncPull could not continue");
152151
}
153152
}
154153

155-
void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture,
156-
bool bProducePullRequest) {
154+
void PullCallbackWarp::onComplete(ResponseFuture *pResponseFuture) {
157155
unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
158-
if (m_pAsyncCallBack == NULL) {
156+
if (m_pAsyncCallBack == nullptr) {
159157
LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
160158
return;
161159
}
162-
PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
160+
auto *pCallback = static_cast<PullCallback *>(m_pAsyncCallBack);
163161
if (!pResponse) {
164-
string err = "unknow reseaon";
162+
string err = "unknown reason";
165163
if (!pResponseFuture->isSendRequestOK()) {
166164
err = "send request failed";
167-
168165
} else if (pResponseFuture->isTimeOut()) {
169-
// pResponseFuture->setAsyncResponseFlag();
170166
err = "wait response timeout";
171167
}
172168
MQException exception(err, -1, __FILE__, __LINE__);
173-
LOG_ERROR("Async pull exception of opaque:%d",
174-
pResponseFuture->getOpaque());
175-
if (pCallback && bProducePullRequest) pCallback->onException(exception);
169+
LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque());
170+
171+
if (pCallback != nullptr) {
172+
pCallback->onException(exception);
173+
}
176174
} else {
177175
try {
178176
if (m_pArg.pPullWrapper) {
179-
unique_ptr<PullResult> pullResult(
180-
m_pClientAPI->processPullResponse(pResponse.get()));
181-
PullResult result = m_pArg.pPullWrapper->processPullResult(
182-
m_pArg.mq, pullResult.get(), &m_pArg.subData);
183-
if (pCallback)
184-
pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest);
177+
unique_ptr<PullResult> pullResult(m_pClientAPI->processPullResponse(pResponse.get()));
178+
PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData);
179+
180+
if (pCallback != nullptr) {
181+
pCallback->onSuccess(m_pArg.mq, result, true);
182+
}
185183
} else {
186184
LOG_ERROR("pPullWrapper had been destroyed with consumer");
187185
}
188-
} catch (MQException& e) {
186+
} catch (MQException &e) {
189187
LOG_ERROR(e.what());
190188
MQException exception("pullResult error", -1, __FILE__, __LINE__);
191-
if (pCallback && bProducePullRequest) pCallback->onException(exception);
189+
190+
if (pCallback != nullptr) {
191+
pCallback->onException(exception);
192+
}
192193
}
193194
}
194195
}

src/common/AsyncCallbackWrap.h

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class ResponseFuture;
2929
class MQClientAPIImpl;
3030
class DefaultMQProducer;
3131
class SendMessageRequestHeader;
32+
3233
//<!***************************************************************************
3334
enum asyncCallBackType {
3435
asyncCallbackWrap = 0,
@@ -38,45 +39,41 @@ enum asyncCallBackType {
3839

3940
struct AsyncCallbackWrap {
4041
public:
41-
AsyncCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI);
42+
AsyncCallbackWrap(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI);
4243
virtual ~AsyncCallbackWrap();
43-
virtual void operationComplete(ResponseFuture* pResponseFuture,
44-
bool bProducePullRequest) = 0;
44+
virtual void onComplete(ResponseFuture *pResponseFuture) = 0;
4545
virtual void onException() = 0;
46-
virtual asyncCallBackType getCallbackType() = 0;
46+
virtual asyncCallBackType getCallbackType() const = 0;
4747

4848
protected:
49-
AsyncCallback* m_pAsyncCallBack;
50-
MQClientAPIImpl* m_pClientAPI;
49+
AsyncCallback *m_pAsyncCallBack;
50+
MQClientAPIImpl *m_pClientAPI;
5151
};
5252

5353
//<!************************************************************************
5454
class SendCallbackWrap : public AsyncCallbackWrap {
5555
public:
56-
SendCallbackWrap(const string& brokerName, const MQMessage& msg,
57-
AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI);
56+
SendCallbackWrap(const string &brokerName, const MQMessage &msg,
57+
AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI);
5858

59-
virtual ~SendCallbackWrap(){};
60-
virtual void operationComplete(ResponseFuture* pResponseFuture,
61-
bool bProducePullRequest);
62-
virtual void onException();
63-
virtual asyncCallBackType getCallbackType() { return sendCallbackWrap; }
59+
~SendCallbackWrap() override = default;;
60+
void onComplete(ResponseFuture *pResponseFuture) override;
61+
void onException() override;
62+
asyncCallBackType getCallbackType() const override { return sendCallbackWrap; }
6463

6564
private:
6665
MQMessage m_msg;
67-
string m_brokerName;
66+
std::string m_brokerName;
6867
};
6968

7069
//<!***************************************************************************
7170
class PullCallbackWarp : public AsyncCallbackWrap {
7271
public:
73-
PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI,
74-
void* pArg);
75-
virtual ~PullCallbackWarp();
76-
virtual void operationComplete(ResponseFuture* pResponseFuture,
77-
bool bProducePullRequest);
78-
virtual void onException();
79-
virtual asyncCallBackType getCallbackType() { return pullCallbackWarp; }
72+
PullCallbackWarp(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI, void *pArg);
73+
~PullCallbackWarp() override;
74+
void onComplete(ResponseFuture *pResponseFuture) override;
75+
void onException() override;
76+
asyncCallBackType getCallbackType() const override { return pullCallbackWarp; }
8077

8178
private:
8279
AsyncArg m_pArg;

src/transport/ResponseFuture.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ ResponseFuture::~ResponseFuture() {
4747
*/
4848
}
4949

50-
void ResponseFuture::releaseThreadCondition() { m_defaultEvent.notify_all(); }
50+
void ResponseFuture::releaseThreadCondition() {
51+
m_defaultEvent.notify_all();
52+
}
5153

5254
RemotingCommand *ResponseFuture::waitResponse(int timeoutMillis) {
5355
boost::unique_lock<boost::mutex> eventLock(m_defaultEventLock);
@@ -105,7 +107,7 @@ void ResponseFuture::executeInvokeCallback() {
105107
deleteAndZero(m_pResponseCommand);
106108
return;
107109
} else {
108-
m_pCallbackWrap->operationComplete(this, true);
110+
m_pCallbackWrap->onComplete(this);
109111
}
110112
}
111113

0 commit comments

Comments
 (0)