Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix getProxyConnection when the topic is migrated #22085

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ protected void grabCnx(Optional<URI> hostURI) {
URI uri = hostURI.get();
InetSocketAddress address = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
if (useProxy) {
cnxFuture = state.client.getProxyConnection(address, randomKeyForSelectConnection);
cnxFuture = state.client.getProxyConnection(state.redirectedClusterURI, address,
randomKeyForSelectConnection);
} else {
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty.util.Timer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -990,8 +991,13 @@ public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
return getConnection(address, address, cnxPool.genRandomKeyToSelectCon());
}

public CompletableFuture<ClientCnx> getProxyConnection(final InetSocketAddress logicalAddress,
public CompletableFuture<ClientCnx> getProxyConnection(final URI redirectedClusterURI,
final InetSocketAddress logicalAddress,
final int randomKeyForSelectConnection) {

LookupService lookup =
redirectedClusterURI == null ? this.lookup : getLookup(redirectedClusterURI.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these connections are cached and pooled, will this still work if the logicalAddress stays the same in the source and destination clusters? Or is that not possible? Otherwise, we might have to use an actually random key for connection selection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a fair concern.

It appears that the connection cache key should be "logicalAddr + physicalAddr + random-key", and the pool can be a flat map, like the one below. I am not sure why physicalAddr is excluded now.

ConcurrentHashMap<String, CompletableFuture<ClientCnx> pool>

I wonder if we need to address this in a separate PR, as this is a design change for the connection cache. (We could isolate this PR to fix the wrong physical address after migration).

Also, the below code(and other getConnection funcs) might need to be revisited since they generate a new ran-key for every getConnection, which is a different connection cache access behavior after migration.

https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L115

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the connection cache key should be "logicalAddr + physicalAddr + random-key", and the pool can be a flat map, like the one below. I am not sure why physicalAddr is excluded now.

The correct solution might not be so obvious. I think that the commit & PR history reveals some important details for understanding it. IIRC, the connection pool has some gotchas that easily trick into making wrong assumptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least for me, the connection pool and the various concepts around it in the Pulsar client are very misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised a PR to add the physical address in the key #22196

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#22196 got merged. Plz continue this PR review.


if (!(lookup instanceof BinaryProtoLookupService)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL(
"Cannot proxy connection through HTTP service URL", null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
Expand All @@ -43,6 +46,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -256,4 +260,21 @@ public void testBothExecutorProvidersMustBeSpecified() throws PulsarClientExcept
.internalExecutorProvider(executorProvider)
.build();
}

@Test
public void getProxyConnection() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setDnsServerAddresses(DefaultDnsServerAddressStreamProvider.defaultAddressList());
conf.setServiceUrl("pulsar://broker-1:6650");
initializeEventLoopGroup(conf);
@Cleanup
PulsarClientImpl client = spy(new PulsarClientImpl(conf, eventLoopGroup));
client.getProxyConnection(null,
InetSocketAddress.createUnresolved("pulsar://broker-1", 6650), 0);
verify(client, times(0)).getLookup(anyString());

client.getProxyConnection(URI.create("pulsar://new-cluster:6650"),
InetSocketAddress.createUnresolved("pulsar://broker-1", 6650), 0);
verify(client, times(1)).getLookup("pulsar://new-cluster:6650");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ public void testProxyProduceConsume() throws Exception {
producerFuture.get();
consumerFuture.get();

verify(producerClient, times(1)).getProxyConnection(any(), anyInt());
verify(producerClient, times(1)).getProxyConnection(any(), any(), anyInt());
verify(producerLookupServiceSpy, never()).getBroker(topicName);

verify(consumerClient, times(1)).getProxyConnection(any(), anyInt());
verify(consumerClient, times(1)).getProxyConnection(any(), any(), anyInt());
verify(consumerLookupServiceSpy, never()).getBroker(topicName);
}

Expand Down Expand Up @@ -330,10 +330,10 @@ public void testClientReconnectsToBrokerOnProxyClosing() throws Exception {
assertEquals(FieldUtils.readDeclaredField(producer.getConnectionHandler(), "useProxy", true), Boolean.FALSE);
assertEquals(FieldUtils.readDeclaredField(consumer.getConnectionHandler(), "useProxy", true), Boolean.FALSE);

verify(producerClient, times(1)).getProxyConnection(any(), anyInt());
verify(producerClient, times(1)).getProxyConnection(any(), any(), anyInt());
verify(producerLookupServiceSpy, times(1)).getBroker(topicName);

verify(consumerClient, times(1)).getProxyConnection(any(), anyInt());
verify(consumerClient, times(1)).getProxyConnection(any(), any(), anyInt());
verify(consumerLookupServiceSpy, times(1)).getBroker(topicName);
}
}
Loading