diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 8d72d649f..df1effa19 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -27,6 +27,7 @@ namespace rocketmq { // tryToFindTopicPublishInfo(const string& topic, - const SessionCredentials& session_credentials); + virtual boost::shared_ptr tryToFindTopicPublishInfo(const string& topic, + const SessionCredentials& session_credentials); void fetchSubscribeMessageQueues(const string& topic, vector& mqs, @@ -102,7 +103,7 @@ class MQClientFactory { bool isDefault = false); void rebalanceImmediately(); void doRebalanceByConsumerGroup(const string& consumerGroup); - void sendHeartbeatToAllBroker(); + virtual void sendHeartbeatToAllBroker(); void cleanOfflineBrokers(); diff --git a/src/common/DefaultMQClient.cpp b/src/common/DefaultMQClient.cpp index 8e29760d0..e0fcd8ed6 100644 --- a/src/common/DefaultMQClient.cpp +++ b/src/common/DefaultMQClient.cpp @@ -157,6 +157,9 @@ void DefaultMQClient::shutdown() { MQClientFactory* DefaultMQClient::getFactory() const { return m_clientFactory; } +void DefaultMQClient::setFactory(MQClientFactory* factory) { + m_clientFactory = factory; +} bool DefaultMQClient::isServiceStateOk() { return m_serviceState == RUNNING; diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp index 118bd4ff4..0a08debc0 100644 --- a/src/common/NameSpaceUtil.cpp +++ b/src/common/NameSpaceUtil.cpp @@ -41,11 +41,11 @@ string NameSpaceUtil::getNameSpaceFromNsURL(string nameServerAddr) { LOG_DEBUG("Try to get Name Space from nameServerAddr [%s]", nameServerAddr.c_str()); string nsAddr = formatNameServerURL(nameServerAddr); string nameSpace; - auto index = nameServerAddr.find(NAMESPACE_PREFIX); + auto index = nsAddr.find(NAMESPACE_PREFIX); if (index != string::npos) { - auto indexDot = nameServerAddr.find('.'); - if (indexDot != string::npos) { - nameSpace = nameServerAddr.substr(index, indexDot); + auto indexDot = nsAddr.find('.'); + if (indexDot != string::npos && indexDot > index) { + nameSpace = nsAddr.substr(index, indexDot - index); LOG_INFO("Get Name Space [%s] from nameServerAddr [%s]", nameSpace.c_str(), nameServerAddr.c_str()); return nameSpace; } @@ -83,7 +83,7 @@ string NameSpaceUtil::withNameSpace(string source, string ns) { } bool NameSpaceUtil::hasNameSpace(string source, string ns) { - if (source.length() >= ns.length() && source.find(ns) != string::npos) { + if (!ns.empty() && source.length() >= ns.length() && source.find(ns) != string::npos) { return true; } return false; diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp index c2f95632e..7122499a3 100644 --- a/src/common/UtilAll.cpp +++ b/src/common/UtilAll.cpp @@ -176,7 +176,7 @@ int UtilAll::Split(vector& ret_, const string& strIn, const string& sep) return ret_.size(); } -int32_t UtilAll::StringToInt32(const std::string& str, int32_t& out) { +bool UtilAll::StringToInt32(const std::string& str, int32_t& out) { out = 0; if (str.empty()) { return false; @@ -196,7 +196,7 @@ int32_t UtilAll::StringToInt32(const std::string& str, int32_t& out) { return true; } -int64_t UtilAll::StringToInt64(const std::string& str, int64_t& val) { +bool UtilAll::StringToInt64(const std::string& str, int64_t& val) { char* endptr = NULL; errno = 0; /* To distinguish success/failure after call */ val = strtoll(str.c_str(), &endptr, 10); diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h index a2993cf7d..6d021f876 100644 --- a/src/common/UtilAll.h +++ b/src/common/UtilAll.h @@ -112,8 +112,8 @@ class UtilAll { static int Split(vector& ret_, const string& strIn, const char sep); static int Split(vector& ret_, const string& strIn, const string& sep); - static int32_t StringToInt32(const std::string& str, int32_t& out); - static int64_t StringToInt64(const std::string& str, int64_t& val); + static bool StringToInt32(const std::string& str, int32_t& out); + static bool StringToInt64(const std::string& str, int64_t& val); static string getLocalHostName(); static string getLocalAddress(); diff --git a/src/include/DefaultMQClient.h b/src/include/DefaultMQClient.h index 0ae4934a0..ebf54a510 100644 --- a/src/include/DefaultMQClient.h +++ b/src/include/DefaultMQClient.h @@ -167,6 +167,8 @@ class DefaultMQClient { const std::string& input_onsChannel); const SessionCredentials& getSessionCredentials() const; + virtual void setFactory(MQClientFactory*); + protected: virtual void start(); virtual void shutdown(); diff --git a/src/producer/TopicPublishInfo.h b/src/producer/TopicPublishInfo.h index 5f0380e68..c57253508 100644 --- a/src/producer/TopicPublishInfo.h +++ b/src/producer/TopicPublishInfo.h @@ -24,8 +24,12 @@ #include #include #include +#include +#include +#include #include "Logging.h" #include "MQMessageQueue.h" +#include "UtilAll.h" namespace rocketmq { // + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "NameSpaceUtil.h" + +using std::string; + +using ::testing::InitGoogleMock; +using ::testing::InitGoogleTest; +using testing::Return; + +using rocketmq::NameSpaceUtil; + +TEST(NameSpaceUtil, isEndPointURL) { + const string url = "http://rocketmq.nameserver.com"; + EXPECT_TRUE(NameSpaceUtil::isEndPointURL(url)); + EXPECT_FALSE(NameSpaceUtil::isEndPointURL("rocketmq.nameserver.com")); + EXPECT_FALSE(NameSpaceUtil::isEndPointURL("127.0.0.1")); +} +TEST(NameSpaceUtil, formatNameServerURL) { + string url = "http://rocketmq.nameserver.com"; + string urlFormatted = "rocketmq.nameserver.com"; + EXPECT_EQ(NameSpaceUtil::formatNameServerURL(url), urlFormatted); + EXPECT_EQ(NameSpaceUtil::formatNameServerURL(urlFormatted), urlFormatted); +} +TEST(NameSpaceUtil, getNameSpaceFromNsURL) { + string url = "http://MQ_INST_UNITTEST.rocketmq.nameserver.com"; + string url2 = "MQ_INST_UNITTEST.rocketmq.nameserver.com"; + string noInstUrl = "http://rocketmq.nameserver.com"; + string inst = "MQ_INST_UNITTEST"; + EXPECT_EQ(NameSpaceUtil::getNameSpaceFromNsURL(url), inst); + EXPECT_EQ(NameSpaceUtil::getNameSpaceFromNsURL(url2), inst); + EXPECT_EQ(NameSpaceUtil::getNameSpaceFromNsURL(noInstUrl), ""); +} +TEST(NameSpaceUtil, checkNameSpaceExistInNsURL) { + string url = "http://MQ_INST_UNITTEST.rocketmq.nameserver.com"; + string url2 = "MQ_INST_UNITTEST.rocketmq.nameserver.com"; + string noInstUrl = "http://rocketmq.nameserver.com"; + EXPECT_TRUE(NameSpaceUtil::checkNameSpaceExistInNsURL(url)); + EXPECT_FALSE(NameSpaceUtil::checkNameSpaceExistInNsURL(url2)); + EXPECT_FALSE(NameSpaceUtil::checkNameSpaceExistInNsURL(noInstUrl)); +} +TEST(NameSpaceUtil, checkNameSpaceExistInNameServer) { + string url = "http://MQ_INST_UNITTEST.rocketmq.nameserver.com"; + string url2 = "MQ_INST_UNITTEST.rocketmq.nameserver.com"; + string noInstUrl = "rocketmq.nameserver.com"; + string nsIP = "127.0.0.1"; + EXPECT_TRUE(NameSpaceUtil::checkNameSpaceExistInNameServer(url)); + EXPECT_TRUE(NameSpaceUtil::checkNameSpaceExistInNameServer(url2)); + EXPECT_FALSE(NameSpaceUtil::checkNameSpaceExistInNameServer(noInstUrl)); + EXPECT_FALSE(NameSpaceUtil::checkNameSpaceExistInNameServer(nsIP)); +} +TEST(NameSpaceUtil, withNameSpace) { + string source = "testTopic"; + string ns = "MQ_INST_UNITTEST"; + string nsSource = "MQ_INST_UNITTEST%testTopic"; + EXPECT_EQ(NameSpaceUtil::withNameSpace(source, ns), nsSource); + EXPECT_EQ(NameSpaceUtil::withNameSpace(source, ""), source); +} +TEST(NameSpaceUtil, hasNameSpace) { + string source = "testTopic"; + string ns = "MQ_INST_UNITTEST"; + string nsSource = "MQ_INST_UNITTEST%testTopic"; + EXPECT_TRUE(NameSpaceUtil::hasNameSpace(nsSource, ns)); + EXPECT_FALSE(NameSpaceUtil::hasNameSpace(source, ns)); + EXPECT_FALSE(NameSpaceUtil::hasNameSpace(source, "")); +} +int main(int argc, char* argv[]) { + InitGoogleMock(&argc, argv); + testing::GTEST_FLAG(throw_on_failure) = true; + testing::GTEST_FLAG(filter) = "NameSpaceUtil.*"; + int itestts = RUN_ALL_TESTS(); + return itestts; +} diff --git a/test/src/common/UtilAllTest.cpp b/test/src/common/UtilAllTest.cpp new file mode 100644 index 000000000..86e1130fb --- /dev/null +++ b/test/src/common/UtilAllTest.cpp @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "UtilAll.h" + +using std::string; + +using ::testing::InitGoogleMock; +using ::testing::InitGoogleTest; +using testing::Return; + +using rocketmq::UtilAll; + +TEST(UtilAll, startsWith_retry) { + string source = "testTopic"; + string retrySource = "%RETRY%testTopic"; + string noRetrySource = "%DLQ%testTopic"; + EXPECT_TRUE(UtilAll::startsWith_retry(retrySource)); + EXPECT_FALSE(UtilAll::startsWith_retry(source)); + EXPECT_FALSE(UtilAll::startsWith_retry(noRetrySource)); +} +TEST(UtilAll, getRetryTopic) { + string source = "testTopic"; + string retrySource = "%RETRY%testTopic"; + EXPECT_EQ(UtilAll::getRetryTopic(source), retrySource); +} +TEST(UtilAll, Trim) { + string source = "testTopic"; + string preSource = " testTopic"; + string surSource = "testTopic "; + string allSource = " testTopic "; + UtilAll::Trim(preSource); + UtilAll::Trim(surSource); + UtilAll::Trim(allSource); + EXPECT_EQ(preSource, source); + EXPECT_EQ(surSource, source); + EXPECT_EQ(allSource, source); +} +TEST(UtilAll, hexstr2ull) { + const char* a = "1"; + const char* b = "FF"; + const char* c = "1a"; + const char* d = "101"; + EXPECT_EQ(UtilAll::hexstr2ull(a), 1); + EXPECT_EQ(UtilAll::hexstr2ull(b), 255); + EXPECT_EQ(UtilAll::hexstr2ull(c), 26); + EXPECT_EQ(UtilAll::hexstr2ull(d), 257); +} +TEST(UtilAll, SplitURL) { + string source = "127.0.0.1"; + string source1 = "127.0.0.1:0"; + string source2 = "127.0.0.1:9876"; + string addr; + string addr1; + string addr2; + short port; + EXPECT_FALSE(UtilAll::SplitURL(source, addr, port)); + EXPECT_FALSE(UtilAll::SplitURL(source1, addr1, port)); + EXPECT_TRUE(UtilAll::SplitURL(source2, addr2, port)); + EXPECT_EQ(addr2, "127.0.0.1"); + EXPECT_EQ(port, 9876); +} +TEST(UtilAll, SplitOne) { + string source = "127.0.0.1:9876"; + vector ret; + EXPECT_EQ(UtilAll::Split(ret, source, '.'), 4); + EXPECT_EQ(ret[0], "127"); +} +TEST(UtilAll, SplitStr) { + string source = "11AA222AA3333AA44444AA5"; + vector ret; + EXPECT_EQ(UtilAll::Split(ret, source, "AA"), 5); + EXPECT_EQ(ret[0], "11"); +} +TEST(UtilAll, StringToInt32) { + string source = "123"; + int value; + EXPECT_TRUE(UtilAll::StringToInt32(source, value)); + EXPECT_EQ(123, value); + EXPECT_FALSE(UtilAll::StringToInt32("123456789X123456789", value)); + EXPECT_FALSE(UtilAll::StringToInt32("-1234567890123456789", value)); + EXPECT_FALSE(UtilAll::StringToInt32("1234567890123456789", value)); +} +TEST(UtilAll, StringToInt64) { + string source = "123"; + int64_t value; + EXPECT_TRUE(UtilAll::StringToInt64(source, value)); + EXPECT_EQ(123, value); + EXPECT_FALSE(UtilAll::StringToInt64("XXXXXXXXXXX", value)); + EXPECT_FALSE(UtilAll::StringToInt64("123456789X123456789", value)); + EXPECT_EQ(123456789, value); + EXPECT_FALSE(UtilAll::StringToInt64("-123456789012345678901234567890123456789012345678901234567890", value)); + EXPECT_FALSE(UtilAll::StringToInt64("123456789012345678901234567890123456789012345678901234567890", value)); +} +int main(int argc, char* argv[]) { + InitGoogleMock(&argc, argv); + testing::GTEST_FLAG(throw_on_failure) = true; + testing::GTEST_FLAG(filter) = "UtilAll.*"; + int itestts = RUN_ALL_TESTS(); + return itestts; +} diff --git a/test/src/producer/DefaultMQProducerImplTest.cpp b/test/src/producer/DefaultMQProducerImplTest.cpp new file mode 100644 index 000000000..033e60649 --- /dev/null +++ b/test/src/producer/DefaultMQProducerImplTest.cpp @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "DefaultMQProducerImpl.h" +#include "MQClientFactory.h" +#include "TopicPublishInfo.h" + +using namespace std; +using namespace rocketmq; +using rocketmq::DefaultMQProducerImpl; +using rocketmq::MQClientAPIImpl; +using rocketmq::MQClientFactory; +using rocketmq::TopicPublishInfo; +using testing::_; +using ::testing::InitGoogleMock; +using ::testing::InitGoogleTest; +using testing::Return; + +class MySendCallback : public SendCallback { + virtual void onSuccess(SendResult& sendResult) {} + virtual void onException(MQException& e) {} +}; + +class MyMessageQueueSelector : public MessageQueueSelector { + virtual MQMessageQueue select(const std::vector& mqs, const MQMessage& msg, void* arg) { + return MQMessageQueue("TestTopic", "BrokerA", 0); + } +}; +class MockMQClientFactory : public MQClientFactory { + public: + MockMQClientFactory(const string& mqClientId) : MQClientFactory(mqClientId) {} + MOCK_METHOD0(start, void()); + MOCK_METHOD0(shutdown, void()); + MOCK_METHOD0(sendHeartbeatToAllBroker, void()); + MOCK_METHOD0(getMQClientAPIImpl, MQClientAPIImpl*()); + MOCK_METHOD1(registerProducer, bool(MQProducer*)); + MOCK_METHOD1(unregisterProducer, void(MQProducer*)); + MOCK_METHOD1(findBrokerAddressInPublish, string(const string&)); + MOCK_METHOD2(tryToFindTopicPublishInfo, + boost::shared_ptr(const string&, const SessionCredentials&)); +}; +class MockMQClientAPIImpl : public MQClientAPIImpl { + public: + MockMQClientAPIImpl() : MQClientAPIImpl("testMockMQClientAPIImpl") {} + + MOCK_METHOD9(sendMessage, + SendResult(const string&, + const string&, + const MQMessage&, + SendMessageRequestHeader*, + int, + int, + int, + SendCallback*, + const SessionCredentials&)); +}; +TEST(DefaultMQProducerImplTest, init) { + DefaultMQProducerImpl* impl = new DefaultMQProducerImpl("testMQProducerGroup"); + EXPECT_EQ(impl->getGroupName(), "testMQProducerGroup"); + impl->setUnitName("testUnit"); + EXPECT_EQ(impl->getUnitName(), "testUnit"); + impl->setTcpTransportPullThreadNum(64); + EXPECT_EQ(impl->getTcpTransportPullThreadNum(), 64); + impl->setTcpTransportConnectTimeout(2000); + EXPECT_EQ(impl->getTcpTransportConnectTimeout(), 2000); + impl->setTcpTransportTryLockTimeout(3000); + // need fix the unit + EXPECT_EQ(impl->getTcpTransportTryLockTimeout(), 3); + impl->setRetryTimes4Async(4); + EXPECT_EQ(impl->getRetryTimes4Async(), 4); + impl->setRetryTimes(2); + EXPECT_EQ(impl->getRetryTimes(), 2); + impl->setSendMsgTimeout(1000); + EXPECT_EQ(impl->getSendMsgTimeout(), 1000); + impl->setCompressMsgBodyOverHowmuch(1024); + EXPECT_EQ(impl->getCompressMsgBodyOverHowmuch(), 1024); + impl->setCompressLevel(2); + EXPECT_EQ(impl->getCompressLevel(), 2); + impl->setMaxMessageSize(2048); + EXPECT_EQ(impl->getMaxMessageSize(), 2048); + + impl->setNamesrvAddr("http://rocketmq.nameserver.com"); + EXPECT_EQ(impl->getNamesrvAddr(), "rocketmq.nameserver.com"); + impl->setNameSpace("MQ_INST_NAMESPACE_TEST"); + EXPECT_EQ(impl->getNameSpace(), "MQ_INST_NAMESPACE_TEST"); + // impl->start(); + // EXPECT_EQ(impl->getGroupName(), "MQ_INST_NAMESPACE_TEST%testMQProducerGroup"); + // impl->shutdown(); +} +TEST(DefaultMQProducerImplTest, Sends) { + DefaultMQProducerImpl* impl = new DefaultMQProducerImpl("testMockSendMQProducerGroup"); + MockMQClientFactory* mockFactory = new MockMQClientFactory("testClientId"); + MockMQClientAPIImpl* apiImpl = new MockMQClientAPIImpl(); + + impl->setFactory(mockFactory); + impl->setNamesrvAddr("http://rocketmq.nameserver.com"); + + // prepare send + boost::shared_ptr topicPublishInfo = boost::make_shared(); + MQMessageQueue mqA("TestTopic", "BrokerA", 0); + MQMessageQueue mqB("TestTopic", "BrokerB", 0); + topicPublishInfo->updateMessageQueueList(mqA); + topicPublishInfo->updateMessageQueueList(mqB); + + SendResult okMQAResult(SEND_OK, "MSSAGEID", "OFFSETID", mqA, 1024); + SendResult okMQBResult(SEND_OK, "MSSAGEID", "OFFSETID", mqB, 2048); + SendResult errorMQBResult(SEND_SLAVE_NOT_AVAILABLE, "MSSAGEID", "OFFSETID", mqB, 2048); + + EXPECT_CALL(*mockFactory, start()).Times(1).WillOnce(Return()); + EXPECT_CALL(*mockFactory, shutdown()).Times(1).WillOnce(Return()); + EXPECT_CALL(*mockFactory, registerProducer(_)).Times(1).WillOnce(Return(true)); + EXPECT_CALL(*mockFactory, unregisterProducer(_)).Times(1).WillOnce(Return()); + EXPECT_CALL(*mockFactory, sendHeartbeatToAllBroker()).Times(1).WillOnce(Return()); + EXPECT_CALL(*mockFactory, tryToFindTopicPublishInfo(_, _)).WillRepeatedly(Return(topicPublishInfo)); + EXPECT_CALL(*mockFactory, findBrokerAddressInPublish(_)).WillRepeatedly(Return("BrokerA")); + EXPECT_CALL(*mockFactory, getMQClientAPIImpl()).WillRepeatedly(Return(apiImpl)); + + EXPECT_CALL(*apiImpl, sendMessage(_, _, _, _, _, _, _, _, _)) + .WillOnce(Return(okMQAResult)) + .WillOnce(Return(okMQBResult)) + .WillOnce(Return(errorMQBResult)) + .WillOnce(Return(okMQAResult)) + .WillOnce(Return(okMQAResult)) + .WillRepeatedly(Return(okMQAResult)); + + // Start Producer. + impl->start(); + + MQMessage msg("testTopic", "testTag", "testKey", "testBodysA"); + SendResult s1 = impl->send(msg); + EXPECT_EQ(s1.getSendStatus(), SEND_OK); + EXPECT_EQ(s1.getQueueOffset(), 1024); + SendResult s2 = impl->send(msg, mqB); + EXPECT_EQ(s2.getSendStatus(), SEND_OK); + EXPECT_EQ(s2.getQueueOffset(), 2048); + MessageQueueSelector* pSelect = new MyMessageQueueSelector(); + SendResult s3 = impl->send(msg, pSelect, nullptr, 3, true); + EXPECT_EQ(s3.getSendStatus(), SEND_OK); + EXPECT_EQ(s3.getQueueOffset(), 1024); + SendResult s33 = impl->send(msg, pSelect, nullptr); + EXPECT_EQ(s33.getSendStatus(), SEND_OK); + EXPECT_EQ(s33.getQueueOffset(), 1024); + + SendCallback* pCallback = new MySendCallback(); + EXPECT_NO_THROW(impl->send(msg, pCallback, true)); + EXPECT_NO_THROW(impl->send(msg, pSelect, nullptr, pCallback)); + EXPECT_NO_THROW(impl->send(msg, mqA, pCallback)); + + EXPECT_NO_THROW(impl->sendOneway(msg)); + EXPECT_NO_THROW(impl->sendOneway(msg, mqA)); + EXPECT_NO_THROW(impl->sendOneway(msg, pSelect, nullptr)); + + MQMessage msgB("testTopic", "testTag", "testKey", "testBodysB"); + vector msgs; + msgs.push_back(msg); + msgs.push_back(msgB); + SendResult s4 = impl->send(msgs); + EXPECT_EQ(s4.getSendStatus(), SEND_OK); + EXPECT_EQ(s4.getQueueOffset(), 1024); + SendResult s5 = impl->send(msgs, mqA); + EXPECT_EQ(s5.getSendStatus(), SEND_OK); + EXPECT_EQ(s5.getQueueOffset(), 1024); + + impl->shutdown(); + delete mockFactory; + delete apiImpl; +} +int main(int argc, char* argv[]) { + InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/src/producer/TopicPublishInfoTest.cpp b/test/src/producer/TopicPublishInfoTest.cpp new file mode 100644 index 000000000..d1548a425 --- /dev/null +++ b/test/src/producer/TopicPublishInfoTest.cpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "MQMessageQueue.h" +#include "TopicPublishInfo.h" + +using namespace std; +using namespace rocketmq; +using rocketmq::MQMessageQueue; +using rocketmq::TopicPublishInfo; +using ::testing::InitGoogleMock; +using ::testing::InitGoogleTest; +using testing::Return; + +TEST(TopicPublishInfoTest, testAll) { + TopicPublishInfo* info = new TopicPublishInfo(); + + MQMessageQueue mqA("TestTopicA", "BrokerA", 0); + MQMessageQueue mqB("TestTopicA", "BrokerB", 0); + int index = -1; + EXPECT_EQ(info->getWhichQueue(), 0); + EXPECT_FALSE(info->ok()); + EXPECT_EQ(info->selectOneMessageQueue(mqA, index), MQMessageQueue()); + EXPECT_EQ(info->selectOneActiveMessageQueue(mqA, index), MQMessageQueue()); + info->updateMessageQueueList(mqA); + info->updateMessageQueueList(mqB); + EXPECT_TRUE(info->ok()); + EXPECT_EQ(info->getMessageQueueList().size(), 2); + + EXPECT_EQ(info->selectOneMessageQueue(mqA, index), MQMessageQueue()); + EXPECT_EQ(info->selectOneActiveMessageQueue(mqA, index), MQMessageQueue()); + index = 0; + MQMessageQueue mqSelect1 = info->selectOneMessageQueue(MQMessageQueue(), index); + EXPECT_EQ(index, 0); + EXPECT_EQ(mqSelect1, mqA); + EXPECT_EQ(info->getWhichQueue(), 1); + MQMessageQueue mqSelect2 = info->selectOneMessageQueue(mqSelect1, index); + EXPECT_EQ(index, 1); + EXPECT_EQ(mqSelect2, mqB); + EXPECT_EQ(info->getWhichQueue(), 3); + index = 0; + MQMessageQueue mqActiveSelect1 = info->selectOneActiveMessageQueue(MQMessageQueue(), index); + EXPECT_EQ(index, 0); + EXPECT_EQ(mqActiveSelect1, mqA); + MQMessageQueue mqActiveSelect2 = info->selectOneActiveMessageQueue(mqActiveSelect1, index); + EXPECT_EQ(index, 1); + EXPECT_EQ(mqActiveSelect2, mqB); + EXPECT_EQ(info->getWhichQueue(), 6); + info->updateNonServiceMessageQueue(mqA, 1000); + info->updateNonServiceMessageQueue(mqA, 1000); + index = 0; + MQMessageQueue mqActiveSelect3 = info->selectOneActiveMessageQueue(mqActiveSelect1, index); + EXPECT_EQ(index, 1); + EXPECT_EQ(mqActiveSelect3, mqB); + MQMessageQueue mqActiveSelect4 = info->selectOneActiveMessageQueue(mqActiveSelect2, index); + EXPECT_EQ(index, 1); + EXPECT_EQ(mqActiveSelect4, mqA); + info->updateNonServiceMessageQueue(mqB, 1000); + index = 0; + MQMessageQueue mqSelect3 = info->selectOneMessageQueue(MQMessageQueue(), index); + EXPECT_EQ(index, 0); + EXPECT_EQ(mqSelect3, mqA); + index = 0; + MQMessageQueue mqActiveSelect5 = info->selectOneActiveMessageQueue(MQMessageQueue(), index); + EXPECT_EQ(index, 1); + EXPECT_EQ(mqActiveSelect5, mqA); + index = 0; + MQMessageQueue mqActiveSelect6 = info->selectOneActiveMessageQueue(mqB, index); + EXPECT_EQ(index, 0); + EXPECT_EQ(mqActiveSelect6, mqA); + info->updateMessageQueueList(mqSelect3); + info->resumeNonServiceMessageQueueList(); +} + +int main(int argc, char* argv[]) { + InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +}