Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Topic lookup using Pulsar binary protocol #105

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService;
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.broker.web.RestException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.client.api.PulsarClientException;

/**
*/
Expand Down Expand Up @@ -356,21 +358,7 @@ public PartitionedTopicMetadata getPartitionedTopicMetadata(@PathParam("property

String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(),
dn.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata;
try {
// gets the number of partitions from the zk cache
partitionMetadata = globalZkCache().getData(path, new Deserializer<PartitionedTopicMetadata>() {

@Override
public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception {
return jsonMapper().readValue(content, PartitionedTopicMetadata.class);
}
}).orElse(
// if the partitioned topic is not found in zk, then the topic is not partitioned
new PartitionedTopicMetadata());
} catch (Exception e) {
throw new RestException(e);
}
PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);

if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), dn,
Expand Down Expand Up @@ -993,7 +981,79 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
return offlineTopicStats;
}

/**
public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
String clientAppId, DestinationName dn) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
try {
checkAuthorization(pulsar, dn, clientAppId);
} catch (RestException e) {
try {
validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, dn.toString());
throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
clientAppId, dn.toString(), authException.getMessage()));
}
}
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(),
dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName());
fetchPartitionedTopicMetadataAsync(pulsar, path).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, dn,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex);
return null;
});
} catch (Exception ex) {
metadataFuture.completeExceptionally(ex);
}
return metadataFuture;
}

private static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, String path) {
try {
return fetchPartitionedTopicMetadataAsync(pulsar, path).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e;
}
throw new RestException(e);
}
}

private static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(PulsarService pulsar,
String path) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// gets the number of partitions from the zk cache
pulsar.getGlobalZkCache().getDataAsync(path, new Deserializer<PartitionedTopicMetadata>() {
@Override
public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception {
return jsonMapper().readValue(content, PartitionedTopicMetadata.class);
}
}).thenAccept(metadata -> {
// if the partitioned topic is not found in zk, then the topic is not partitioned
if (metadata.isPresent()) {
metadataFuture.complete(metadata.get());
} else {
metadataFuture.complete(new PartitionedTopicMetadata());
}
}).exceptionally(ex -> {
metadataFuture.complete(new PartitionedTopicMetadata());
return null;
});
} catch (Exception e) {
metadataFuture.completeExceptionally(e);
}
return metadataFuture;
}

/**
* Get the Topic object reference from the Pulsar broker
*/
private PersistentTopic getTopicReference(DestinationName dn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,24 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yahoo.pulsar.broker.web.NoSwaggerDocumentation;
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.web.RestException;
import static com.yahoo.pulsar.common.api.Commands.newLookupResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.lookup.data.LookupData;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import static com.google.common.base.Preconditions.checkNotNull;

import io.netty.buffer.ByteBuf;

@Path("/v2/destination/")
@NoSwaggerDocumentation
Expand All @@ -55,7 +66,7 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
try {
validateClusterOwnership(topic.getCluster());
checkConnect(topic);
validateReplicationSettingsOnNamespace(topic.getNamespaceObject());
validateReplicationSettingsOnNamespace(pulsar(), topic.getNamespaceObject());
} catch (Throwable t) {
// Validation checks failed
log.error("Validation check failed: {}", t.getMessage());
Expand All @@ -74,23 +85,24 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
}

// We have found either a broker that owns the topic, or a broker to which we should redirect the client to
if (result.isHttpRedirect()) {
if (result.isRedirect()) {
boolean newAuthoritative = this.isLeaderBroker();
URI redirect;
try {
redirect = new URI(String.format("%s%s%s?authoritative=%s", result.getHttpRedirectAddress(),
"/lookup/v2/destination/", topic.getLookupName(), newAuthoritative));
String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
: result.getLookupData().getHttpUrl();
redirect = new URI(String.format("%s%s%s?authoritative=%s", redirectUrl, "/lookup/v2/destination/",
topic.getLookupName(), newAuthoritative));
} catch (URISyntaxException e) {
log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e);
asyncResponse.resume(e);
return;
}

if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topic, redirect);
}

asyncResponse.resume(new WebApplicationException(Response.temporaryRedirect(redirect).build()));

} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
Expand All @@ -103,7 +115,113 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
asyncResponse.resume(exception);
return null;
});

}


/**
*
* Lookup broker-service address for a given namespace-bundle which contains given topic.
*
* a. Returns broker-address if namespace-bundle is already owned by any broker
* b. If current-broker receives lookup-request and if it's not a leader
* then current broker redirects request to leader by returning leader-service address.
* c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and
* redirects request by returning least-loaded broker.
* d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns
* success(connect) response to client.
*
* @param pulsarService
* @param fqdn
* @param authoritative
* @param clientAppId
* @param requestId
* @return
*/
public static CompletableFuture<ByteBuf> lookupDestinationAsync(PulsarService pulsarService, DestinationName fqdn, boolean authoritative,
String clientAppId, long requestId) {

final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
final String cluster = fqdn.getCluster();

// (1) validate cluster
getClusterDataIfDifferentCluster(pulsarService, cluster, clientAppId).thenAccept(differentClusterData -> {

if (differentClusterData != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the lookup call to {}/{} cluster={}", clientAppId,
differentClusterData.getBrokerServiceUrl(), differentClusterData.getBrokerServiceUrlTls(), cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId));
} else {
// (2) authorize client
try {
checkAuthorization(pulsarService, fqdn, clientAppId);
} catch (Exception e) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, fqdn.toString());
validationFuture
.complete(newLookupResponse(ServerError.AuthorizationError, e.getMessage(), requestId));
return;
}
// (3) validate global namespace
validateReplicationSettingsOnNamespaceAsync(pulsarService, fqdn.getNamespaceObject())
.thenAccept(success -> {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
}).exceptionally(ex -> {
validationFuture
.complete(newLookupResponse(ServerError.MetadataError, ex.getMessage(), requestId));
return null;
});
}
}).exceptionally(ex -> {
validationFuture.completeExceptionally(ex);
return null;
});

// Initiate lookup once validation completes
validationFuture.thenAccept(validaitonFailureResponse -> {
if (validaitonFailureResponse != null) {
lookupfuture.complete(validaitonFailureResponse);
} else {
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(fqdn, authoritative)
.thenAccept(lookupResult -> {

if (log.isDebugEnabled()) {
log.debug("[{}] Lookup result {}", fqdn.toString(), lookupResult);
}

LookupData lookupData = lookupResult.getLookupData();
if (lookupResult.isRedirect()) {
boolean newAuthoritative = isLeaderBroker(pulsarService);
lookupfuture.complete(
newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
newAuthoritative, LookupType.Redirect, requestId));
} else {
lookupfuture.complete(
newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
true /* authoritative */, LookupType.Connect, requestId));
}
}).exceptionally(e -> {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, fqdn.toString(),
e.getMessage(), e);
lookupfuture.complete(
newLookupResponse(ServerError.ServiceNotReady, e.getMessage(), requestId));
return null;
});
}

}).exceptionally(ex -> {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, fqdn.toString(), ex.getMessage(),
ex);
lookupfuture.complete(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});

return lookupfuture;
}

private static final Logger log = LoggerFactory.getLogger(DestinationLookup.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,41 @@
*/
public class LookupResult {
enum Type {
BrokerUrl, HttpRedirectUrl
BrokerUrl, RedirectUrl
}

private final Type type;
private final LookupData lookupData;
private final URI httpRedirectAddress;

public LookupResult(NamespaceEphemeralData namespaceEphemeralData) {
this.type = Type.BrokerUrl;
this.lookupData = new LookupData(namespaceEphemeralData.getNativeUrl(),
namespaceEphemeralData.getNativeUrlTls(), namespaceEphemeralData.getHttpUrl());
this.httpRedirectAddress = null;
namespaceEphemeralData.getNativeUrlTls(), namespaceEphemeralData.getHttpUrl(),
namespaceEphemeralData.getHttpUrlTls());
}

public LookupResult(URI httpRedirectAddress) {
this.type = Type.HttpRedirectUrl;
this.lookupData = null;
this.httpRedirectAddress = httpRedirectAddress;
public LookupResult(String httpUrl, String httpUrlTls, String brokerServiceUrl, String brokerServiceUrlTls) {
this.type = Type.RedirectUrl; // type = reidrect => as current broker is
// not owner and prepares LookupResult
// with other broker's urls
this.lookupData = new LookupData(brokerServiceUrl, brokerServiceUrlTls, httpUrl, httpUrlTls);
}

public boolean isBrokerUrl() {
return type == Type.BrokerUrl;
}

public boolean isHttpRedirect() {
return type == Type.HttpRedirectUrl;
public boolean isRedirect() {
return type == Type.RedirectUrl;
}

public LookupData getLookupData() {
checkArgument(isBrokerUrl());
return lookupData;
}

public URI getHttpRedirectAddress() {
checkArgument(isHttpRedirect());
return httpRedirectAddress;
@Override
public String toString() {
return "LookupResult [type=" + type + ", lookupData=" + lookupData + "]";
}

}
Loading