Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use local broker address as serviceURL when running WebSocket component embedded in broker #82

Merged
merged 1 commit into from
Oct 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
package com.yahoo.pulsar.broker;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -58,6 +56,7 @@
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.websocket.WebSocketConsumerServlet;
import com.yahoo.pulsar.websocket.WebSocketProducerServlet;
import com.yahoo.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -252,7 +251,9 @@ public void start() throws PulsarServerException {
this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true);

if (config.isWebSocketServiceEnabled()) {
this.webSocketService = new WebSocketService(config);
// Use local broker address to avoid different IP address when using a VIP for service discovery
this.webSocketService = new WebSocketService(new ClusterData(webServiceAddress, webServiceAddressTls),
config);
this.webSocketService.start();
this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH,
new ServletHolder(new WebSocketProducerServlet(webSocketService)), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,17 @@ public class WebSocketService implements Closeable {
private ServiceConfiguration config;
private ConfigurationCacheService configurationCacheService;

private ClusterData localCluster;

public WebSocketService(ServiceConfiguration config) throws PulsarClientException, MalformedURLException,
ServletException, DeploymentException, PulsarServerException {
this(null, config);
}

public WebSocketService(ClusterData localCluster, ServiceConfiguration config) throws PulsarClientException,
MalformedURLException, ServletException, DeploymentException, PulsarServerException {
this.config = config;
this.localCluster = localCluster;
}

public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
Expand Down Expand Up @@ -129,28 +137,37 @@ public ZooKeeperClientFactory getZooKeeperClientFactory() {
public synchronized PulsarClient getPulsarClient() throws IOException {
// Do lazy initialization of client
if (pulsarClient == null) {
ClusterData localCluster;
try {
localCluster = configurationCacheService.clustersCache()
.get("/admin/clusters/" + config.getClusterName());
} catch (Exception e) {
throw new PulsarServerException(e);
if (localCluster == null) {
// If not explicitly set, read clusters data from ZK
localCluster = retrieveClusterData();
}

ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
if (config.isAuthenticationEnabled()) {
clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
}
pulsarClient = createClientInstance(localCluster);
}
return pulsarClient;
}

if (config.isTlsEnabled() && !localCluster.getServiceUrlTls().isEmpty()) {
pulsarClient = PulsarClient.create(localCluster.getServiceUrlTls(), clientConf);
} else {
pulsarClient = PulsarClient.create(localCluster.getServiceUrl(), clientConf);
}
private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
if (config.isAuthenticationEnabled()) {
clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
}

if (config.isTlsEnabled() && !clusterData.getServiceUrlTls().isEmpty()) {
return PulsarClient.create(clusterData.getServiceUrlTls(), clientConf);
} else {
return PulsarClient.create(clusterData.getServiceUrl(), clientConf);
}
}

private ClusterData retrieveClusterData() throws PulsarServerException {
try {
return configurationCacheService.clustersCache().get("/admin/clusters/" + config.getClusterName());
} catch (Exception e) {
throw new PulsarServerException(e);
}
return this.pulsarClient;
}

public ConfigurationCacheService getConfigurationCache() {
Expand Down