diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp old mode 100644 new mode 100755 index 52e037470..cb26fda60 --- a/src/common/AsyncCallbackWrap.cpp +++ b/src/common/AsyncCallbackWrap.cpp @@ -1,193 +1,193 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "AsyncCallbackWrap.h" -#include "Logging.h" -#include "MQClientAPIImpl.h" -#include "MQDecoder.h" -#include "MQMessageQueue.h" -#include "MQProtos.h" -#include "PullAPIWrapper.h" -#include "PullResultExt.h" -#include "ResponseFuture.h" - -namespace rocketmq { -//(m_pAsyncCallBack); - if (pCallback) { - unique_ptr exception( - new MQException("send msg failed due to wait response timeout or network error", -1, __FILE__, __LINE__)); - pCallback->onException(*exception); - if (pCallback->getSendCallbackType() == autoDeleteSendCallback) { - deleteAndZero(pCallback); - } - } -} - -void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) { - unique_ptr pResponse(pResponseFuture->getCommand()); - - if (m_pAsyncCallBack == NULL) { - return; - } - int opaque = pResponseFuture->getOpaque(); - SendCallback* pCallback = static_cast(m_pAsyncCallBack); - - if (!pResponse) { - string err = "unknow reseaon"; - if (!pResponseFuture->isSendRequestOK()) { - err = "send request failed"; - - } else if (pResponseFuture->isTimeOut()) { - // pResponseFuture->setAsyncResponseFlag(); - err = "wait response timeout"; - } - if (pCallback) { - MQException exception(err, -1, __FILE__, __LINE__); - pCallback->onException(exception); - } - LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque()); - } else { - try { - SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get()); - if (pCallback) { - LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", - opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes()); - pCallback->onSuccess(ret); - } - } catch (MQException& e) { - LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what()); - - // broker may return exception, need consider retry send - int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes(); - int retryTimes = pResponseFuture->getRetrySendTimes(); - if (pResponseFuture->getASyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) { - int64 left_timeout_ms = pResponseFuture->leftTime(); - string brokerAddr = pResponseFuture->getBrokerAddr(); - const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand(); - retryTimes += 1; - LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s", - opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data()); - - bool exception_flag = false; - try { - m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, - (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, - retryTimes); - } catch (MQClientException& e) { - LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, - retryTimes, m_msg.toString().data()); - exception_flag = true; - } - - if (exception_flag == false) { - return; // send retry again, here need return - } - } - - if (pCallback) { - MQException exception("process send response error", -1, __FILE__, __LINE__); - pCallback->onException(exception); - } - } - } - if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) { - deleteAndZero(pCallback); - } -} - -//(pArg); -} - -PullCallbackWarp::~PullCallbackWarp() {} - -void PullCallbackWarp::onException() { - if (m_pAsyncCallBack == NULL) - return; - - PullCallback* pCallback = static_cast(m_pAsyncCallBack); - if (pCallback) { - MQException exception("wait response timeout", -1, __FILE__, __LINE__); - pCallback->onException(exception); - } else { - LOG_ERROR("PullCallback is NULL, AsyncPull could not continue"); - } -} - -void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) { - unique_ptr pResponse(pResponseFuture->getCommand()); - if (m_pAsyncCallBack == NULL) { - LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue"); - return; - } - PullCallback* pCallback = static_cast(m_pAsyncCallBack); - if (!pResponse) { - string err = "unknow reseaon"; - if (!pResponseFuture->isSendRequestOK()) { - err = "send request failed"; - - } else if (pResponseFuture->isTimeOut()) { - // pResponseFuture->setAsyncResponseFlag(); - err = "wait response timeout"; - } - MQException exception(err, -1, __FILE__, __LINE__); - LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque()); - if (pCallback && bProducePullRequest) - pCallback->onException(exception); - } else { - try { - if (m_pArg.pPullWrapper) { - unique_ptr pullResult(m_pClientAPI->processPullResponse(pResponse.get())); - PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData); - if (pCallback) - pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest); - } else { - LOG_ERROR("pPullWrapper had been destroyed with consumer"); - } - } catch (MQException& e) { - LOG_ERROR(e.what()); - MQException exception("pullResult error", -1, __FILE__, __LINE__); - if (pCallback && bProducePullRequest) - pCallback->onException(exception); - } - } -} - -//(m_pAsyncCallBack); + if (pCallback) { + unique_ptr exception( + new MQException("send msg failed due to wait response timeout or network error", -1, __FILE__, __LINE__)); + pCallback->onException(*exception); + if (pCallback->getSendCallbackType() == autoDeleteSendCallback) { + deleteAndZero(pCallback); + } + } +} + +void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) { + unique_ptr pResponse(pResponseFuture->getCommand()); + + if (m_pAsyncCallBack == NULL) { + return; + } + int opaque = pResponseFuture->getOpaque(); + SendCallback* pCallback = static_cast(m_pAsyncCallBack); + + if (!pResponse) { + string err = "unknow reseaon"; + if (!pResponseFuture->isSendRequestOK()) { + err = "send request failed"; + + } else if (pResponseFuture->isTimeOut()) { + // pResponseFuture->setAsyncResponseFlag(); + err = "wait response timeout"; + } + if (pCallback) { + MQException exception(err, -1, __FILE__, __LINE__); + pCallback->onException(exception); + } + LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque()); + } else { + try { + SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get()); + if (pCallback) { + LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", + opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes()); + pCallback->onSuccess(ret); + } + } catch (MQException& e) { + LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what()); + + // broker may return exception, need consider retry send + int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes(); + int retryTimes = pResponseFuture->getRetrySendTimes(); + if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) { + int64 left_timeout_ms = pResponseFuture->leftTime(); + string brokerAddr = pResponseFuture->getBrokerAddr(); + const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand(); + retryTimes += 1; + LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s", + opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data()); + + bool exception_flag = false; + try { + m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, + (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, + retryTimes); + } catch (MQClientException& e) { + LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, + retryTimes, m_msg.toString().data()); + exception_flag = true; + } + + if (exception_flag == false) { + return; // send retry again, here need return + } + } + + if (pCallback) { + MQException exception("process send response error", -1, __FILE__, __LINE__); + pCallback->onException(exception); + } + } + } + if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) { + deleteAndZero(pCallback); + } +} + +//(pArg); +} + +PullCallbackWarp::~PullCallbackWarp() {} + +void PullCallbackWarp::onException() { + if (m_pAsyncCallBack == NULL) + return; + + PullCallback* pCallback = static_cast(m_pAsyncCallBack); + if (pCallback) { + MQException exception("wait response timeout", -1, __FILE__, __LINE__); + pCallback->onException(exception); + } else { + LOG_ERROR("PullCallback is NULL, AsyncPull could not continue"); + } +} + +void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) { + unique_ptr pResponse(pResponseFuture->getCommand()); + if (m_pAsyncCallBack == NULL) { + LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue"); + return; + } + PullCallback* pCallback = static_cast(m_pAsyncCallBack); + if (!pResponse) { + string err = "unknow reseaon"; + if (!pResponseFuture->isSendRequestOK()) { + err = "send request failed"; + + } else if (pResponseFuture->isTimeOut()) { + // pResponseFuture->setAsyncResponseFlag(); + err = "wait response timeout"; + } + MQException exception(err, -1, __FILE__, __LINE__); + LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque()); + if (pCallback && bProducePullRequest) + pCallback->onException(exception); + } else { + try { + if (m_pArg.pPullWrapper) { + unique_ptr pullResult(m_pClientAPI->processPullResponse(pResponse.get())); + PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData); + if (pCallback) + pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest); + } else { + LOG_ERROR("pPullWrapper had been destroyed with consumer"); + } + } catch (MQException& e) { + LOG_ERROR(e.what()); + MQException exception("pullResult error", -1, __FILE__, __LINE__); + if (pCallback && bProducePullRequest) + pCallback->onException(exception); + } + } +} + +// +#endif + +#include + +#include "Logging.h" +#include "UtilAll.h" + +namespace rocketmq { + +EventLoop* EventLoop::GetDefaultEventLoop() { + static EventLoop defaultEventLoop; + return &defaultEventLoop; +} + +EventLoop::EventLoop(const struct event_config* config, bool run_immediately) + : m_eventBase(nullptr), m_loopThread(nullptr), _is_running(false) { + // tell libevent support multi-threads +#ifdef WIN32 + evthread_use_windows_threads(); +#else + evthread_use_pthreads(); +#endif + + if (config == nullptr) { + m_eventBase = event_base_new(); + } else { + m_eventBase = event_base_new_with_config(config); + } + + if (m_eventBase == nullptr) { + // failure... + LOG_ERROR("Failed to create event base!"); + return; + } + + evthread_make_base_notifiable(m_eventBase); + + if (run_immediately) { + start(); + } +} + +EventLoop::~EventLoop() { + stop(); + + if (m_eventBase != nullptr) { + event_base_free(m_eventBase); + m_eventBase = nullptr; + } +} + +void EventLoop::start() { + if (m_loopThread == nullptr) { + // start event loop +#if !defined(WIN32) && !defined(__APPLE__) + string taskName = UtilAll::getProcessName(); + prctl(PR_SET_NAME, "EventLoop", 0, 0, 0); +#endif + m_loopThread = new std::thread(&EventLoop::runLoop, this); +#if !defined(WIN32) && !defined(__APPLE__) + prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); +#endif + } +} + +void EventLoop::stop() { + if (m_loopThread != nullptr /*&& m_loopThread.joinable()*/) { + _is_running = false; + m_loopThread->join(); + + delete m_loopThread; + m_loopThread = nullptr; + } +} + +void EventLoop::runLoop() { + _is_running = true; + + while (_is_running) { + int ret; + + ret = event_base_dispatch(m_eventBase); + // ret = event_base_loop(m_eventBase, EVLOOP_NONBLOCK); + + if (ret == 1) { + // no event + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +} + +#define OPT_UNLOCK_CALLBACKS (BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS) + +BufferEvent* EventLoop::createBufferEvent(socket_t fd, int options) { + struct bufferevent* event = bufferevent_socket_new(m_eventBase, fd, options); + if (event == nullptr) { + return nullptr; + } + + bool unlock = (options & OPT_UNLOCK_CALLBACKS) == OPT_UNLOCK_CALLBACKS; + + return new BufferEvent(event, unlock); +} + +BufferEvent::BufferEvent(struct bufferevent* event, bool unlockCallbacks) + : m_bufferEvent(event), + m_unlockCallbacks(unlockCallbacks), + m_readCallback(nullptr), + m_writeCallback(nullptr), + m_eventCallback(nullptr), + m_callbackTransport() { +#ifdef ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK + if (m_bufferEvent != nullptr) { + bufferevent_setcb(m_bufferEvent, read_callback, write_callback, event_callback, this); + } +#endif // ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK +} + +BufferEvent::~BufferEvent() { + if (m_bufferEvent != nullptr) { + // free function will set all callbacks to NULL first. + bufferevent_free(m_bufferEvent); + m_bufferEvent = nullptr; + } +} + +void BufferEvent::setCallback(BufferEventDataCallback readCallback, + BufferEventDataCallback writeCallback, + BufferEventEventCallback eventCallback, + std::shared_ptr transport) { + // use lock in bufferevent + bufferevent_lock(m_bufferEvent); + + // wrap callback + m_readCallback = readCallback; + m_writeCallback = writeCallback; + m_eventCallback = eventCallback; + m_callbackTransport = transport; + +#ifndef ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK + bufferevent_data_cb readcb = readCallback != nullptr ? read_callback : nullptr; + bufferevent_data_cb writecb = writeCallback != nullptr ? write_callback : nullptr; + bufferevent_event_cb eventcb = eventCallback != nullptr ? event_callback : nullptr; + + bufferevent_setcb(m_bufferEvent, readcb, writecb, eventcb, this); +#endif // ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK + + bufferevent_unlock(m_bufferEvent); +} + +void BufferEvent::read_callback(struct bufferevent* bev, void* ctx) { + auto event = static_cast(ctx); + + if (event->m_unlockCallbacks) + bufferevent_lock(event->m_bufferEvent); + + BufferEventDataCallback callback = event->m_readCallback; + std::shared_ptr transport = event->m_callbackTransport.lock(); + + if (event->m_unlockCallbacks) + bufferevent_unlock(event->m_bufferEvent); + + if (callback) { + callback(event, transport.get()); + } +} + +void BufferEvent::write_callback(struct bufferevent* bev, void* ctx) { + auto event = static_cast(ctx); + + if (event->m_unlockCallbacks) + bufferevent_lock(event->m_bufferEvent); + + BufferEventDataCallback callback = event->m_writeCallback; + std::shared_ptr transport = event->m_callbackTransport.lock(); + + if (event->m_unlockCallbacks) + bufferevent_unlock(event->m_bufferEvent); + + if (callback) { + callback(event, transport.get()); + } +} + +static std::string buildPeerAddrPort(socket_t fd) { + sockaddr_in addr; + socklen_t len = sizeof(addr); + + getpeername(fd, (struct sockaddr*)&addr, &len); + + LOG_DEBUG("socket: %d, addr: %s, port: %d", fd, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); + std::string addrPort(inet_ntoa(addr.sin_addr)); + addrPort.append(":"); + addrPort.append(UtilAll::to_string(ntohs(addr.sin_port))); + + return addrPort; +} + +void BufferEvent::event_callback(struct bufferevent* bev, short what, void* ctx) { + auto event = static_cast(ctx); + + if (what & BEV_EVENT_CONNECTED) { + socket_t fd = event->getfd(); + event->m_peerAddrPort = buildPeerAddrPort(fd); + } + + if (event->m_unlockCallbacks) + bufferevent_lock(event->m_bufferEvent); + + BufferEventEventCallback callback = event->m_eventCallback; + std::shared_ptr transport = event->m_callbackTransport.lock(); + + if (event->m_unlockCallbacks) + bufferevent_unlock(event->m_bufferEvent); + + if (callback) { + callback(event, what, transport.get()); + } +} + +} // namespace rocketmq diff --git a/src/transport/EventLoop.h b/src/transport/EventLoop.h new file mode 100644 index 000000000..c974479f0 --- /dev/null +++ b/src/transport/EventLoop.h @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __EVENTLOOP_H__ +#define __EVENTLOOP_H__ + +#include +#include + +#include +#include +#include + +#include "noncopyable.h" + +using socket_t = evutil_socket_t; + +namespace rocketmq { + +class BufferEvent; + +class EventLoop : public noncopyable { + public: + static EventLoop* GetDefaultEventLoop(); + + public: + explicit EventLoop(const struct event_config* config = nullptr, bool run_immediately = true); + virtual ~EventLoop(); + + void start(); + void stop(); + + BufferEvent* createBufferEvent(socket_t fd, int options); + + private: + void runLoop(); + + private: + struct event_base* m_eventBase; + std::thread* m_loopThread; + + bool _is_running; // aotmic is unnecessary +}; + +class TcpTransport; + +using BufferEventDataCallback = void (*)(BufferEvent* event, TcpTransport* transport); +using BufferEventEventCallback = void (*)(BufferEvent* event, short what, TcpTransport* transport); + +class BufferEvent : public noncopyable { + public: + virtual ~BufferEvent(); + + void setCallback(BufferEventDataCallback readCallback, + BufferEventDataCallback writeCallback, + BufferEventEventCallback eventCallback, + std::shared_ptr transport); + + void setWatermark(short events, size_t lowmark, size_t highmark) { + bufferevent_setwatermark(m_bufferEvent, events, lowmark, highmark); + } + + int enable(short event) { return bufferevent_enable(m_bufferEvent, event); } + + int connect(const struct sockaddr* addr, int socklen) { + return bufferevent_socket_connect(m_bufferEvent, (struct sockaddr*)addr, socklen); + } + + int write(const void* data, size_t size) { return bufferevent_write(m_bufferEvent, data, size); } + + size_t read(void* data, size_t size) { return bufferevent_read(m_bufferEvent, data, size); } + + struct evbuffer* getInput() { + return bufferevent_get_input(m_bufferEvent); + } + + socket_t getfd() const { return bufferevent_getfd(m_bufferEvent); } + + std::string getPeerAddrPort() const { return m_peerAddrPort; } + + private: + BufferEvent(struct bufferevent* event, bool unlockCallbacks); + friend EventLoop; + + static void read_callback(struct bufferevent* bev, void* ctx); + static void write_callback(struct bufferevent* bev, void* ctx); + static void event_callback(struct bufferevent* bev, short what, void* ctx); + + private: + struct bufferevent* m_bufferEvent; + const bool m_unlockCallbacks; + + BufferEventDataCallback m_readCallback; + BufferEventDataCallback m_writeCallback; + BufferEventEventCallback m_eventCallback; + std::weak_ptr m_callbackTransport; // avoid reference cycle + + // cache properties + std::string m_peerAddrPort; +}; + +} // namespace rocketmq + +#endif //__EVENTLOOP_H__ diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp old mode 100644 new mode 100755 index cc10daa40..b0b261391 --- a/src/transport/ResponseFuture.cpp +++ b/src/transport/ResponseFuture.cpp @@ -15,46 +15,42 @@ * limitations under the License. */ #include "ResponseFuture.h" + +#include + #include "Logging.h" #include "TcpRemotingClient.h" namespace rocketmq { + // lk(m_defaultEventLock); - if (!m_defaultEvent.timed_wait(lk, boost::posix_time::milliseconds(timeoutMillis))) { - LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode, m_opaque); - m_syncResponse.store(true); + std::unique_lock eventLock(m_defaultEventLock); + if (!m_haveResponse) { + if (timeoutMillis <= 0) { + timeoutMillis = m_timeout; + } + if (m_defaultEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) { + LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode, m_opaque); + m_haveResponse = true; + } } return m_pResponseCommand; } -void ResponseFuture::setResponse(RemotingCommand* pResponseCommand) { - // LOG_DEBUG("setResponse of opaque:%d",m_opaque); - m_pResponseCommand = pResponseCommand; +bool ResponseFuture::setResponse(RemotingCommand* pResponseCommand) { + std::unique_lock eventLock(m_defaultEventLock); - if (!getASyncFlag()) { - if (m_syncResponse.load() == false) { - m_defaultEvent.notify_all(); - m_syncResponse.store(true); - } + if (m_haveResponse) { + return false; } -} -const bool ResponseFuture::getSyncResponseFlag() { - if (m_syncResponse.load() == true) { - return true; - } - return false; -} + m_pResponseCommand = pResponseCommand; + m_haveResponse = true; -const bool ResponseFuture::getAsyncResponseFlag() { - if (m_asyncResponse.load() == true) { - // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() ); - return true; + if (!getAsyncFlag()) { + m_defaultEvent.notify_all(); } - return false; -} - -void ResponseFuture::setAsyncResponseFlag() { - m_asyncResponse.store(true); + return true; } -const bool ResponseFuture::getASyncFlag() { - if (m_bAsync.load() == true) { - // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() ); - return true; - } - return false; +const bool ResponseFuture::getAsyncFlag() { + return m_bAsync; } -bool ResponseFuture::isSendRequestOK() { +bool ResponseFuture::isSendRequestOK() const { return m_sendRequestOK; } @@ -126,58 +108,32 @@ int ResponseFuture::getRequestCode() const { return m_requestCode; } -void ResponseFuture::setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus) { - boost::lock_guard lock(m_asyncCallbackLock); - if (m_asyncCallbackStatus == asyncCallBackStatus_init) { - m_asyncCallbackStatus = asyncCallbackStatus; - } -} - -void ResponseFuture::executeInvokeCallback() { - if (m_pCallbackWrap == NULL) { +void ResponseFuture::invokeCompleteCallback() { + if (m_pCallbackWrap == nullptr) { deleteAndZero(m_pResponseCommand); return; } else { - if (m_asyncCallbackStatus == asyncCallBackStatus_response) { - m_pCallbackWrap->operationComplete(this, true); - } else { - if (m_pResponseCommand) - deleteAndZero(m_pResponseCommand); // the responseCommand from - // RemotingCommand::Decode(mem) will - // only deleted by operationComplete - // automatically - LOG_WARN( - "timeout and response incoming concurrently of opaque:%d, and " - "executeInvokeCallbackException was called earlier", - m_opaque); - } + m_pCallbackWrap->operationComplete(this, true); } } -void ResponseFuture::executeInvokeCallbackException() { - if (m_pCallbackWrap == NULL) { +void ResponseFuture::invokeExceptionCallback() { + if (m_pCallbackWrap == nullptr) { LOG_ERROR("m_pCallbackWrap is NULL, critical error"); return; } else { - if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) { - // here no need retrySendTimes process because of it have timeout - LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), - getRetrySendTimes(), getMaxRetrySendTimes()); - - m_pCallbackWrap->onException(); - } else { - LOG_WARN( - "timeout and response incoming concurrently of opaque:%d, and " - "executeInvokeCallback was called earlier", - m_opaque); - } + // here no need retrySendTimes process because of it have timeout + LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), getRetrySendTimes(), + getMaxRetrySendTimes()); + + m_pCallbackWrap->onException(); } } bool ResponseFuture::isTimeOut() const { int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp; // m_timeout; + return m_bAsync && diff > m_timeout; } int ResponseFuture::getMaxRetrySendTimes() const { @@ -197,16 +153,17 @@ void ResponseFuture::setRetrySendTimes(int retryTimes) { void ResponseFuture::setBrokerAddr(const std::string& brokerAddr) { m_brokerAddr = brokerAddr; } + +std::string ResponseFuture::getBrokerAddr() const { + return m_brokerAddr; +} + void ResponseFuture::setRequestCommand(const RemotingCommand& requestCommand) { m_requestCommand = requestCommand; } - const RemotingCommand& ResponseFuture::getRequestCommand() { return m_requestCommand; } -std::string ResponseFuture::getBrokerAddr() const { - return m_brokerAddr; -} int64 ResponseFuture::leftTime() const { int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp; @@ -222,4 +179,4 @@ AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() { } // -#include + +#include +#include + #include "AsyncCallbackWrap.h" #include "RemotingCommand.h" #include "UtilAll.h" namespace rocketmq { -typedef enum asyncCallBackStatus { - asyncCallBackStatus_init = 0, - asyncCallBackStatus_response = 1, - asyncCallBackStatus_timeout = 2 -} asyncCallBackStatus; +typedef enum AsyncCallbackStatus { + ASYNC_CALLBACK_STATUS_INIT = 0, + ASYNC_CALLBACK_STATUS_RESPONSE = 1, + ASYNC_CALLBACK_STATUS_TIMEOUT = 2 +} AsyncCallbAackStatus; class TcpRemotingClient; // m_bAsync; - RemotingCommand* m_pResponseCommand; // m_asyncResponse; - boost::atomic m_syncResponse; + + AsyncCallbackStatus m_asyncCallbackStatus; + std::mutex m_asyncCallbackLock; + + bool m_haveResponse; + std::mutex m_defaultEventLock; + std::condition_variable m_defaultEvent; + + int64 m_beginTimestamp; + bool m_sendRequestOK; + RemotingCommand* m_pResponseCommand; // #endif + #include "Logging.h" #include "MemoryOutputStream.h" #include "TopAddressing.h" @@ -35,27 +36,31 @@ TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeo m_ioServiceWork(m_ioService) { #if !defined(WIN32) && !defined(__APPLE__) string taskName = UtilAll::getProcessName(); - prctl(PR_SET_NAME, "networkTP", 0, 0, 0); + prctl(PR_SET_NAME, "NetworkTP", 0, 0, 0); #endif - for (int i = 0; i != pullThreadNum; ++i) { + for (int i = 0; i != m_pullThreadNum; ++i) { m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService)); } #if !defined(WIN32) && !defined(__APPLE__) prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); #endif - LOG_INFO( - "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, " - "m_pullThreadNum:%d", - m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum); + + LOG_INFO("m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, m_pullThreadNum:%d", m_tcpConnectTimeout, + m_tcpTransportTryLockTimeout, m_pullThreadNum); + m_async_service_thread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this))); } void TcpRemotingClient::boost_asio_work() { - LOG_INFO("TcpRemotingClient::boost asio async service runing"); - boost::asio::io_service::work work(m_async_ioService); // avoid async io - // service stops after - // first timer timeout - // callback + LOG_INFO("TcpRemotingClient::boost asio async service running"); + +#if !defined(WIN32) && !defined(__APPLE__) + prctl(PR_SET_NAME, "RemotingAsioT", 0, 0, 0); +#endif + + // avoid async io service stops after first timer timeout callback + boost::asio::io_service::work work(m_async_ioService); + m_async_ioService.run(); } @@ -69,15 +74,15 @@ TcpRemotingClient::~TcpRemotingClient() { void TcpRemotingClient::stopAllTcpTransportThread() { LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin"); + m_async_ioService.stop(); m_async_service_thread->interrupt(); m_async_service_thread->join(); removeAllTimerCallback(); { - TcpMap::iterator it = m_tcpTable.begin(); - for (; it != m_tcpTable.end(); ++it) { - it->second->disconnect(it->first); + for (const auto& trans : m_tcpTable) { + trans.second->disconnect(trans.first); } m_tcpTable.clear(); } @@ -86,62 +91,66 @@ void TcpRemotingClient::stopAllTcpTransportThread() { m_threadpool.join_all(); { - boost::lock_guard lock(m_futureTableMutex); - for (ResMap::iterator it = m_futureTable.begin(); it != m_futureTable.end(); ++it) { - if (it->second) - it->second->releaseThreadCondition(); + std::lock_guard lock(m_futureTableLock); + for (const auto& future : m_futureTable) { + if (future.second) + future.second->releaseThreadCondition(); } } + LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End"); } void TcpRemotingClient::updateNameServerAddressList(const string& addrs) { LOG_INFO("updateNameServerAddressList: [%s]", addrs.c_str()); - if (!addrs.empty()) { - boost::unique_lock lock(m_namesrvlock, boost::try_to_lock); - if (!lock.owns_lock()) { - if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(10))) { - LOG_ERROR("updateNameServerAddressList get timed_mutex timeout"); - return; - } + + if (addrs.empty()) { + return; + } + + std::unique_lock lock(m_namesrvLock, std::try_to_lock); + if (!lock.owns_lock()) { + if (!lock.try_lock_for(std::chrono::seconds(10))) { + LOG_ERROR("updateNameServerAddressList get timed_mutex timeout"); + return; } - // clear first; - m_namesrvAddrList.clear(); - - vector out; - UtilAll::Split(out, addrs, ";"); - for (size_t i = 0; i < out.size(); i++) { - string addr = out[i]; - UtilAll::Trim(addr); - - string hostName; - short portNumber; - if (UtilAll::SplitURL(addr, hostName, portNumber)) { - LOG_INFO("update Namesrv:%s", addr.c_str()); - m_namesrvAddrList.push_back(addr); - } else { - LOG_INFO("This may be invalid namer server: [%s]", addr.c_str()); - } + } + + // clear first; + m_namesrvAddrList.clear(); + + vector out; + UtilAll::Split(out, addrs, ";"); + for (auto addr : out) { + UtilAll::Trim(addr); + + string hostName; + short portNumber; + if (UtilAll::SplitURL(addr, hostName, portNumber)) { + LOG_INFO("update Namesrv:%s", addr.c_str()); + m_namesrvAddrList.push_back(addr); + } else { + LOG_INFO("This may be invalid namer server: [%s]", addr.c_str()); } - out.clear(); } + out.clear(); } -bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& request) { - boost::shared_ptr pTcp = GetTransport(addr, true); - if (pTcp != NULL) { +bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis) { + std::shared_ptr pTcp = GetTransport(addr, true); + if (pTcp != nullptr) { int code = request.getCode(); int opaque = request.getOpaque(); - boost::shared_ptr responseFuture(new ResponseFuture(code, opaque, this, 3000, false, NULL)); + + std::shared_ptr responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis)); addResponseFuture(opaque, responseFuture); - // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d, - // timeoutms:%d", addr.c_str(), code, opaque, 3000); if (SendCommand(pTcp, request)) { responseFuture->setSendRequestOK(true); - unique_ptr pRsp(responseFuture->waitResponse(3000)); - if (pRsp == NULL) { + unique_ptr pRsp(responseFuture->waitResponse()); + if (pRsp == nullptr) { LOG_ERROR("wait response timeout of heartbeat, so closeTransport of addr:%s", addr.c_str()); + // avoid responseFuture leak; findAndDeleteResponseFuture(opaque); CloseTransport(addr, pTcp); return false; @@ -152,6 +161,7 @@ bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& req return false; } } else { + // avoid responseFuture leak; findAndDeleteResponseFuture(opaque); CloseTransport(addr, pTcp); } @@ -159,34 +169,28 @@ bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& req return false; } -RemotingCommand* TcpRemotingClient::invokeSync(const string& addr, - RemotingCommand& request, - int timeoutMillis /* = 3000 */) { +RemotingCommand* TcpRemotingClient::invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis) { LOG_DEBUG("InvokeSync:", addr.c_str()); - boost::shared_ptr pTcp = GetTransport(addr, true); - if (pTcp != NULL) { + std::shared_ptr pTcp = GetTransport(addr, true); + if (pTcp != nullptr) { int code = request.getCode(); int opaque = request.getOpaque(); - boost::shared_ptr responseFuture( - new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL)); + + std::shared_ptr responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis)); addResponseFuture(opaque, responseFuture); if (SendCommand(pTcp, request)) { - // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d, - // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis); responseFuture->setSendRequestOK(true); - RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis); - if (pRsp == NULL) { + RemotingCommand* pRsp = responseFuture->waitResponse(); + if (pRsp == nullptr) { if (code != GET_CONSUMER_LIST_BY_GROUP) { - LOG_WARN( - "wait response timeout or get NULL response of code:%d, so " - "closeTransport of addr:%s", - code, addr.c_str()); + LOG_WARN("wait response timeout or get NULL response of code:%d, so closeTransport of addr:%s", code, + addr.c_str()); CloseTransport(addr, pTcp); } // avoid responseFuture leak; findAndDeleteResponseFuture(opaque); - return NULL; + return nullptr; } else { return pRsp; } @@ -197,137 +201,130 @@ RemotingCommand* TcpRemotingClient::invokeSync(const string& addr, } } LOG_DEBUG("InvokeSync [%s] Failed: Cannot Get Transport.", addr.c_str()); - return NULL; + return nullptr; } bool TcpRemotingClient::invokeAsync(const string& addr, RemotingCommand& request, - AsyncCallbackWrap* cbw, - int64 timeoutMilliseconds, + AsyncCallbackWrap* callback, + int64 timeoutMillis, int maxRetrySendTimes, int retrySendTimes) { - boost::shared_ptr pTcp = GetTransport(addr, true); - if (pTcp != NULL) { - // pTcp = GetTransport(addr, true); + if (pTcp != nullptr) { int code = request.getCode(); int opaque = request.getOpaque(); - boost::shared_ptr responseFuture( - new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw)); + + // delete in callback + std::shared_ptr responseFuture( + new ResponseFuture(code, opaque, this, timeoutMillis, true, callback)); responseFuture->setMaxRetrySendTimes(maxRetrySendTimes); responseFuture->setRetrySendTimes(retrySendTimes); responseFuture->setBrokerAddr(addr); responseFuture->setRequestCommand(request); addAsyncResponseFuture(opaque, responseFuture); - if (cbw) { + + if (callback) { boost::asio::deadline_timer* t = - new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMilliseconds)); + new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMillis)); addTimerCallback(t, opaque); - boost::system::error_code e; - t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout, this, e, opaque)); + t->async_wait( + boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque)); } - if (SendCommand(pTcp, request)) // Even if send failed, asyncTimerThread - // will trigger next pull request or report - // send msg failed - { + // Even if send failed, asyncTimerThread will trigger next pull request or report send msg failed + if (SendCommand(pTcp, request)) { LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque); responseFuture->setSendRequestOK(true); } return true; } + LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str()); return false; } void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand& request) { // pTcp = GetTransport(addr, true); - if (pTcp != NULL) { + std::shared_ptr pTcp = GetTransport(addr, true); + if (pTcp != nullptr) { request.markOnewayRPC(); - LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(), request.getCode()); - SendCommand(pTcp, request); + if (SendCommand(pTcp, request)) { + LOG_DEBUG("invokeOneway success. addr:%s, code:%d", addr.c_str(), request.getCode()); + } else { + LOG_WARN("invokeOneway failed. addr:%s, code:%d", addr.c_str(), request.getCode()); + } + } else { + LOG_WARN("invokeOneway failed: NULL transport. addr:%s, code:%d", addr.c_str(), request.getCode()); } } -boost::shared_ptr TcpRemotingClient::GetTransport(const string& addr, bool needRespons) { +std::shared_ptr TcpRemotingClient::GetTransport(const string& addr, bool needResponse) { if (addr.empty()) { LOG_DEBUG("GetTransport of NameServer"); - return CreateNameserverTransport(needRespons); + return CreateNameServerTransport(needResponse); } - return CreateTransport(addr, needRespons); + return CreateTransport(addr, needResponse); } -boost::shared_ptr TcpRemotingClient::CreateTransport(const string& addr, bool needRespons) { - boost::shared_ptr tts; +std::shared_ptr TcpRemotingClient::CreateTransport(const string& addr, bool needResponse) { + std::shared_ptr tts; + { // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking - // long - // time, if could not get m_tcpLock, return NULL - bool bGetMutex = false; - boost::unique_lock lock(m_tcpLock, boost::try_to_lock); + // long time, if could not get m_tcpLock, return NULL + std::unique_lock lock(m_tcpTableLock, std::try_to_lock); if (!lock.owns_lock()) { - if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { + if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) { LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str()); - boost::shared_ptr pTcp; + std::shared_ptr pTcp; return pTcp; - } else { - bGetMutex = true; } - } else { - bGetMutex = true; } - if (bGetMutex) { - if (m_tcpTable.find(addr) != m_tcpTable.end()) { - boost::weak_ptr weakPtcp(m_tcpTable[addr]); - boost::shared_ptr tcp = weakPtcp.lock(); - if (tcp) { - tcpConnectStatus connectStatus = tcp->getTcpConnectStatus(); - if (connectStatus == e_connectWaitResponse) { - boost::shared_ptr pTcp; - return pTcp; - } else if (connectStatus == e_connectFail) { - LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str()); - tcp->disconnect(addr); // avoid coredump when connection with broker was broken - m_tcpTable.erase(addr); - } else if (connectStatus == e_connectSuccess) { - return tcp; - } else { - LOG_ERROR( - "go to fault state, erase:%s from tcpMap, and reconnect " - "it", - addr.c_str()); - m_tcpTable.erase(addr); - } + + // check for reuse + if (m_tcpTable.find(addr) != m_tcpTable.end()) { + std::shared_ptr tcp = m_tcpTable[addr]; + + if (tcp) { + TcpConnectStatus connectStatus = tcp->getTcpConnectStatus(); + if (connectStatus == TCP_CONNECT_STATUS_SUCCESS) { + return tcp; + } else if (connectStatus == TCP_CONNECT_STATUS_WAIT) { + std::shared_ptr pTcp; + return pTcp; + } else if (connectStatus == TCP_CONNECT_STATUS_FAILED) { + LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str()); + tcp->disconnect(addr); // avoid coredump when connection with broker was broken + m_tcpTable.erase(addr); + } else { + LOG_ERROR("go to fault state, erase:%s from tcpMap, and reconnect it", addr.c_str()); + m_tcpTable.erase(addr); } } + } - //connect(addr, m_tcpConnectTimeout); - if (connectStatus != e_connectWaitResponse) { - LOG_WARN("can not connect to :%s", addr.c_str()); - tts->disconnect(addr); - boost::shared_ptr pTcp; - return pTcp; - } else { - m_tcpTable[addr] = tts; // even if connecting failed finally, this - // server transport will be erased by next - // CreateTransport - } - } else { - LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str()); - boost::shared_ptr pTcp; + tts = TcpTransport::CreateTransport(this, callback); + TcpConnectStatus connectStatus = tts->connect(addr, 0); // use non-block + if (connectStatus != TCP_CONNECT_STATUS_WAIT) { + LOG_WARN("can not connect to:%s", addr.c_str()); + tts->disconnect(addr); + std::shared_ptr pTcp; return pTcp; + } else { + // even if connecting failed finally, this server transport will be erased by next CreateTransport + m_tcpTable[addr] = tts; } } - tcpConnectStatus connectStatus = tts->waitTcpConnectEvent(m_tcpConnectTimeout); - if (connectStatus != e_connectSuccess) { + TcpConnectStatus connectStatus = tts->waitTcpConnectEvent(static_cast(m_tcpConnectTimeout)); + if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) { LOG_WARN("can not connect to server:%s", addr.c_str()); tts->disconnect(addr); - boost::shared_ptr pTcp; + std::shared_ptr pTcp; return pTcp; } else { LOG_INFO("connect server with addr:%s success", addr.c_str()); @@ -335,165 +332,122 @@ boost::shared_ptr TcpRemotingClient::CreateTransport(const string& } } -boost::shared_ptr TcpRemotingClient::CreateNameserverTransport(bool needRespons) { +std::shared_ptr TcpRemotingClient::CreateNameServerTransport(bool needResponse) { // m_namesrvLock was added to avoid operation of nameServer was blocked by // m_tcpLock, it was used by single Thread mostly, so no performance impact - // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long + // try get m_tcpLock until m_tcpTransportTryLockTimeout to avoid blocking long // time, if could not get m_namesrvlock, return NULL LOG_DEBUG("--CreateNameserverTransport--"); - bool bGetMutex = false; - boost::unique_lock lock(m_namesrvlock, boost::try_to_lock); + std::unique_lock lock(m_namesrvLock, std::try_to_lock); if (!lock.owns_lock()) { - if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { + if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) { LOG_ERROR("CreateNameserverTransport get timed_mutex timeout"); - boost::shared_ptr pTcp; + std::shared_ptr pTcp; return pTcp; - } else { - bGetMutex = true; } - } else { - bGetMutex = true; } - if (bGetMutex) { - if (!m_namesrvAddrChoosed.empty()) { - boost::shared_ptr pTcp = GetTransport(m_namesrvAddrChoosed, true); - if (pTcp) - return pTcp; - else - m_namesrvAddrChoosed.clear(); - } + if (!m_namesrvAddrChoosed.empty()) { + std::shared_ptr pTcp = CreateTransport(m_namesrvAddrChoosed, true); + if (pTcp) + return pTcp; + else + m_namesrvAddrChoosed.clear(); + } - vector::iterator itp = m_namesrvAddrList.begin(); - for (; itp != m_namesrvAddrList.end(); ++itp) { - unsigned int index = m_namesrvIndex % m_namesrvAddrList.size(); - if (m_namesrvIndex == numeric_limits::max()) - m_namesrvIndex = 0; - m_namesrvIndex++; - LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "", m_namesrvIndex, index, - m_namesrvAddrList.size()); - boost::shared_ptr pTcp = GetTransport(m_namesrvAddrList[index], true); - if (pTcp) { - m_namesrvAddrChoosed = m_namesrvAddrList[index]; - return pTcp; - } + for (int i = 0; i < m_namesrvAddrList.size(); i++) { + unsigned int index = m_namesrvIndex++ % m_namesrvAddrList.size(); + LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "", m_namesrvIndex, index, + m_namesrvAddrList.size()); + std::shared_ptr pTcp = CreateTransport(m_namesrvAddrList[index], true); + if (pTcp) { + m_namesrvAddrChoosed = m_namesrvAddrList[index]; + return pTcp; } - boost::shared_ptr pTcp; - return pTcp; - } else { - LOG_WARN("get nameServer tcpTransport mutex failed"); - boost::shared_ptr pTcp; - return pTcp; } + + std::shared_ptr pTcp; + return pTcp; } -void TcpRemotingClient::CloseTransport(const string& addr, boost::shared_ptr pTcp) { +bool TcpRemotingClient::CloseTransport(const string& addr, std::shared_ptr pTcp) { if (addr.empty()) { return CloseNameServerTransport(pTcp); } - bool bGetMutex = false; - boost::unique_lock lock(m_tcpLock, boost::try_to_lock); + std::unique_lock lock(m_tcpTableLock, std::try_to_lock); if (!lock.owns_lock()) { - if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { + if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) { LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str()); - return; - } else { - bGetMutex = true; + return false; } - } else { - bGetMutex = true; } + LOG_ERROR("CloseTransport of:%s", addr.c_str()); - if (bGetMutex) { - bool removeItemFromTable = true; - if (m_tcpTable.find(addr) != m_tcpTable.end()) { - if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) { - LOG_INFO( - "tcpTransport with addr:%s has been closed before, and has been " - "created again, nothing to do", - addr.c_str()); - removeItemFromTable = false; - } - } else { - LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str()); - removeItemFromTable = false; - } - if (removeItemFromTable == true) { - LOG_WARN("closeTransport: disconnect broker:%s with state:%d", addr.c_str(), - m_tcpTable[addr]->getTcpConnectStatus()); - if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess) - m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken - LOG_WARN("closeTransport: erase broker: %s", addr.c_str()); - m_tcpTable.erase(addr); + bool removeItemFromTable = true; + if (m_tcpTable.find(addr) != m_tcpTable.end()) { + if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) { + LOG_INFO("tcpTransport with addr:%s has been closed before, and has been created again, nothing to do", + addr.c_str()); + removeItemFromTable = false; } } else { - LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str()); - return; + LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str()); + removeItemFromTable = false; } + + if (removeItemFromTable) { + LOG_WARN("closeTransport: disconnect:%s with state:%d", addr.c_str(), m_tcpTable[addr]->getTcpConnectStatus()); + if (m_tcpTable[addr]->getTcpConnectStatus() == TCP_CONNECT_STATUS_SUCCESS) + m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken + LOG_WARN("closeTransport: erase broker: %s", addr.c_str()); + m_tcpTable.erase(addr); + } + LOG_ERROR("CloseTransport of:%s end", addr.c_str()); + + return removeItemFromTable; } -void TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr pTcp) { - bool bGetMutex = false; - boost::unique_lock lock(m_namesrvlock, boost::try_to_lock); +bool TcpRemotingClient::CloseNameServerTransport(std::shared_ptr pTcp) { + std::unique_lock lock(m_namesrvLock, std::try_to_lock); if (!lock.owns_lock()) { - if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { - LOG_ERROR("CreateNameserverTransport get timed_mutex timeout"); - return; - } else { - bGetMutex = true; + if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) { + LOG_ERROR("CreateNameServerTransport get timed_mutex timeout"); + return false; } - } else { - bGetMutex = true; } - if (bGetMutex) { - string addr = m_namesrvAddrChoosed; - bool removeItemFromTable = true; - if (m_tcpTable.find(addr) != m_tcpTable.end()) { - if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) { - LOG_INFO( - "tcpTransport with addr:%s has been closed before, and has been " - "created again, nothing to do", - addr.c_str()); - removeItemFromTable = false; - } - } else { - LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str()); - removeItemFromTable = false; - } - if (removeItemFromTable == true) { - m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken - LOG_WARN("closeTransport: erase broker: %s", addr.c_str()); - m_tcpTable.erase(addr); - m_namesrvAddrChoosed.clear(); - } - } else { - LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s", m_namesrvAddrChoosed.c_str()); - return; + string addr = m_namesrvAddrChoosed; + + bool removeItemFromTable = CloseTransport(addr, pTcp); + if (removeItemFromTable) { + m_namesrvAddrChoosed.clear(); } + + return removeItemFromTable; } -bool TcpRemotingClient::SendCommand(boost::shared_ptr pTts, RemotingCommand& msg) { - const MemoryBlock* phead = msg.GetHead(); - const MemoryBlock* pbody = msg.GetBody(); +bool TcpRemotingClient::SendCommand(std::shared_ptr pTts, RemotingCommand& msg) { + const MemoryBlock* pHead = msg.GetHead(); + const MemoryBlock* pBody = msg.GetBody(); - unique_ptr result(new MemoryOutputStream(1024)); - if (phead->getData()) { - result->write(phead->getData(), phead->getSize()); + unique_ptr buffer(new MemoryOutputStream(1024)); + if (pHead->getSize() > 0) { + buffer->write(pHead->getData(), static_cast(pHead->getSize())); } - if (pbody->getData()) { - result->write(pbody->getData(), pbody->getSize()); + if (pBody->getSize() > 0) { + buffer->write(pBody->getData(), static_cast(pBody->getSize())); } - const char* pData = static_cast(result->getData()); - int len = result->getDataSize(); + + const char* pData = static_cast(buffer->getData()); + size_t len = buffer->getDataSize(); return pTts->sendMessage(pData, len); } void TcpRemotingClient::static_messageReceived(void* context, const MemoryBlock& mem, const string& addr) { - TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context; + auto* pTcpRemotingClient = reinterpret_cast(context); if (pTcpRemotingClient) pTcpRemotingClient->messageReceived(mem, addr); } @@ -503,11 +457,11 @@ void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& ad } void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) { - RemotingCommand* pRespondCmd = NULL; + RemotingCommand* pRespondCmd = nullptr; try { pRespondCmd = RemotingCommand::Decode(mem); } catch (...) { - LOG_ERROR("processData_error"); + LOG_ERROR("processData error"); return; } @@ -515,43 +469,58 @@ void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) //isResponseType()) { - boost::shared_ptr pFuture(findAndDeleteAsyncResponseFuture(opaque)); + std::shared_ptr pFuture = findAndDeleteAsyncResponseFuture(opaque); if (!pFuture) { pFuture = findAndDeleteResponseFuture(opaque); - if (pFuture) { - if (pFuture->getSyncResponseFlag()) { - LOG_WARN("waitResponse already timeout of opaque:%d", opaque); - deleteAndZero(pRespondCmd); - return; - } - LOG_DEBUG("find_response opaque:%d", opaque); - } else { + if (!pFuture) { LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", opaque); deleteAndZero(pRespondCmd); return; } } + + LOG_DEBUG("find_response opaque:%d", opaque); processResponseCommand(pRespondCmd, pFuture); } else { processRequestCommand(pRespondCmd, addr); } } -void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, boost::shared_ptr pfuture) { - int code = pfuture->getRequestCode(); +void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, std::shared_ptr pFuture) { + int code = pFuture->getRequestCode(); + pCmd->SetExtHeader(code); // set head, for response use + int opaque = pCmd->getOpaque(); - LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque, - pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes()); - pCmd->SetExtHeader(code); // set head , for response use - - pfuture->setResponse(pCmd); - - if (pfuture->getASyncFlag()) { - if (!pfuture->getAsyncResponseFlag()) { - pfuture->setAsyncResponseFlag(); - pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response); - cancelTimerCallback(opaque); - pfuture->executeInvokeCallback(); + LOG_DEBUG("processResponseCommand, code:%d, opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque, + pFuture->getMaxRetrySendTimes(), pFuture->getRetrySendTimes()); + + if (!pFuture->setResponse(pCmd)) { + // this branch is unreachable normally. + LOG_WARN("response already timeout of opaque:%d", opaque); + deleteAndZero(pCmd); + return; + } + + if (pFuture->getAsyncFlag()) { + cancelTimerCallback(opaque); + pFuture->invokeCompleteCallback(); + } +} + +void TcpRemotingClient::handleAsyncRequestTimeout(const boost::system::error_code& e, int opaque) { + if (e == boost::asio::error::operation_aborted) { + LOG_DEBUG("handleAsyncRequestTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data()); + return; + } + + LOG_DEBUG("handleAsyncRequestTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data()); + + std::shared_ptr pFuture(findAndDeleteAsyncResponseFuture(opaque)); + if (pFuture) { + LOG_ERROR("no response got for opaque:%d", opaque); + eraseTimerCallback(opaque); + if (pFuture->getAsyncCallbackWrap()) { + pFuture->invokeExceptionCallback(); } } } @@ -575,16 +544,16 @@ void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const strin } } -void TcpRemotingClient::addResponseFuture(int opaque, boost::shared_ptr pfuture) { - boost::lock_guard lock(m_futureTableMutex); - m_futureTable[opaque] = pfuture; +void TcpRemotingClient::addResponseFuture(int opaque, std::shared_ptr pFuture) { + std::lock_guard lock(m_futureTableLock); + m_futureTable[opaque] = pFuture; } // Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will // be erased, so caller must ensure the life cycle of returned shared_ptr; -boost::shared_ptr TcpRemotingClient::findAndDeleteResponseFuture(int opaque) { - boost::lock_guard lock(m_futureTableMutex); - boost::shared_ptr pResponseFuture; +std::shared_ptr TcpRemotingClient::findAndDeleteResponseFuture(int opaque) { + std::lock_guard lock(m_futureTableLock); + std::shared_ptr pResponseFuture; if (m_futureTable.find(opaque) != m_futureTable.end()) { pResponseFuture = m_futureTable[opaque]; m_futureTable.erase(opaque); @@ -592,42 +561,20 @@ boost::shared_ptr TcpRemotingClient::findAndDeleteResponseFuture return pResponseFuture; } -void TcpRemotingClient::handleAsyncPullForResponseTimeout(const boost::system::error_code& e, int opaque) { - if (e == boost::asio::error::operation_aborted) { - LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(), - e.message().data()); - return; - } - - LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data()); - boost::shared_ptr pFuture(findAndDeleteAsyncResponseFuture(opaque)); - if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) { - if ((pFuture->getAsyncResponseFlag() != true)) // if no response received, then check timeout or not - { - LOG_ERROR("no response got for opaque:%d", opaque); - pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout); - pFuture->executeInvokeCallbackException(); - } - } - - eraseTimerCallback(opaque); -} - -void TcpRemotingClient::addAsyncResponseFuture(int opaque, boost::shared_ptr pfuture) { - boost::lock_guard lock(m_asyncFutureLock); - m_asyncFutureTable[opaque] = pfuture; +void TcpRemotingClient::addAsyncResponseFuture(int opaque, std::shared_ptr pFuture) { + std::lock_guard lock(m_asyncFutureTableLock); + m_asyncFutureTable[opaque] = pFuture; } // Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] will // be erased, so caller must ensure the life cycle of returned shared_ptr; -boost::shared_ptr TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) { - boost::lock_guard lock(m_asyncFutureLock); - boost::shared_ptr pResponseFuture; +std::shared_ptr TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) { + std::lock_guard lock(m_asyncFutureTableLock); + std::shared_ptr pResponseFuture; if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) { pResponseFuture = m_asyncFutureTable[opaque]; m_asyncFutureTable.erase(opaque); } - return pResponseFuture; } @@ -638,56 +585,64 @@ void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, ClientRemot } void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int opaque) { - boost::lock_guard lock(m_timerMapMutex); - if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { + std::lock_guard lock(m_asyncTimerTableLock); + if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) { LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque); - boost::asio::deadline_timer* old_t = m_async_timer_map[opaque]; - old_t->cancel(); + boost::asio::deadline_timer* old_t = m_asyncTimerTable[opaque]; + m_asyncTimerTable.erase(opaque); + try { + old_t->cancel(); + } catch (const std::exception& ec) { + LOG_WARN("encounter exception when cancel old timer: %s", ec.what()); + } delete old_t; - old_t = NULL; - m_async_timer_map.erase(opaque); } - m_async_timer_map[opaque] = t; + m_asyncTimerTable[opaque] = t; } void TcpRemotingClient::eraseTimerCallback(int opaque) { - boost::lock_guard lock(m_timerMapMutex); - if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { + std::lock_guard lock(m_asyncTimerTableLock); + if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) { LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque); - boost::asio::deadline_timer* t = m_async_timer_map[opaque]; + boost::asio::deadline_timer* t = m_asyncTimerTable[opaque]; + m_asyncTimerTable.erase(opaque); delete t; - t = NULL; - m_async_timer_map.erase(opaque); } } void TcpRemotingClient::cancelTimerCallback(int opaque) { - boost::lock_guard lock(m_timerMapMutex); - if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { + std::lock_guard lock(m_asyncTimerTableLock); + if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) { LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque); - boost::asio::deadline_timer* t = m_async_timer_map[opaque]; - t->cancel(); + boost::asio::deadline_timer* t = m_asyncTimerTable[opaque]; + m_asyncTimerTable.erase(opaque); + try { + t->cancel(); + } catch (const std::exception& ec) { + LOG_WARN("encounter exception when cancel timer: %s", ec.what()); + } delete t; - t = NULL; - m_async_timer_map.erase(opaque); } } void TcpRemotingClient::removeAllTimerCallback() { - boost::lock_guard lock(m_timerMapMutex); - for (asyncTimerMap::iterator it = m_async_timer_map.begin(); it != m_async_timer_map.end(); ++it) { - boost::asio::deadline_timer* t = it->second; - t->cancel(); + std::lock_guard lock(m_asyncTimerTableLock); + for (const auto& timer : m_asyncTimerTable) { + boost::asio::deadline_timer* t = timer.second; + try { + t->cancel(); + } catch (const std::exception& ec) { + LOG_WARN("encounter exception when cancel timer: %s", ec.what()); + } delete t; - t = NULL; } - m_async_timer_map.clear(); + m_asyncTimerTable.clear(); } void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) { - // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it - // later - boost::shared_ptr pFuture(findAndDeleteAsyncResponseFuture(opaque)); + // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will + // discard when receive it later + std::shared_ptr pFuture(findAndDeleteAsyncResponseFuture(opaque)); if (!pFuture) { pFuture = findAndDeleteResponseFuture(opaque); if (pFuture) { @@ -695,9 +650,9 @@ void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, } } else { LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); + // delete the timeout timer for opaque for pullrequest + cancelTimerCallback(opaque); } - // delete the timeout timer for opaque for pullrequest - cancelTimerCallback(opaque); } // -#include -#include -#include -#include -#include -#include "ClientRemotingProcessor.h" -#include "RemotingCommand.h" -#include "ResponseFuture.h" -#include "SocketUtil.h" -#include "TcpTransport.h" - -namespace rocketmq { -// GetTransport(const string& addr, bool needRespons); - boost::shared_ptr CreateTransport(const string& addr, bool needRespons); - boost::shared_ptr CreateNameserverTransport(bool needRespons); - void CloseTransport(const string& addr, boost::shared_ptr pTcp); - void CloseNameServerTransport(boost::shared_ptr pTcp); - bool SendCommand(boost::shared_ptr pTts, RemotingCommand& msg); - void processRequestCommand(RemotingCommand* pCmd, const string& addr); - void processResponseCommand(RemotingCommand* pCmd, boost::shared_ptr pfuture); - - void addResponseFuture(int opaque, boost::shared_ptr pfuture); - boost::shared_ptr findAndDeleteResponseFuture(int opaque); - - void addAsyncResponseFuture(int opaque, boost::shared_ptr pfuture); - boost::shared_ptr findAndDeleteAsyncResponseFuture(int opaque); - - void addTimerCallback(boost::asio::deadline_timer* t, int opaque); - void eraseTimerCallback(int opaque); - void cancelTimerCallback(int opaque); - void removeAllTimerCallback(); - - private: - typedef map> TcpMap; - typedef map> ResMap; - - typedef map RequestMap; - RequestMap m_requestTable; - - boost::mutex m_futureTableMutex; - ResMap m_futureTable; //future; - - ResMap m_asyncFutureTable; - boost::mutex m_asyncFutureLock; - - TcpMap m_tcpTable; //tcp; - boost::timed_mutex m_tcpLock; - - // ThreadPool m_threadpool; - int m_pullThreadNum; - uint64_t m_tcpConnectTimeout; // ms - uint64_t m_tcpTransportTryLockTimeout; // s - - // m_namesrvAddrList; - string m_namesrvAddrChoosed; - unsigned int m_namesrvIndex; - boost::asio::io_service m_ioService; - boost::thread_group m_threadpool; - boost::asio::io_service::work m_ioServiceWork; - - boost::asio::io_service m_async_ioService; - unique_ptr m_async_service_thread; - - typedef map asyncTimerMap; - boost::mutex m_timerMapMutex; - asyncTimerMap m_async_timer_map; -}; - -// +#include + +#include +#include +#include +#include + +#include "ClientRemotingProcessor.h" +#include "RemotingCommand.h" +#include "ResponseFuture.h" +#include "SocketUtil.h" +#include "TcpTransport.h" + +namespace rocketmq { +// pFuture); + void handleAsyncRequestTimeout(const boost::system::error_code& e, int opaque); + + std::shared_ptr GetTransport(const string& addr, bool needResponse); + std::shared_ptr CreateTransport(const string& addr, bool needResponse); + std::shared_ptr CreateNameServerTransport(bool needResponse); + + bool CloseTransport(const string& addr, std::shared_ptr pTcp); + bool CloseNameServerTransport(std::shared_ptr pTcp); + + bool SendCommand(std::shared_ptr pTts, RemotingCommand& msg); + + void addResponseFuture(int opaque, std::shared_ptr pFuture); + std::shared_ptr findAndDeleteResponseFuture(int opaque); + + void addAsyncResponseFuture(int opaque, std::shared_ptr pFuture); + std::shared_ptr findAndDeleteAsyncResponseFuture(int opaque); + + void addTimerCallback(boost::asio::deadline_timer* t, int opaque); + void eraseTimerCallback(int opaque); + void cancelTimerCallback(int opaque); + void removeAllTimerCallback(); + + void boost_asio_work(); + + private: + using RequestMap = map; + using TcpMap = map>; + using ResMap = map>; + using AsyncTimerMap = map; + + RequestMap m_requestTable; + + TcpMap m_tcpTable; //tcp; + std::timed_mutex m_tcpTableLock; + + ResMap m_futureTable; //future; + std::mutex m_futureTableLock; + + ResMap m_asyncFutureTable; + std::mutex m_asyncFutureTableLock; + + AsyncTimerMap m_asyncTimerTable; + std::mutex m_asyncTimerTableLock; + + int m_pullThreadNum; + uint64_t m_tcpConnectTimeout; // ms + uint64_t m_tcpTransportTryLockTimeout; // s + + // m_namesrvAddrList; + string m_namesrvAddrChoosed; + unsigned int m_namesrvIndex; + + boost::asio::io_service m_ioService; + boost::asio::io_service::work m_ioServiceWork; + boost::thread_group m_threadpool; + + boost::asio::io_service m_async_ioService; + unique_ptr m_async_service_thread; +}; + +// + #ifndef WIN32 #include // for sockaddr_in and inet_ntoa... #include #include // for socket(), bind(), and connect()... #endif + #include "Logging.h" #include "TcpRemotingClient.h" #include "UtilAll.h" @@ -27,99 +31,56 @@ namespace rocketmq { // lock(m_socketLock); - - struct sockaddr_in sin; - memset(&sin, 0, sizeof(sin)); - sin.sin_family = AF_INET; - sin.sin_addr.s_addr = getInetAddr(hostName); - - sin.sin_port = htons(portNumber); - - m_eventBase = event_base_new(); - m_bufferEvent = bufferevent_socket_new(m_eventBase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); - bufferevent_setcb(m_bufferEvent, readNextMessageIntCallback, NULL, eventcb, this); - bufferevent_enable(m_bufferEvent, EV_READ | EV_WRITE); - bufferevent_setwatermark(m_bufferEvent, EV_READ, 4, 0); - - setTcpConnectStatus(e_connectWaitResponse); - if (bufferevent_socket_connect(m_bufferEvent, (struct sockaddr*)&sin, sizeof(sin)) < 0) { - LOG_INFO("connect to fd:%d failed", bufferevent_getfd(m_bufferEvent)); - setTcpConnectStatus(e_connectFail); - freeBufferEvent(); - return e_connectFail; - } else { - int fd = bufferevent_getfd(m_bufferEvent); - LOG_INFO("try to connect to fd:%d, addr:%s", fd, (hostName.c_str())); - - evthread_make_base_notifiable(m_eventBase); - - m_ReadDatathread = new boost::thread(boost::bind(&TcpTransport::runThread, this)); - - while (!m_event_base_status) { - LOG_INFO("Wait till event base is looping"); - boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(1000); - boost::unique_lock lock(m_event_base_mtx); - m_event_base_cv.timed_wait(lock, timeout); - } +void TcpTransport::freeBufferEvent() { + // freeBufferEvent is idempotent. - return e_connectWaitResponse; + // first, unlink BufferEvent + if (m_event != nullptr) { + m_event->setCallback(nullptr, nullptr, nullptr, nullptr); } + + // then, release BufferEvent + m_event.reset(); } -void TcpTransport::setTcpConnectStatus(tcpConnectStatus connectStatus) { +void TcpTransport::setTcpConnectStatus(TcpConnectStatus connectStatus) { m_tcpConnectStatus = connectStatus; } -tcpConnectStatus TcpTransport::getTcpConnectStatus() { +TcpConnectStatus TcpTransport::getTcpConnectStatus() { return m_tcpConnectStatus; } -tcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillisecs) { - boost::unique_lock lk(m_connectEventLock); - if (!m_connectEvent.timed_wait(lk, boost::posix_time::milliseconds(timeoutMillisecs))) { - LOG_INFO("connect timeout"); +TcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillis) { + std::unique_lock eventLock(m_connectEventLock); + if (m_tcpConnectStatus == TCP_CONNECT_STATUS_WAIT) { + if (m_connectEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) { + LOG_INFO("connect timeout"); + } } - return getTcpConnectStatus(); + return m_tcpConnectStatus; } -void TcpTransport::setTcpConnectEvent(tcpConnectStatus connectStatus) { - tcpConnectStatus baseStatus(getTcpConnectStatus()); - setTcpConnectStatus(connectStatus); - if (baseStatus == e_connectWaitResponse) { - LOG_INFO("received libevent callback event"); +// internal method +void TcpTransport::setTcpConnectEvent(TcpConnectStatus connectStatus) { + TcpConnectStatus baseStatus = m_tcpConnectStatus.exchange(connectStatus, std::memory_order_relaxed); + if (baseStatus == TCP_CONNECT_STATUS_WAIT) { + std::unique_lock eventLock(m_connectEventLock); m_connectEvent.notify_all(); } } @@ -165,129 +126,109 @@ u_long TcpTransport::getInetAddr(string& hostname) { } void TcpTransport::disconnect(const string& addr) { - boost::lock_guard lock(m_socketLock); - if (getTcpConnectStatus() != e_connectInit) { - clearBufferEventCallback(); - LOG_INFO("disconnect:%s start", addr.c_str()); - m_connectEvent.notify_all(); - setTcpConnectStatus(e_connectInit); - if (m_ReadDatathread) { - m_ReadDatathread->interrupt(); - exitBaseDispatch(); - while (m_ReadDatathread->timed_join(boost::posix_time::seconds(1)) == false) { - LOG_WARN("join readDataThread fail, retry"); - m_ReadDatathread->interrupt(); - exitBaseDispatch(); - } - delete m_ReadDatathread; - m_ReadDatathread = NULL; - } + // disconnect is idempotent. + std::lock_guard lock(m_eventLock); + if (getTcpConnectStatus() != TCP_CONNECT_STATUS_INIT) { + LOG_INFO("disconnect:%s start. event:%p", addr.c_str(), m_event.get()); freeBufferEvent(); + setTcpConnectEvent(TCP_CONNECT_STATUS_INIT); LOG_INFO("disconnect:%s completely", addr.c_str()); } } -void TcpTransport::clearBufferEventCallback() { - if (m_bufferEvent) { - // Bufferevents are internally reference-counted, so if the bufferevent has - // pending deferred callbacks when you free it, it won't be deleted until - // the callbacks are done. - // so just empty callback to avoid future callback by libevent - bufferevent_setcb(m_bufferEvent, NULL, NULL, NULL, NULL); +TcpConnectStatus TcpTransport::connect(const string& strServerURL, int timeoutMillis) { + string hostname; + short port; + LOG_DEBUG("connect to [%s].", strServerURL.c_str()); + if (!UtilAll::SplitURL(strServerURL, hostname, port)) { + LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str()); + return TCP_CONNECT_STATUS_FAILED; } -} -void TcpTransport::freeBufferEvent() { - if (m_bufferEvent) { - bufferevent_free(m_bufferEvent); - m_bufferEvent = NULL; - } - if (m_eventBase) { - event_base_free(m_eventBase); - m_eventBase = NULL; - } -} -void TcpTransport::exitBaseDispatch() { - if (m_eventBase) { - event_base_loopbreak(m_eventBase); - // event_base_loopexit(m_eventBase, NULL); //Note: memory leak will be - // occured when timer callback was not done; + { + std::lock_guard lock(m_eventLock); + + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = getInetAddr(hostname); + sin.sin_port = htons(port); + + m_event.reset(EventLoop::GetDefaultEventLoop()->createBufferEvent(-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE)); + m_event->setCallback(readNextMessageIntCallback, nullptr, eventCallback, shared_from_this()); + m_event->setWatermark(EV_READ, 4, 0); + m_event->enable(EV_READ | EV_WRITE); + + setTcpConnectStatus(TCP_CONNECT_STATUS_WAIT); + if (m_event->connect((struct sockaddr*)&sin, sizeof(sin)) < 0) { + LOG_INFO("connect to fd:%d failed", m_event->getfd()); + freeBufferEvent(); + setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED); + return TCP_CONNECT_STATUS_FAILED; + } } -} -void TcpTransport::runThread() { - if (m_eventBase != NULL) { - if (!m_event_base_status) { - boost::mutex::scoped_lock lock(m_event_base_mtx); - m_event_base_status.store(true); - m_event_base_cv.notify_all(); - LOG_INFO("Notify on event_base_dispatch"); - } - event_base_dispatch(m_eventBase); - // event_base_loop(m_eventBase, EVLOOP_ONCE);//EVLOOP_NONBLOCK should not - // be used, as could not callback event immediatly + if (timeoutMillis <= 0) { + LOG_INFO("try to connect to fd:%d, addr:%s", m_event->getfd(), hostname.c_str()); + return TCP_CONNECT_STATUS_WAIT; } - LOG_INFO("event_base_dispatch exit once"); - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - if (getTcpConnectStatus() != e_connectSuccess) - return; -} -void TcpTransport::timeoutcb(evutil_socket_t fd, short what, void* arg) { - LOG_INFO("timeoutcb: received event:%d on fd:%d", what, fd); - TcpTransport* tcpTrans = (TcpTransport*)arg; - if (tcpTrans->getTcpConnectStatus() != e_connectSuccess) { - LOG_INFO("timeoutcb: after connect time, tcp was not established on fd:%d", fd); - tcpTrans->setTcpConnectStatus(e_connectFail); - } else { - LOG_INFO("timeoutcb: after connect time, tcp was established on fd:%d", fd); + TcpConnectStatus connectStatus = waitTcpConnectEvent(timeoutMillis); + if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) { + LOG_WARN("can not connect to server:%s", strServerURL.c_str()); + + std::lock_guard lock(m_eventLock); + freeBufferEvent(); + setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED); + return TCP_CONNECT_STATUS_FAILED; } + + return TCP_CONNECT_STATUS_SUCCESS; } -void TcpTransport::eventcb(struct bufferevent* bev, short what, void* ctx) { - evutil_socket_t fd = bufferevent_getfd(bev); - TcpTransport* tcpTrans = (TcpTransport*)ctx; +void TcpTransport::eventCallback(BufferEvent* event, short what, TcpTransport* transport) { + socket_t fd = event->getfd(); LOG_INFO("eventcb: received event:%x on fd:%d", what, fd); if (what & BEV_EVENT_CONNECTED) { + LOG_INFO("eventcb: connect to fd:%d successfully", fd); + + // disable Nagle int val = 1; - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&val, sizeof(val)); - LOG_INFO("eventcb:connect to fd:%d successfully", fd); - tcpTrans->setTcpConnectEvent(e_connectSuccess); + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void*)&val, sizeof(val)); + transport->setTcpConnectEvent(TCP_CONNECT_STATUS_SUCCESS); } else if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_READING | BEV_EVENT_WRITING)) { - LOG_INFO("eventcb:rcv error event cb:%x on fd:%d", what, fd); - tcpTrans->setTcpConnectEvent(e_connectFail); - bufferevent_setcb(bev, NULL, NULL, NULL, NULL); - // bufferevent_disable(bev, EV_READ|EV_WRITE); - // bufferevent_free(bev); + LOG_INFO("eventcb: received error event cb:%x on fd:%d", what, fd); + // if error, stop callback. + event->setCallback(nullptr, nullptr, nullptr, nullptr); + transport->setTcpConnectEvent(TCP_CONNECT_STATUS_FAILED); } else { LOG_ERROR("eventcb: received error event:%d on fd:%d", what, fd); } } -void TcpTransport::readNextMessageIntCallback(struct bufferevent* bev, void* ctx) { +void TcpTransport::readNextMessageIntCallback(BufferEvent* event, TcpTransport* transport) { /* This callback is invoked when there is data to read on bev. */ // protocol:
- // 1 2 3 4 + // 1 2 3 4 // rocketmq protocol contains 4 parts as following: // 1, big endian 4 bytes int, its length is sum of 2,3 and 4 // 2, big endian 4 bytes int, its length is 3 // 3, use json to serialization data - // 4, application could self-defination binary data + // 4, application could self-defined binary data - struct evbuffer* input = bufferevent_get_input(bev); + struct evbuffer* input = event->getInput(); while (1) { struct evbuffer_iovec v[4]; int n = evbuffer_peek(input, 4, NULL, v, sizeof(v) / sizeof(v[0])); - int idx = 0; char hdr[4]; char* p = hdr; - unsigned int needed = 4; + size_t needed = 4; - for (idx = 0; idx < n; idx++) { - if (needed) { - unsigned int tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len; + for (int idx = 0; idx < n; idx++) { + if (needed > 0) { + size_t tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len; memcpy(p, v[idx].iov_base, tmp); p += tmp; needed -= tmp; @@ -296,80 +237,54 @@ void TcpTransport::readNextMessageIntCallback(struct bufferevent* bev, void* ctx } } - if (needed) { - LOG_DEBUG(" too little data received with sum = %d ", 4 - needed); + if (needed > 0) { + LOG_DEBUG("too little data received with sum = %d", 4 - needed); return; } - uint32 totalLenOfOneMsg = *(uint32*)hdr; // first 4 bytes, which indicates 1st part of protocol - uint32 bytesInMessage = ntohl(totalLenOfOneMsg); - LOG_DEBUG("fd:%d, totalLen:" SIZET_FMT ", bytesInMessage:%d", bufferevent_getfd(bev), v[0].iov_len, bytesInMessage); - uint32 len = evbuffer_get_length(input); - if (len >= bytesInMessage + 4) { - LOG_DEBUG("had received all data with len:%d from fd:%d", len, bufferevent_getfd(bev)); + uint32 totalLenOfOneMsg = *(uint32*)hdr; // first 4 bytes, which indicates 1st part of protocol + uint32 msgLen = ntohl(totalLenOfOneMsg); + size_t recvLen = evbuffer_get_length(input); + if (recvLen >= msgLen + 4) { + LOG_DEBUG("had received all data. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen); } else { - LOG_DEBUG("didn't received whole bytesInMessage:%d, from fd:%d, totalLen:%d", bytesInMessage, - bufferevent_getfd(bev), len); + LOG_DEBUG("didn't received whole. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen); return; // consider large data which was not received completely by now } - if (bytesInMessage > 0) { - MemoryBlock messageData(bytesInMessage, true); - uint32 bytesRead = 0; - char* data = messageData.getData() + bytesRead; - bufferevent_read(bev, data, 4); - bytesRead = bufferevent_read(bev, data, bytesInMessage); + if (msgLen > 0) { + MemoryBlock msg(msgLen, true); - TcpTransport* tcpTrans = (TcpTransport*)ctx; - tcpTrans->messageReceived(messageData); + event->read(hdr, 4); // skip length field + size_t bytesRead = event->read(msg.getData(), msgLen); + + transport->messageReceived(msg, event->getPeerAddrPort()); } } } -bool TcpTransport::sendMessage(const char* pData, int len) { - boost::lock_guard lock(m_socketLock); - if (getTcpConnectStatus() != e_connectSuccess) { - return false; - } - - int bytes_left = len; - int bytes_written = 0; - const char* ptr = pData; - - /*NOTE: - 1. do not need to consider large data which could not send by once, as - bufferevent could handle this case; - */ - if (m_bufferEvent) { - bytes_written = bufferevent_write(m_bufferEvent, ptr, bytes_left); - if (bytes_written == 0) - return true; - else - return false; +void TcpTransport::messageReceived(const MemoryBlock& mem, const std::string& addr) { + if (m_readCallback != nullptr) { + m_readCallback(m_tcpRemotingClient, mem, addr); } - return false; } -void TcpTransport::messageReceived(const MemoryBlock& mem) { - if (m_readcallback) { - m_readcallback(m_tcpRemotingClient, mem, getPeerAddrAndPort()); +bool TcpTransport::sendMessage(const char* pData, size_t len) { + std::lock_guard lock(m_eventLock); + if (getTcpConnectStatus() != TCP_CONNECT_STATUS_SUCCESS) { + return false; } + + /* NOTE: + do not need to consider large data which could not send by once, as + bufferevent could handle this case; + */ + return m_event != nullptr && m_event->write(pData, len) == 0; } const string TcpTransport::getPeerAddrAndPort() { - struct sockaddr_in broker; - socklen_t cLen = sizeof(broker); - - // getsockname(m_socket->getRawSocketHandle(), (struct sockaddr*) &s, &sLen); - // // ! use connectSock here. - getpeername(bufferevent_getfd(m_bufferEvent), (struct sockaddr*)&broker, &cLen); // ! use connectSock here. - LOG_DEBUG("broker addr: %s, broker port: %d", inet_ntoa(broker.sin_addr), ntohs(broker.sin_port)); - string brokerAddr(inet_ntoa(broker.sin_addr)); - brokerAddr.append(":"); - string brokerPort(UtilAll::to_string(ntohs(broker.sin_port))); - brokerAddr.append(brokerPort); - LOG_DEBUG("brokerAddr:%s", brokerAddr.c_str()); - return brokerAddr; + std::lock_guard lock(m_eventLock); + return m_event ? m_event->getPeerAddrPort() : ""; } const uint64_t TcpTransport::getStartTime() const { diff --git a/src/transport/TcpTransport.h b/src/transport/TcpTransport.h old mode 100644 new mode 100755 index cda03ca78..bff23ddfc --- a/src/transport/TcpTransport.h +++ b/src/transport/TcpTransport.h @@ -17,73 +17,77 @@ #ifndef __TCPTRANSPORT_H__ #define __TCPTRANSPORT_H__ -#include -#include -#include -#include -#include "dataBlock.h" +#include +#include +#include -extern "C" { -#include "event2/buffer.h" -#include "event2/bufferevent.h" -#include "event2/event.h" -#include "event2/thread.h" -} +#include "EventLoop.h" +#include "dataBlock.h" namespace rocketmq { + // { public: - TcpTransport(TcpRemotingClient* pTcpRemointClient, READ_CALLBACK handle = NULL); + static std::shared_ptr CreateTransport(TcpRemotingClient* pTcpRemotingClient, + TcpTransportReadCallback handle = nullptr) { + // transport must be managed by smart pointer + std::shared_ptr transport(new TcpTransport(pTcpRemotingClient, handle)); + return transport; + } + virtual ~TcpTransport(); - tcpConnectStatus connect(const std::string& strServerURL, int timeOutMillisecs = 3000); void disconnect(const std::string& addr); - tcpConnectStatus waitTcpConnectEvent(int timeoutMillisecs = 3000); - void setTcpConnectStatus(tcpConnectStatus connectStatus); - tcpConnectStatus getTcpConnectStatus(); - bool sendMessage(const char* pData, int len); + TcpConnectStatus connect(const std::string& strServerURL, int timeoutMillis = 3000); + TcpConnectStatus waitTcpConnectEvent(int timeoutMillis = 3000); + TcpConnectStatus getTcpConnectStatus(); + + bool sendMessage(const char* pData, size_t len); const std::string getPeerAddrAndPort(); const uint64_t getStartTime() const; private: - void messageReceived(const MemoryBlock& mem); - static void readNextMessageIntCallback(struct bufferevent* bev, void* ctx); - static void eventcb(struct bufferevent* bev, short what, void* ctx); - static void timeoutcb(evutil_socket_t fd, short what, void* arg); - void runThread(); - void clearBufferEventCallback(); - void freeBufferEvent(); - void exitBaseDispatch(); - void setTcpConnectEvent(tcpConnectStatus connectStatus); + TcpTransport(TcpRemotingClient* pTcpRemotingClient, TcpTransportReadCallback handle = nullptr); + + static void readNextMessageIntCallback(BufferEvent* event, TcpTransport* transport); + static void eventCallback(BufferEvent* event, short what, TcpTransport* transport); + + void messageReceived(const MemoryBlock& mem, const std::string& addr); + void freeBufferEvent(); // not thread-safe + + void setTcpConnectEvent(TcpConnectStatus connectStatus); + void setTcpConnectStatus(TcpConnectStatus connectStatus); + u_long getInetAddr(std::string& hostname); private: uint64_t m_startTime; - boost::mutex m_socketLock; - struct event_base* m_eventBase; - struct bufferevent* m_bufferEvent; - boost::atomic m_tcpConnectStatus; - boost::mutex m_connectEventLock; - boost::condition_variable_any m_connectEvent; - boost::atomic m_event_base_status; - boost::mutex m_event_base_mtx; - boost::condition_variable_any m_event_base_cv; + std::shared_ptr m_event; // NOTE: use m_event in callback is unsafe. + std::mutex m_eventLock; + std::atomic m_tcpConnectStatus; - //