-
Notifications
You must be signed in to change notification settings - Fork 167
Realization C asynSend #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
de9ea92
e0779fa
04a708d
6a9776b
1541a5d
dd4ddc9
0cb2025
214d683
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include <stdio.h> | ||
|
||
#include "CProducer.h" | ||
#include "CCommon.h" | ||
#include "CMessage.h" | ||
#include "CSendResult.h" | ||
|
||
#ifdef _WIN32 | ||
#include <windows.h> | ||
#else | ||
|
||
#include <unistd.h> | ||
#include <memory.h> | ||
|
||
#endif | ||
|
||
void thread_sleep(unsigned milliseconds) { | ||
#ifdef _WIN32 | ||
Sleep(milliseconds); | ||
#else | ||
usleep(milliseconds * 1000); // takes microseconds | ||
#endif | ||
} | ||
|
||
void sendSuccessCallback(CSendResult result){ | ||
printf("Msg Send ID:%s\n", result.msgId); | ||
} | ||
|
||
void sendExceptionCallback(CMQException e){ | ||
printf("asyn send exception error : %d\n" , e.error); | ||
printf("asyn send exception msg : %s\n" , e.msg); | ||
printf("asyn send exception file : %s\n" , e.file); | ||
printf("asyn send exception line : %d\n" , e.line); | ||
} | ||
|
||
void startSendMessage(CProducer *producer) { | ||
int i = 0; | ||
char DestMsg[256]; | ||
CMessage *msg = CreateMessage("T_TestTopic"); | ||
SetMessageTags(msg, "Test_Tag"); | ||
SetMessageKeys(msg, "Test_Keys"); | ||
CSendResult result; | ||
for (i = 0; i < 10; i++) { | ||
printf("send one message : %d\n", i); | ||
memset(DestMsg, 0, sizeof(DestMsg)); | ||
snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i); | ||
SetMessageBody(msg, DestMsg); | ||
int code = SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback); | ||
printf("Async send return code: %d\n", code); | ||
thread_sleep(1000); | ||
} | ||
} | ||
|
||
void CreateProducerAndStartSendMessage(int i){ | ||
printf("Producer Initializing.....\n"); | ||
CProducer *producer = CreateProducer("Group_producer"); | ||
SetProducerNameServerAddress(producer, "127.0.0.1:9876"); | ||
if(i == 1){ | ||
SetProducerSendMsgTimeout(producer , 3); | ||
} | ||
StartProducer(producer); | ||
printf("Producer start.....\n"); | ||
startSendMessage(producer); | ||
ShutdownProducer(producer); | ||
DestroyProducer(producer); | ||
printf("Producer Shutdown!\n"); | ||
} | ||
|
||
int main(int argc, char *argv[]) { | ||
printf("Send Async successCallback.....\n"); | ||
CreateProducerAndStartSendMessage(0); | ||
|
||
printf("Send Async exceptionCallback.....\n"); | ||
CreateProducerAndStartSendMessage(1); | ||
|
||
return 0; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
|
||
#ifndef __C_MQEXCPTION_H__ | ||
#define __C_MQEXCPTION_H__ | ||
#include "CCommon.h" | ||
|
||
#ifdef __cplusplus | ||
extern "C" { | ||
#endif | ||
|
||
#define MAX_EXEPTION_CHAR_LENGTH 512 | ||
|
||
typedef struct _CMQException_{ | ||
int error; | ||
int line; | ||
char file[MAX_EXEPTION_CHAR_LENGTH]; | ||
char msg[MAX_EXEPTION_CHAR_LENGTH]; | ||
char type[MAX_EXEPTION_CHAR_LENGTH]; | ||
|
||
} CMQException; | ||
|
||
#ifdef __cplusplus | ||
}; | ||
#endif | ||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,9 @@ | |
#ifndef __C_PRODUCER_H__ | ||
#define __C_PRODUCER_H__ | ||
|
||
#include "CCommon.h" | ||
#include "CMessage.h" | ||
#include "CSendResult.h" | ||
#include "CMQException.h" | ||
|
||
#ifdef __cplusplus | ||
extern "C" { | ||
|
@@ -29,6 +29,8 @@ extern "C" { | |
//typedef struct _CProducer_ _CProducer; | ||
typedef struct CProducer CProducer; | ||
typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why need the parameter size, what's it function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. QueueSelectorCallback is not asyn send callback function |
||
typedef void(*CSendSuccessCallback)(CSendResult result); | ||
typedef void(*CSendExceptionCallback)(CMQException e); | ||
|
||
ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId); | ||
ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer); | ||
|
@@ -49,6 +51,7 @@ ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer *producer, int level); | |
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer *producer, int size); | ||
|
||
ROCKETMQCLIENT_API int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result); | ||
ROCKETMQCLIENT_API int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback , CSendExceptionCallback cSendExceptionCallback); | ||
ROCKETMQCLIENT_API int SendMessageOneway(CProducer *producer,CMessage *msg); | ||
ROCKETMQCLIENT_API int SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult *result); | ||
#ifdef __cplusplus | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,12 +21,17 @@ | |
#include <ostream> | ||
#include <sstream> | ||
#include <string> | ||
|
||
#include <string.h> | ||
#include "RocketMQClient.h" | ||
#include "CCommon.h" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why include C type head file here? it is better to remove it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Already processed |
||
|
||
|
||
|
||
namespace rocketmq { | ||
//<!*************************************************************************** | ||
class ROCKETMQCLIENT_API MQException : public std::exception { | ||
|
||
public: | ||
MQException(const std::string& msg, int error, const char* file, | ||
int line) throw() | ||
|
@@ -60,6 +65,10 @@ class ROCKETMQCLIENT_API MQException : public std::exception { | |
|
||
virtual const char* GetType() const throw() { return m_type.c_str(); } | ||
|
||
int GetLine() { return m_line;} | ||
|
||
const char* GetFile() { return m_file.c_str(); } | ||
|
||
protected: | ||
int m_error; | ||
int m_line; | ||
|
@@ -68,6 +77,7 @@ class ROCKETMQCLIENT_API MQException : public std::exception { | |
std::string m_type; | ||
}; | ||
|
||
|
||
inline std::ostream& operator<<(std::ostream& os, const MQException& e) { | ||
os << "Type: " << e.GetType() << " , " << e.what(); | ||
return os; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,10 +16,17 @@ | |
*/ | ||
|
||
#include "DefaultMQProducer.h" | ||
#include "AsyncCallback.h" | ||
|
||
#include "CProducer.h" | ||
#include "CCommon.h" | ||
#include <string.h> | ||
#include "CSendResult.h" | ||
#include "CMessage.h" | ||
#include "CMQException.h" | ||
|
||
#include <string.h> | ||
#include <typeinfo> | ||
|
||
|
||
#ifdef __cplusplus | ||
extern "C" { | ||
|
@@ -45,6 +52,35 @@ class SelectMessageQueue : public MessageQueueSelector { | |
QueueSelectorCallback m_pCallback; | ||
}; | ||
|
||
class CSendCallback : public AutoDeleteSendCallBack{ | ||
public: | ||
CSendCallback(CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){ | ||
m_cSendSuccessCallback = cSendSuccessCallback; | ||
m_cSendExceptionCallback = cSendExceptionCallback; | ||
} | ||
virtual ~CSendCallback(){} | ||
virtual void onSuccess(SendResult& sendResult) { | ||
CSendResult result; | ||
result.sendStatus = CSendStatus((int) sendResult.getSendStatus()); | ||
result.offset = sendResult.getQueueOffset(); | ||
strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); | ||
result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; | ||
m_cSendSuccessCallback(result); | ||
|
||
} | ||
virtual void onException(MQException& e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please format the alignment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Already processed |
||
CMQException exception; | ||
exception.error = e.GetError(); | ||
exception.line = e.GetLine(); | ||
strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1); | ||
strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1); | ||
m_cSendExceptionCallback( exception ); | ||
} | ||
private: | ||
CSendSuccessCallback m_cSendSuccessCallback; | ||
CSendExceptionCallback m_cSendExceptionCallback; | ||
}; | ||
|
||
|
||
CProducer *CreateProducer(const char *groupId) { | ||
if (groupId == NULL) { | ||
|
@@ -127,6 +163,30 @@ int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) { | |
return OK; | ||
} | ||
|
||
int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){ | ||
if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) { | ||
return NULL_POINTER; | ||
} | ||
DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer; | ||
MQMessage *message = (MQMessage *) msg; | ||
CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , cSendExceptionCallback); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See MQClientAPIImpl::sendMessageAsync and AsyncCallbackWrap::operationComplete(90 line),and AsyncCallbackWrap::onException()[ 117 line] |
||
|
||
try { | ||
defaultMQProducer->send(*message ,cSendCallback); | ||
} catch (exception &e) { | ||
if(cSendCallback != NULL){ | ||
if(typeid(e) == typeid( MQException )){ | ||
MQException &mqe = (MQException &)e; | ||
cSendCallback->onException( mqe ); | ||
} | ||
delete cSendCallback; | ||
cSendCallback = NULL; | ||
} | ||
return PRODUCER_SEND_ASYNC_FAILED; | ||
} | ||
return OK; | ||
} | ||
|
||
int SendMessageOneway(CProducer *producer, CMessage *msg) { | ||
if (producer == NULL || msg == NULL) { | ||
return NULL_POINTER; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is better to define the MACRO separately, and the length of msg maybe bigger than 256, IMO, maybe 512 is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already processed