@@ -79,8 +79,8 @@ typedef std::vector<std::string> StringList;
79
79
ClientImpl::ClientImpl (const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
80
80
: mutex_ (),
81
81
state_ (Open),
82
- serviceNameResolver_ (serviceUrl),
83
- clientConfiguration_ ( ClientConfiguration (clientConfiguration) .setUseTls (serviceNameResolver_. useTls ())),
82
+ clientConfiguration_ ( ClientConfiguration (clientConfiguration)
83
+ .setUseTls (ServiceNameResolver:: useTls (ServiceURI (serviceUrl) ))),
84
84
memoryLimitController_ (clientConfiguration.getMemoryLimit ()),
85
85
ioExecutorProvider_ (std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads ())),
86
86
listenerExecutorProvider_ (
@@ -98,25 +98,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
98
98
if (loggerFactory) {
99
99
LogUtils::setLoggerFactory (std::move (loggerFactory));
100
100
}
101
+ lookupServicePtr_ = createLookup (serviceUrl);
102
+ }
103
+
104
+ ClientImpl::~ClientImpl () { shutdown (); }
101
105
106
+ LookupServicePtr ClientImpl::createLookup (const std::string& serviceUrl) {
102
107
LookupServicePtr underlyingLookupServicePtr;
103
- if (serviceNameResolver_. useHttp ()) {
108
+ if (ServiceNameResolver:: useHttp (ServiceURI (serviceUrl) )) {
104
109
LOG_DEBUG (" Using HTTP Lookup" );
105
110
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
106
- std::ref (serviceNameResolver_), std::cref (clientConfiguration_),
107
- std::cref (clientConfiguration_.getAuthPtr ()));
111
+ serviceUrl, std::cref (clientConfiguration_), std::cref (clientConfiguration_.getAuthPtr ()));
108
112
} else {
109
113
LOG_DEBUG (" Using Binary Lookup" );
110
114
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
111
- std::ref (serviceNameResolver_) , std::ref (pool_), std::cref (clientConfiguration_));
115
+ serviceUrl , std::ref (pool_), std::cref (clientConfiguration_));
112
116
}
113
117
114
- lookupServicePtr_ = RetryableLookupService::create (
118
+ auto lookupServicePtr = RetryableLookupService::create (
115
119
underlyingLookupServicePtr, clientConfiguration_.impl_ ->operationTimeout , ioExecutorProvider_);
120
+ return lookupServicePtr;
116
121
}
117
122
118
- ClientImpl::~ClientImpl () { shutdown (); }
119
-
120
123
const ClientConfiguration& ClientImpl::conf () const { return clientConfiguration_; }
121
124
122
125
MemoryLimitController& ClientImpl::getMemoryLimitController () { return memoryLimitController_; }
@@ -129,7 +132,21 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
129
132
return partitionListenerExecutorProvider_;
130
133
}
131
134
132
- LookupServicePtr ClientImpl::getLookup () { return lookupServicePtr_; }
135
+ LookupServicePtr ClientImpl::getLookup (const std::string& redirectedClusterURI) {
136
+ if (redirectedClusterURI.empty ()) {
137
+ return lookupServicePtr_;
138
+ }
139
+
140
+ Lock lock (mutex_);
141
+ auto it = redirectedClusterLookupServicePtrs_.find (redirectedClusterURI);
142
+ if (it == redirectedClusterLookupServicePtrs_.end ()) {
143
+ auto lookup = createLookup (redirectedClusterURI);
144
+ redirectedClusterLookupServicePtrs_.emplace (redirectedClusterURI, lookup);
145
+ return lookup;
146
+ }
147
+
148
+ return it->second ;
149
+ }
133
150
134
151
void ClientImpl::createProducerAsync (const std::string& topic, ProducerConfiguration conf,
135
152
CreateProducerCallback callback, bool autoDownloadSchema) {
@@ -517,7 +534,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517
534
}
518
535
}
519
536
520
- GetConnectionFuture ClientImpl::getConnection (const std::string& topic, size_t key) {
537
+ GetConnectionFuture ClientImpl::getConnection (const std::string& redirectedClusterURI,
538
+ const std::string& topic, size_t key) {
521
539
Promise<Result, ClientConnectionPtr> promise;
522
540
523
541
const auto topicNamePtr = TopicName::get (topic);
@@ -528,7 +546,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
528
546
}
529
547
530
548
auto self = shared_from_this ();
531
- lookupServicePtr_->getBroker (*topicNamePtr)
549
+ getLookup (redirectedClusterURI)
550
+ ->getBroker (*topicNamePtr)
532
551
.addListener ([this , self, promise, key](Result result, const LookupService::LookupResult& data) {
533
552
if (result != ResultOk) {
534
553
promise.setFailed (result);
@@ -554,16 +573,18 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
554
573
return promise.getFuture ();
555
574
}
556
575
557
- const std::string& ClientImpl::getPhysicalAddress (const std::string& logicalAddress) {
576
+ const std::string& ClientImpl::getPhysicalAddress (const std::string& redirectedClusterURI,
577
+ const std::string& logicalAddress) {
558
578
if (useProxy_) {
559
- return serviceNameResolver_ .resolveHost ();
579
+ return getLookup (redirectedClusterURI)-> getServiceNameResolver () .resolveHost ();
560
580
} else {
561
581
return logicalAddress;
562
582
}
563
583
}
564
584
565
- GetConnectionFuture ClientImpl::connect (const std::string& logicalAddress, size_t key) {
566
- const auto & physicalAddress = getPhysicalAddress (logicalAddress);
585
+ GetConnectionFuture ClientImpl::connect (const std::string& redirectedClusterURI,
586
+ const std::string& logicalAddress, size_t key) {
587
+ const auto & physicalAddress = getPhysicalAddress (redirectedClusterURI, logicalAddress);
567
588
Promise<Result, ClientConnectionPtr> promise;
568
589
pool_.getConnectionAsync (logicalAddress, physicalAddress, key)
569
590
.addListener ([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
@@ -633,6 +654,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {
633
654
634
655
memoryLimitController_.close ();
635
656
lookupServicePtr_->close ();
657
+ for (const auto & it : redirectedClusterLookupServicePtrs_) {
658
+ it.second ->close ();
659
+ }
636
660
637
661
auto producers = producers_.move ();
638
662
auto consumers = consumers_.move ();
0 commit comments