diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index 2d9ffc42..489d8a27 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -22,7 +22,6 @@
 #include "ConnectionPool.h"
 #include "LogUtils.h"
 #include "NamespaceName.h"
-#include "ServiceNameResolver.h"
 #include "TopicName.h"
 
 DECLARE_LOG_OBJECT()
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index a3c059e4..6132825d 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -38,9 +38,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
 
 class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
    public:
-    BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
+    BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool,
                              const ClientConfiguration& clientConfiguration)
-        : serviceNameResolver_(serviceNameResolver),
+        : serviceNameResolver_(serviceUrl),
           cnxPool_(pool),
           listenerName_(clientConfiguration.getListenerName()),
           maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
@@ -54,6 +54,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
 
     Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
 
+    ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
+
    protected:
     // Mark findBroker as protected to make it accessible from test.
     LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
@@ -63,7 +65,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
     std::mutex mutex_;
     uint64_t requestIdGenerator_ = 0;
 
-    ServiceNameResolver& serviceNameResolver_;
+    ServiceNameResolver serviceNameResolver_;
     ConnectionPool& cnxPool_;
     std::string listenerName_;
     const int32_t maxLookupRedirects_;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 00041b2a..0beb739c 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -403,7 +403,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
             LOG_INFO(cnxString_ << "Connected to broker");
         } else {
             LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
-                                << ", proxy: " << proxyServiceUrl_);
+                                << ", proxy: " << proxyServiceUrl_
+                                << ", physical address:" << physicalAddress_);
         }
 
         Lock lock(mutex_);
@@ -945,6 +946,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
                     handleError(incomingCmd.error());
                     break;
 
+                case BaseCommand::TOPIC_MIGRATED:
+                    handleTopicMigrated(incomingCmd.topicmigrated());
+                    break;
+
                 case BaseCommand::CLOSE_PRODUCER:
                     handleCloseProducer(incomingCmd.close_producer());
                     break;
@@ -1761,6 +1766,56 @@ void ClientConnection::handleError(const proto::CommandError& error) {
     }
 }
 
+std::string ClientConnection::getMigratedBrokerServiceUrl(
+    const proto::CommandTopicMigrated& commandTopicMigrated) {
+    if (tlsSocket_) {
+        if (commandTopicMigrated.has_brokerserviceurltls()) {
+            return commandTopicMigrated.brokerserviceurltls();
+        }
+    } else if (commandTopicMigrated.has_brokerserviceurl()) {
+        return commandTopicMigrated.brokerserviceurl();
+    }
+    return "";
+}
+
+void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) {
+    const long resourceId = commandTopicMigrated.resource_id();
+    const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated);
+
+    if (migratedBrokerServiceUrl.empty()) {
+        LOG_WARN("Failed to find the migrated broker url for resource:"
+                 << resourceId
+                 << (commandTopicMigrated.has_brokerserviceurl()
+                         ? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl()
+                         : "")
+                 << (commandTopicMigrated.has_brokerserviceurltls()
+                         ? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls()
+                         : ""));
+        return;
+    }
+
+    Lock lock(mutex_);
+    if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) {
+        auto it = producers_.find(resourceId);
+        if (it != producers_.end()) {
+            auto producer = it->second.lock();
+            producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
+            LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
+        } else {
+            LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId);
+        }
+    } else {
+        auto it = consumers_.find(resourceId);
+        if (it != consumers_.end()) {
+            auto consumer = it->second.lock();
+            consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
+            LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
+        } else {
+            LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId);
+        }
+    }
+}
+
 boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
     const proto::CommandCloseProducer& closeProducer) {
     if (tlsSocket_) {
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index b16fc694..3c83b4db 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -85,6 +85,7 @@ class BrokerEntryMetadata;
 class CommandActiveConsumerChange;
 class CommandAckResponse;
 class CommandMessage;
+class CommandTopicMigrated;
 class CommandCloseConsumer;
 class CommandCloseProducer;
 class CommandConnected;
@@ -414,6 +415,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     void handleLookupTopicRespose(const proto::CommandLookupTopicResponse&);
     void handleProducerSuccess(const proto::CommandProducerSuccess&);
     void handleError(const proto::CommandError&);
+    void handleTopicMigrated(const proto::CommandTopicMigrated&);
     void handleCloseProducer(const proto::CommandCloseProducer&);
     void handleCloseConsumer(const proto::CommandCloseConsumer&);
     void handleAuthChallenge();
@@ -421,10 +423,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
     void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
     void handleAckResponse(const proto::CommandAckResponse&);
-    boost::optional<std::string> getAssignedBrokerServiceUrl(
-        const proto::CommandCloseProducer& closeProducer);
-    boost::optional<std::string> getAssignedBrokerServiceUrl(
-        const proto::CommandCloseConsumer& closeConsumer);
+    boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
+    boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
+    std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
 };
 }  // namespace pulsar
 
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index ae339731..3d19c426 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -79,8 +79,8 @@ typedef std::vector<std::string> StringList;
 ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
     : mutex_(),
       state_(Open),
-      serviceNameResolver_(serviceUrl),
-      clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())),
+      clientConfiguration_(ClientConfiguration(clientConfiguration)
+                               .setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))),
       memoryLimitController_(clientConfiguration.getMemoryLimit()),
       ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
       listenerExecutorProvider_(
@@ -98,25 +98,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
     if (loggerFactory) {
         LogUtils::setLoggerFactory(std::move(loggerFactory));
     }
+    lookupServicePtr_ = createLookup(serviceUrl);
+}
+
+ClientImpl::~ClientImpl() { shutdown(); }
 
+LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
     LookupServicePtr underlyingLookupServicePtr;
-    if (serviceNameResolver_.useHttp()) {
+    if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
         LOG_DEBUG("Using HTTP Lookup");
         underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
-            std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
-            std::cref(clientConfiguration_.getAuthPtr()));
+            serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
     } else {
         LOG_DEBUG("Using Binary Lookup");
         underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
-            std::ref(serviceNameResolver_), std::ref(pool_), std::cref(clientConfiguration_));
+            serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
     }
 
-    lookupServicePtr_ = RetryableLookupService::create(
+    auto lookupServicePtr = RetryableLookupService::create(
         underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
+    return lookupServicePtr;
 }
 
-ClientImpl::~ClientImpl() { shutdown(); }
-
 const ClientConfiguration& ClientImpl::conf() const { return clientConfiguration_; }
 
 MemoryLimitController& ClientImpl::getMemoryLimitController() { return memoryLimitController_; }
@@ -129,7 +132,21 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
     return partitionListenerExecutorProvider_;
 }
 
-LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
+LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) {
+    if (redirectedClusterURI.empty()) {
+        return lookupServicePtr_;
+    }
+
+    Lock lock(mutex_);
+    auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
+    if (it == redirectedClusterLookupServicePtrs_.end()) {
+        auto lookup = createLookup(redirectedClusterURI);
+        redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
+        return lookup;
+    }
+
+    return it->second;
+}
 
 void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
                                      CreateProducerCallback callback, bool autoDownloadSchema) {
@@ -517,7 +534,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
     }
 }
 
-GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
+GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClusterURI,
+                                              const std::string& topic, size_t key) {
     Promise<Result, ClientConnectionPtr> promise;
 
     const auto topicNamePtr = TopicName::get(topic);
@@ -528,7 +546,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
     }
 
     auto self = shared_from_this();
-    lookupServicePtr_->getBroker(*topicNamePtr)
+    getLookup(redirectedClusterURI)
+        ->getBroker(*topicNamePtr)
         .addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
             if (result != ResultOk) {
                 promise.setFailed(result);
@@ -554,16 +573,18 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
     return promise.getFuture();
 }
 
-const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddress) {
+const std::string& ClientImpl::getPhysicalAddress(const std::string& redirectedClusterURI,
+                                                  const std::string& logicalAddress) {
     if (useProxy_) {
-        return serviceNameResolver_.resolveHost();
+        return getLookup(redirectedClusterURI)->getServiceNameResolver().resolveHost();
     } else {
         return logicalAddress;
     }
 }
 
-GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
-    const auto& physicalAddress = getPhysicalAddress(logicalAddress);
+GetConnectionFuture ClientImpl::connect(const std::string& redirectedClusterURI,
+                                        const std::string& logicalAddress, size_t key) {
+    const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI, logicalAddress);
     Promise<Result, ClientConnectionPtr> promise;
     pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
         .addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
@@ -633,6 +654,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {
 
     memoryLimitController_.close();
     lookupServicePtr_->close();
+    for (const auto& it : redirectedClusterLookupServicePtrs_) {
+        it.second->close();
+    }
 
     auto producers = producers_.move();
     auto consumers = consumers_.move();
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 7126542b..27cde3a1 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -97,9 +97,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
 
     // Use virtual method to test
-    virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
+    virtual GetConnectionFuture getConnection(const std::string& redirectedClusterURI,
+                                              const std::string& topic, size_t key);
 
-    GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
+    GetConnectionFuture connect(const std::string& redirectedClusterURI, const std::string& logicalAddress,
+                                size_t key);
 
     void closeAsync(CloseCallback callback);
     void shutdown();
@@ -119,7 +121,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     ExecutorServiceProviderPtr getIOExecutorProvider();
     ExecutorServiceProviderPtr getListenerExecutorProvider();
     ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
-    LookupServicePtr getLookup();
+    LookupServicePtr getLookup(const std::string& redirectedClusterURI = "");
 
     void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
 
@@ -165,7 +167,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
                                           const std::string& consumerName, const ConsumerConfiguration& conf,
                                           SubscribeCallback callback);
 
-    const std::string& getPhysicalAddress(const std::string& logicalAddress);
+    const std::string& getPhysicalAddress(const std::string& redirectedClusterURI,
+                                          const std::string& logicalAddress);
+
+    LookupServicePtr createLookup(const std::string& serviceUrl);
 
     static std::string getClientVersion(const ClientConfiguration& clientConfiguration);
 
@@ -179,7 +184,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     std::mutex mutex_;
 
     State state_;
-    ServiceNameResolver serviceNameResolver_;
     ClientConfiguration clientConfiguration_;
     MemoryLimitController memoryLimitController_;
 
@@ -188,6 +192,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     ExecutorServiceProviderPtr partitionListenerExecutorProvider_;
 
     LookupServicePtr lookupServicePtr_;
+    std::unordered_map<std::string, LookupServicePtr> redirectedClusterLookupServicePtrs_;
     ConnectionPool pool_;
 
     uint64_t producerIdGenerator_;
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 0959af2a..93b9db44 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -46,11 +46,11 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
 const static std::string PARTITION_METHOD_NAME = "partitions";
 const static int NUMBER_OF_LOOKUP_THREADS = 1;
 
-HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver,
+HTTPLookupService::HTTPLookupService(const std::string &serviceUrl,
                                      const ClientConfiguration &clientConfiguration,
                                      const AuthenticationPtr &authData)
     : executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
-      serviceNameResolver_(serviceNameResolver),
+      serviceNameResolver_(serviceUrl),
       authenticationPtr_(authData),
       lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
       maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()),
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index cf0b0ad1..17dd110e 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -40,7 +40,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
     typedef Promise<Result, LookupDataResultPtr> LookupPromise;
 
     ExecutorServiceProviderPtr executorProvider_;
-    ServiceNameResolver& serviceNameResolver_;
+    ServiceNameResolver serviceNameResolver_;
     AuthenticationPtr authenticationPtr_;
     int lookupTimeoutInSeconds_;
     const int maxLookupRedirects_;
@@ -64,7 +64,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
     Result sendHTTPRequest(std::string completeUrl, std::string& responseData, long& responseCode);
 
    public:
-    HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&);
+    HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);
 
     LookupResultFuture getBroker(const TopicName& topicName) override;
 
@@ -74,6 +74,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
 
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
         const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;
+
+    ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
 };
 }  // namespace pulsar
 
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 52e20d2d..46b918f6 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -43,7 +43,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
       epoch_(0),
       timer_(executor_->createDeadlineTimer()),
       creationTimer_(executor_->createDeadlineTimer()),
-      reconnectionPending_(false) {}
+      reconnectionPending_(false),
+      redirectedClusterURI_("") {}
 
 HandlerBase::~HandlerBase() {
     ASIO_ERROR ignored;
@@ -88,9 +89,9 @@ void HandlerBase::grabCnx() { grabCnx(boost::none); }
 Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
     const ClientImplPtr& client, const boost::optional<std::string>& assignedBrokerUrl) {
     if (assignedBrokerUrl && client->getLookupCount() > 0) {
-        return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_);
+        return client->connect(getRedirectedClusterURI(), assignedBrokerUrl.get(), connectionKeySuffix_);
     } else {
-        return client->getConnection(topic(), connectionKeySuffix_);
+        return client->getConnection(getRedirectedClusterURI(), topic(), connectionKeySuffix_);
     }
 }
 
@@ -209,4 +210,13 @@ Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimest
     }
 }
 
+void HandlerBase::setRedirectedClusterURI(const std::string& serviceUrl) {
+    Lock lock(mutex_);
+    redirectedClusterURI_ = serviceUrl;
+}
+const std::string& HandlerBase::getRedirectedClusterURI() {
+    Lock lock(mutex_);
+    return redirectedClusterURI_;
+}
+
 }  // namespace pulsar
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 32e124a2..415e234c 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -52,6 +52,8 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
     ClientConnectionWeakPtr getCnx() const;
     void setCnx(const ClientConnectionPtr& cnx);
     void resetCnx() { setCnx(nullptr); }
+    void setRedirectedClusterURI(const std::string& serviceUrl);
+    const std::string& getRedirectedClusterURI();
 
    protected:
     /*
@@ -145,6 +147,8 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
     mutable std::mutex connectionMutex_;
     std::atomic<bool> reconnectionPending_;
     ClientConnectionWeakPtr connection_;
+    std::string redirectedClusterURI_;
+
     friend class ClientConnection;
     friend class PulsarFriend;
 };
diff --git a/lib/LookupService.h b/lib/LookupService.h
index b50d1f82..684984fc 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -29,6 +29,7 @@
 #include "Future.h"
 #include "LookupDataResult.h"
 #include "ProtoApiEnums.h"
+#include "ServiceNameResolver.h"
 
 namespace pulsar {
 using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
@@ -86,6 +87,8 @@ class LookupService {
     virtual Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName,
                                                  const std::string& version = "") = 0;
 
+    virtual ServiceNameResolver& getServiceNameResolver() = 0;
+
     virtual ~LookupService() {}
 
     virtual void close() {}
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 890d476c..f84c255e 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -144,7 +144,11 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c
         return promise.getFuture();
     }
 
+    LOG_INFO("Creating producer for topic:" << topic() << ", producerName:" << producerName_ << " on "
+                                            << cnx->cnxString());
     ClientImplPtr client = client_.lock();
+    cnx->registerProducer(producerId_, shared_from_this());
+
     int requestId = client->newRequestId();
 
     SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId,
@@ -214,7 +218,6 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
 
-        cnx->registerProducer(producerId_, shared_from_this());
         producerName_ = responseData.producerName;
         schemaVersion_ = responseData.schemaVersion;
         producerStr_ = "[" + topic() + ", " + producerName_ + "] ";
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 561855f9..8bc40bf3 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -76,6 +76,10 @@ class RetryableLookupService : public LookupService {
         });
     }
 
+    ServiceNameResolver& getServiceNameResolver() override {
+        return lookupService_->getServiceNameResolver();
+    }
+
    private:
     const std::shared_ptr<LookupService> lookupService_;
     RetryableOperationCachePtr<LookupResult> lookupCache_;
diff --git a/lib/ServiceNameResolver.h b/lib/ServiceNameResolver.h
index 8457d0e1..e6b2523b 100644
--- a/lib/ServiceNameResolver.h
+++ b/lib/ServiceNameResolver.h
@@ -36,14 +36,17 @@ class ServiceNameResolver {
     ServiceNameResolver(const ServiceNameResolver&) = delete;
     ServiceNameResolver& operator=(const ServiceNameResolver&) = delete;
 
-    bool useTls() const noexcept {
-        return serviceUri_.getScheme() == PulsarScheme::PULSAR_SSL ||
-               serviceUri_.getScheme() == PulsarScheme::HTTPS;
+    bool useTls() const noexcept { return useTls(serviceUri_); }
+
+    static bool useTls(const ServiceURI& serviceUri) noexcept {
+        return serviceUri.getScheme() == PulsarScheme::PULSAR_SSL ||
+               serviceUri.getScheme() == PulsarScheme::HTTPS;
     }
 
-    bool useHttp() const noexcept {
-        return serviceUri_.getScheme() == PulsarScheme::HTTP ||
-               serviceUri_.getScheme() == PulsarScheme::HTTPS;
+    bool useHttp() const noexcept { return useTls(serviceUri_); }
+
+    static bool useHttp(const ServiceURI& serviceUri) noexcept {
+        return serviceUri.getScheme() == PulsarScheme::HTTP || serviceUri.getScheme() == PulsarScheme::HTTPS;
     }
 
     const std::string& resolveHost() {
diff --git a/proto/PulsarApi.proto b/proto/PulsarApi.proto
index a2548f3a..4e207913 100644
--- a/proto/PulsarApi.proto
+++ b/proto/PulsarApi.proto
@@ -262,6 +262,7 @@ enum ProtocolVersion {
     v17 = 17; // Added support ack receipt
     v18 = 18; // Add client support for broker entry metadata
     v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse
+    v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated
 }
 
 message CommandConnect {
@@ -620,6 +621,18 @@ message CommandReachedEndOfTopic {
     required uint64 consumer_id = 1;
 }
 
+message CommandTopicMigrated {
+    enum ResourceType {
+        Producer = 0;
+        Consumer = 1;
+    }
+    required uint64 resource_id = 1;
+    required ResourceType resource_type = 2;
+    optional string brokerServiceUrl      = 3;
+    optional string brokerServiceUrlTls   = 4;
+
+}
+
 message CommandCloseProducer {
     required uint64 producer_id = 1;
     required uint64 request_id = 2;
@@ -1029,6 +1042,7 @@ message BaseCommand {
         WATCH_TOPIC_UPDATE = 66;
         WATCH_TOPIC_LIST_CLOSE = 67;
 
+        TOPIC_MIGRATED = 68;
     }
 
 
@@ -1110,4 +1124,6 @@ message BaseCommand {
     optional CommandWatchTopicListSuccess watchTopicListSuccess = 65;
     optional CommandWatchTopicUpdate watchTopicUpdate = 66;
     optional CommandWatchTopicListClose watchTopicListClose = 67;
+
+    optional CommandTopicMigrated topicMigrated = 68;
 }
diff --git a/run-unit-tests.sh b/run-unit-tests.sh
index 226789c5..698ca62c 100755
--- a/run-unit-tests.sh
+++ b/run-unit-tests.sh
@@ -33,9 +33,12 @@ export https_proxy=
 
 # Run ExtensibleLoadManager tests
 docker compose -f tests/extensibleLM/docker-compose.yml up -d
+docker compose -f tests/blue-green/docker-compose.yml up -d
 until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
+until curl http://localhost:8081/metrics > /dev/null 2>&1 ; do sleep 1; done
 sleep 5
 $CMAKE_BUILD_DIRECTORY/tests/ExtensibleLoadManagerTest
+docker compose -f tests/blue-green/docker-compose.yml down
 docker compose -f tests/extensibleLM/docker-compose.yml down
 
 # Run OAuth2 tests
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index 1e8e67d5..b5142860 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -443,7 +443,8 @@ TEST(ClientTest, testRetryUntilSucceed) {
     EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2);
     std::atomic_int count{0};
     ON_CALL(*clientImpl, getConnection)
-        .WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) {
+        .WillByDefault([&clientImpl, &count](const std::string &redirectedClusterURI,
+                                             const std::string &topic, size_t index) {
             if (count++ < kFailCount) {
                 return GetConnectionFuture::failed(ResultRetryable);
             }
@@ -461,9 +462,10 @@ TEST(ClientTest, testRetryTimeout) {
     auto clientImpl =
         std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2));
     EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2));
-    ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) {
-        return GetConnectionFuture::failed(ResultRetryable);
-    });
+    ON_CALL(*clientImpl, getConnection)
+        .WillByDefault([](const std::string &redirectedClusterURI, const std::string &topic, size_t index) {
+            return GetConnectionFuture::failed(ResultRetryable);
+        });
 
     auto topic = "client-test-retry-timeout";
     {
@@ -484,9 +486,10 @@ TEST(ClientTest, testNoRetry) {
     auto clientImpl =
         std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100));
     EXPECT_CALL(*clientImpl, getConnection).Times(2);
-    ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) {
-        return GetConnectionFuture::failed(ResultAuthenticationError);
-    });
+    ON_CALL(*clientImpl, getConnection)
+        .WillByDefault([](const std::string &redirectedClusterURI, const std::string &, size_t) {
+            return GetConnectionFuture::failed(ResultAuthenticationError);
+        });
 
     auto topic = "client-test-no-retry";
     {
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 0fe13851..924acd43 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -84,8 +84,7 @@ TEST(LookupServiceTest, basicLookup) {
     ClientConfiguration conf;
     ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
     ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
-    ServiceNameResolver serviceNameResolver(url);
-    BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf);
+    BinaryProtoLookupService lookupService(url, pool_, conf);
 
     TopicNamePtr topicName = TopicName::get("topic");
 
@@ -148,26 +147,24 @@ static void testMultiAddresses(LookupService& lookupService) {
 
 TEST(LookupServiceTest, testMultiAddresses) {
     ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), AuthFactory::Disabled(), "");
-    ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999");
     ClientConfiguration conf;
-    BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, conf);
+    BinaryProtoLookupService binaryLookupService("pulsar://localhost,localhost:9999", pool, conf);
     testMultiAddresses(binaryLookupService);
 
     // HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test
-    ServiceNameResolver serviceNameResolverForHttp("http://localhost,localhost:9999");
     auto httpLookupServicePtr = std::make_shared<HTTPLookupService>(
-        std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled());
+        "http://localhost,localhost:9999", ClientConfiguration{}, AuthFactory::Disabled());
     testMultiAddresses(*httpLookupServicePtr);
 }
 TEST(LookupServiceTest, testRetry) {
     auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
     ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
-    ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost");
     ClientConfiguration conf;
 
     auto lookupService = RetryableLookupService::create(
-        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, conf), std::chrono::seconds(30),
-        executorProvider);
+        std::make_shared<BinaryProtoLookupService>("pulsar://localhost:9999,localhost", pool, conf),
+        std::chrono::seconds(30), executorProvider);
+    ServiceNameResolver& serviceNameResolver = lookupService->getServiceNameResolver();
 
     PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
     auto topicNamePtr = TopicName::get("lookup-service-test-retry");
@@ -196,12 +193,12 @@ TEST(LookupServiceTest, testRetry) {
 TEST(LookupServiceTest, testTimeout) {
     auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
     ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
-    ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
     ClientConfiguration conf;
 
     constexpr int timeoutInSeconds = 2;
     auto lookupService = RetryableLookupService::create(
-        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, conf),
+        std::make_shared<BinaryProtoLookupService>("pulsar://localhost:9990,localhost:9902,localhost:9904",
+                                                   pool, conf),
         std::chrono::seconds(timeoutInSeconds), executorProvider);
     auto topicNamePtr = TopicName::get("lookup-service-test-retry");
 
@@ -467,9 +464,9 @@ INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLook
 
 class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService {
    public:
-    BinaryProtoLookupServiceRedirectTestHelper(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
+    BinaryProtoLookupServiceRedirectTestHelper(const std::string& serviceUrl, ConnectionPool& pool,
                                                const ClientConfiguration& clientConfiguration)
-        : BinaryProtoLookupService(serviceNameResolver, pool, clientConfiguration) {}
+        : BinaryProtoLookupService(serviceUrl, pool, clientConfiguration) {}
 
     LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
                                   size_t redirectCount) {
@@ -484,14 +481,13 @@ TEST(LookupServiceTest, testRedirectionLimit) {
     conf.setMaxLookupRedirects(redirect_limit);
     ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
     ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
-    std::string url = "pulsar://localhost:6650";
-    ServiceNameResolver serviceNameResolver(url);
-    BinaryProtoLookupServiceRedirectTestHelper lookupService(serviceNameResolver, pool_, conf);
+    string url = "pulsar://localhost:6650";
+    BinaryProtoLookupServiceRedirectTestHelper lookupService(url, pool_, conf);
 
     const auto topicNamePtr = TopicName::get("topic");
     for (auto idx = 0; idx < redirect_limit + 5; ++idx) {
-        auto future =
-            lookupService.findBroker(serviceNameResolver.resolveHost(), false, topicNamePtr->toString(), idx);
+        auto future = lookupService.findBroker(lookupService.getServiceNameResolver().resolveHost(), false,
+                                               topicNamePtr->toString(), idx);
         LookupService::LookupResult lookupResult;
         auto result = future.get(lookupResult);
 
diff --git a/tests/MockClientImpl.h b/tests/MockClientImpl.h
index aa4208a9..074808fc 100644
--- a/tests/MockClientImpl.h
+++ b/tests/MockClientImpl.h
@@ -39,8 +39,8 @@ class MockClientImpl : public ClientImpl {
     MockClientImpl(const std::string& serviceUrl, ClientConfiguration conf = {})
         : ClientImpl(serviceUrl, conf) {}
 
-    MOCK_METHOD((Future<Result, ClientConnectionPtr>), getConnection, (const std::string&, size_t),
-                (override));
+    MOCK_METHOD((Future<Result, ClientConnectionPtr>), getConnection,
+                (const std::string&, const std::string&, size_t), (override));
 
     SyncOpResult createProducer(const std::string& topic) {
         using namespace std::chrono;
@@ -65,7 +65,7 @@ class MockClientImpl : public ClientImpl {
     }
 
     GetConnectionFuture getConnectionReal(const std::string& topic, size_t key) {
-        return ClientImpl::getConnection(topic, key);
+        return ClientImpl::getConnection("", topic, key);
     }
 
     Result close() {
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 73778842..bfa11ef1 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -26,6 +26,7 @@
 #include "lib/ClientImpl.h"
 #include "lib/ConsumerConfigurationImpl.h"
 #include "lib/ConsumerImpl.h"
+#include "lib/LookupService.h"
 #include "lib/MessageImpl.h"
 #include "lib/MultiTopicsConsumerImpl.h"
 #include "lib/NamespaceName.h"
@@ -38,6 +39,7 @@
 using std::string;
 
 namespace pulsar {
+using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
 class PulsarFriend {
    public:
     static ProducerStatsImplPtr getProducerStatsPtr(Producer producer) {
@@ -166,6 +168,14 @@ class PulsarFriend {
 
     static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; }
 
+    static std::string getConnectionPhysicalAddress(HandlerBase& handler) {
+        auto cnx = handler.connection_.lock();
+        if (cnx) {
+            return cnx->physicalAddress_;
+        }
+        return "";
+    }
+
     static void setClientConnection(HandlerBase& handler, ClientConnectionWeakPtr conn) {
         handler.connection_ = conn;
     }
@@ -177,7 +187,7 @@ class PulsarFriend {
     static void setServiceUrlIndex(ServiceNameResolver& resolver, size_t index) { resolver.index_ = index; }
 
     static void setServiceUrlIndex(const Client& client, size_t index) {
-        setServiceUrlIndex(client.impl_->serviceNameResolver_, index);
+        setServiceUrlIndex(client.impl_->lookupServicePtr_->getServiceNameResolver(), index);
     }
 
     static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; }
diff --git a/tests/blue-green/docker-compose.yml b/tests/blue-green/docker-compose.yml
new file mode 100644
index 00000000..b2d22ebf
--- /dev/null
+++ b/tests/blue-green/docker-compose.yml
@@ -0,0 +1,152 @@
+version: '3'
+networks:
+  green-pulsar:
+    driver: bridge
+services:
+  # Start ZooKeeper
+  zookeeper:
+    image: apachepulsar/pulsar:latest
+    container_name: green-zookeeper
+    restart: on-failure
+    networks:
+      - green-pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    command: >
+      bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
+             bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
+             exec bin/pulsar zookeeper"
+    healthcheck:
+      test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
+      interval: 10s
+      timeout: 5s
+      retries: 30
+
+  # Initialize cluster metadata
+  pulsar-init:
+    container_name: green-pulsar-init
+    hostname: pulsar-init
+    image: apachepulsar/pulsar:latest
+    networks:
+      - green-pulsar
+    environment:
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    command: >
+      bin/pulsar initialize-cluster-metadata \
+               --cluster cluster-a \
+               --zookeeper zookeeper:2181 \
+               --configuration-store zookeeper:2181 \
+               --web-service-url http://broker-1:8080 \
+               --broker-service-url pulsar://broker-1:6650
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+
+  # Start bookie
+  bookie:
+    image: apachepulsar/pulsar:latest
+    container_name: green-bookie
+    restart: on-failure
+    networks:
+      - green-pulsar
+    environment:
+      - clusterName=cluster-a
+      - zkServers=zookeeper:2181
+      - metadataServiceUri=metadata-store:zk:zookeeper:2181
+      - advertisedAddress=bookie
+      - BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      pulsar-init:
+        condition: service_completed_successfully
+    command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"
+
+  proxy:
+    image: apachepulsar/pulsar:latest
+    container_name: green-proxy
+    hostname: proxy
+    restart: on-failure
+    networks:
+      - green-pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    ports:
+      - "8081:8080"
+      - "6651:6650"
+    depends_on:
+      broker-1:
+        condition: service_started
+      broker-2:
+        condition: service_started
+    command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy"
+
+  # Start broker 1
+  broker-1:
+    image: apachepulsar/pulsar:latest
+    container_name: green-broker-1
+    hostname: broker-1
+    restart: on-failure
+    networks:
+      - green-pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - managedLedgerDefaultEnsembleSize=1
+      - managedLedgerDefaultWriteQuorum=1
+      - managedLedgerDefaultAckQuorum=1
+      - advertisedAddress=green-broker-1
+      - internalListenerName=internal
+      - advertisedListeners=internal:pulsar://green-broker-1:6650
+      - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
+      # Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode.
+      - loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
+      - loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
+      - loadBalancerSheddingEnabled=false
+      - loadBalancerDebugModeEnabled=true
+      - brokerServiceCompactionThresholdInBytes=1000000
+      - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      bookie:
+        condition: service_started
+    command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
+
+  # Start broker 2
+  broker-2:
+    image: apachepulsar/pulsar:latest
+    container_name: green-broker-2
+    hostname: broker-2
+    restart: on-failure
+    networks:
+      - green-pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - managedLedgerDefaultEnsembleSize=1
+      - managedLedgerDefaultWriteQuorum=1
+      - managedLedgerDefaultAckQuorum=1
+      - advertisedAddress=green-broker-2
+      - internalListenerName=internal
+      - advertisedListeners=internal:pulsar://green-broker-2:6650
+      - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
+      # Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode.
+      - loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
+      - loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
+      - loadBalancerSheddingEnabled=false
+      - loadBalancerDebugModeEnabled=true
+      - brokerServiceCompactionThresholdInBytes=1000000
+      - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      bookie:
+        condition: service_started
+    command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
\ No newline at end of file
diff --git a/tests/extensibleLM/ExtensibleLoadManagerTest.cc b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
index a6379f6e..f4e2c81c 100644
--- a/tests/extensibleLM/ExtensibleLoadManagerTest.cc
+++ b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
@@ -20,6 +20,7 @@
 #include <gtest/gtest.h>
 
 #include <thread>
+#include <unordered_set>
 
 #include "include/pulsar/Client.h"
 #include "lib/LogUtils.h"
@@ -35,21 +36,43 @@ bool checkTime() {
     const static auto start = std::chrono::high_resolution_clock::now();
     auto end = std::chrono::high_resolution_clock::now();
     auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
-    return duration < 180 * 1000;
+    return duration < 300 * 1000;
 }
 
 TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
-    const static std::string adminUrl = "http://localhost:8080/";
-    const static std::string topicName =
-        "persistent://public/unload-test/topic-1" + std::to_string(time(NULL));
+    const static std::string blueAdminUrl = "http://localhost:8080/";
+    const static std::string greenAdminUrl = "http://localhost:8081/";
+    const static std::string topicNameSuffix = std::to_string(time(NULL));
+    const static std::string topicName = "persistent://public/unload-test/topic-" + topicNameSuffix;
 
     ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
-        std::string url = adminUrl + "admin/v2/namespaces/public/unload-test?bundles=1";
+        std::string url = blueAdminUrl + "admin/v2/clusters/cluster-a/migrate?migrated=false";
+        int res = makePostRequest(url, R"(
+                    {
+                       "serviceUrl": "http://localhost:8081",
+                       "serviceUrlTls":"https://localhost:8085",
+                       "brokerServiceUrl": "pulsar://localhost:6651",
+                       "brokerServiceUrlTls": "pulsar+ssl://localhost:6655"
+                    })");
+        LOG_INFO("res:" << res);
+        return res == 200;
+    }));
+
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+        std::string url = blueAdminUrl + "admin/v2/namespaces/public/unload-test?bundles=1";
         int res = makePutRequest(url, "");
         return res == 204 || res == 409;
     }));
 
-    Client client{"pulsar://localhost:6650"};
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+        std::string url = greenAdminUrl + "admin/v2/namespaces/public/unload-test?bundles=1";
+        int res = makePutRequest(url, "");
+        return res == 204 || res == 409;
+    }));
+
+    ClientConfiguration conf;
+    conf.setIOThreads(8);
+    Client client{"pulsar://localhost:6650", conf};
     Producer producer;
     ProducerConfiguration producerConfiguration;
     Result producerResult = client.createProducer(topicName, producerConfiguration, producer);
@@ -58,24 +81,25 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
     Result consumerResult = client.subscribe(topicName, "sub", consumer);
     ASSERT_EQ(consumerResult, ResultOk);
 
-    Semaphore firstUnloadSemaphore(0);
-    Semaphore secondUnloadSemaphore(0);
-    Semaphore halfPubWaitSemaphore(0);
-    const int msgCount = 10;
-    int produced = 0;
+    Semaphore unloadSemaphore(0);
+    Semaphore pubWaitSemaphore(0);
+    Semaphore migrationSemaphore(0);
+
+    const int msgCount = 20;
+    SynchronizedHashMap<int, int> producedMsgs;
     auto produce = [&]() {
         int i = 0;
         while (i < msgCount && checkTime()) {
-            if (i == 3) {
-                firstUnloadSemaphore.acquire();
+            if (i == 3 || i == 8 || i == 17) {
+                unloadSemaphore.acquire();
             }
 
-            if (i == 5) {
-                halfPubWaitSemaphore.release();
+            if (i == 5 || i == 15) {
+                pubWaitSemaphore.release();
             }
 
-            if (i == 8) {
-                secondUnloadSemaphore.acquire();
+            if (i == 12) {
+                migrationSemaphore.acquire();
             }
 
             std::string content = std::to_string(i);
@@ -86,37 +110,33 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
                 return sendResult == ResultOk;
             }));
 
-            LOG_INFO("produced index:" << i);
-            produced++;
+            LOG_INFO("produced i:" << i);
+            producedMsgs.emplace(i, i);
             i++;
         }
         LOG_INFO("producer finished");
     };
-
-    int consumed = 0;
+    std::atomic<bool> stopConsumer(false);
+    SynchronizedHashMap<int, int> consumedMsgs;
     auto consume = [&]() {
         Message receivedMsg;
-        int i = 0;
-        while (i < msgCount && checkTime()) {
-            ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
-                Result receiveResult =
-                    consumer.receive(receivedMsg, 1000);  // Assumed that we wait 1000 ms for each message
-                return receiveResult == ResultOk;
-            }));
-            LOG_INFO("received index:" << i);
-
-            int id = std::stoi(receivedMsg.getDataAsString());
-            if (id < i) {
+        while (checkTime()) {
+            if (stopConsumer && producedMsgs.size() == msgCount && consumedMsgs.size() == msgCount) {
+                break;
+            }
+            Result receiveResult =
+                consumer.receive(receivedMsg, 1000);  // Assumed that we wait 1000 ms for each message
+            if (receiveResult != ResultOk) {
                 continue;
             }
+            int i = std::stoi(receivedMsg.getDataAsString());
+            LOG_INFO("received i:" << i);
             ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
                 Result ackResult = consumer.acknowledge(receivedMsg);
                 return ackResult == ResultOk;
             }));
-            LOG_INFO("acked index:" << i);
-
-            consumed++;
-            i++;
+            LOG_INFO("acked i:" << i);
+            consumedMsgs.emplace(i, i);
         }
         LOG_INFO("consumer finished");
     };
@@ -124,7 +144,8 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
     std::thread produceThread(produce);
     std::thread consumeThread(consume);
 
-    auto unload = [&] {
+    auto unload = [&](bool migrated) {
+        const std::string &adminUrl = migrated ? greenAdminUrl : blueAdminUrl;
         auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
         auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
         auto &producerImpl = PulsarFriend::getProducerImpl(producer);
@@ -135,15 +156,17 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
             ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
                                   [&] { return consumerImpl.isConnected() && producerImpl.isConnected(); }));
 
-            std::string url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic-1";
+            std::string url =
+                adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic" + topicNameSuffix;
             std::string responseDataBeforeUnload;
             int res = makeGetRequest(url, responseDataBeforeUnload);
             if (res != 200) {
                 continue;
             }
-            destinationBroker = responseDataBeforeUnload.find("broker-2") == std::string::npos
-                                    ? "broker-2:8080"
-                                    : "broker-1:8080";
+            std::string prefix = migrated ? "green-" : "";
+            destinationBroker =
+                prefix + (responseDataBeforeUnload.find("broker-2") == std::string::npos ? "broker-2:8080"
+                                                                                         : "broker-1:8080");
             lookupCountBeforeUnload = clientImplPtr->getLookupCount();
             ASSERT_TRUE(lookupCountBeforeUnload > 0);
 
@@ -163,31 +186,69 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
                                   [&] { return consumerImpl.isConnected() && producerImpl.isConnected(); }));
             std::string responseDataAfterUnload;
             ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
-                url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic-1";
+                url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic" + topicNameSuffix;
                 res = makeGetRequest(url, responseDataAfterUnload);
                 return res == 200 && responseDataAfterUnload.find(destinationBroker) != std::string::npos;
             }));
             LOG_INFO("after lookup responseData:" << responseDataAfterUnload << ",res:" << res);
 
             auto lookupCountAfterUnload = clientImplPtr->getLookupCount();
-            ASSERT_EQ(lookupCountBeforeUnload, lookupCountAfterUnload);
+            if (lookupCountBeforeUnload != lookupCountAfterUnload) {
+                continue;
+            }
             break;
         }
     };
-    LOG_INFO("starting first unload");
-    unload();
-    firstUnloadSemaphore.release();
-    halfPubWaitSemaphore.acquire();
-    LOG_INFO("starting second unload");
-    unload();
-    secondUnloadSemaphore.release();
+    LOG_INFO("#### starting first unload ####");
+    unload(false);
+    unloadSemaphore.release();
+    pubWaitSemaphore.acquire();
+    LOG_INFO("#### starting second unload ####");
+    unload(false);
+    unloadSemaphore.release();
 
-    produceThread.join();
+    LOG_INFO("#### migrating the cluster ####");
+    migrationSemaphore.release();
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+        std::string url = blueAdminUrl + "admin/v2/clusters/cluster-a/migrate?migrated=true";
+        int res = makePostRequest(url, R"({
+                                               "serviceUrl": "http://localhost:8081",
+                                               "serviceUrlTls":"https://localhost:8085",
+                                               "brokerServiceUrl": "pulsar://localhost:6651",
+                                               "brokerServiceUrlTls": "pulsar+ssl://localhost:6655"
+                                            })");
+        LOG_INFO("res:" << res);
+        return res == 200;
+    }));
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(130), [&] {
+        auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+        auto &producerImpl = PulsarFriend::getProducerImpl(producer);
+        auto consumerConnAddress = PulsarFriend::getConnectionPhysicalAddress(consumerImpl);
+        auto producerConnAddress = PulsarFriend::getConnectionPhysicalAddress(producerImpl);
+        return consumerImpl.isConnected() && producerImpl.isConnected() &&
+               consumerConnAddress.find("6651") != std::string::npos &&
+               producerConnAddress.find("6651") != std::string::npos;
+    }));
+    pubWaitSemaphore.acquire();
+    LOG_INFO("#### starting third unload after migration ####");
+    unload(true);
+    unloadSemaphore.release();
+
+    stopConsumer = true;
     consumeThread.join();
-    ASSERT_EQ(consumed, msgCount);
-    ASSERT_EQ(produced, msgCount);
-    ASSERT_TRUE(checkTime()) << "timed out";
+    produceThread.join();
+    ASSERT_EQ(producedMsgs.size(), msgCount);
+    ASSERT_EQ(consumedMsgs.size(), msgCount);
+    for (int i = 0; i < msgCount; i++) {
+        producedMsgs.remove(i);
+        consumedMsgs.remove(i);
+    }
+    ASSERT_EQ(producedMsgs.size(), 0);
+    ASSERT_EQ(consumedMsgs.size(), 0);
+
     client.close();
+
+    ASSERT_TRUE(checkTime()) << "timed out";
 }
 
 int main(int argc, char *argv[]) {
diff --git a/tests/extensibleLM/docker-compose.yml b/tests/extensibleLM/docker-compose.yml
index 8d3c33a3..14876395 100644
--- a/tests/extensibleLM/docker-compose.yml
+++ b/tests/extensibleLM/docker-compose.yml
@@ -109,6 +109,8 @@ services:
       - loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
       - loadBalancerSheddingEnabled=false
       - loadBalancerDebugModeEnabled=true
+      - clusterMigrationCheckDurationSeconds=1
+      - brokerServiceCompactionThresholdInBytes=1000000
       - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
     depends_on:
       zookeeper:
@@ -141,6 +143,8 @@ services:
       - loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
       - loadBalancerSheddingEnabled=false
       - loadBalancerDebugModeEnabled=true
+      - clusterMigrationCheckDurationSeconds=1
+      - brokerServiceCompactionThresholdInBytes=1000000
       - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
     depends_on:
       zookeeper: