From de9ea92898f7efa5a28abf73ae4161d97742f700 Mon Sep 17 00:00:00 2001 From: laohu <2372554140@qq.com> Date: Sun, 6 Jan 2019 21:46:44 +0800 Subject: [PATCH 1/8] asyn --- example/Producer.c | 29 ++++++++++++++++++++++++--- include/CProducer.h | 3 +++ src/extern/CProducer.cpp | 42 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/example/Producer.c b/example/Producer.c index cef8383cb..9a9912d23 100644 --- a/example/Producer.c +++ b/example/Producer.c @@ -57,16 +57,39 @@ void startSendMessage(CProducer *producer) { } } +void sendSuccessCallback(CSendResult *result){ + printf("Msg Send ID:%s\n", result->msgId); +} + +void sendExceptionCallback(char *exceptionInfo){ + +} + +void startSendMessageAsync(CProducer *producer){ + int i = 0; + char DestMsg[256]; + CMessage *msg = CreateMessage("T_TestTopic"); + SetMessageTags(msg, "Test_Tag"); + SetMessageKeys(msg, "Test_Keys"); + for (i = 0; i < 10; i++) { + printf("send one message : %d\n", i); + memset(DestMsg, 0, sizeof(DestMsg)); + snprintf(DestMsg, 255, "New message body: index %d", i); + SetMessageBody(msg, DestMsg); + SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); + thread_sleep(1000); + } +} int main(int argc, char *argv[]) { printf("Producer Initializing.....\n"); CProducer *producer = CreateProducer("Group_producer"); - SetProducerNameServerAddress(producer, "172.17.0.2:9876"); + SetProducerNameServerAddress(producer, "127.0.0.1:9876"); StartProducer(producer); printf("Producer start.....\n"); - startSendMessage(producer); - + //startSendMessage(producer); + startSendMessageAsync(producer); ShutdownProducer(producer); DestroyProducer(producer); printf("Producer Shutdown!\n"); diff --git a/include/CProducer.h b/include/CProducer.h index 6edd99e53..1b0229ce2 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -29,6 +29,8 @@ extern "C" { //typedef struct _CProducer_ _CProducer; typedef struct CProducer CProducer; typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg); +typedef void(*CSendSuccessCallback)(CSendResult *result); +typedef void(*CSendExceptionCallback)(char *exceptionInfo); ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId); ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer); @@ -49,6 +51,7 @@ ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer *producer, int level); ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer *producer, int size); ROCKETMQCLIENT_API int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result); +ROCKETMQCLIENT_API int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback , CSendExceptionCallback cSendExceptionCallback); ROCKETMQCLIENT_API int SendMessageOneway(CProducer *producer,CMessage *msg); ROCKETMQCLIENT_API int SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult *result); #ifdef __cplusplus diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 0715942ac..a08b0e96c 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -16,11 +16,15 @@ */ #include "DefaultMQProducer.h" +#include "AsyncCallback.h" + #include "CProducer.h" #include "CCommon.h" -#include +#include "CSendResult.h" #include "CMessage.h" +#include + #ifdef __cplusplus extern "C" { #endif @@ -45,6 +49,27 @@ class SelectMessageQueue : public MessageQueueSelector { QueueSelectorCallback m_pCallback; }; +class CSendCallback : public SendCallback{ +public: + CSendCallback(CSendSuccessCallback cSendSuccessCallback ,CSendExceptionCallback cSendExceptionCallback){ + m_cSendSuccessCallback = cSendSuccessCallback; + m_cSendExceptionCallback= cSendExceptionCallback; + } + + virtual void onSuccess(SendResult& sendResult) { + CSendResult *result; + result->sendStatus = CSendStatus((int) sendResult.getSendStatus()); + result->offset = sendResult.getQueueOffset(); + strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); + result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + m_cSendSuccessCallback( result); + } + virtual void onException(MQException& e) { cout << "send Exception\n"; } +private: + CSendSuccessCallback m_cSendSuccessCallback; + CSendExceptionCallback m_cSendExceptionCallback; +}; + CProducer *CreateProducer(const char *groupId) { if (groupId == NULL) { @@ -127,6 +152,21 @@ int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) { return OK; } +int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){ + if (producer == NULL || msg == NULL || cSendSuccessCallback ==NULL || cSendExceptionCallback==NULL) { + return NULL_POINTER; + } + DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer; + MQMessage *message = (MQMessage *) msg; + CSendCallback cSendCallback(cSendSuccessCallback, cSendExceptionCallback); + try { + defaultMQProducer->send(*message ,&cSendCallback,true); + } catch (exception &e) { + return PRODUCER_SEND_ONEWAY_FAILED; + } + return OK; +} + int SendMessageOneway(CProducer *producer, CMessage *msg) { if (producer == NULL || msg == NULL) { return NULL_POINTER; From e0779faec8456fe6e1d83254dcd37cf1ff8d8530 Mon Sep 17 00:00:00 2001 From: laohu <2372554140@qq.com> Date: Mon, 7 Jan 2019 19:16:04 +0800 Subject: [PATCH 2/8] pure virtual method called --- example/Producer.c | 26 +---------------------- include/CProducer.h | 2 +- src/extern/CProducer.cpp | 28 ++++++++++++------------ test/src/UrlTest.cpp | 46 ++++++++++++++++++++++++++++++++++++++-- 4 files changed, 61 insertions(+), 41 deletions(-) diff --git a/example/Producer.c b/example/Producer.c index 9a9912d23..5feedd68e 100644 --- a/example/Producer.c +++ b/example/Producer.c @@ -57,29 +57,6 @@ void startSendMessage(CProducer *producer) { } } -void sendSuccessCallback(CSendResult *result){ - printf("Msg Send ID:%s\n", result->msgId); -} - -void sendExceptionCallback(char *exceptionInfo){ - -} - -void startSendMessageAsync(CProducer *producer){ - int i = 0; - char DestMsg[256]; - CMessage *msg = CreateMessage("T_TestTopic"); - SetMessageTags(msg, "Test_Tag"); - SetMessageKeys(msg, "Test_Keys"); - for (i = 0; i < 10; i++) { - printf("send one message : %d\n", i); - memset(DestMsg, 0, sizeof(DestMsg)); - snprintf(DestMsg, 255, "New message body: index %d", i); - SetMessageBody(msg, DestMsg); - SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); - thread_sleep(1000); - } -} int main(int argc, char *argv[]) { printf("Producer Initializing.....\n"); @@ -88,8 +65,7 @@ int main(int argc, char *argv[]) { SetProducerNameServerAddress(producer, "127.0.0.1:9876"); StartProducer(producer); printf("Producer start.....\n"); - //startSendMessage(producer); - startSendMessageAsync(producer); + startSendMessage(producer); ShutdownProducer(producer); DestroyProducer(producer); printf("Producer Shutdown!\n"); diff --git a/include/CProducer.h b/include/CProducer.h index 1b0229ce2..2b7bafb26 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -29,7 +29,7 @@ extern "C" { //typedef struct _CProducer_ _CProducer; typedef struct CProducer CProducer; typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg); -typedef void(*CSendSuccessCallback)(CSendResult *result); +typedef void(*CSendSuccessCallback)(CSendResult result); typedef void(*CSendExceptionCallback)(char *exceptionInfo); ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId); diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index a08b0e96c..015b87ce8 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -51,21 +51,21 @@ class SelectMessageQueue : public MessageQueueSelector { class CSendCallback : public SendCallback{ public: - CSendCallback(CSendSuccessCallback cSendSuccessCallback ,CSendExceptionCallback cSendExceptionCallback){ - m_cSendSuccessCallback = cSendSuccessCallback; - m_cSendExceptionCallback= cSendExceptionCallback; - } virtual void onSuccess(SendResult& sendResult) { - CSendResult *result; - result->sendStatus = CSendStatus((int) sendResult.getSendStatus()); - result->offset = sendResult.getQueueOffset(); - strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); - result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; - m_cSendSuccessCallback( result); + cout << "send onSuccess\n"; + CSendResult result; + result.sendStatus = CSendStatus((int) sendResult.getSendStatus()); + result.offset = sendResult.getQueueOffset(); + strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); + result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + m_cSendSuccessCallback(result); + } - virtual void onException(MQException& e) { cout << "send Exception\n"; } -private: + virtual void onException(MQException& e) { + cout << "send Exception\n"; + } +public: CSendSuccessCallback m_cSendSuccessCallback; CSendExceptionCallback m_cSendExceptionCallback; }; @@ -158,7 +158,9 @@ int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cS } DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer; MQMessage *message = (MQMessage *) msg; - CSendCallback cSendCallback(cSendSuccessCallback, cSendExceptionCallback); + CSendCallback cSendCallback; + cSendCallback.m_cSendSuccessCallback = cSendSuccessCallback; + cSendCallback.m_cSendExceptionCallback = cSendExceptionCallback; try { defaultMQProducer->send(*message ,&cSendCallback,true); } catch (exception &e) { diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp index c7dead00c..9b5d483cc 100644 --- a/test/src/UrlTest.cpp +++ b/test/src/UrlTest.cpp @@ -20,6 +20,14 @@ #include "gtest/gtest.h" #include "gmock/gmock.h" +#include + +#include "CProducer.h" +#include "CCommon.h" +#include "CMessage.h" +#include "CSendResult.h" +#include + using namespace std; using ::testing::InitGoogleTest; using ::testing::InitGoogleMock; @@ -53,9 +61,43 @@ TEST(Url, Url) { } + +void sendSuccessCallback(CSendResult result){ + printf("Msg Send ID:%d\n", result.offset); +} + +void sendExceptionCallback(char *exceptionInfo){ + +} + +TEST(Producer, asynSend){ + CProducer *producer = CreateProducer("testGroup"); + SetProducerNameServerAddress(producer, "172.17.0.1:9876"); + StartProducer(producer); + printf("Producer start.....\n"); + + int i = 0; + char DestMsg[256]; + CMessage *msg = CreateMessage("test"); + SetMessageTags(msg, "Test_Tag"); + SetMessageKeys(msg, "Test_Keys"); + printf("send one message : %d\n", i); + memset(DestMsg, 0, sizeof(DestMsg)); + snprintf(DestMsg, 255, "New message body: index %d", 1); + SetMessageBody(msg, DestMsg); + SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); + usleep(100 * 1000); + ShutdownProducer(producer); + DestroyProducer(producer); + printf("Producer Shutdown!\n"); +} + + int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); - testing::GTEST_FLAG(filter) = "Url.Url"; - return RUN_ALL_TESTS(); + testing::GTEST_FLAG(filter) = "Producer.asynSend"; + int itestts = RUN_ALL_TESTS(); + printf("i %d" , itestts); + return itestts; } From 04a708d6f7fd17649de65dc065e0f95aab790ef3 Mon Sep 17 00:00:00 2001 From: laohu <2372554140@qq.com> Date: Wed, 9 Jan 2019 16:55:06 +0800 Subject: [PATCH 3/8] complete --- example/CAsyncProducer.c | 92 ++++++++++++++++++++++++++++++++++++++++ include/CProducer.h | 2 +- src/extern/CProducer.cpp | 26 +++++++----- test/src/UrlTest.cpp | 9 ++-- 4 files changed, 113 insertions(+), 16 deletions(-) create mode 100644 example/CAsyncProducer.c diff --git a/example/CAsyncProducer.c b/example/CAsyncProducer.c new file mode 100644 index 000000000..52f9edc78 --- /dev/null +++ b/example/CAsyncProducer.c @@ -0,0 +1,92 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include + +#include "CProducer.h" +#include "CCommon.h" +#include "CMessage.h" +#include "CSendResult.h" + +#ifdef _WIN32 +#include +#else + +#include +#include + +#endif + +void thread_sleep(unsigned milliseconds) { +#ifdef _WIN32 + Sleep(milliseconds); +#else + usleep(milliseconds * 1000); // takes microseconds +#endif +} + +void sendSuccessCallback(CSendResult result){ + printf("Msg Send ID:%s\n", result.msgId); +} + +void sendExceptionCallback(const char* exceptionInfo){ + printf("asyn send exception info : %s\n" , exceptionInfo); +} + +void startSendMessage(CProducer *producer) { + int i = 0; + char DestMsg[256]; + CMessage *msg = CreateMessage("T_TestTopic"); + SetMessageTags(msg, "Test_Tag"); + SetMessageKeys(msg, "Test_Keys"); + CSendResult result; + for (i = 0; i < 10; i++) { + printf("send one message : %d\n", i); + memset(DestMsg, 0, sizeof(DestMsg)); + snprintf(DestMsg, 255, "New message body: index %d", i); + SetMessageBody(msg, DestMsg); + SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); + printf("Msg Send ID:%s\n", result.msgId); + thread_sleep(1000); + } +} + +void CreateProducerAndStartSendMessage(int i){ + printf("Producer Initializing.....\n"); + CProducer *producer = CreateProducer("Group_producer"); + SetProducerNameServerAddress(producer, "127.0.0.1:9876"); + if(i == 1){ + SetProducerSendMsgTimeout(producer , 3); + } + StartProducer(producer); + printf("Producer start.....\n"); + startSendMessage(producer); + ShutdownProducer(producer); + DestroyProducer(producer); + printf("Producer Shutdown!\n"); +} + +int main(int argc, char *argv[]) { + printf("Send Async successCallback.....\n"); + CreateProducerAndStartSendMessage(0); + + printf("Send Async exceptionCallback.....\n"); + CreateProducerAndStartSendMessage(1); + + return 0; +} + diff --git a/include/CProducer.h b/include/CProducer.h index 2b7bafb26..c7cee64c4 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -30,7 +30,7 @@ extern "C" { typedef struct CProducer CProducer; typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg); typedef void(*CSendSuccessCallback)(CSendResult result); -typedef void(*CSendExceptionCallback)(char *exceptionInfo); +typedef void(*CSendExceptionCallback)(const char* exceptionInfo); ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId); ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer); diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 015b87ce8..363ea5ed6 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -49,9 +49,13 @@ class SelectMessageQueue : public MessageQueueSelector { QueueSelectorCallback m_pCallback; }; -class CSendCallback : public SendCallback{ +class CSendCallback : public AutoDeleteSendCallBack{ public: - + CSendCallback(CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){ + m_cSendSuccessCallback = cSendSuccessCallback; + m_cSendExceptionCallback = cSendExceptionCallback; + } + virtual ~CSendCallback(){} virtual void onSuccess(SendResult& sendResult) { cout << "send onSuccess\n"; CSendResult result; @@ -62,10 +66,11 @@ class CSendCallback : public SendCallback{ m_cSendSuccessCallback(result); } - virtual void onException(MQException& e) { - cout << "send Exception\n"; - } -public: + virtual void onException(MQException& e) { + m_cSendExceptionCallback( e.what() ); + + } +private: CSendSuccessCallback m_cSendSuccessCallback; CSendExceptionCallback m_cSendExceptionCallback; }; @@ -153,16 +158,15 @@ int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) { } int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){ - if (producer == NULL || msg == NULL || cSendSuccessCallback ==NULL || cSendExceptionCallback==NULL) { + if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) { return NULL_POINTER; } DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer; MQMessage *message = (MQMessage *) msg; - CSendCallback cSendCallback; - cSendCallback.m_cSendSuccessCallback = cSendSuccessCallback; - cSendCallback.m_cSendExceptionCallback = cSendExceptionCallback; + CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , cSendExceptionCallback); + try { - defaultMQProducer->send(*message ,&cSendCallback,true); + defaultMQProducer->send(*message ,cSendCallback); } catch (exception &e) { return PRODUCER_SEND_ONEWAY_FAILED; } diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp index 9b5d483cc..a0414f19e 100644 --- a/test/src/UrlTest.cpp +++ b/test/src/UrlTest.cpp @@ -63,16 +63,17 @@ TEST(Url, Url) { void sendSuccessCallback(CSendResult result){ - printf("Msg Send ID:%d\n", result.offset); + printf("Msg Send ID:%s\n", result.msgId); } -void sendExceptionCallback(char *exceptionInfo){ - +void sendExceptionCallback(const char* exceptionInfo){ + printf("asyn send exception info : %s\n" , exceptionInfo); } TEST(Producer, asynSend){ CProducer *producer = CreateProducer("testGroup"); - SetProducerNameServerAddress(producer, "172.17.0.1:9876"); + SetProducerNameServerAddress(producer, "127.0.0.1:9876"); + SetProducerSendMsgTimeout(producer , 3); StartProducer(producer); printf("Producer start.....\n"); From 6a9776ba25bebf3deaa00f437d7ebf2db932af72 Mon Sep 17 00:00:00 2001 From: laohu <2372554140@qq.com> Date: Thu, 10 Jan 2019 15:18:35 +0800 Subject: [PATCH 4/8] add CMQException --- example/CAsyncProducer.c | 7 +++++-- include/CMQException.h | 39 +++++++++++++++++++++++++++++++++++++ include/CProducer.h | 3 ++- include/MQClientException.h | 10 ++++++++++ src/extern/CProducer.cpp | 9 +++++++-- test/src/UrlTest.cpp | 34 ++------------------------------ 6 files changed, 65 insertions(+), 37 deletions(-) create mode 100644 include/CMQException.h diff --git a/example/CAsyncProducer.c b/example/CAsyncProducer.c index 52f9edc78..c5b24488a 100644 --- a/example/CAsyncProducer.c +++ b/example/CAsyncProducer.c @@ -43,8 +43,11 @@ void sendSuccessCallback(CSendResult result){ printf("Msg Send ID:%s\n", result.msgId); } -void sendExceptionCallback(const char* exceptionInfo){ - printf("asyn send exception info : %s\n" , exceptionInfo); +void sendExceptionCallback(CMQException e){ + printf("asyn send exception error : %d\n" , e.error); + printf("asyn send exception msg : %s\n" , e.msg); + printf("asyn send exception file : %s\n" , e.file); + printf("asyn send exception line : %d\n" , e.line); } void startSendMessage(CProducer *producer) { diff --git a/include/CMQException.h b/include/CMQException.h new file mode 100644 index 000000000..13df478c8 --- /dev/null +++ b/include/CMQException.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __C_MQEXCPTION_H__ +#define __C_MQEXCPTION_H__ +#include "CCommon.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _CMQException_{ + int error; + int line; + char file[MAX_MESSAGE_ID_LENGTH]; + char msg[MAX_MESSAGE_ID_LENGTH]; + char type[MAX_MESSAGE_ID_LENGTH]; + +} CMQException; + +#ifdef __cplusplus +}; +#endif +#endif diff --git a/include/CProducer.h b/include/CProducer.h index c7cee64c4..aa54a0ba6 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -21,6 +21,7 @@ #include "CCommon.h" #include "CMessage.h" #include "CSendResult.h" +#include "CMQException.h" #ifdef __cplusplus extern "C" { @@ -30,7 +31,7 @@ extern "C" { typedef struct CProducer CProducer; typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg); typedef void(*CSendSuccessCallback)(CSendResult result); -typedef void(*CSendExceptionCallback)(const char* exceptionInfo); +typedef void(*CSendExceptionCallback)(CMQException e); ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId); ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer); diff --git a/include/MQClientException.h b/include/MQClientException.h index bf29863a8..8b1b425b4 100755 --- a/include/MQClientException.h +++ b/include/MQClientException.h @@ -21,12 +21,17 @@ #include #include #include + +#include #include "RocketMQClient.h" +#include "CCommon.h" + namespace rocketmq { // @@ -67,8 +68,12 @@ class CSendCallback : public AutoDeleteSendCallBack{ } virtual void onException(MQException& e) { - m_cSendExceptionCallback( e.what() ); - + CMQException exception; + exception.error = e.GetError(); + exception.line = e.GetLine(); + strncpy(exception.msg, e.what(), MAX_MESSAGE_ID_LENGTH - 1); + strncpy(exception.file, e.GetFile(), MAX_MESSAGE_ID_LENGTH - 1); + m_cSendExceptionCallback( exception ); } private: CSendSuccessCallback m_cSendSuccessCallback; diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp index a0414f19e..e310cf44b 100644 --- a/test/src/UrlTest.cpp +++ b/test/src/UrlTest.cpp @@ -26,6 +26,7 @@ #include "CCommon.h" #include "CMessage.h" #include "CSendResult.h" +#include "CMQException.h" #include using namespace std; @@ -62,42 +63,11 @@ TEST(Url, Url) { } -void sendSuccessCallback(CSendResult result){ - printf("Msg Send ID:%s\n", result.msgId); -} - -void sendExceptionCallback(const char* exceptionInfo){ - printf("asyn send exception info : %s\n" , exceptionInfo); -} - -TEST(Producer, asynSend){ - CProducer *producer = CreateProducer("testGroup"); - SetProducerNameServerAddress(producer, "127.0.0.1:9876"); - SetProducerSendMsgTimeout(producer , 3); - StartProducer(producer); - printf("Producer start.....\n"); - - int i = 0; - char DestMsg[256]; - CMessage *msg = CreateMessage("test"); - SetMessageTags(msg, "Test_Tag"); - SetMessageKeys(msg, "Test_Keys"); - printf("send one message : %d\n", i); - memset(DestMsg, 0, sizeof(DestMsg)); - snprintf(DestMsg, 255, "New message body: index %d", 1); - SetMessageBody(msg, DestMsg); - SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); - usleep(100 * 1000); - ShutdownProducer(producer); - DestroyProducer(producer); - printf("Producer Shutdown!\n"); -} - int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); - testing::GTEST_FLAG(filter) = "Producer.asynSend"; + testing::GTEST_FLAG(filter) = "Url.Url"; int itestts = RUN_ALL_TESTS(); printf("i %d" , itestts); return itestts; From 1541a5ddaf411d1c28ca825d70cafb71a1bb2546 Mon Sep 17 00:00:00 2001 From: laohu <2372554140@qq.com> Date: Thu, 10 Jan 2019 19:44:34 +0800 Subject: [PATCH 5/8] BROADCASTING conflict --- example/PushConsumer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/PushConsumer.cpp b/example/PushConsumer.cpp index d5ce02171..119b7f267 100755 --- a/example/PushConsumer.cpp +++ b/example/PushConsumer.cpp @@ -81,7 +81,7 @@ int main(int argc, char *argv[]) { if (info.syncpush) consumer.setAsyncPull(false); // set sync pull if (info.broadcasting) { - consumer.setMessageModel(BROADCASTING); + consumer.setMessageModel(rocketmq::BROADCASTING); } consumer.setInstanceName(info.groupname); From dd4ddc9ae8bb0504b7ab403499e4367dc35b0ba0 Mon Sep 17 00:00:00 2001 From: laohu <2372554140@qq.com> Date: Fri, 11 Jan 2019 14:51:38 +0800 Subject: [PATCH 6/8] exception delete cSendCallback --- include/CMQException.h | 8 +++++--- src/extern/CProducer.cpp | 24 +++++++++++++++++------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/include/CMQException.h b/include/CMQException.h index 13df478c8..50560ef00 100644 --- a/include/CMQException.h +++ b/include/CMQException.h @@ -24,12 +24,14 @@ extern "C" { #endif +#define MAX_EXEPTION_CHAR_LENGTH 256 + typedef struct _CMQException_{ int error; int line; - char file[MAX_MESSAGE_ID_LENGTH]; - char msg[MAX_MESSAGE_ID_LENGTH]; - char type[MAX_MESSAGE_ID_LENGTH]; + char file[MAX_EXEPTION_CHAR_LENGTH]; + char msg[MAX_EXEPTION_CHAR_LENGTH]; + char type[MAX_EXEPTION_CHAR_LENGTH]; } CMQException; diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index ef8109081..f3b7a65c8 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -25,6 +25,8 @@ #include "CMQException.h" #include +#include + #ifdef __cplusplus extern "C" { @@ -68,16 +70,16 @@ class CSendCallback : public AutoDeleteSendCallBack{ } virtual void onException(MQException& e) { - CMQException exception; - exception.error = e.GetError(); - exception.line = e.GetLine(); - strncpy(exception.msg, e.what(), MAX_MESSAGE_ID_LENGTH - 1); - strncpy(exception.file, e.GetFile(), MAX_MESSAGE_ID_LENGTH - 1); + CMQException exception; + exception.error = e.GetError(); + exception.line = e.GetLine(); + strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1); + strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1); m_cSendExceptionCallback( exception ); } private: - CSendSuccessCallback m_cSendSuccessCallback; - CSendExceptionCallback m_cSendExceptionCallback; + CSendSuccessCallback m_cSendSuccessCallback; + CSendExceptionCallback m_cSendExceptionCallback; }; @@ -173,6 +175,14 @@ int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cS try { defaultMQProducer->send(*message ,cSendCallback); } catch (exception &e) { + if(cSendCallback != NULL){ + if(typeid(e) == typeid( MQException )){ + MQException &mqe = (MQException &)e; + cSendCallback->onException( mqe ); + } + delete cSendCallback; + cSendCallback = NULL; + } return PRODUCER_SEND_ONEWAY_FAILED; } return OK; From 0cb2025b65f4ef486a714c6d89c7db79636306f9 Mon Sep 17 00:00:00 2001 From: laohu <2372554140@qq.com> Date: Tue, 15 Jan 2019 10:10:06 +0800 Subject: [PATCH 7/8] change --- include/CProducer.h | 1 - include/MQClientException.h | 4 ++-- src/extern/CProducer.cpp | 2 -- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/include/CProducer.h b/include/CProducer.h index aa54a0ba6..e75622c46 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -18,7 +18,6 @@ #ifndef __C_PRODUCER_H__ #define __C_PRODUCER_H__ -#include "CCommon.h" #include "CMessage.h" #include "CSendResult.h" #include "CMQException.h" diff --git a/include/MQClientException.h b/include/MQClientException.h index 8b1b425b4..2c3d2eda8 100755 --- a/include/MQClientException.h +++ b/include/MQClientException.h @@ -65,9 +65,9 @@ class ROCKETMQCLIENT_API MQException : public std::exception { virtual const char* GetType() const throw() { return m_type.c_str(); } - int GetLine() const throw(){ return m_line;} + int GetLine() { return m_line;} - const char* GetFile() const throw() { return m_file.c_str(); } + const char* GetFile() { return m_file.c_str(); } protected: int m_error; diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index f3b7a65c8..435e26972 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -19,7 +19,6 @@ #include "AsyncCallback.h" #include "CProducer.h" -#include "CCommon.h" #include "CSendResult.h" #include "CMessage.h" #include "CMQException.h" @@ -60,7 +59,6 @@ class CSendCallback : public AutoDeleteSendCallBack{ } virtual ~CSendCallback(){} virtual void onSuccess(SendResult& sendResult) { - cout << "send onSuccess\n"; CSendResult result; result.sendStatus = CSendStatus((int) sendResult.getSendStatus()); result.offset = sendResult.getQueueOffset(); From 214d6834bcbb57347622cc28c1d1471f90b6954d Mon Sep 17 00:00:00 2001 From: laohu <2372554140@qq.com> Date: Tue, 15 Jan 2019 14:54:43 +0800 Subject: [PATCH 8/8] change --- example/CAsyncProducer.c | 6 +++--- include/CCommon.h | 1 + include/CMQException.h | 2 +- src/extern/CProducer.cpp | 3 ++- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/example/CAsyncProducer.c b/example/CAsyncProducer.c index c5b24488a..56496b5ad 100644 --- a/example/CAsyncProducer.c +++ b/example/CAsyncProducer.c @@ -60,10 +60,10 @@ void startSendMessage(CProducer *producer) { for (i = 0; i < 10; i++) { printf("send one message : %d\n", i); memset(DestMsg, 0, sizeof(DestMsg)); - snprintf(DestMsg, 255, "New message body: index %d", i); + snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i); SetMessageBody(msg, DestMsg); - SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); - printf("Msg Send ID:%s\n", result.msgId); + int code = SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); + printf("Async send return code: %d\n", code); thread_sleep(1000); } } diff --git a/include/CCommon.h b/include/CCommon.h index efcd2aa66..eb9ffbce5 100644 --- a/include/CCommon.h +++ b/include/CCommon.h @@ -36,6 +36,7 @@ typedef enum _CStatus_{ PRODUCER_SEND_SYNC_FAILED = 11, PRODUCER_SEND_ONEWAY_FAILED = 12, PRODUCER_SEND_ORDERLY_FAILED = 13, + PRODUCER_SEND_ASYNC_FAILED = 14, PUSHCONSUMER_ERROR_CODE_START = 20, PUSHCONSUMER_START_FAILED = 20, diff --git a/include/CMQException.h b/include/CMQException.h index 50560ef00..da26edd94 100644 --- a/include/CMQException.h +++ b/include/CMQException.h @@ -24,7 +24,7 @@ extern "C" { #endif -#define MAX_EXEPTION_CHAR_LENGTH 256 +#define MAX_EXEPTION_CHAR_LENGTH 512 typedef struct _CMQException_{ int error; diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 435e26972..c238e5924 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -19,6 +19,7 @@ #include "AsyncCallback.h" #include "CProducer.h" +#include "CCommon.h" #include "CSendResult.h" #include "CMessage.h" #include "CMQException.h" @@ -181,7 +182,7 @@ int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cS delete cSendCallback; cSendCallback = NULL; } - return PRODUCER_SEND_ONEWAY_FAILED; + return PRODUCER_SEND_ASYNC_FAILED; } return OK; }