Skip to content

Commit

Permalink
HOPSWORKS-1740 KafkaAdminClient initialize in case of connection fail…
Browse files Browse the repository at this point in the history
…ure (#95) (#572)
  • Loading branch information
tkakantousis committed May 26, 2020
1 parent 8ae724b commit aca2eba
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;

@RequestScoped
Expand Down Expand Up @@ -161,8 +160,7 @@ public Response getTopics(
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens={Audience.API}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"})
public Response createTopic(TopicDTO topicDto, @Context UriInfo uriInfo, @Context SecurityContext sc)
throws KafkaException, ProjectException, UserException,
InterruptedException, ExecutionException {
throws KafkaException, ProjectException, UserException {
kafkaController.createTopic(project, topicDto, uriInfo);
URI uri = uriInfo.getAbsolutePathBuilder().path(topicDto.getName()).build();
topicDto.setHref(uri);
Expand All @@ -187,8 +185,7 @@ public Response removeTopic(@PathParam("topic") String topicName, @Context Secur
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@JWTRequired(acceptedTokens={Audience.API, Audience.JOB}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"})
public Response getTopic(@Context UriInfo uriInfo, @PathParam("topic") String topicName, @Context SecurityContext sc)
throws KafkaException, InterruptedException, ExecutionException {

throws KafkaException {
PartitionDetailsDTO dto = topicsBuilder.buildTopicDetails(uriInfo, project, topicName);
return Response.ok().entity(dto).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.hops.hopsworks.common.kafka.KafkaController;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;

import javax.ejb.EJB;
import javax.ejb.Stateless;
Expand All @@ -38,6 +39,9 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.stream.Collectors;

import static io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade.TopicsFilters;
Expand Down Expand Up @@ -113,14 +117,20 @@ private void expand(TopicDTO dto, ResourceRequest resourceRequest) {
}

public PartitionDetailsDTO buildTopicDetails(UriInfo uriInfo, Project project, String topicName)
throws KafkaException, InterruptedException, ExecutionException {
throws KafkaException {

PartitionDetailsDTO dto = new PartitionDetailsDTO();
dto.setHref(topicUri(uriInfo, project, topicName).build());
List<PartitionDetailsDTO> list = kafkaController.getTopicDetails(project, topicName).get();
dto.setCount(Integer.toUnsignedLong(list.size()));
list.forEach(dto::addItem);
return dto;
try {
List<PartitionDetailsDTO> list =
kafkaController.getTopicDetails(project, topicName).get(3000, TimeUnit.MILLISECONDS);
dto.setCount(Integer.toUnsignedLong(list.size()));
list.forEach(dto::addItem);
return dto;
} catch (InterruptedException | ExecutionException | TimeoutException e){
throw new KafkaException(
RESTCodes.KafkaErrorCode.TOPIC_FETCH_FAILED, Level.WARNING, "Topic name: " + topicName, e.getMessage(), e);
}
}

public SharedProjectDTO buildSharedProject(UriInfo uriInfo, Project project, String topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,28 @@
import org.apache.kafka.common.config.SslConfigs;

import javax.annotation.PostConstruct;
import javax.ejb.AccessTimeout;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.time.Duration;
import java.util.Collection;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

@Singleton
@DependsOn("KafkaBrokers")
@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER)
public class HopsKafkaAdminClient {

private static final Logger LOG = Logger.getLogger(HopsKafkaAdminClient.class.getName());

@EJB
private BaseHadoopClientsService baseHadoopService;
@EJB
Expand All @@ -51,10 +59,25 @@ public class HopsKafkaAdminClient {

@PostConstruct
private void init() {
adminClient = getAdminClient();
try {
LOG.log(Level.FINE, "Initializing Kafka client");
initClient();
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka is currently unavailable. Will periodically retry to connect");
}
}

private AdminClient getAdminClient() {
@AccessTimeout(value = 5000)
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
private void initClient() {
if (adminClient != null) {
try {
LOG.log(Level.FINE, "Will attempt to close current kafka client");
adminClient.close(Duration.ofSeconds(3));
} catch (Exception e) {
LOG.log(Level.WARNING, "Could not close adminClient, will continue with initialization", e);
}
}
Properties props = new Properties();
Set<String> brokers = kafkaBrokers.getKafkaBrokers();
//Keep only INTERNAL protocol brokers
Expand All @@ -70,22 +93,48 @@ private AdminClient getAdminClient() {
props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, baseHadoopService.getSuperKeystorePassword());
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM);
return AdminClient.create(props);
LOG.log(Level.FINE, "Will attempt to initialize current kafka client");
adminClient = AdminClient.create(props);
}

public ListTopicsResult listTopics() {
return adminClient.listTopics();
try {
return adminClient.listTopics();
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka cluster is unavailable", e);
initClient();
return adminClient.listTopics();
}
}

public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
return adminClient.createTopics(newTopics);
try {
return adminClient.createTopics(newTopics);
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka cluster is unavailable", e);
initClient();
return adminClient.createTopics(newTopics);
}
}

public DeleteTopicsResult deleteTopics(Collection<String> topics) {
return adminClient.deleteTopics(topics);
try {
return adminClient.deleteTopics(topics);
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka cluster is unavailable", e);
initClient();
return adminClient.deleteTopics(topics);
}
}

public DescribeTopicsResult describeTopics(Collection<String> topics) {
return adminClient.describeTopics(topics);
try {
return adminClient.describeTopics(topics);
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka cluster is unavailable", e);
initClient();
return adminClient.describeTopics(topics);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -133,7 +135,7 @@ public class KafkaController {
private KafkaBrokers kafkaBrokers;

public void createTopic(Project project, TopicDTO topicDto, UriInfo uriInfo) throws KafkaException,
ProjectException, UserException, InterruptedException, ExecutionException {
ProjectException, UserException {

if (topicDto == null) {
throw new IllegalArgumentException("topicDto was not provided.");
Expand Down Expand Up @@ -205,20 +207,25 @@ public List<TopicDTO> findTopicsByProject(Project project) {
return topics;
}

public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto)
throws KafkaException, InterruptedException, ExecutionException {
public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) throws KafkaException {

Subjects schema =
subjectsFacade.findSubjectByNameAndVersion(project, topicDto.getSchemaName(), topicDto.getSchemaVersion())
.orElseThrow(() ->
new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicDto.getName()));

// create the topic in kafka
if (createTopicInKafka(topicDto).get() == null) {
throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO,
"topic name: " + topicDto.getName());
try {
if (createTopicInKafka(topicDto).get(3000, TimeUnit.MILLISECONDS) == null) {
throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO,
"topic name: " + topicDto.getName());
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new KafkaException(
RESTCodes.KafkaErrorCode.TOPIC_FETCH_FAILED, Level.WARNING, "Topic name: " + topicDto.getName(), e.getMessage(),
e);
}

/*
* What is the possibility of the program failing here? The topic is created
* on
Expand All @@ -241,15 +248,21 @@ public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto)
}

private KafkaFuture<CreateTopicsResult> createTopicInKafka(TopicDTO topicDTO) {
return hopsKafkaAdminClient.listTopics().names().thenApply((set) -> {
if (set.contains(topicDTO.getName())) {
return null;
} else {
NewTopic newTopic =
new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions(), topicDTO.getNumOfReplicas().shortValue());
return hopsKafkaAdminClient.createTopics(Collections.singleton(newTopic));
}
});
return hopsKafkaAdminClient.listTopics().names().thenApply(
set -> {
if (set.contains(topicDTO.getName())) {
return null;
} else {
NewTopic newTopic =
new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions(), topicDTO.getNumOfReplicas().shortValue());
try {
return hopsKafkaAdminClient.createTopics(Collections.singleton(newTopic));
} catch (Exception e) {
LOGGER.log(Level.WARNING, e.getMessage(), e);
return null;
}
}
});
}

private KafkaFuture<List<PartitionDetailsDTO>> getTopicDetailsFromKafkaCluster(String topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -1648,10 +1649,10 @@ public void cleanup(Project project, String sessionId,
private void removeProjectInt(Project project, List<HdfsUsers> usersToClean,
List<HdfsGroups> groupsToClean, List<Future<?>> projectCreationFutures,
boolean decreaseCreatedProj, Users owner)
throws IOException, InterruptedException, HopsSecurityException,
ServiceException, ProjectException,
GenericException, TensorBoardException, FeaturestoreException,
ElasticException {
throws IOException, InterruptedException, HopsSecurityException,
ServiceException, ProjectException,
GenericException, TensorBoardException, FeaturestoreException,
ElasticException, TimeoutException, ExecutionException {
DistributedFileSystemOps dfso = null;
try {
dfso = dfs.getDfsOps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,12 @@ public enum KafkaErrorCode implements RESTErrorCode {
SCHEMA_VERSION_NOT_FOUND(18, "Specified version of the schema not found", Response.Status.NOT_FOUND),
PROJECT_IS_NOT_THE_OWNER_OF_THE_TOPIC(19, "Specified project is not the owner of the topic",
Response.Status.BAD_REQUEST),
ACL_FOR_ANY_USER(20, "Cannot create an ACL for user with email '*'", Response.Status.BAD_REQUEST);
ACL_FOR_ANY_USER(20, "Cannot create an ACL for user with email '*'", Response.Status.BAD_REQUEST),
KAFKA_UNAVAILABLE(21, "Kafka is temporarily unavailable. Please try again later",
Response.Status.SERVICE_UNAVAILABLE),
TOPIC_DELETION_FAILED(22, "Could not delete Kafka topics.", Response.Status.INTERNAL_SERVER_ERROR),
TOPIC_FETCH_FAILED(23, "Could note fetch topic details.", Response.Status.INTERNAL_SERVER_ERROR),
TOPIC_CREATION_FAILED(24, "Could not create topic.", Response.Status.INTERNAL_SERVER_ERROR);


private Integer code;
Expand Down

0 comments on commit aca2eba

Please sign in to comment.