Skip to content
1 change: 1 addition & 0 deletions src/MQClientAPIImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

namespace rocketmq {
//<!************************************************************************
MQClientAPIImpl::MQClientAPIImpl(const string& mqClientId) : m_firstFetchNameSrv(true), m_mqClientId(mqClientId) {}
MQClientAPIImpl::MQClientAPIImpl(const string& mqClientId,
ClientRemotingProcessor* clientRemotingProcessor,
int pullThreadNum,
Expand Down
1 change: 1 addition & 0 deletions src/MQClientAPIImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace rocketmq {
//<!************************************************************************
class MQClientAPIImpl {
public:
MQClientAPIImpl(const string& mqClientId);
MQClientAPIImpl(const string& mqClientId,
ClientRemotingProcessor* clientRemotingProcessor,
int pullThreadNum,
Expand Down
5 changes: 4 additions & 1 deletion src/MQClientFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

namespace rocketmq {
//<!***************************************************************************
MQClientFactory::MQClientFactory(const string& clientID) : m_bFetchNSService(true) {
m_clientId = clientID;
}
MQClientFactory::MQClientFactory(const string& clientID,
int pullThreadNum,
uint64_t tcpConnectTimeout,
Expand Down Expand Up @@ -716,7 +719,7 @@ void MQClientFactory::checkTransactionState(const std::string& addr,
}
}

MQClientAPIImpl* MQClientFactory::getMQClientAPIImpl() const {
MQClientAPIImpl* MQClientFactory::getMQClientAPIImpl() {
return m_pClientAPIImpl.get();
}

Expand Down
23 changes: 12 additions & 11 deletions src/MQClientFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ class MQClientFactory {
uint64_t tcpConnectTimeout,
uint64_t tcpTransportTryLockTimeout,
string unitName);
MQClientFactory(const string& clientID);
virtual ~MQClientFactory();

void start();
void shutdown();
bool registerProducer(MQProducer* pProducer);
void unregisterProducer(MQProducer* pProducer);
bool registerConsumer(MQConsumer* pConsumer);
void unregisterConsumer(MQConsumer* pConsumer);
virtual void start();
virtual void shutdown();
virtual bool registerProducer(MQProducer* pProducer);
virtual void unregisterProducer(MQProducer* pProducer);
virtual bool registerConsumer(MQConsumer* pConsumer);
virtual void unregisterConsumer(MQConsumer* pConsumer);

void createTopic(const string& key,
const string& newTopic,
Expand All @@ -76,7 +77,7 @@ class MQClientFactory {
void checkTransactionState(const std::string& addr,
const MQMessageExt& message,
const CheckTransactionStateRequestHeader& checkRequestHeader);
MQClientAPIImpl* getMQClientAPIImpl() const;
virtual MQClientAPIImpl* getMQClientAPIImpl();
MQProducer* selectProducer(const string& group);
MQConsumer* selectConsumer(const string& group);

Expand All @@ -88,10 +89,10 @@ class MQClientFactory {

FindBrokerResult* findBrokerAddressInAdmin(const string& brokerName);

string findBrokerAddressInPublish(const string& brokerName);
virtual string findBrokerAddressInPublish(const string& brokerName);

boost::shared_ptr<TopicPublishInfo> tryToFindTopicPublishInfo(const string& topic,
const SessionCredentials& session_credentials);
virtual boost::shared_ptr<TopicPublishInfo> tryToFindTopicPublishInfo(const string& topic,
const SessionCredentials& session_credentials);

void fetchSubscribeMessageQueues(const string& topic,
vector<MQMessageQueue>& mqs,
Expand All @@ -102,7 +103,7 @@ class MQClientFactory {
bool isDefault = false);
void rebalanceImmediately();
void doRebalanceByConsumerGroup(const string& consumerGroup);
void sendHeartbeatToAllBroker();
virtual void sendHeartbeatToAllBroker();

void cleanOfflineBrokers();

Expand Down
3 changes: 3 additions & 0 deletions src/common/DefaultMQClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ void DefaultMQClient::shutdown() {
MQClientFactory* DefaultMQClient::getFactory() const {
return m_clientFactory;
}
void DefaultMQClient::setFactory(MQClientFactory* factory) {
m_clientFactory = factory;
}

bool DefaultMQClient::isServiceStateOk() {
return m_serviceState == RUNNING;
Expand Down
10 changes: 5 additions & 5 deletions src/common/NameSpaceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ string NameSpaceUtil::getNameSpaceFromNsURL(string nameServerAddr) {
LOG_DEBUG("Try to get Name Space from nameServerAddr [%s]", nameServerAddr.c_str());
string nsAddr = formatNameServerURL(nameServerAddr);
string nameSpace;
auto index = nameServerAddr.find(NAMESPACE_PREFIX);
auto index = nsAddr.find(NAMESPACE_PREFIX);
if (index != string::npos) {
auto indexDot = nameServerAddr.find('.');
if (indexDot != string::npos) {
nameSpace = nameServerAddr.substr(index, indexDot);
auto indexDot = nsAddr.find('.');
if (indexDot != string::npos && indexDot > index) {
nameSpace = nsAddr.substr(index, indexDot - index);
LOG_INFO("Get Name Space [%s] from nameServerAddr [%s]", nameSpace.c_str(), nameServerAddr.c_str());
return nameSpace;
}
Expand Down Expand Up @@ -83,7 +83,7 @@ string NameSpaceUtil::withNameSpace(string source, string ns) {
}

bool NameSpaceUtil::hasNameSpace(string source, string ns) {
if (source.length() >= ns.length() && source.find(ns) != string::npos) {
if (!ns.empty() && source.length() >= ns.length() && source.find(ns) != string::npos) {
return true;
}
return false;
Expand Down
4 changes: 2 additions & 2 deletions src/common/UtilAll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ int UtilAll::Split(vector<string>& ret_, const string& strIn, const string& sep)
return ret_.size();
}

int32_t UtilAll::StringToInt32(const std::string& str, int32_t& out) {
bool UtilAll::StringToInt32(const std::string& str, int32_t& out) {
out = 0;
if (str.empty()) {
return false;
Expand All @@ -196,7 +196,7 @@ int32_t UtilAll::StringToInt32(const std::string& str, int32_t& out) {
return true;
}

int64_t UtilAll::StringToInt64(const std::string& str, int64_t& val) {
bool UtilAll::StringToInt64(const std::string& str, int64_t& val) {
char* endptr = NULL;
errno = 0; /* To distinguish success/failure after call */
val = strtoll(str.c_str(), &endptr, 10);
Expand Down
4 changes: 2 additions & 2 deletions src/common/UtilAll.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class UtilAll {
static int Split(vector<string>& ret_, const string& strIn, const char sep);
static int Split(vector<string>& ret_, const string& strIn, const string& sep);

static int32_t StringToInt32(const std::string& str, int32_t& out);
static int64_t StringToInt64(const std::string& str, int64_t& val);
static bool StringToInt32(const std::string& str, int32_t& out);
static bool StringToInt64(const std::string& str, int64_t& val);

static string getLocalHostName();
static string getLocalAddress();
Expand Down
2 changes: 2 additions & 0 deletions src/include/DefaultMQClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class DefaultMQClient {
const std::string& input_onsChannel);
const SessionCredentials& getSessionCredentials() const;

virtual void setFactory(MQClientFactory*);

protected:
virtual void start();
virtual void shutdown();
Expand Down
6 changes: 5 additions & 1 deletion src/producer/TopicPublishInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <map>
#include <string>
#include <vector>
#include "Logging.h"
#include "MQMessageQueue.h"
#include "UtilAll.h"

namespace rocketmq {
//<!************************************************************************/
Expand Down Expand Up @@ -257,6 +261,6 @@ class TopicPublishInfo {
};

//<!***************************************************************************
} //<!end namespace;
} // namespace rocketmq

#endif
93 changes: 93 additions & 0 deletions test/src/common/NameSpaceUtilTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <string>

#include "gmock/gmock.h"
#include "gtest/gtest.h"

#include "NameSpaceUtil.h"

using std::string;

using ::testing::InitGoogleMock;
using ::testing::InitGoogleTest;
using testing::Return;

using rocketmq::NameSpaceUtil;

TEST(NameSpaceUtil, isEndPointURL) {
const string url = "http://rocketmq.nameserver.com";
EXPECT_TRUE(NameSpaceUtil::isEndPointURL(url));
EXPECT_FALSE(NameSpaceUtil::isEndPointURL("rocketmq.nameserver.com"));
EXPECT_FALSE(NameSpaceUtil::isEndPointURL("127.0.0.1"));
}
TEST(NameSpaceUtil, formatNameServerURL) {
string url = "http://rocketmq.nameserver.com";
string urlFormatted = "rocketmq.nameserver.com";
EXPECT_EQ(NameSpaceUtil::formatNameServerURL(url), urlFormatted);
EXPECT_EQ(NameSpaceUtil::formatNameServerURL(urlFormatted), urlFormatted);
}
TEST(NameSpaceUtil, getNameSpaceFromNsURL) {
string url = "http://MQ_INST_UNITTEST.rocketmq.nameserver.com";
string url2 = "MQ_INST_UNITTEST.rocketmq.nameserver.com";
string noInstUrl = "http://rocketmq.nameserver.com";
string inst = "MQ_INST_UNITTEST";
EXPECT_EQ(NameSpaceUtil::getNameSpaceFromNsURL(url), inst);
EXPECT_EQ(NameSpaceUtil::getNameSpaceFromNsURL(url2), inst);
EXPECT_EQ(NameSpaceUtil::getNameSpaceFromNsURL(noInstUrl), "");
}
TEST(NameSpaceUtil, checkNameSpaceExistInNsURL) {
string url = "http://MQ_INST_UNITTEST.rocketmq.nameserver.com";
string url2 = "MQ_INST_UNITTEST.rocketmq.nameserver.com";
string noInstUrl = "http://rocketmq.nameserver.com";
EXPECT_TRUE(NameSpaceUtil::checkNameSpaceExistInNsURL(url));
EXPECT_FALSE(NameSpaceUtil::checkNameSpaceExistInNsURL(url2));
EXPECT_FALSE(NameSpaceUtil::checkNameSpaceExistInNsURL(noInstUrl));
}
TEST(NameSpaceUtil, checkNameSpaceExistInNameServer) {
string url = "http://MQ_INST_UNITTEST.rocketmq.nameserver.com";
string url2 = "MQ_INST_UNITTEST.rocketmq.nameserver.com";
string noInstUrl = "rocketmq.nameserver.com";
string nsIP = "127.0.0.1";
EXPECT_TRUE(NameSpaceUtil::checkNameSpaceExistInNameServer(url));
EXPECT_TRUE(NameSpaceUtil::checkNameSpaceExistInNameServer(url2));
EXPECT_FALSE(NameSpaceUtil::checkNameSpaceExistInNameServer(noInstUrl));
EXPECT_FALSE(NameSpaceUtil::checkNameSpaceExistInNameServer(nsIP));
}
TEST(NameSpaceUtil, withNameSpace) {
string source = "testTopic";
string ns = "MQ_INST_UNITTEST";
string nsSource = "MQ_INST_UNITTEST%testTopic";
EXPECT_EQ(NameSpaceUtil::withNameSpace(source, ns), nsSource);
EXPECT_EQ(NameSpaceUtil::withNameSpace(source, ""), source);
}
TEST(NameSpaceUtil, hasNameSpace) {
string source = "testTopic";
string ns = "MQ_INST_UNITTEST";
string nsSource = "MQ_INST_UNITTEST%testTopic";
EXPECT_TRUE(NameSpaceUtil::hasNameSpace(nsSource, ns));
EXPECT_FALSE(NameSpaceUtil::hasNameSpace(source, ns));
EXPECT_FALSE(NameSpaceUtil::hasNameSpace(source, ""));
}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(throw_on_failure) = true;
testing::GTEST_FLAG(filter) = "NameSpaceUtil.*";
int itestts = RUN_ALL_TESTS();
return itestts;
}
120 changes: 120 additions & 0 deletions test/src/common/UtilAllTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <string>

#include "gmock/gmock.h"
#include "gtest/gtest.h"

#include "UtilAll.h"

using std::string;

using ::testing::InitGoogleMock;
using ::testing::InitGoogleTest;
using testing::Return;

using rocketmq::UtilAll;

TEST(UtilAll, startsWith_retry) {
string source = "testTopic";
string retrySource = "%RETRY%testTopic";
string noRetrySource = "%DLQ%testTopic";
EXPECT_TRUE(UtilAll::startsWith_retry(retrySource));
EXPECT_FALSE(UtilAll::startsWith_retry(source));
EXPECT_FALSE(UtilAll::startsWith_retry(noRetrySource));
}
TEST(UtilAll, getRetryTopic) {
string source = "testTopic";
string retrySource = "%RETRY%testTopic";
EXPECT_EQ(UtilAll::getRetryTopic(source), retrySource);
}
TEST(UtilAll, Trim) {
string source = "testTopic";
string preSource = " testTopic";
string surSource = "testTopic ";
string allSource = " testTopic ";
UtilAll::Trim(preSource);
UtilAll::Trim(surSource);
UtilAll::Trim(allSource);
EXPECT_EQ(preSource, source);
EXPECT_EQ(surSource, source);
EXPECT_EQ(allSource, source);
}
TEST(UtilAll, hexstr2ull) {
const char* a = "1";
const char* b = "FF";
const char* c = "1a";
const char* d = "101";
EXPECT_EQ(UtilAll::hexstr2ull(a), 1);
EXPECT_EQ(UtilAll::hexstr2ull(b), 255);
EXPECT_EQ(UtilAll::hexstr2ull(c), 26);
EXPECT_EQ(UtilAll::hexstr2ull(d), 257);
}
TEST(UtilAll, SplitURL) {
string source = "127.0.0.1";
string source1 = "127.0.0.1:0";
string source2 = "127.0.0.1:9876";
string addr;
string addr1;
string addr2;
short port;
EXPECT_FALSE(UtilAll::SplitURL(source, addr, port));
EXPECT_FALSE(UtilAll::SplitURL(source1, addr1, port));
EXPECT_TRUE(UtilAll::SplitURL(source2, addr2, port));
EXPECT_EQ(addr2, "127.0.0.1");
EXPECT_EQ(port, 9876);
}
TEST(UtilAll, SplitOne) {
string source = "127.0.0.1:9876";
vector<string> ret;
EXPECT_EQ(UtilAll::Split(ret, source, '.'), 4);
EXPECT_EQ(ret[0], "127");
}
TEST(UtilAll, SplitStr) {
string source = "11AA222AA3333AA44444AA5";
vector<string> ret;
EXPECT_EQ(UtilAll::Split(ret, source, "AA"), 5);
EXPECT_EQ(ret[0], "11");
}
TEST(UtilAll, StringToInt32) {
string source = "123";
int value;
EXPECT_TRUE(UtilAll::StringToInt32(source, value));
EXPECT_EQ(123, value);
EXPECT_FALSE(UtilAll::StringToInt32("123456789X123456789", value));
EXPECT_FALSE(UtilAll::StringToInt32("-1234567890123456789", value));
EXPECT_FALSE(UtilAll::StringToInt32("1234567890123456789", value));
}
TEST(UtilAll, StringToInt64) {
string source = "123";
int64_t value;
EXPECT_TRUE(UtilAll::StringToInt64(source, value));
EXPECT_EQ(123, value);
EXPECT_FALSE(UtilAll::StringToInt64("XXXXXXXXXXX", value));
EXPECT_FALSE(UtilAll::StringToInt64("123456789X123456789", value));
EXPECT_EQ(123456789, value);
EXPECT_FALSE(UtilAll::StringToInt64("-123456789012345678901234567890123456789012345678901234567890", value));
EXPECT_FALSE(UtilAll::StringToInt64("123456789012345678901234567890123456789012345678901234567890", value));
}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(throw_on_failure) = true;
testing::GTEST_FLAG(filter) = "UtilAll.*";
int itestts = RUN_ALL_TESTS();
return itestts;
}
Loading