From 93c31a62f9200a4d28e8bf2f18997ff112fd86f8 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Fri, 10 Jan 2020 22:14:21 +0800 Subject: [PATCH 1/7] fix(send): try to use command v2 to send messages --- src/MQClientFactory.cpp | 3 +- src/common/MQVersion.cpp | 17 +- src/common/MQVersion.h | 1313 ++++++++++++++++++++++++++++- test/src/common/MQVersionTest.cpp | 44 + 4 files changed, 1365 insertions(+), 12 deletions(-) create mode 100644 test/src/common/MQVersionTest.cpp diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index 6315cb1a8..b242b5942 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -22,6 +22,7 @@ #include "Rebalance.h" #include "TopicPublishInfo.h" #include "TransactionMQProducer.h" +#include "MQVersion.h" #define MAX_BUFF_SIZE 8192 #define SAFE_BUFF_SIZE 7936 // 8192 - 256 = 7936 @@ -1161,7 +1162,7 @@ ConsumerRunningInfo* MQClientFactory::consumerRunningInfo(const string& consumer } else { runningInfo->setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "CONSUME_ACTIVELY"); } - runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, "V3_1_8"); // MQVersion::s_CurrentVersion )); + runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)); // MQVersion::s_CurrentVersion )); return runningInfo; } diff --git a/src/common/MQVersion.cpp b/src/common/MQVersion.cpp index 015390c95..c9791b82c 100644 --- a/src/common/MQVersion.cpp +++ b/src/common/MQVersion.cpp @@ -17,15 +17,18 @@ #include "MQVersion.h" namespace rocketmq { -int MQVersion::s_CurrentVersion = MQVersion::V3_1_8; +int MQVersion::s_CurrentVersion = MQVersion::V4_6_0; //= HIGHER_VERSION) { + currentVersion = HIGHER_VERSION; + } + return RocketMQCPPClientVersion[currentVersion]; } // Date: Fri, 10 Jan 2020 23:30:58 +0800 Subject: [PATCH 2/7] fix(send): try to use command v2 to send messages --- src/MQClientFactory.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index b242b5942..4d64aa0e0 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -18,11 +18,11 @@ #include "ConsumerRunningInfo.h" #include "Logging.h" #include "MQClientManager.h" +#include "MQVersion.h" #include "PullRequest.h" #include "Rebalance.h" #include "TopicPublishInfo.h" #include "TransactionMQProducer.h" -#include "MQVersion.h" #define MAX_BUFF_SIZE 8192 #define SAFE_BUFF_SIZE 7936 // 8192 - 256 = 7936 @@ -1162,7 +1162,9 @@ ConsumerRunningInfo* MQClientFactory::consumerRunningInfo(const string& consumer } else { runningInfo->setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "CONSUME_ACTIVELY"); } - runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)); // MQVersion::s_CurrentVersion )); + runningInfo->setProperty( + ConsumerRunningInfo::PROP_CLIENT_VERSION, + MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)); // MQVersion::s_CurrentVersion )); return runningInfo; } From f6b8a9ad8f8051bd7a51a891e267a49e581ec31a Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Sat, 11 Jan 2020 00:53:37 +0800 Subject: [PATCH 3/7] fix(send): try to use command v2 to send messages --- src/common/VirtualEnvUtil.cpp | 6 ++--- test/src/common/VirtualEnvUtilTest.cpp | 33 ++++++++++++++++++-------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/common/VirtualEnvUtil.cpp b/src/common/VirtualEnvUtil.cpp index 15517e9f7..4b6eb404b 100644 --- a/src/common/VirtualEnvUtil.cpp +++ b/src/common/VirtualEnvUtil.cpp @@ -28,7 +28,7 @@ string VirtualEnvUtil::buildWithProjectGroup(const string& origin, const string& char prefix[1024]; sprintf(prefix, VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str()); - if (origin.find_last_of(prefix) == string::npos) { + if (origin.find(prefix) == string::npos) { return origin + prefix; } else { return origin; @@ -41,7 +41,7 @@ string VirtualEnvUtil::buildWithProjectGroup(const string& origin, const string& string VirtualEnvUtil::clearProjectGroup(const string& origin, const string& projectGroup) { char prefix[1024]; sprintf(prefix, VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str()); - string::size_type pos = origin.find_last_of(prefix); + auto pos = origin.find(prefix); if (!UtilAll::isBlank(prefix) && pos != string::npos) { return origin.substr(0, pos); @@ -51,4 +51,4 @@ string VirtualEnvUtil::clearProjectGroup(const string& origin, const string& pro } // Date: Sat, 11 Jan 2020 10:18:17 +0800 Subject: [PATCH 4/7] fix(send): try to use command v2 to send messages --- src/common/MQVersion.cpp | 1 + src/common/MQVersion.h | 1 + src/protocol/RemotingCommand.cpp | 8 ++++---- test/src/common/MQVersionTest.cpp | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/common/MQVersion.cpp b/src/common/MQVersion.cpp index c9791b82c..5c0c78963 100644 --- a/src/common/MQVersion.cpp +++ b/src/common/MQVersion.cpp @@ -18,6 +18,7 @@ namespace rocketmq { int MQVersion::s_CurrentVersion = MQVersion::V4_6_0; +std::string MQVersion::s_CurrentLanguage = "CPP"; // RemotingCommand::s_seqNumber; // Date: Sat, 11 Jan 2020 16:44:44 +0800 Subject: [PATCH 5/7] fix(send): try to use command v2 to send messages --- src/MQClientAPIImpl.cpp | 6 +- src/common/AsyncCallbackWrap.cpp | 1 - src/common/AsyncCallbackWrap.h | 7 +-- src/producer/DefaultMQProducer.cpp | 1 + src/protocol/CommandHeader.cpp | 66 +++++++++++++++++++--- src/protocol/CommandHeader.h | 44 ++++++++++++++- src/protocol/RemotingCommand.cpp | 1 + test/src/MQClientAPIImpTest.cpp | 2 +- test/src/protocol/CommandHeaderTest.cpp | 73 ++++++++++++++++++++++++- 9 files changed, 182 insertions(+), 19 deletions(-) mode change 100755 => 100644 src/common/AsyncCallbackWrap.cpp diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 7520eb5e4..26ab1b0b0 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -230,7 +230,11 @@ SendResult MQClientAPIImpl::sendMessage(const string& addr, int communicationMode, SendCallback* pSendCallback, const SessionCredentials& sessionCredentials) { - RemotingCommand request(SEND_MESSAGE, pRequestHeader); + // RemotingCommand request(SEND_MESSAGE, pRequestHeader); + // Using MQ V2 Protocol to end messages. + SendMessageRequestHeaderV2* pRequestHeaderV2 = new SendMessageRequestHeaderV2(*pRequestHeader); + RemotingCommand request(SEND_MESSAGE_V2, pRequestHeaderV2); + delete pRequestHeader; // delete to avoid memory leak. string body = msg.getBody(); request.SetBody(body.c_str(), body.length()); request.setMsgBody(body); diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp old mode 100755 new mode 100644 index cb26fda60..cffa219c2 --- a/src/common/AsyncCallbackWrap.cpp +++ b/src/common/AsyncCallbackWrap.cpp @@ -20,7 +20,6 @@ #include "MQClientAPIImpl.h" #include "MQDecoder.h" #include "MQMessageQueue.h" -#include "MQProtos.h" #include "PullAPIWrapper.h" #include "PullResultExt.h" #include "ResponseFuture.h" diff --git a/src/common/AsyncCallbackWrap.h b/src/common/AsyncCallbackWrap.h index 9b202a8f2..c4b3f66ae 100644 --- a/src/common/AsyncCallbackWrap.h +++ b/src/common/AsyncCallbackWrap.h @@ -20,15 +20,14 @@ #include "AsyncArg.h" #include "AsyncCallback.h" #include "MQMessage.h" -#include "UtilAll.h" #include "RemotingCommand.h" +#include "UtilAll.h" namespace rocketmq { class ResponseFuture; class MQClientAPIImpl; class DefaultMQProducer; -class SendMessageRequestHeader; //sysFlag = (msg.getSysFlag()); requestHeader->bornTimestamp = UtilAll::currentTimeMillis(); requestHeader->flag = (msg.getFlag()); + requestHeader->consumeRetryTimes = 16; requestHeader->batch = isBatchMsg; requestHeader->properties = (MQDecoder::messageProperties2String(msg.getProperties())); diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp index 46d3cbf91..0701072f3 100644 --- a/src/protocol/CommandHeader.cpp +++ b/src/protocol/CommandHeader.cpp @@ -160,14 +160,6 @@ void SendMessageRequestHeader::Encode(Json::Value& outData) { outData["batch"] = UtilAll::to_string(batch); } -int SendMessageRequestHeader::getReconsumeTimes() { - return reconsumeTimes; -} - -void SendMessageRequestHeader::setReconsumeTimes(int input_reconsumeTimes) { - reconsumeTimes = input_reconsumeTimes; -} - void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(map& requestMap) { LOG_DEBUG( "SendMessageRequestHeader producerGroup is:%s,topic is:%s, defaulttopic " @@ -193,6 +185,64 @@ void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(map("batch", UtilAll::to_string(batch))); } +//& requestMap) { + LOG_DEBUG( + "SendMessageRequestHeaderV2 producerGroup is:%s,topic is:%s, defaulttopic " + "is:%s, properties is:%s,UtilAll::to_string( defaultTopicQueueNums) " + "is:%s,UtilAll::to_string( queueId):%s, UtilAll::to_string( sysFlag) " + "is:%s, UtilAll::to_string( bornTimestamp) is:%s,UtilAll::to_string( " + "flag) is:%s,UtilAll::to_string( reconsumeTimes) is:%s,UtilAll::to_string( unitMode) is:%s,UtilAll::to_string( " + "batch) is:%s", + a.c_str(), b.c_str(), c.c_str(), i.c_str(), UtilAll::to_string(d).c_str(), UtilAll::to_string(e).c_str(), + UtilAll::to_string(f).c_str(), UtilAll::to_string(g).c_str(), UtilAll::to_string(g).c_str(), + UtilAll::to_string(j).c_str(), UtilAll::to_string(k).c_str(), UtilAll::to_string(m).c_str()); + + requestMap.insert(pair("a", a)); + requestMap.insert(pair("b", b)); + requestMap.insert(pair("c", c)); + requestMap.insert(pair("d", UtilAll::to_string(d))); + requestMap.insert(pair("e", UtilAll::to_string(e))); + requestMap.insert(pair("f", UtilAll::to_string(f))); + requestMap.insert(pair("g", UtilAll::to_string(g))); + requestMap.insert(pair("h", UtilAll::to_string(h))); + requestMap.insert(pair("i", i)); + requestMap.insert(pair("j", UtilAll::to_string(j))); + requestMap.insert(pair("k", UtilAll::to_string(k))); + requestMap.insert(pair("l", UtilAll::to_string(l))); + requestMap.insert(pair("m", UtilAll::to_string(m))); +} +void SendMessageRequestHeaderV2::CreateSendMessageRequestHeaderV1(SendMessageRequestHeader& v1) { + v1.producerGroup = a; + v1.topic = b; + v1.defaultTopic = c; + v1.defaultTopicQueueNums = d; + v1.queueId = e; + v1.sysFlag = f; + v1.bornTimestamp = g; + v1.flag = h; + v1.properties = i; + v1.reconsumeTimes = j; + v1.unitMode = k; + v1.consumeRetryTimes = l; + v1.batch = m; +} //& requestMap); - int getReconsumeTimes(); - void setReconsumeTimes(int input_reconsumeTimes); public: string producerGroup; @@ -169,9 +168,50 @@ class SendMessageRequestHeader : public CommandHeader { string properties; int reconsumeTimes; bool unitMode; + int consumeRetryTimes; bool batch; }; +//& requestMap); + virtual void CreateSendMessageRequestHeaderV1(SendMessageRequestHeader& v1); + + public: + string a; // producerGroup + string b; // topic; + string c; // defaultTopic; + int d; // defaultTopicQueueNums; + int e; // queueId; + int f; // sysFlag; + int64 g; // bornTimestamp; + int h; // flag; + string i; // properties; + int j; // reconsumeTimes; + bool k; // unitMode; + int l; // consumeRetryTimes; + bool m; // batch; +}; + // requestMap; header.SetDeclaredFieldOfCommandHeader(requestMap); EXPECT_EQ(requestMap["topic"], topic); @@ -234,6 +233,76 @@ TEST(commandHeader, SendMessageRequestHeader) { EXPECT_EQ(outData["batch"], "0"); } +TEST(commandHeader, SendMessageRequestHeaderV2) { + string producerGroup = "testProducer"; + string topic = "testTopic"; + string defaultTopic = "defaultTopic"; + int defaultTopicQueueNums = 1; + int queueId = 2; + int sysFlag = 3; + int64 bornTimestamp = 4; + int flag = 5; + string properties = "testProperty"; + int reconsumeTimes = 6; + bool unitMode = true; + bool batch = false; + + SendMessageRequestHeaderV2 header; + header.a = producerGroup; + header.b = topic; + header.c = defaultTopic; + header.d = defaultTopicQueueNums; + header.e = queueId; + header.f = sysFlag; + header.g = bornTimestamp; + header.h = flag; + header.i = properties; + header.j = reconsumeTimes; + header.k = unitMode; + header.m = batch; + map requestMap; + header.SetDeclaredFieldOfCommandHeader(requestMap); + EXPECT_EQ(requestMap["a"], producerGroup); + EXPECT_EQ(requestMap["b"], topic); + EXPECT_EQ(requestMap["c"], defaultTopic); + EXPECT_EQ(requestMap["d"], "1"); + EXPECT_EQ(requestMap["e"], "2"); + EXPECT_EQ(requestMap["f"], "3"); + EXPECT_EQ(requestMap["g"], "4"); + EXPECT_EQ(requestMap["h"], "5"); + EXPECT_EQ(requestMap["i"], properties); + EXPECT_EQ(requestMap["j"], "6"); + EXPECT_EQ(requestMap["k"], "1"); + EXPECT_EQ(requestMap["m"], "0"); + + Value outData; + header.Encode(outData); + EXPECT_EQ(outData["a"], producerGroup); + EXPECT_EQ(outData["b"], topic); + EXPECT_EQ(outData["c"], defaultTopic); + EXPECT_EQ(outData["d"], defaultTopicQueueNums); + EXPECT_EQ(outData["e"], queueId); + EXPECT_EQ(outData["f"], sysFlag); + EXPECT_EQ(outData["g"], "4"); + EXPECT_EQ(outData["h"], flag); + EXPECT_EQ(outData["i"], properties); + EXPECT_EQ(outData["j"], "6"); + EXPECT_EQ(outData["k"], "1"); + EXPECT_EQ(outData["m"], "0"); + + SendMessageRequestHeader v1; + header.CreateSendMessageRequestHeaderV1(v1); + EXPECT_EQ(v1.producerGroup, producerGroup); + EXPECT_EQ(v1.queueId, queueId); + EXPECT_EQ(v1.batch, batch); + + SendMessageRequestHeaderV2 v2(v1); + EXPECT_EQ(header.a, v2.a); + EXPECT_EQ(header.e, v2.e); + EXPECT_EQ(header.m, v2.m); + EXPECT_EQ(header.g, v2.g); +} + TEST(commandHeader, SendMessageResponseHeader) { SendMessageResponseHeader header; header.msgId = "ABCDEFG"; From 7c31947451f544c26eeef7a5ffa56f88ae51a3ca Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Sat, 11 Jan 2020 17:14:10 +0800 Subject: [PATCH 6/7] fix(send): try to use command v2 to send messages --- src/protocol/CommandHeader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp index 0701072f3..1360d0490 100644 --- a/src/protocol/CommandHeader.cpp +++ b/src/protocol/CommandHeader.cpp @@ -198,7 +198,7 @@ void SendMessageRequestHeaderV2::Encode(Json::Value& outData) { outData["i"] = i; // string properties; outData["j"] = UtilAll::to_string(j); // int reconsumeTimes; outData["k"] = UtilAll::to_string(k); // bool unitMode; - outData["l"] = l; // int consumeRetryTimes; + outData["l"] = l; // int consumeRetryTimes; outData["m"] = UtilAll::to_string(m); // bool batch; } From 8ce4bd5eeb72c9b6ae70576fab39a55d763cfc4f Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Sat, 11 Jan 2020 18:02:26 +0800 Subject: [PATCH 7/7] fix(send): try to use command v2 to send messages --- test/src/MQClientAPIImpTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp index 67b8a9ab4..cf63f67f4 100644 --- a/test/src/MQClientAPIImpTest.cpp +++ b/test/src/MQClientAPIImpTest.cpp @@ -172,6 +172,6 @@ TEST(MQClientAPIImplTest, sendMessage) { } int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); - testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.sendMessage"; + testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*"; return RUN_ALL_TESTS(); }