diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 7520eb5e4..26ab1b0b0 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -230,7 +230,11 @@ SendResult MQClientAPIImpl::sendMessage(const string& addr, int communicationMode, SendCallback* pSendCallback, const SessionCredentials& sessionCredentials) { - RemotingCommand request(SEND_MESSAGE, pRequestHeader); + // RemotingCommand request(SEND_MESSAGE, pRequestHeader); + // Using MQ V2 Protocol to end messages. + SendMessageRequestHeaderV2* pRequestHeaderV2 = new SendMessageRequestHeaderV2(*pRequestHeader); + RemotingCommand request(SEND_MESSAGE_V2, pRequestHeaderV2); + delete pRequestHeader; // delete to avoid memory leak. string body = msg.getBody(); request.SetBody(body.c_str(), body.length()); request.setMsgBody(body); diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index 6315cb1a8..4d64aa0e0 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -18,6 +18,7 @@ #include "ConsumerRunningInfo.h" #include "Logging.h" #include "MQClientManager.h" +#include "MQVersion.h" #include "PullRequest.h" #include "Rebalance.h" #include "TopicPublishInfo.h" @@ -1161,7 +1162,9 @@ ConsumerRunningInfo* MQClientFactory::consumerRunningInfo(const string& consumer } else { runningInfo->setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "CONSUME_ACTIVELY"); } - runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, "V3_1_8"); // MQVersion::s_CurrentVersion )); + runningInfo->setProperty( + ConsumerRunningInfo::PROP_CLIENT_VERSION, + MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)); // MQVersion::s_CurrentVersion )); return runningInfo; } diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp old mode 100755 new mode 100644 index cb26fda60..cffa219c2 --- a/src/common/AsyncCallbackWrap.cpp +++ b/src/common/AsyncCallbackWrap.cpp @@ -20,7 +20,6 @@ #include "MQClientAPIImpl.h" #include "MQDecoder.h" #include "MQMessageQueue.h" -#include "MQProtos.h" #include "PullAPIWrapper.h" #include "PullResultExt.h" #include "ResponseFuture.h" diff --git a/src/common/AsyncCallbackWrap.h b/src/common/AsyncCallbackWrap.h index 9b202a8f2..c4b3f66ae 100644 --- a/src/common/AsyncCallbackWrap.h +++ b/src/common/AsyncCallbackWrap.h @@ -20,15 +20,14 @@ #include "AsyncArg.h" #include "AsyncCallback.h" #include "MQMessage.h" -#include "UtilAll.h" #include "RemotingCommand.h" +#include "UtilAll.h" namespace rocketmq { class ResponseFuture; class MQClientAPIImpl; class DefaultMQProducer; -class SendMessageRequestHeader; //= HIGHER_VERSION) { + currentVersion = HIGHER_VERSION; + } + return RocketMQCPPClientVersion[currentVersion]; } //sysFlag = (msg.getSysFlag()); requestHeader->bornTimestamp = UtilAll::currentTimeMillis(); requestHeader->flag = (msg.getFlag()); + requestHeader->consumeRetryTimes = 16; requestHeader->batch = isBatchMsg; requestHeader->properties = (MQDecoder::messageProperties2String(msg.getProperties())); diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp index 46d3cbf91..1360d0490 100644 --- a/src/protocol/CommandHeader.cpp +++ b/src/protocol/CommandHeader.cpp @@ -160,14 +160,6 @@ void SendMessageRequestHeader::Encode(Json::Value& outData) { outData["batch"] = UtilAll::to_string(batch); } -int SendMessageRequestHeader::getReconsumeTimes() { - return reconsumeTimes; -} - -void SendMessageRequestHeader::setReconsumeTimes(int input_reconsumeTimes) { - reconsumeTimes = input_reconsumeTimes; -} - void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(map& requestMap) { LOG_DEBUG( "SendMessageRequestHeader producerGroup is:%s,topic is:%s, defaulttopic " @@ -193,6 +185,64 @@ void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(map("batch", UtilAll::to_string(batch))); } +//& requestMap) { + LOG_DEBUG( + "SendMessageRequestHeaderV2 producerGroup is:%s,topic is:%s, defaulttopic " + "is:%s, properties is:%s,UtilAll::to_string( defaultTopicQueueNums) " + "is:%s,UtilAll::to_string( queueId):%s, UtilAll::to_string( sysFlag) " + "is:%s, UtilAll::to_string( bornTimestamp) is:%s,UtilAll::to_string( " + "flag) is:%s,UtilAll::to_string( reconsumeTimes) is:%s,UtilAll::to_string( unitMode) is:%s,UtilAll::to_string( " + "batch) is:%s", + a.c_str(), b.c_str(), c.c_str(), i.c_str(), UtilAll::to_string(d).c_str(), UtilAll::to_string(e).c_str(), + UtilAll::to_string(f).c_str(), UtilAll::to_string(g).c_str(), UtilAll::to_string(g).c_str(), + UtilAll::to_string(j).c_str(), UtilAll::to_string(k).c_str(), UtilAll::to_string(m).c_str()); + + requestMap.insert(pair("a", a)); + requestMap.insert(pair("b", b)); + requestMap.insert(pair("c", c)); + requestMap.insert(pair("d", UtilAll::to_string(d))); + requestMap.insert(pair("e", UtilAll::to_string(e))); + requestMap.insert(pair("f", UtilAll::to_string(f))); + requestMap.insert(pair("g", UtilAll::to_string(g))); + requestMap.insert(pair("h", UtilAll::to_string(h))); + requestMap.insert(pair("i", i)); + requestMap.insert(pair("j", UtilAll::to_string(j))); + requestMap.insert(pair("k", UtilAll::to_string(k))); + requestMap.insert(pair("l", UtilAll::to_string(l))); + requestMap.insert(pair("m", UtilAll::to_string(m))); +} +void SendMessageRequestHeaderV2::CreateSendMessageRequestHeaderV1(SendMessageRequestHeader& v1) { + v1.producerGroup = a; + v1.topic = b; + v1.defaultTopic = c; + v1.defaultTopicQueueNums = d; + v1.queueId = e; + v1.sysFlag = f; + v1.bornTimestamp = g; + v1.flag = h; + v1.properties = i; + v1.reconsumeTimes = j; + v1.unitMode = k; + v1.consumeRetryTimes = l; + v1.batch = m; +} //& requestMap); - int getReconsumeTimes(); - void setReconsumeTimes(int input_reconsumeTimes); public: string producerGroup; @@ -169,9 +168,50 @@ class SendMessageRequestHeader : public CommandHeader { string properties; int reconsumeTimes; bool unitMode; + int consumeRetryTimes; bool batch; }; +//& requestMap); + virtual void CreateSendMessageRequestHeaderV1(SendMessageRequestHeader& v1); + + public: + string a; // producerGroup + string b; // topic; + string c; // defaultTopic; + int d; // defaultTopicQueueNums; + int e; // queueId; + int f; // sysFlag; + int64 g; // bornTimestamp; + int h; // flag; + string i; // properties; + int j; // reconsumeTimes; + bool k; // unitMode; + int l; // consumeRetryTimes; + bool m; // batch; +}; + // RemotingCommand::s_seqNumber; // requestMap; header.SetDeclaredFieldOfCommandHeader(requestMap); EXPECT_EQ(requestMap["topic"], topic); @@ -234,6 +233,76 @@ TEST(commandHeader, SendMessageRequestHeader) { EXPECT_EQ(outData["batch"], "0"); } +TEST(commandHeader, SendMessageRequestHeaderV2) { + string producerGroup = "testProducer"; + string topic = "testTopic"; + string defaultTopic = "defaultTopic"; + int defaultTopicQueueNums = 1; + int queueId = 2; + int sysFlag = 3; + int64 bornTimestamp = 4; + int flag = 5; + string properties = "testProperty"; + int reconsumeTimes = 6; + bool unitMode = true; + bool batch = false; + + SendMessageRequestHeaderV2 header; + header.a = producerGroup; + header.b = topic; + header.c = defaultTopic; + header.d = defaultTopicQueueNums; + header.e = queueId; + header.f = sysFlag; + header.g = bornTimestamp; + header.h = flag; + header.i = properties; + header.j = reconsumeTimes; + header.k = unitMode; + header.m = batch; + map requestMap; + header.SetDeclaredFieldOfCommandHeader(requestMap); + EXPECT_EQ(requestMap["a"], producerGroup); + EXPECT_EQ(requestMap["b"], topic); + EXPECT_EQ(requestMap["c"], defaultTopic); + EXPECT_EQ(requestMap["d"], "1"); + EXPECT_EQ(requestMap["e"], "2"); + EXPECT_EQ(requestMap["f"], "3"); + EXPECT_EQ(requestMap["g"], "4"); + EXPECT_EQ(requestMap["h"], "5"); + EXPECT_EQ(requestMap["i"], properties); + EXPECT_EQ(requestMap["j"], "6"); + EXPECT_EQ(requestMap["k"], "1"); + EXPECT_EQ(requestMap["m"], "0"); + + Value outData; + header.Encode(outData); + EXPECT_EQ(outData["a"], producerGroup); + EXPECT_EQ(outData["b"], topic); + EXPECT_EQ(outData["c"], defaultTopic); + EXPECT_EQ(outData["d"], defaultTopicQueueNums); + EXPECT_EQ(outData["e"], queueId); + EXPECT_EQ(outData["f"], sysFlag); + EXPECT_EQ(outData["g"], "4"); + EXPECT_EQ(outData["h"], flag); + EXPECT_EQ(outData["i"], properties); + EXPECT_EQ(outData["j"], "6"); + EXPECT_EQ(outData["k"], "1"); + EXPECT_EQ(outData["m"], "0"); + + SendMessageRequestHeader v1; + header.CreateSendMessageRequestHeaderV1(v1); + EXPECT_EQ(v1.producerGroup, producerGroup); + EXPECT_EQ(v1.queueId, queueId); + EXPECT_EQ(v1.batch, batch); + + SendMessageRequestHeaderV2 v2(v1); + EXPECT_EQ(header.a, v2.a); + EXPECT_EQ(header.e, v2.e); + EXPECT_EQ(header.m, v2.m); + EXPECT_EQ(header.g, v2.g); +} + TEST(commandHeader, SendMessageResponseHeader) { SendMessageResponseHeader header; header.msgId = "ABCDEFG";