diff --git a/example/CAsyncProducer.c b/example/CAsyncProducer.c new file mode 100644 index 000000000..56496b5ad --- /dev/null +++ b/example/CAsyncProducer.c @@ -0,0 +1,95 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include + +#include "CProducer.h" +#include "CCommon.h" +#include "CMessage.h" +#include "CSendResult.h" + +#ifdef _WIN32 +#include +#else + +#include +#include + +#endif + +void thread_sleep(unsigned milliseconds) { +#ifdef _WIN32 + Sleep(milliseconds); +#else + usleep(milliseconds * 1000); // takes microseconds +#endif +} + +void sendSuccessCallback(CSendResult result){ + printf("Msg Send ID:%s\n", result.msgId); +} + +void sendExceptionCallback(CMQException e){ + printf("asyn send exception error : %d\n" , e.error); + printf("asyn send exception msg : %s\n" , e.msg); + printf("asyn send exception file : %s\n" , e.file); + printf("asyn send exception line : %d\n" , e.line); +} + +void startSendMessage(CProducer *producer) { + int i = 0; + char DestMsg[256]; + CMessage *msg = CreateMessage("T_TestTopic"); + SetMessageTags(msg, "Test_Tag"); + SetMessageKeys(msg, "Test_Keys"); + CSendResult result; + for (i = 0; i < 10; i++) { + printf("send one message : %d\n", i); + memset(DestMsg, 0, sizeof(DestMsg)); + snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i); + SetMessageBody(msg, DestMsg); + int code = SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); + printf("Async send return code: %d\n", code); + thread_sleep(1000); + } +} + +void CreateProducerAndStartSendMessage(int i){ + printf("Producer Initializing.....\n"); + CProducer *producer = CreateProducer("Group_producer"); + SetProducerNameServerAddress(producer, "127.0.0.1:9876"); + if(i == 1){ + SetProducerSendMsgTimeout(producer , 3); + } + StartProducer(producer); + printf("Producer start.....\n"); + startSendMessage(producer); + ShutdownProducer(producer); + DestroyProducer(producer); + printf("Producer Shutdown!\n"); +} + +int main(int argc, char *argv[]) { + printf("Send Async successCallback.....\n"); + CreateProducerAndStartSendMessage(0); + + printf("Send Async exceptionCallback.....\n"); + CreateProducerAndStartSendMessage(1); + + return 0; +} + diff --git a/example/Producer.c b/example/Producer.c index cef8383cb..5feedd68e 100644 --- a/example/Producer.c +++ b/example/Producer.c @@ -62,11 +62,10 @@ int main(int argc, char *argv[]) { printf("Producer Initializing.....\n"); CProducer *producer = CreateProducer("Group_producer"); - SetProducerNameServerAddress(producer, "172.17.0.2:9876"); + SetProducerNameServerAddress(producer, "127.0.0.1:9876"); StartProducer(producer); printf("Producer start.....\n"); startSendMessage(producer); - ShutdownProducer(producer); DestroyProducer(producer); printf("Producer Shutdown!\n"); diff --git a/example/PushConsumer.cpp b/example/PushConsumer.cpp index d5ce02171..119b7f267 100755 --- a/example/PushConsumer.cpp +++ b/example/PushConsumer.cpp @@ -81,7 +81,7 @@ int main(int argc, char *argv[]) { if (info.syncpush) consumer.setAsyncPull(false); // set sync pull if (info.broadcasting) { - consumer.setMessageModel(BROADCASTING); + consumer.setMessageModel(rocketmq::BROADCASTING); } consumer.setInstanceName(info.groupname); diff --git a/include/CCommon.h b/include/CCommon.h index efcd2aa66..eb9ffbce5 100644 --- a/include/CCommon.h +++ b/include/CCommon.h @@ -36,6 +36,7 @@ typedef enum _CStatus_{ PRODUCER_SEND_SYNC_FAILED = 11, PRODUCER_SEND_ONEWAY_FAILED = 12, PRODUCER_SEND_ORDERLY_FAILED = 13, + PRODUCER_SEND_ASYNC_FAILED = 14, PUSHCONSUMER_ERROR_CODE_START = 20, PUSHCONSUMER_START_FAILED = 20, diff --git a/include/CMQException.h b/include/CMQException.h new file mode 100644 index 000000000..da26edd94 --- /dev/null +++ b/include/CMQException.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __C_MQEXCPTION_H__ +#define __C_MQEXCPTION_H__ +#include "CCommon.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define MAX_EXEPTION_CHAR_LENGTH 512 + +typedef struct _CMQException_{ + int error; + int line; + char file[MAX_EXEPTION_CHAR_LENGTH]; + char msg[MAX_EXEPTION_CHAR_LENGTH]; + char type[MAX_EXEPTION_CHAR_LENGTH]; + +} CMQException; + +#ifdef __cplusplus +}; +#endif +#endif diff --git a/include/CProducer.h b/include/CProducer.h index 6edd99e53..e75622c46 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -18,9 +18,9 @@ #ifndef __C_PRODUCER_H__ #define __C_PRODUCER_H__ -#include "CCommon.h" #include "CMessage.h" #include "CSendResult.h" +#include "CMQException.h" #ifdef __cplusplus extern "C" { @@ -29,6 +29,8 @@ extern "C" { //typedef struct _CProducer_ _CProducer; typedef struct CProducer CProducer; typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg); +typedef void(*CSendSuccessCallback)(CSendResult result); +typedef void(*CSendExceptionCallback)(CMQException e); ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId); ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer); @@ -49,6 +51,7 @@ ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer *producer, int level); ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer *producer, int size); ROCKETMQCLIENT_API int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result); +ROCKETMQCLIENT_API int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback , CSendExceptionCallback cSendExceptionCallback); ROCKETMQCLIENT_API int SendMessageOneway(CProducer *producer,CMessage *msg); ROCKETMQCLIENT_API int SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult *result); #ifdef __cplusplus diff --git a/include/MQClientException.h b/include/MQClientException.h index bf29863a8..2c3d2eda8 100755 --- a/include/MQClientException.h +++ b/include/MQClientException.h @@ -21,12 +21,17 @@ #include #include #include + +#include #include "RocketMQClient.h" +#include "CCommon.h" + namespace rocketmq { // +#include "CSendResult.h" #include "CMessage.h" +#include "CMQException.h" + +#include +#include + #ifdef __cplusplus extern "C" { @@ -45,6 +52,35 @@ class SelectMessageQueue : public MessageQueueSelector { QueueSelectorCallback m_pCallback; }; +class CSendCallback : public AutoDeleteSendCallBack{ +public: + CSendCallback(CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){ + m_cSendSuccessCallback = cSendSuccessCallback; + m_cSendExceptionCallback = cSendExceptionCallback; + } + virtual ~CSendCallback(){} + virtual void onSuccess(SendResult& sendResult) { + CSendResult result; + result.sendStatus = CSendStatus((int) sendResult.getSendStatus()); + result.offset = sendResult.getQueueOffset(); + strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); + result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + m_cSendSuccessCallback(result); + + } + virtual void onException(MQException& e) { + CMQException exception; + exception.error = e.GetError(); + exception.line = e.GetLine(); + strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1); + strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1); + m_cSendExceptionCallback( exception ); + } +private: + CSendSuccessCallback m_cSendSuccessCallback; + CSendExceptionCallback m_cSendExceptionCallback; +}; + CProducer *CreateProducer(const char *groupId) { if (groupId == NULL) { @@ -127,6 +163,30 @@ int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) { return OK; } +int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){ + if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) { + return NULL_POINTER; + } + DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer; + MQMessage *message = (MQMessage *) msg; + CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , cSendExceptionCallback); + + try { + defaultMQProducer->send(*message ,cSendCallback); + } catch (exception &e) { + if(cSendCallback != NULL){ + if(typeid(e) == typeid( MQException )){ + MQException &mqe = (MQException &)e; + cSendCallback->onException( mqe ); + } + delete cSendCallback; + cSendCallback = NULL; + } + return PRODUCER_SEND_ASYNC_FAILED; + } + return OK; +} + int SendMessageOneway(CProducer *producer, CMessage *msg) { if (producer == NULL || msg == NULL) { return NULL_POINTER; diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp index c7dead00c..e310cf44b 100644 --- a/test/src/UrlTest.cpp +++ b/test/src/UrlTest.cpp @@ -20,6 +20,15 @@ #include "gtest/gtest.h" #include "gmock/gmock.h" +#include + +#include "CProducer.h" +#include "CCommon.h" +#include "CMessage.h" +#include "CSendResult.h" +#include "CMQException.h" +#include + using namespace std; using ::testing::InitGoogleTest; using ::testing::InitGoogleMock; @@ -53,9 +62,13 @@ TEST(Url, Url) { } + + int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); testing::GTEST_FLAG(filter) = "Url.Url"; - return RUN_ALL_TESTS(); + int itestts = RUN_ALL_TESTS(); + printf("i %d" , itestts); + return itestts; }