Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit ad1f59f

Browse files
committedFeb 12, 2024·
[feat] PIP-188 Support blue-green migration
1 parent 68b4244 commit ad1f59f

23 files changed

+473
-126
lines changed
 

‎lib/BinaryProtoLookupService.cc

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include "ConnectionPool.h"
2323
#include "LogUtils.h"
2424
#include "NamespaceName.h"
25-
#include "ServiceNameResolver.h"
2625
#include "TopicName.h"
2726

2827
DECLARE_LOG_OBJECT()

‎lib/BinaryProtoLookupService.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
3838

3939
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4040
public:
41-
BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
41+
BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool,
4242
const ClientConfiguration& clientConfiguration)
43-
: serviceNameResolver_(serviceNameResolver),
43+
: serviceNameResolver_(serviceUrl),
4444
cnxPool_(pool),
4545
listenerName_(clientConfiguration.getListenerName()),
4646
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
@@ -54,6 +54,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
5454

5555
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
5656

57+
ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
58+
5759
protected:
5860
// Mark findBroker as protected to make it accessible from test.
5961
LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
@@ -63,7 +65,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
6365
std::mutex mutex_;
6466
uint64_t requestIdGenerator_ = 0;
6567

66-
ServiceNameResolver& serviceNameResolver_;
68+
ServiceNameResolver serviceNameResolver_;
6769
ConnectionPool& cnxPool_;
6870
std::string listenerName_;
6971
const int32_t maxLookupRedirects_;

‎lib/ClientConnection.cc

+55-1
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
403403
LOG_INFO(cnxString_ << "Connected to broker");
404404
} else {
405405
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
406-
<< ", proxy: " << proxyServiceUrl_);
406+
<< ", proxy: " << proxyServiceUrl_
407+
<< ", physical address:" << physicalAddress_);
407408
}
408409

409410
Lock lock(mutex_);
@@ -945,6 +946,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
945946
handleError(incomingCmd.error());
946947
break;
947948

949+
case BaseCommand::TOPIC_MIGRATED:
950+
handleTopicMigrated(incomingCmd.topicmigrated());
951+
break;
952+
948953
case BaseCommand::CLOSE_PRODUCER:
949954
handleCloseProducer(incomingCmd.close_producer());
950955
break;
@@ -1761,6 +1766,55 @@ void ClientConnection::handleError(const proto::CommandError& error) {
17611766
}
17621767
}
17631768

1769+
std::string ClientConnection::getMigratedBrokerServiceUrl(
1770+
const proto::CommandTopicMigrated& commandTopicMigrated) {
1771+
if (tlsSocket_) {
1772+
if (commandTopicMigrated.has_brokerserviceurltls()) {
1773+
return commandTopicMigrated.brokerserviceurltls();
1774+
}
1775+
} else if (commandTopicMigrated.has_brokerserviceurl()) {
1776+
return commandTopicMigrated.brokerserviceurl();
1777+
}
1778+
return "";
1779+
}
1780+
1781+
void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) {
1782+
const long resourceId = commandTopicMigrated.resource_id();
1783+
const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated);
1784+
1785+
if (migratedBrokerServiceUrl.empty()) {
1786+
LOG_WARN("Failed to find the migrated broker url for resource:"
1787+
<< resourceId
1788+
<< (commandTopicMigrated.has_brokerserviceurl()
1789+
? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl()
1790+
: "")
1791+
<< (commandTopicMigrated.has_brokerserviceurltls()
1792+
? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls()
1793+
: ""));
1794+
return;
1795+
}
1796+
1797+
if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) {
1798+
auto it = producers_.find(resourceId);
1799+
if (it != producers_.end()) {
1800+
auto producer = it->second.lock();
1801+
producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
1802+
LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
1803+
} else {
1804+
LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId);
1805+
}
1806+
} else {
1807+
auto it = consumers_.find(resourceId);
1808+
if (it != consumers_.end()) {
1809+
auto consumer = it->second.lock();
1810+
consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
1811+
LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
1812+
} else {
1813+
LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId);
1814+
}
1815+
}
1816+
}
1817+
17641818
boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
17651819
const proto::CommandCloseProducer& closeProducer) {
17661820
if (tlsSocket_) {

‎lib/ClientConnection.h

+5-4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class BrokerEntryMetadata;
8585
class CommandActiveConsumerChange;
8686
class CommandAckResponse;
8787
class CommandMessage;
88+
class CommandTopicMigrated;
8889
class CommandCloseConsumer;
8990
class CommandCloseProducer;
9091
class CommandConnected;
@@ -414,17 +415,17 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
414415
void handleLookupTopicRespose(const proto::CommandLookupTopicResponse&);
415416
void handleProducerSuccess(const proto::CommandProducerSuccess&);
416417
void handleError(const proto::CommandError&);
418+
void handleTopicMigrated(const proto::CommandTopicMigrated&);
417419
void handleCloseProducer(const proto::CommandCloseProducer&);
418420
void handleCloseConsumer(const proto::CommandCloseConsumer&);
419421
void handleAuthChallenge();
420422
void handleGetLastMessageIdResponse(const proto::CommandGetLastMessageIdResponse&);
421423
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
422424
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
423425
void handleAckResponse(const proto::CommandAckResponse&);
424-
boost::optional<std::string> getAssignedBrokerServiceUrl(
425-
const proto::CommandCloseProducer& closeProducer);
426-
boost::optional<std::string> getAssignedBrokerServiceUrl(
427-
const proto::CommandCloseConsumer& closeConsumer);
426+
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
427+
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
428+
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
428429
};
429430
} // namespace pulsar
430431

‎lib/ClientImpl.cc

+40-16
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ typedef std::vector<std::string> StringList;
7979
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
8080
: mutex_(),
8181
state_(Open),
82-
serviceNameResolver_(serviceUrl),
83-
clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())),
82+
clientConfiguration_(ClientConfiguration(clientConfiguration)
83+
.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))),
8484
memoryLimitController_(clientConfiguration.getMemoryLimit()),
8585
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
8686
listenerExecutorProvider_(
@@ -98,25 +98,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
9898
if (loggerFactory) {
9999
LogUtils::setLoggerFactory(std::move(loggerFactory));
100100
}
101+
lookupServicePtr_ = createLookup(serviceUrl);
102+
}
103+
104+
ClientImpl::~ClientImpl() { shutdown(); }
101105

106+
LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
102107
LookupServicePtr underlyingLookupServicePtr;
103-
if (serviceNameResolver_.useHttp()) {
108+
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
104109
LOG_DEBUG("Using HTTP Lookup");
105110
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
106-
std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
107-
std::cref(clientConfiguration_.getAuthPtr()));
111+
serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
108112
} else {
109113
LOG_DEBUG("Using Binary Lookup");
110114
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
111-
std::ref(serviceNameResolver_), std::ref(pool_), std::cref(clientConfiguration_));
115+
serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
112116
}
113117

114-
lookupServicePtr_ = RetryableLookupService::create(
118+
auto lookupServicePtr = RetryableLookupService::create(
115119
underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
120+
return lookupServicePtr;
116121
}
117122

118-
ClientImpl::~ClientImpl() { shutdown(); }
119-
120123
const ClientConfiguration& ClientImpl::conf() const { return clientConfiguration_; }
121124

122125
MemoryLimitController& ClientImpl::getMemoryLimitController() { return memoryLimitController_; }
@@ -129,7 +132,21 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
129132
return partitionListenerExecutorProvider_;
130133
}
131134

132-
LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
135+
LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) {
136+
if (redirectedClusterURI.empty()) {
137+
return lookupServicePtr_;
138+
}
139+
140+
Lock lock(mutex_);
141+
auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
142+
if (it == redirectedClusterLookupServicePtrs_.end()) {
143+
auto lookup = createLookup(redirectedClusterURI);
144+
redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
145+
return lookup;
146+
}
147+
148+
return it->second;
149+
}
133150

134151
void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
135152
CreateProducerCallback callback, bool autoDownloadSchema) {
@@ -517,7 +534,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517534
}
518535
}
519536

520-
GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
537+
GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClusterURI,
538+
const std::string& topic, size_t key) {
521539
Promise<Result, ClientConnectionPtr> promise;
522540

523541
const auto topicNamePtr = TopicName::get(topic);
@@ -528,7 +546,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
528546
}
529547

530548
auto self = shared_from_this();
531-
lookupServicePtr_->getBroker(*topicNamePtr)
549+
getLookup(redirectedClusterURI)
550+
->getBroker(*topicNamePtr)
532551
.addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
533552
if (result != ResultOk) {
534553
promise.setFailed(result);
@@ -554,16 +573,18 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
554573
return promise.getFuture();
555574
}
556575

557-
const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddress) {
576+
const std::string& ClientImpl::getPhysicalAddress(const std::string& redirectedClusterURI,
577+
const std::string& logicalAddress) {
558578
if (useProxy_) {
559-
return serviceNameResolver_.resolveHost();
579+
return getLookup(redirectedClusterURI)->getServiceNameResolver().resolveHost();
560580
} else {
561581
return logicalAddress;
562582
}
563583
}
564584

565-
GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
566-
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
585+
GetConnectionFuture ClientImpl::connect(const std::string& redirectedClusterURI,
586+
const std::string& logicalAddress, size_t key) {
587+
const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI, logicalAddress);
567588
Promise<Result, ClientConnectionPtr> promise;
568589
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
569590
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
@@ -633,6 +654,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {
633654

634655
memoryLimitController_.close();
635656
lookupServicePtr_->close();
657+
for (const auto& it : redirectedClusterLookupServicePtrs_) {
658+
it.second->close();
659+
}
636660

637661
auto producers = producers_.move();
638662
auto consumers = consumers_.move();

‎lib/ClientImpl.h

+10-5
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9797
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9898

9999
// Use virtual method to test
100-
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
100+
virtual GetConnectionFuture getConnection(const std::string& redirectedClusterURI,
101+
const std::string& topic, size_t key);
101102

102-
GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
103+
GetConnectionFuture connect(const std::string& redirectedClusterURI, const std::string& logicalAddress,
104+
size_t key);
103105

104106
void closeAsync(CloseCallback callback);
105107
void shutdown();
@@ -119,7 +121,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
119121
ExecutorServiceProviderPtr getIOExecutorProvider();
120122
ExecutorServiceProviderPtr getListenerExecutorProvider();
121123
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
122-
LookupServicePtr getLookup();
124+
LookupServicePtr getLookup(const std::string& redirectedClusterURI = "");
123125

124126
void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
125127

@@ -165,7 +167,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
165167
const std::string& consumerName, const ConsumerConfiguration& conf,
166168
SubscribeCallback callback);
167169

168-
const std::string& getPhysicalAddress(const std::string& logicalAddress);
170+
const std::string& getPhysicalAddress(const std::string& redirectedClusterURI,
171+
const std::string& logicalAddress);
172+
173+
LookupServicePtr createLookup(const std::string& serviceUrl);
169174

170175
static std::string getClientVersion(const ClientConfiguration& clientConfiguration);
171176

@@ -179,7 +184,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
179184
std::mutex mutex_;
180185

181186
State state_;
182-
ServiceNameResolver serviceNameResolver_;
183187
ClientConfiguration clientConfiguration_;
184188
MemoryLimitController memoryLimitController_;
185189

@@ -188,6 +192,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
188192
ExecutorServiceProviderPtr partitionListenerExecutorProvider_;
189193

190194
LookupServicePtr lookupServicePtr_;
195+
std::unordered_map<std::string, LookupServicePtr> redirectedClusterLookupServicePtrs_;
191196
ConnectionPool pool_;
192197

193198
uint64_t producerIdGenerator_;

‎lib/HTTPLookupService.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
4646
const static std::string PARTITION_METHOD_NAME = "partitions";
4747
const static int NUMBER_OF_LOOKUP_THREADS = 1;
4848

49-
HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver,
49+
HTTPLookupService::HTTPLookupService(const std::string &serviceUrl,
5050
const ClientConfiguration &clientConfiguration,
5151
const AuthenticationPtr &authData)
5252
: executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
53-
serviceNameResolver_(serviceNameResolver),
53+
serviceNameResolver_(serviceUrl),
5454
authenticationPtr_(authData),
5555
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
5656
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()),

‎lib/HTTPLookupService.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
4040
typedef Promise<Result, LookupDataResultPtr> LookupPromise;
4141

4242
ExecutorServiceProviderPtr executorProvider_;
43-
ServiceNameResolver& serviceNameResolver_;
43+
ServiceNameResolver serviceNameResolver_;
4444
AuthenticationPtr authenticationPtr_;
4545
int lookupTimeoutInSeconds_;
4646
const int maxLookupRedirects_;
@@ -64,7 +64,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
6464
Result sendHTTPRequest(std::string completeUrl, std::string& responseData, long& responseCode);
6565

6666
public:
67-
HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&);
67+
HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);
6868

6969
LookupResultFuture getBroker(const TopicName& topicName) override;
7070

@@ -74,6 +74,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
7474

7575
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
7676
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;
77+
78+
ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
7779
};
7880
} // namespace pulsar
7981

‎lib/HandlerBase.cc

+4-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
4343
epoch_(0),
4444
timer_(executor_->createDeadlineTimer()),
4545
creationTimer_(executor_->createDeadlineTimer()),
46-
reconnectionPending_(false) {}
46+
reconnectionPending_(false),
47+
redirectedClusterURI_("") {}
4748

4849
HandlerBase::~HandlerBase() {
4950
ASIO_ERROR ignored;
@@ -88,9 +89,9 @@ void HandlerBase::grabCnx() { grabCnx(boost::none); }
8889
Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
8990
const ClientImplPtr& client, const boost::optional<std::string>& assignedBrokerUrl) {
9091
if (assignedBrokerUrl && client->getLookupCount() > 0) {
91-
return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_);
92+
return client->connect(redirectedClusterURI_, assignedBrokerUrl.get(), connectionKeySuffix_);
9293
} else {
93-
return client->getConnection(topic(), connectionKeySuffix_);
94+
return client->getConnection(redirectedClusterURI_, topic(), connectionKeySuffix_);
9495
}
9596
}
9697

‎lib/HandlerBase.h

+3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
5252
ClientConnectionWeakPtr getCnx() const;
5353
void setCnx(const ClientConnectionPtr& cnx);
5454
void resetCnx() { setCnx(nullptr); }
55+
void setRedirectedClusterURI(const std::string serviceUrl) { redirectedClusterURI_ = serviceUrl; }
5556

5657
protected:
5758
/*
@@ -145,6 +146,8 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
145146
mutable std::mutex connectionMutex_;
146147
std::atomic<bool> reconnectionPending_;
147148
ClientConnectionWeakPtr connection_;
149+
std::string redirectedClusterURI_;
150+
148151
friend class ClientConnection;
149152
friend class PulsarFriend;
150153
};

‎lib/LookupService.h

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "Future.h"
3030
#include "LookupDataResult.h"
3131
#include "ProtoApiEnums.h"
32+
#include "ServiceNameResolver.h"
3233

3334
namespace pulsar {
3435
using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
@@ -86,6 +87,8 @@ class LookupService {
8687
virtual Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName,
8788
const std::string& version = "") = 0;
8889

90+
virtual ServiceNameResolver& getServiceNameResolver() = 0;
91+
8992
virtual ~LookupService() {}
9093

9194
virtual void close() {}

‎lib/ProducerImpl.cc

+4-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,11 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c
144144
return promise.getFuture();
145145
}
146146

147+
LOG_DEBUG("Creating producer" << topic() << "producerName: " << producerName_ << " on "
148+
<< cnx->cnxString());
147149
ClientImplPtr client = client_.lock();
150+
cnx->registerProducer(producerId_, shared_from_this());
151+
148152
int requestId = client->newRequestId();
149153

150154
SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId,
@@ -214,7 +218,6 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
214218
// set the cnx pointer so that new messages will be sent immediately
215219
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
216220

217-
cnx->registerProducer(producerId_, shared_from_this());
218221
producerName_ = responseData.producerName;
219222
schemaVersion_ = responseData.schemaVersion;
220223
producerStr_ = "[" + topic() + ", " + producerName_ + "] ";

‎lib/RetryableLookupService.h

+4
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class RetryableLookupService : public LookupService {
7676
});
7777
}
7878

79+
ServiceNameResolver& getServiceNameResolver() override {
80+
return lookupService_->getServiceNameResolver();
81+
}
82+
7983
private:
8084
const std::shared_ptr<LookupService> lookupService_;
8185
RetryableOperationCachePtr<LookupResult> lookupCache_;

‎lib/ServiceNameResolver.h

+9-6
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,17 @@ class ServiceNameResolver {
3636
ServiceNameResolver(const ServiceNameResolver&) = delete;
3737
ServiceNameResolver& operator=(const ServiceNameResolver&) = delete;
3838

39-
bool useTls() const noexcept {
40-
return serviceUri_.getScheme() == PulsarScheme::PULSAR_SSL ||
41-
serviceUri_.getScheme() == PulsarScheme::HTTPS;
39+
bool useTls() const noexcept { return useTls(serviceUri_); }
40+
41+
static bool useTls(const ServiceURI& serviceUri) noexcept {
42+
return serviceUri.getScheme() == PulsarScheme::PULSAR_SSL ||
43+
serviceUri.getScheme() == PulsarScheme::HTTPS;
4244
}
4345

44-
bool useHttp() const noexcept {
45-
return serviceUri_.getScheme() == PulsarScheme::HTTP ||
46-
serviceUri_.getScheme() == PulsarScheme::HTTPS;
46+
bool useHttp() const noexcept { return useTls(serviceUri_); }
47+
48+
static bool useHttp(const ServiceURI& serviceUri) noexcept {
49+
return serviceUri.getScheme() == PulsarScheme::HTTP || serviceUri.getScheme() == PulsarScheme::HTTPS;
4750
}
4851

4952
const std::string& resolveHost() {

‎proto/PulsarApi.proto

+16
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ enum ProtocolVersion {
262262
v17 = 17; // Added support ack receipt
263263
v18 = 18; // Add client support for broker entry metadata
264264
v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse
265+
v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated
265266
}
266267

267268
message CommandConnect {
@@ -620,6 +621,18 @@ message CommandReachedEndOfTopic {
620621
required uint64 consumer_id = 1;
621622
}
622623

624+
message CommandTopicMigrated {
625+
enum ResourceType {
626+
Producer = 0;
627+
Consumer = 1;
628+
}
629+
required uint64 resource_id = 1;
630+
required ResourceType resource_type = 2;
631+
optional string brokerServiceUrl = 3;
632+
optional string brokerServiceUrlTls = 4;
633+
634+
}
635+
623636
message CommandCloseProducer {
624637
required uint64 producer_id = 1;
625638
required uint64 request_id = 2;
@@ -1029,6 +1042,7 @@ message BaseCommand {
10291042
WATCH_TOPIC_UPDATE = 66;
10301043
WATCH_TOPIC_LIST_CLOSE = 67;
10311044

1045+
TOPIC_MIGRATED = 68;
10321046
}
10331047

10341048

@@ -1110,4 +1124,6 @@ message BaseCommand {
11101124
optional CommandWatchTopicListSuccess watchTopicListSuccess = 65;
11111125
optional CommandWatchTopicUpdate watchTopicUpdate = 66;
11121126
optional CommandWatchTopicListClose watchTopicListClose = 67;
1127+
1128+
optional CommandTopicMigrated topicMigrated = 68;
11131129
}

‎run-unit-tests.sh

+3
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@ export https_proxy=
3333

3434
# Run ExtensibleLoadManager tests
3535
docker compose -f tests/extensibleLM/docker-compose.yml up -d
36+
docker compose -f tests/blue-green/docker-compose.yml up -d
3637
until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
38+
until curl http://localhost:8081/metrics > /dev/null 2>&1 ; do sleep 1; done
3739
sleep 5
3840
$CMAKE_BUILD_DIRECTORY/tests/ExtensibleLoadManagerTest
41+
docker compose -f tests/blue-green/docker-compose.yml down
3942
docker compose -f tests/extensibleLM/docker-compose.yml down
4043

4144
# Run OAuth2 tests

‎tests/ClientTest.cc

+10-7
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,8 @@ TEST(ClientTest, testRetryUntilSucceed) {
443443
EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2);
444444
std::atomic_int count{0};
445445
ON_CALL(*clientImpl, getConnection)
446-
.WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) {
446+
.WillByDefault([&clientImpl, &count](const std::string &redirectedClusterURI,
447+
const std::string &topic, size_t index) {
447448
if (count++ < kFailCount) {
448449
return GetConnectionFuture::failed(ResultRetryable);
449450
}
@@ -461,9 +462,10 @@ TEST(ClientTest, testRetryTimeout) {
461462
auto clientImpl =
462463
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2));
463464
EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2));
464-
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) {
465-
return GetConnectionFuture::failed(ResultRetryable);
466-
});
465+
ON_CALL(*clientImpl, getConnection)
466+
.WillByDefault([](const std::string &redirectedClusterURI, const std::string &topic, size_t index) {
467+
return GetConnectionFuture::failed(ResultRetryable);
468+
});
467469

468470
auto topic = "client-test-retry-timeout";
469471
{
@@ -484,9 +486,10 @@ TEST(ClientTest, testNoRetry) {
484486
auto clientImpl =
485487
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100));
486488
EXPECT_CALL(*clientImpl, getConnection).Times(2);
487-
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) {
488-
return GetConnectionFuture::failed(ResultAuthenticationError);
489-
});
489+
ON_CALL(*clientImpl, getConnection)
490+
.WillByDefault([](const std::string &redirectedClusterURI, const std::string &, size_t) {
491+
return GetConnectionFuture::failed(ResultAuthenticationError);
492+
});
490493

491494
auto topic = "client-test-no-retry";
492495
{

‎tests/LookupServiceTest.cc

+14-18
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ TEST(LookupServiceTest, basicLookup) {
8484
ClientConfiguration conf;
8585
ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
8686
ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
87-
ServiceNameResolver serviceNameResolver(url);
88-
BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf);
87+
BinaryProtoLookupService lookupService(url, pool_, conf);
8988

9089
TopicNamePtr topicName = TopicName::get("topic");
9190

@@ -148,26 +147,24 @@ static void testMultiAddresses(LookupService& lookupService) {
148147

149148
TEST(LookupServiceTest, testMultiAddresses) {
150149
ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), AuthFactory::Disabled(), "");
151-
ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999");
152150
ClientConfiguration conf;
153-
BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, conf);
151+
BinaryProtoLookupService binaryLookupService("pulsar://localhost,localhost:9999", pool, conf);
154152
testMultiAddresses(binaryLookupService);
155153

156154
// HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test
157-
ServiceNameResolver serviceNameResolverForHttp("http://localhost,localhost:9999");
158155
auto httpLookupServicePtr = std::make_shared<HTTPLookupService>(
159-
std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled());
156+
"http://localhost,localhost:9999", ClientConfiguration{}, AuthFactory::Disabled());
160157
testMultiAddresses(*httpLookupServicePtr);
161158
}
162159
TEST(LookupServiceTest, testRetry) {
163160
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
164161
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
165-
ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost");
166162
ClientConfiguration conf;
167163

168164
auto lookupService = RetryableLookupService::create(
169-
std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, conf), std::chrono::seconds(30),
170-
executorProvider);
165+
std::make_shared<BinaryProtoLookupService>("pulsar://localhost:9999,localhost", pool, conf),
166+
std::chrono::seconds(30), executorProvider);
167+
ServiceNameResolver& serviceNameResolver = lookupService->getServiceNameResolver();
171168

172169
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
173170
auto topicNamePtr = TopicName::get("lookup-service-test-retry");
@@ -196,12 +193,12 @@ TEST(LookupServiceTest, testRetry) {
196193
TEST(LookupServiceTest, testTimeout) {
197194
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
198195
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
199-
ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
200196
ClientConfiguration conf;
201197

202198
constexpr int timeoutInSeconds = 2;
203199
auto lookupService = RetryableLookupService::create(
204-
std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, conf),
200+
std::make_shared<BinaryProtoLookupService>("pulsar://localhost:9990,localhost:9902,localhost:9904",
201+
pool, conf),
205202
std::chrono::seconds(timeoutInSeconds), executorProvider);
206203
auto topicNamePtr = TopicName::get("lookup-service-test-retry");
207204

@@ -467,9 +464,9 @@ INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLook
467464

468465
class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService {
469466
public:
470-
BinaryProtoLookupServiceRedirectTestHelper(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
467+
BinaryProtoLookupServiceRedirectTestHelper(const std::string& serviceUrl, ConnectionPool& pool,
471468
const ClientConfiguration& clientConfiguration)
472-
: BinaryProtoLookupService(serviceNameResolver, pool, clientConfiguration) {}
469+
: BinaryProtoLookupService(serviceUrl, pool, clientConfiguration) {}
473470

474471
LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
475472
size_t redirectCount) {
@@ -484,14 +481,13 @@ TEST(LookupServiceTest, testRedirectionLimit) {
484481
conf.setMaxLookupRedirects(redirect_limit);
485482
ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
486483
ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
487-
std::string url = "pulsar://localhost:6650";
488-
ServiceNameResolver serviceNameResolver(url);
489-
BinaryProtoLookupServiceRedirectTestHelper lookupService(serviceNameResolver, pool_, conf);
484+
string url = "pulsar://localhost:6650";
485+
BinaryProtoLookupServiceRedirectTestHelper lookupService(url, pool_, conf);
490486

491487
const auto topicNamePtr = TopicName::get("topic");
492488
for (auto idx = 0; idx < redirect_limit + 5; ++idx) {
493-
auto future =
494-
lookupService.findBroker(serviceNameResolver.resolveHost(), false, topicNamePtr->toString(), idx);
489+
auto future = lookupService.findBroker(lookupService.getServiceNameResolver().resolveHost(), false,
490+
topicNamePtr->toString(), idx);
495491
LookupService::LookupResult lookupResult;
496492
auto result = future.get(lookupResult);
497493

‎tests/MockClientImpl.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ class MockClientImpl : public ClientImpl {
3939
MockClientImpl(const std::string& serviceUrl, ClientConfiguration conf = {})
4040
: ClientImpl(serviceUrl, conf) {}
4141

42-
MOCK_METHOD((Future<Result, ClientConnectionPtr>), getConnection, (const std::string&, size_t),
43-
(override));
42+
MOCK_METHOD((Future<Result, ClientConnectionPtr>), getConnection,
43+
(const std::string&, const std::string&, size_t), (override));
4444

4545
SyncOpResult createProducer(const std::string& topic) {
4646
using namespace std::chrono;
@@ -65,7 +65,7 @@ class MockClientImpl : public ClientImpl {
6565
}
6666

6767
GetConnectionFuture getConnectionReal(const std::string& topic, size_t key) {
68-
return ClientImpl::getConnection(topic, key);
68+
return ClientImpl::getConnection("", topic, key);
6969
}
7070

7171
Result close() {

‎tests/PulsarFriend.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "lib/ClientImpl.h"
2727
#include "lib/ConsumerConfigurationImpl.h"
2828
#include "lib/ConsumerImpl.h"
29+
#include "lib/LookupService.h"
2930
#include "lib/MessageImpl.h"
3031
#include "lib/MultiTopicsConsumerImpl.h"
3132
#include "lib/NamespaceName.h"
@@ -38,6 +39,7 @@
3839
using std::string;
3940

4041
namespace pulsar {
42+
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
4143
class PulsarFriend {
4244
public:
4345
static ProducerStatsImplPtr getProducerStatsPtr(Producer producer) {
@@ -166,6 +168,14 @@ class PulsarFriend {
166168

167169
static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; }
168170

171+
static std::string getConnectionPhysicalAddress(HandlerBase& handler) {
172+
auto cnx = handler.connection_.lock();
173+
if (cnx) {
174+
return cnx->physicalAddress_;
175+
}
176+
return "";
177+
}
178+
169179
static void setClientConnection(HandlerBase& handler, ClientConnectionWeakPtr conn) {
170180
handler.connection_ = conn;
171181
}
@@ -177,7 +187,7 @@ class PulsarFriend {
177187
static void setServiceUrlIndex(ServiceNameResolver& resolver, size_t index) { resolver.index_ = index; }
178188

179189
static void setServiceUrlIndex(const Client& client, size_t index) {
180-
setServiceUrlIndex(client.impl_->serviceNameResolver_, index);
190+
setServiceUrlIndex(client.impl_->lookupServicePtr_->getServiceNameResolver(), index);
181191
}
182192

183193
static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; }

‎tests/blue-green/docker-compose.yml

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
version: '3'
2+
networks:
3+
green-pulsar:
4+
driver: bridge
5+
services:
6+
# Start ZooKeeper
7+
zookeeper:
8+
image: apachepulsar/pulsar:latest
9+
container_name: green-zookeeper
10+
restart: on-failure
11+
networks:
12+
- green-pulsar
13+
environment:
14+
- metadataStoreUrl=zk:zookeeper:2181
15+
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
16+
command: >
17+
bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
18+
bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
19+
exec bin/pulsar zookeeper"
20+
healthcheck:
21+
test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
22+
interval: 10s
23+
timeout: 5s
24+
retries: 30
25+
26+
# Initialize cluster metadata
27+
pulsar-init:
28+
container_name: green-pulsar-init
29+
hostname: pulsar-init
30+
image: apachepulsar/pulsar:latest
31+
networks:
32+
- green-pulsar
33+
environment:
34+
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
35+
command: >
36+
bin/pulsar initialize-cluster-metadata \
37+
--cluster cluster-a \
38+
--zookeeper zookeeper:2181 \
39+
--configuration-store zookeeper:2181 \
40+
--web-service-url http://broker-1:8080 \
41+
--broker-service-url pulsar://broker-1:6650
42+
depends_on:
43+
zookeeper:
44+
condition: service_healthy
45+
46+
# Start bookie
47+
bookie:
48+
image: apachepulsar/pulsar:latest
49+
container_name: green-bookie
50+
restart: on-failure
51+
networks:
52+
- green-pulsar
53+
environment:
54+
- clusterName=cluster-a
55+
- zkServers=zookeeper:2181
56+
- metadataServiceUri=metadata-store:zk:zookeeper:2181
57+
- advertisedAddress=bookie
58+
- BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
59+
depends_on:
60+
zookeeper:
61+
condition: service_healthy
62+
pulsar-init:
63+
condition: service_completed_successfully
64+
command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"
65+
66+
proxy:
67+
image: apachepulsar/pulsar:latest
68+
container_name: green-proxy
69+
hostname: proxy
70+
restart: on-failure
71+
networks:
72+
- green-pulsar
73+
environment:
74+
- metadataStoreUrl=zk:zookeeper:2181
75+
- zookeeperServers=zookeeper:2181
76+
- clusterName=cluster-a
77+
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
78+
ports:
79+
- "8081:8080"
80+
- "6651:6650"
81+
depends_on:
82+
broker-1:
83+
condition: service_started
84+
broker-2:
85+
condition: service_started
86+
command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy"
87+
88+
# Start broker 1
89+
broker-1:
90+
image: apachepulsar/pulsar:latest
91+
container_name: green-broker-1
92+
hostname: broker-1
93+
restart: on-failure
94+
networks:
95+
- green-pulsar
96+
environment:
97+
- metadataStoreUrl=zk:zookeeper:2181
98+
- zookeeperServers=zookeeper:2181
99+
- clusterName=cluster-a
100+
- managedLedgerDefaultEnsembleSize=1
101+
- managedLedgerDefaultWriteQuorum=1
102+
- managedLedgerDefaultAckQuorum=1
103+
- advertisedAddress=green-broker-1
104+
- internalListenerName=internal
105+
- advertisedListeners=internal:pulsar://green-broker-1:6650
106+
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
107+
# Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode.
108+
- loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
109+
- loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
110+
- loadBalancerSheddingEnabled=false
111+
- loadBalancerDebugModeEnabled=true
112+
- brokerServiceCompactionThresholdInBytes=1000000
113+
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
114+
depends_on:
115+
zookeeper:
116+
condition: service_healthy
117+
bookie:
118+
condition: service_started
119+
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
120+
121+
# Start broker 2
122+
broker-2:
123+
image: apachepulsar/pulsar:latest
124+
container_name: green-broker-2
125+
hostname: broker-2
126+
restart: on-failure
127+
networks:
128+
- green-pulsar
129+
environment:
130+
- metadataStoreUrl=zk:zookeeper:2181
131+
- zookeeperServers=zookeeper:2181
132+
- clusterName=cluster-a
133+
- managedLedgerDefaultEnsembleSize=1
134+
- managedLedgerDefaultWriteQuorum=1
135+
- managedLedgerDefaultAckQuorum=1
136+
- advertisedAddress=green-broker-2
137+
- internalListenerName=internal
138+
- advertisedListeners=internal:pulsar://green-broker-2:6650
139+
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
140+
# Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode.
141+
- loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
142+
- loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
143+
- loadBalancerSheddingEnabled=false
144+
- loadBalancerDebugModeEnabled=true
145+
- brokerServiceCompactionThresholdInBytes=1000000
146+
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
147+
depends_on:
148+
zookeeper:
149+
condition: service_healthy
150+
bookie:
151+
condition: service_started
152+
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"

‎tests/extensibleLM/ExtensibleLoadManagerTest.cc

+112-53
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <gtest/gtest.h>
2121

2222
#include <thread>
23+
#include <unordered_set>
2324

2425
#include "include/pulsar/Client.h"
2526
#include "lib/LogUtils.h"
@@ -35,16 +36,36 @@ bool checkTime() {
3536
const static auto start = std::chrono::high_resolution_clock::now();
3637
auto end = std::chrono::high_resolution_clock::now();
3738
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
38-
return duration < 180 * 1000;
39+
return duration < 300 * 1000;
3940
}
4041

4142
TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
42-
const static std::string adminUrl = "http://localhost:8080/";
43-
const static std::string topicName =
44-
"persistent://public/unload-test/topic-1" + std::to_string(time(NULL));
43+
const static std::string blueAdminUrl = "http://localhost:8080/";
44+
const static std::string greenAdminUrl = "http://localhost:8081/";
45+
const static std::string topicNameSuffix = std::to_string(time(NULL));
46+
const static std::string topicName = "persistent://public/unload-test/topic-" + topicNameSuffix;
4547

4648
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
47-
std::string url = adminUrl + "admin/v2/namespaces/public/unload-test?bundles=1";
49+
std::string url = blueAdminUrl + "admin/v2/clusters/cluster-a/migrate?migrated=false";
50+
int res = makePostRequest(url, R"(
51+
{
52+
"serviceUrl": "http://localhost:8081",
53+
"serviceUrlTls":"https://localhost:8085",
54+
"brokerServiceUrl": "pulsar://localhost:6651",
55+
"brokerServiceUrlTls": "pulsar+ssl://localhost:6655"
56+
})");
57+
LOG_INFO("res:" << res);
58+
return res == 200;
59+
}));
60+
61+
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
62+
std::string url = blueAdminUrl + "admin/v2/namespaces/public/unload-test?bundles=1";
63+
int res = makePutRequest(url, "");
64+
return res == 204 || res == 409;
65+
}));
66+
67+
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
68+
std::string url = greenAdminUrl + "admin/v2/namespaces/public/unload-test?bundles=1";
4869
int res = makePutRequest(url, "");
4970
return res == 204 || res == 409;
5071
}));
@@ -58,24 +79,25 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
5879
Result consumerResult = client.subscribe(topicName, "sub", consumer);
5980
ASSERT_EQ(consumerResult, ResultOk);
6081

61-
Semaphore firstUnloadSemaphore(0);
62-
Semaphore secondUnloadSemaphore(0);
63-
Semaphore halfPubWaitSemaphore(0);
64-
const int msgCount = 10;
65-
int produced = 0;
82+
Semaphore unloadSemaphore(0);
83+
Semaphore pubWaitSemaphore(0);
84+
Semaphore migrationSemaphore(0);
85+
86+
const int msgCount = 20;
87+
SynchronizedHashMap<int, int> producedMsgs;
6688
auto produce = [&]() {
6789
int i = 0;
6890
while (i < msgCount && checkTime()) {
69-
if (i == 3) {
70-
firstUnloadSemaphore.acquire();
91+
if (i == 3 || i == 8 || i == 17) {
92+
unloadSemaphore.acquire();
7193
}
7294

73-
if (i == 5) {
74-
halfPubWaitSemaphore.release();
95+
if (i == 5 || i == 15) {
96+
pubWaitSemaphore.release();
7597
}
7698

77-
if (i == 8) {
78-
secondUnloadSemaphore.acquire();
99+
if (i == 12) {
100+
migrationSemaphore.acquire();
79101
}
80102

81103
std::string content = std::to_string(i);
@@ -86,45 +108,42 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
86108
return sendResult == ResultOk;
87109
}));
88110

89-
LOG_INFO("produced index:" << i);
90-
produced++;
111+
LOG_INFO("produced i:" << i);
112+
producedMsgs.emplace(i, i);
91113
i++;
92114
}
93115
LOG_INFO("producer finished");
94116
};
95-
96-
int consumed = 0;
117+
std::atomic<bool> stopConsumer(false);
118+
SynchronizedHashMap<int, int> consumedMsgs;
97119
auto consume = [&]() {
98120
Message receivedMsg;
99-
int i = 0;
100-
while (i < msgCount && checkTime()) {
101-
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
102-
Result receiveResult =
103-
consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message
104-
return receiveResult == ResultOk;
105-
}));
106-
LOG_INFO("received index:" << i);
107-
108-
int id = std::stoi(receivedMsg.getDataAsString());
109-
if (id < i) {
121+
while (checkTime()) {
122+
if (stopConsumer && producedMsgs.size() == msgCount && consumedMsgs.size() == msgCount) {
123+
break;
124+
}
125+
Result receiveResult =
126+
consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message
127+
if (receiveResult != ResultOk) {
110128
continue;
111129
}
130+
int i = std::stoi(receivedMsg.getDataAsString());
131+
LOG_INFO("received i:" << i);
112132
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
113133
Result ackResult = consumer.acknowledge(receivedMsg);
114134
return ackResult == ResultOk;
115135
}));
116-
LOG_INFO("acked index:" << i);
117-
118-
consumed++;
119-
i++;
136+
LOG_INFO("acked i:" << i);
137+
consumedMsgs.emplace(i, i);
120138
}
121139
LOG_INFO("consumer finished");
122140
};
123141

124142
std::thread produceThread(produce);
125143
std::thread consumeThread(consume);
126144

127-
auto unload = [&] {
145+
auto unload = [&](bool migrated) {
146+
const std::string &adminUrl = migrated ? greenAdminUrl : blueAdminUrl;
128147
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
129148
auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
130149
auto &producerImpl = PulsarFriend::getProducerImpl(producer);
@@ -135,15 +154,17 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
135154
ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
136155
[&] { return consumerImpl.isConnected() && producerImpl.isConnected(); }));
137156

138-
std::string url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic-1";
157+
std::string url =
158+
adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic" + topicNameSuffix;
139159
std::string responseDataBeforeUnload;
140160
int res = makeGetRequest(url, responseDataBeforeUnload);
141161
if (res != 200) {
142162
continue;
143163
}
144-
destinationBroker = responseDataBeforeUnload.find("broker-2") == std::string::npos
145-
? "broker-2:8080"
146-
: "broker-1:8080";
164+
std::string prefix = migrated ? "green-" : "";
165+
destinationBroker =
166+
prefix + (responseDataBeforeUnload.find("broker-2") == std::string::npos ? "broker-2:8080"
167+
: "broker-1:8080");
147168
lookupCountBeforeUnload = clientImplPtr->getLookupCount();
148169
ASSERT_TRUE(lookupCountBeforeUnload > 0);
149170

@@ -163,31 +184,69 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
163184
[&] { return consumerImpl.isConnected() && producerImpl.isConnected(); }));
164185
std::string responseDataAfterUnload;
165186
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
166-
url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic-1";
187+
url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic" + topicNameSuffix;
167188
res = makeGetRequest(url, responseDataAfterUnload);
168189
return res == 200 && responseDataAfterUnload.find(destinationBroker) != std::string::npos;
169190
}));
170191
LOG_INFO("after lookup responseData:" << responseDataAfterUnload << ",res:" << res);
171192

172193
auto lookupCountAfterUnload = clientImplPtr->getLookupCount();
173-
ASSERT_EQ(lookupCountBeforeUnload, lookupCountAfterUnload);
194+
if (lookupCountBeforeUnload != lookupCountAfterUnload) {
195+
continue;
196+
}
174197
break;
175198
}
176199
};
177-
LOG_INFO("starting first unload");
178-
unload();
179-
firstUnloadSemaphore.release();
180-
halfPubWaitSemaphore.acquire();
181-
LOG_INFO("starting second unload");
182-
unload();
183-
secondUnloadSemaphore.release();
200+
LOG_INFO("#### starting first unload ####");
201+
unload(false);
202+
unloadSemaphore.release();
203+
pubWaitSemaphore.acquire();
204+
LOG_INFO("#### starting second unload ####");
205+
unload(false);
206+
unloadSemaphore.release();
184207

185-
produceThread.join();
208+
LOG_INFO("#### migrating the cluster ####");
209+
migrationSemaphore.release();
210+
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
211+
std::string url = blueAdminUrl + "admin/v2/clusters/cluster-a/migrate?migrated=true";
212+
int res = makePostRequest(url, R"({
213+
"serviceUrl": "http://localhost:8081",
214+
"serviceUrlTls":"https://localhost:8085",
215+
"brokerServiceUrl": "pulsar://localhost:6651",
216+
"brokerServiceUrlTls": "pulsar+ssl://localhost:6655"
217+
})");
218+
LOG_INFO("res:" << res);
219+
return res == 200;
220+
}));
221+
ASSERT_TRUE(waitUntil(std::chrono::seconds(130), [&] {
222+
auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
223+
auto &producerImpl = PulsarFriend::getProducerImpl(producer);
224+
auto consumerConnAddress = PulsarFriend::getConnectionPhysicalAddress(consumerImpl);
225+
auto producerConnAddress = PulsarFriend::getConnectionPhysicalAddress(producerImpl);
226+
return consumerImpl.isConnected() && producerImpl.isConnected() &&
227+
consumerConnAddress.find("6651") != std::string::npos &&
228+
producerConnAddress.find("6651") != std::string::npos;
229+
}));
230+
pubWaitSemaphore.acquire();
231+
LOG_INFO("#### starting third unload after migration ####");
232+
unload(true);
233+
unloadSemaphore.release();
234+
235+
stopConsumer = true;
186236
consumeThread.join();
187-
ASSERT_EQ(consumed, msgCount);
188-
ASSERT_EQ(produced, msgCount);
189-
ASSERT_TRUE(checkTime()) << "timed out";
237+
produceThread.join();
238+
ASSERT_EQ(producedMsgs.size(), msgCount);
239+
ASSERT_EQ(consumedMsgs.size(), msgCount);
240+
for (int i = 0; i < msgCount; i++) {
241+
producedMsgs.remove(i);
242+
consumedMsgs.remove(i);
243+
}
244+
ASSERT_EQ(producedMsgs.size(), 0);
245+
ASSERT_EQ(consumedMsgs.size(), 0);
246+
190247
client.close();
248+
249+
ASSERT_TRUE(checkTime()) << "timed out";
191250
}
192251

193252
int main(int argc, char *argv[]) {

‎tests/extensibleLM/docker-compose.yml

+4
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ services:
109109
- loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
110110
- loadBalancerSheddingEnabled=false
111111
- loadBalancerDebugModeEnabled=true
112+
- clusterMigrationCheckDurationSeconds=1
113+
- brokerServiceCompactionThresholdInBytes=1000000
112114
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
113115
depends_on:
114116
zookeeper:
@@ -141,6 +143,8 @@ services:
141143
- loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
142144
- loadBalancerSheddingEnabled=false
143145
- loadBalancerDebugModeEnabled=true
146+
- clusterMigrationCheckDurationSeconds=1
147+
- brokerServiceCompactionThresholdInBytes=1000000
144148
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
145149
depends_on:
146150
zookeeper:

0 commit comments

Comments
 (0)
Please sign in to comment.