Skip to content

Commit b2c11a2

Browse files
committed
Patch apache#402
1 parent 68b4244 commit b2c11a2

23 files changed

+477
-127
lines changed

lib/BinaryProtoLookupService.cc

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include "ConnectionPool.h"
2323
#include "LogUtils.h"
2424
#include "NamespaceName.h"
25-
#include "ServiceNameResolver.h"
2625
#include "TopicName.h"
2726

2827
DECLARE_LOG_OBJECT()

lib/BinaryProtoLookupService.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
3838

3939
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4040
public:
41-
BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
41+
BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool,
4242
const ClientConfiguration& clientConfiguration)
43-
: serviceNameResolver_(serviceNameResolver),
43+
: serviceNameResolver_(serviceUrl),
4444
cnxPool_(pool),
4545
listenerName_(clientConfiguration.getListenerName()),
4646
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
@@ -54,6 +54,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
5454

5555
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
5656

57+
ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
58+
5759
protected:
5860
// Mark findBroker as protected to make it accessible from test.
5961
LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
@@ -63,7 +65,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
6365
std::mutex mutex_;
6466
uint64_t requestIdGenerator_ = 0;
6567

66-
ServiceNameResolver& serviceNameResolver_;
68+
ServiceNameResolver serviceNameResolver_;
6769
ConnectionPool& cnxPool_;
6870
std::string listenerName_;
6971
const int32_t maxLookupRedirects_;

lib/ClientConnection.cc

+56-1
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
403403
LOG_INFO(cnxString_ << "Connected to broker");
404404
} else {
405405
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
406-
<< ", proxy: " << proxyServiceUrl_);
406+
<< ", proxy: " << proxyServiceUrl_
407+
<< ", physical address:" << physicalAddress_);
407408
}
408409

409410
Lock lock(mutex_);
@@ -945,6 +946,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
945946
handleError(incomingCmd.error());
946947
break;
947948

949+
case BaseCommand::TOPIC_MIGRATED:
950+
handleTopicMigrated(incomingCmd.topicmigrated());
951+
break;
952+
948953
case BaseCommand::CLOSE_PRODUCER:
949954
handleCloseProducer(incomingCmd.close_producer());
950955
break;
@@ -1761,6 +1766,56 @@ void ClientConnection::handleError(const proto::CommandError& error) {
17611766
}
17621767
}
17631768

1769+
std::string ClientConnection::getMigratedBrokerServiceUrl(
1770+
const proto::CommandTopicMigrated& commandTopicMigrated) {
1771+
if (tlsSocket_) {
1772+
if (commandTopicMigrated.has_brokerserviceurltls()) {
1773+
return commandTopicMigrated.brokerserviceurltls();
1774+
}
1775+
} else if (commandTopicMigrated.has_brokerserviceurl()) {
1776+
return commandTopicMigrated.brokerserviceurl();
1777+
}
1778+
return "";
1779+
}
1780+
1781+
void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) {
1782+
const long resourceId = commandTopicMigrated.resource_id();
1783+
const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated);
1784+
1785+
if (migratedBrokerServiceUrl.empty()) {
1786+
LOG_WARN("Failed to find the migrated broker url for resource:"
1787+
<< resourceId
1788+
<< (commandTopicMigrated.has_brokerserviceurl()
1789+
? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl()
1790+
: "")
1791+
<< (commandTopicMigrated.has_brokerserviceurltls()
1792+
? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls()
1793+
: ""));
1794+
return;
1795+
}
1796+
1797+
Lock lock(mutex_);
1798+
if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) {
1799+
auto it = producers_.find(resourceId);
1800+
if (it != producers_.end()) {
1801+
auto producer = it->second.lock();
1802+
producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
1803+
LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
1804+
} else {
1805+
LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId);
1806+
}
1807+
} else {
1808+
auto it = consumers_.find(resourceId);
1809+
if (it != consumers_.end()) {
1810+
auto consumer = it->second.lock();
1811+
consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
1812+
LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
1813+
} else {
1814+
LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId);
1815+
}
1816+
}
1817+
}
1818+
17641819
boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
17651820
const proto::CommandCloseProducer& closeProducer) {
17661821
if (tlsSocket_) {

lib/ClientConnection.h

+5-4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class BrokerEntryMetadata;
8585
class CommandActiveConsumerChange;
8686
class CommandAckResponse;
8787
class CommandMessage;
88+
class CommandTopicMigrated;
8889
class CommandCloseConsumer;
8990
class CommandCloseProducer;
9091
class CommandConnected;
@@ -414,17 +415,17 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
414415
void handleLookupTopicRespose(const proto::CommandLookupTopicResponse&);
415416
void handleProducerSuccess(const proto::CommandProducerSuccess&);
416417
void handleError(const proto::CommandError&);
418+
void handleTopicMigrated(const proto::CommandTopicMigrated&);
417419
void handleCloseProducer(const proto::CommandCloseProducer&);
418420
void handleCloseConsumer(const proto::CommandCloseConsumer&);
419421
void handleAuthChallenge();
420422
void handleGetLastMessageIdResponse(const proto::CommandGetLastMessageIdResponse&);
421423
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
422424
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
423425
void handleAckResponse(const proto::CommandAckResponse&);
424-
boost::optional<std::string> getAssignedBrokerServiceUrl(
425-
const proto::CommandCloseProducer& closeProducer);
426-
boost::optional<std::string> getAssignedBrokerServiceUrl(
427-
const proto::CommandCloseConsumer& closeConsumer);
426+
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
427+
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
428+
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
428429
};
429430
} // namespace pulsar
430431

lib/ClientImpl.cc

+40-16
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ typedef std::vector<std::string> StringList;
7979
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
8080
: mutex_(),
8181
state_(Open),
82-
serviceNameResolver_(serviceUrl),
83-
clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())),
82+
clientConfiguration_(ClientConfiguration(clientConfiguration)
83+
.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))),
8484
memoryLimitController_(clientConfiguration.getMemoryLimit()),
8585
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
8686
listenerExecutorProvider_(
@@ -98,25 +98,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
9898
if (loggerFactory) {
9999
LogUtils::setLoggerFactory(std::move(loggerFactory));
100100
}
101+
lookupServicePtr_ = createLookup(serviceUrl);
102+
}
103+
104+
ClientImpl::~ClientImpl() { shutdown(); }
101105

106+
LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
102107
LookupServicePtr underlyingLookupServicePtr;
103-
if (serviceNameResolver_.useHttp()) {
108+
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
104109
LOG_DEBUG("Using HTTP Lookup");
105110
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
106-
std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
107-
std::cref(clientConfiguration_.getAuthPtr()));
111+
serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
108112
} else {
109113
LOG_DEBUG("Using Binary Lookup");
110114
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
111-
std::ref(serviceNameResolver_), std::ref(pool_), std::cref(clientConfiguration_));
115+
serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
112116
}
113117

114-
lookupServicePtr_ = RetryableLookupService::create(
118+
auto lookupServicePtr = RetryableLookupService::create(
115119
underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
120+
return lookupServicePtr;
116121
}
117122

118-
ClientImpl::~ClientImpl() { shutdown(); }
119-
120123
const ClientConfiguration& ClientImpl::conf() const { return clientConfiguration_; }
121124

122125
MemoryLimitController& ClientImpl::getMemoryLimitController() { return memoryLimitController_; }
@@ -129,7 +132,21 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
129132
return partitionListenerExecutorProvider_;
130133
}
131134

132-
LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
135+
LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) {
136+
if (redirectedClusterURI.empty()) {
137+
return lookupServicePtr_;
138+
}
139+
140+
Lock lock(mutex_);
141+
auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
142+
if (it == redirectedClusterLookupServicePtrs_.end()) {
143+
auto lookup = createLookup(redirectedClusterURI);
144+
redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
145+
return lookup;
146+
}
147+
148+
return it->second;
149+
}
133150

134151
void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
135152
CreateProducerCallback callback, bool autoDownloadSchema) {
@@ -517,7 +534,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517534
}
518535
}
519536

520-
GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
537+
GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClusterURI,
538+
const std::string& topic, size_t key) {
521539
Promise<Result, ClientConnectionPtr> promise;
522540

523541
const auto topicNamePtr = TopicName::get(topic);
@@ -528,7 +546,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
528546
}
529547

530548
auto self = shared_from_this();
531-
lookupServicePtr_->getBroker(*topicNamePtr)
549+
getLookup(redirectedClusterURI)
550+
->getBroker(*topicNamePtr)
532551
.addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
533552
if (result != ResultOk) {
534553
promise.setFailed(result);
@@ -554,16 +573,18 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
554573
return promise.getFuture();
555574
}
556575

557-
const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddress) {
576+
const std::string& ClientImpl::getPhysicalAddress(const std::string& redirectedClusterURI,
577+
const std::string& logicalAddress) {
558578
if (useProxy_) {
559-
return serviceNameResolver_.resolveHost();
579+
return getLookup(redirectedClusterURI)->getServiceNameResolver().resolveHost();
560580
} else {
561581
return logicalAddress;
562582
}
563583
}
564584

565-
GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
566-
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
585+
GetConnectionFuture ClientImpl::connect(const std::string& redirectedClusterURI,
586+
const std::string& logicalAddress, size_t key) {
587+
const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI, logicalAddress);
567588
Promise<Result, ClientConnectionPtr> promise;
568589
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
569590
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
@@ -633,6 +654,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {
633654

634655
memoryLimitController_.close();
635656
lookupServicePtr_->close();
657+
for (const auto& it : redirectedClusterLookupServicePtrs_) {
658+
it.second->close();
659+
}
636660

637661
auto producers = producers_.move();
638662
auto consumers = consumers_.move();

lib/ClientImpl.h

+10-5
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9797
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9898

9999
// Use virtual method to test
100-
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
100+
virtual GetConnectionFuture getConnection(const std::string& redirectedClusterURI,
101+
const std::string& topic, size_t key);
101102

102-
GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
103+
GetConnectionFuture connect(const std::string& redirectedClusterURI, const std::string& logicalAddress,
104+
size_t key);
103105

104106
void closeAsync(CloseCallback callback);
105107
void shutdown();
@@ -119,7 +121,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
119121
ExecutorServiceProviderPtr getIOExecutorProvider();
120122
ExecutorServiceProviderPtr getListenerExecutorProvider();
121123
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
122-
LookupServicePtr getLookup();
124+
LookupServicePtr getLookup(const std::string& redirectedClusterURI = "");
123125

124126
void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
125127

@@ -165,7 +167,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
165167
const std::string& consumerName, const ConsumerConfiguration& conf,
166168
SubscribeCallback callback);
167169

168-
const std::string& getPhysicalAddress(const std::string& logicalAddress);
170+
const std::string& getPhysicalAddress(const std::string& redirectedClusterURI,
171+
const std::string& logicalAddress);
172+
173+
LookupServicePtr createLookup(const std::string& serviceUrl);
169174

170175
static std::string getClientVersion(const ClientConfiguration& clientConfiguration);
171176

@@ -179,7 +184,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
179184
std::mutex mutex_;
180185

181186
State state_;
182-
ServiceNameResolver serviceNameResolver_;
183187
ClientConfiguration clientConfiguration_;
184188
MemoryLimitController memoryLimitController_;
185189

@@ -188,6 +192,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
188192
ExecutorServiceProviderPtr partitionListenerExecutorProvider_;
189193

190194
LookupServicePtr lookupServicePtr_;
195+
std::unordered_map<std::string, LookupServicePtr> redirectedClusterLookupServicePtrs_;
191196
ConnectionPool pool_;
192197

193198
uint64_t producerIdGenerator_;

lib/HTTPLookupService.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
4646
const static std::string PARTITION_METHOD_NAME = "partitions";
4747
const static int NUMBER_OF_LOOKUP_THREADS = 1;
4848

49-
HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver,
49+
HTTPLookupService::HTTPLookupService(const std::string &serviceUrl,
5050
const ClientConfiguration &clientConfiguration,
5151
const AuthenticationPtr &authData)
5252
: executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
53-
serviceNameResolver_(serviceNameResolver),
53+
serviceNameResolver_(serviceUrl),
5454
authenticationPtr_(authData),
5555
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
5656
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()),

lib/HTTPLookupService.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
4040
typedef Promise<Result, LookupDataResultPtr> LookupPromise;
4141

4242
ExecutorServiceProviderPtr executorProvider_;
43-
ServiceNameResolver& serviceNameResolver_;
43+
ServiceNameResolver serviceNameResolver_;
4444
AuthenticationPtr authenticationPtr_;
4545
int lookupTimeoutInSeconds_;
4646
const int maxLookupRedirects_;
@@ -64,7 +64,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
6464
Result sendHTTPRequest(std::string completeUrl, std::string& responseData, long& responseCode);
6565

6666
public:
67-
HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&);
67+
HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);
6868

6969
LookupResultFuture getBroker(const TopicName& topicName) override;
7070

@@ -74,6 +74,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
7474

7575
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
7676
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;
77+
78+
ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
7779
};
7880
} // namespace pulsar
7981

lib/HandlerBase.cc

+4-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
4343
epoch_(0),
4444
timer_(executor_->createDeadlineTimer()),
4545
creationTimer_(executor_->createDeadlineTimer()),
46-
reconnectionPending_(false) {}
46+
reconnectionPending_(false),
47+
redirectedClusterURI_("") {}
4748

4849
HandlerBase::~HandlerBase() {
4950
ASIO_ERROR ignored;
@@ -88,9 +89,9 @@ void HandlerBase::grabCnx() { grabCnx(boost::none); }
8889
Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
8990
const ClientImplPtr& client, const boost::optional<std::string>& assignedBrokerUrl) {
9091
if (assignedBrokerUrl && client->getLookupCount() > 0) {
91-
return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_);
92+
return client->connect(redirectedClusterURI_, assignedBrokerUrl.get(), connectionKeySuffix_);
9293
} else {
93-
return client->getConnection(topic(), connectionKeySuffix_);
94+
return client->getConnection(redirectedClusterURI_, topic(), connectionKeySuffix_);
9495
}
9596
}
9697

lib/HandlerBase.h

+3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
5252
ClientConnectionWeakPtr getCnx() const;
5353
void setCnx(const ClientConnectionPtr& cnx);
5454
void resetCnx() { setCnx(nullptr); }
55+
void setRedirectedClusterURI(const std::string serviceUrl) { redirectedClusterURI_ = serviceUrl; }
5556

5657
protected:
5758
/*
@@ -145,6 +146,8 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
145146
mutable std::mutex connectionMutex_;
146147
std::atomic<bool> reconnectionPending_;
147148
ClientConnectionWeakPtr connection_;
149+
std::string redirectedClusterURI_;
150+
148151
friend class ClientConnection;
149152
friend class PulsarFriend;
150153
};

0 commit comments

Comments
 (0)