From aca2ebac5d2c915b99b3d66611a29b11e26e0ae6 Mon Sep 17 00:00:00 2001 From: Theofilos Kakantousis Date: Tue, 26 May 2020 14:43:41 +0200 Subject: [PATCH] HOPSWORKS-1740 KafkaAdminClient initialize in case of connection failure (#95) (#572) --- .../hopsworks/api/kafka/KafkaResource.java | 7 +-- .../api/kafka/topics/TopicsBuilder.java | 20 ++++-- .../dao/kafka/HopsKafkaAdminClient.java | 63 ++++++++++++++++--- .../common/kafka/KafkaController.java | 45 ++++++++----- .../common/project/ProjectController.java | 9 +-- .../hops/hopsworks/restutils/RESTCodes.java | 7 ++- 6 files changed, 113 insertions(+), 38 deletions(-) diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/KafkaResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/KafkaResource.java index f869466256..76a7cd94ef 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/KafkaResource.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/KafkaResource.java @@ -97,7 +97,6 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.logging.Logger; @RequestScoped @@ -161,8 +160,7 @@ public Response getTopics( @AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER}) @JWTRequired(acceptedTokens={Audience.API}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"}) public Response createTopic(TopicDTO topicDto, @Context UriInfo uriInfo, @Context SecurityContext sc) - throws KafkaException, ProjectException, UserException, - InterruptedException, ExecutionException { + throws KafkaException, ProjectException, UserException { kafkaController.createTopic(project, topicDto, uriInfo); URI uri = uriInfo.getAbsolutePathBuilder().path(topicDto.getName()).build(); topicDto.setHref(uri); @@ -187,8 +185,7 @@ public Response removeTopic(@PathParam("topic") String topicName, @Context Secur @AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST}) @JWTRequired(acceptedTokens={Audience.API, Audience.JOB}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"}) public Response getTopic(@Context UriInfo uriInfo, @PathParam("topic") String topicName, @Context SecurityContext sc) - throws KafkaException, InterruptedException, ExecutionException { - + throws KafkaException { PartitionDetailsDTO dto = topicsBuilder.buildTopicDetails(uriInfo, project, topicName); return Response.ok().entity(dto).build(); } diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/topics/TopicsBuilder.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/topics/TopicsBuilder.java index d4fbf9f839..50d877552d 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/topics/TopicsBuilder.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/topics/TopicsBuilder.java @@ -25,6 +25,7 @@ import io.hops.hopsworks.common.kafka.KafkaController; import io.hops.hopsworks.exceptions.KafkaException; import io.hops.hopsworks.persistence.entity.project.Project; +import io.hops.hopsworks.restutils.RESTCodes; import javax.ejb.EJB; import javax.ejb.Stateless; @@ -38,6 +39,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; import java.util.stream.Collectors; import static io.hops.hopsworks.common.dao.kafka.ProjectTopicsFacade.TopicsFilters; @@ -113,14 +117,20 @@ private void expand(TopicDTO dto, ResourceRequest resourceRequest) { } public PartitionDetailsDTO buildTopicDetails(UriInfo uriInfo, Project project, String topicName) - throws KafkaException, InterruptedException, ExecutionException { + throws KafkaException { PartitionDetailsDTO dto = new PartitionDetailsDTO(); dto.setHref(topicUri(uriInfo, project, topicName).build()); - List list = kafkaController.getTopicDetails(project, topicName).get(); - dto.setCount(Integer.toUnsignedLong(list.size())); - list.forEach(dto::addItem); - return dto; + try { + List list = + kafkaController.getTopicDetails(project, topicName).get(3000, TimeUnit.MILLISECONDS); + dto.setCount(Integer.toUnsignedLong(list.size())); + list.forEach(dto::addItem); + return dto; + } catch (InterruptedException | ExecutionException | TimeoutException e){ + throw new KafkaException( + RESTCodes.KafkaErrorCode.TOPIC_FETCH_FAILED, Level.WARNING, "Topic name: " + topicName, e.getMessage(), e); + } } public SharedProjectDTO buildSharedProject(UriInfo uriInfo, Project project, String topicName) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/HopsKafkaAdminClient.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/HopsKafkaAdminClient.java index 08c3780d8f..4e4017b810 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/HopsKafkaAdminClient.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/HopsKafkaAdminClient.java @@ -28,20 +28,28 @@ import org.apache.kafka.common.config.SslConfigs; import javax.annotation.PostConstruct; +import javax.ejb.AccessTimeout; import javax.ejb.ConcurrencyManagement; import javax.ejb.ConcurrencyManagementType; import javax.ejb.DependsOn; import javax.ejb.EJB; import javax.ejb.Singleton; +import javax.ejb.TransactionAttribute; +import javax.ejb.TransactionAttributeType; +import java.time.Duration; import java.util.Collection; import java.util.Properties; import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; @Singleton @DependsOn("KafkaBrokers") @ConcurrencyManagement(ConcurrencyManagementType.CONTAINER) public class HopsKafkaAdminClient { + private static final Logger LOG = Logger.getLogger(HopsKafkaAdminClient.class.getName()); + @EJB private BaseHadoopClientsService baseHadoopService; @EJB @@ -51,10 +59,25 @@ public class HopsKafkaAdminClient { @PostConstruct private void init() { - adminClient = getAdminClient(); + try { + LOG.log(Level.FINE, "Initializing Kafka client"); + initClient(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Kafka is currently unavailable. Will periodically retry to connect"); + } } - private AdminClient getAdminClient() { + @AccessTimeout(value = 5000) + @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) + private void initClient() { + if (adminClient != null) { + try { + LOG.log(Level.FINE, "Will attempt to close current kafka client"); + adminClient.close(Duration.ofSeconds(3)); + } catch (Exception e) { + LOG.log(Level.WARNING, "Could not close adminClient, will continue with initialization", e); + } + } Properties props = new Properties(); Set brokers = kafkaBrokers.getKafkaBrokers(); //Keep only INTERNAL protocol brokers @@ -70,22 +93,48 @@ private AdminClient getAdminClient() { props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, baseHadoopService.getSuperKeystorePassword()); props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM); - return AdminClient.create(props); + LOG.log(Level.FINE, "Will attempt to initialize current kafka client"); + adminClient = AdminClient.create(props); } public ListTopicsResult listTopics() { - return adminClient.listTopics(); + try { + return adminClient.listTopics(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Kafka cluster is unavailable", e); + initClient(); + return adminClient.listTopics(); + } } public CreateTopicsResult createTopics(Collection newTopics) { - return adminClient.createTopics(newTopics); + try { + return adminClient.createTopics(newTopics); + } catch (Exception e) { + LOG.log(Level.WARNING, "Kafka cluster is unavailable", e); + initClient(); + return adminClient.createTopics(newTopics); + } } public DeleteTopicsResult deleteTopics(Collection topics) { - return adminClient.deleteTopics(topics); + try { + return adminClient.deleteTopics(topics); + } catch (Exception e) { + LOG.log(Level.WARNING, "Kafka cluster is unavailable", e); + initClient(); + return adminClient.deleteTopics(topics); + } } public DescribeTopicsResult describeTopics(Collection topics) { - return adminClient.describeTopics(topics); + try { + return adminClient.describeTopics(topics); + } catch (Exception e) { + LOG.log(Level.WARNING, "Kafka cluster is unavailable", e); + initClient(); + return adminClient.describeTopics(topics); + } } + } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java index f56177ade9..2faf106aee 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java @@ -93,6 +93,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -133,7 +135,7 @@ public class KafkaController { private KafkaBrokers kafkaBrokers; public void createTopic(Project project, TopicDTO topicDto, UriInfo uriInfo) throws KafkaException, - ProjectException, UserException, InterruptedException, ExecutionException { + ProjectException, UserException { if (topicDto == null) { throw new IllegalArgumentException("topicDto was not provided."); @@ -205,8 +207,7 @@ public List findTopicsByProject(Project project) { return topics; } - public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) - throws KafkaException, InterruptedException, ExecutionException { + public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) throws KafkaException { Subjects schema = subjectsFacade.findSubjectByNameAndVersion(project, topicDto.getSchemaName(), topicDto.getSchemaVersion()) @@ -214,11 +215,17 @@ public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicDto.getName())); // create the topic in kafka - if (createTopicInKafka(topicDto).get() == null) { - throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO, - "topic name: " + topicDto.getName()); + try { + if (createTopicInKafka(topicDto).get(3000, TimeUnit.MILLISECONDS) == null) { + throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS_IN_ZOOKEEPER, Level.INFO, + "topic name: " + topicDto.getName()); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new KafkaException( + RESTCodes.KafkaErrorCode.TOPIC_FETCH_FAILED, Level.WARNING, "Topic name: " + topicDto.getName(), e.getMessage(), + e); } - + /* * What is the possibility of the program failing here? The topic is created * on @@ -241,15 +248,21 @@ public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) } private KafkaFuture createTopicInKafka(TopicDTO topicDTO) { - return hopsKafkaAdminClient.listTopics().names().thenApply((set) -> { - if (set.contains(topicDTO.getName())) { - return null; - } else { - NewTopic newTopic = - new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions(), topicDTO.getNumOfReplicas().shortValue()); - return hopsKafkaAdminClient.createTopics(Collections.singleton(newTopic)); - } - }); + return hopsKafkaAdminClient.listTopics().names().thenApply( + set -> { + if (set.contains(topicDTO.getName())) { + return null; + } else { + NewTopic newTopic = + new NewTopic(topicDTO.getName(), topicDTO.getNumOfPartitions(), topicDTO.getNumOfReplicas().shortValue()); + try { + return hopsKafkaAdminClient.createTopics(Collections.singleton(newTopic)); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e.getMessage(), e); + return null; + } + } + }); } private KafkaFuture> getTopicDetailsFromKafkaCluster(String topicName) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java index eaf58108db..e05026d4e7 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java @@ -183,6 +183,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -1648,10 +1649,10 @@ public void cleanup(Project project, String sessionId, private void removeProjectInt(Project project, List usersToClean, List groupsToClean, List> projectCreationFutures, boolean decreaseCreatedProj, Users owner) - throws IOException, InterruptedException, HopsSecurityException, - ServiceException, ProjectException, - GenericException, TensorBoardException, FeaturestoreException, - ElasticException { + throws IOException, InterruptedException, HopsSecurityException, + ServiceException, ProjectException, + GenericException, TensorBoardException, FeaturestoreException, + ElasticException, TimeoutException, ExecutionException { DistributedFileSystemOps dfso = null; try { dfso = dfs.getDfsOps(); diff --git a/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java b/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java index 84a0cbe272..36f8394548 100644 --- a/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java +++ b/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java @@ -696,7 +696,12 @@ public enum KafkaErrorCode implements RESTErrorCode { SCHEMA_VERSION_NOT_FOUND(18, "Specified version of the schema not found", Response.Status.NOT_FOUND), PROJECT_IS_NOT_THE_OWNER_OF_THE_TOPIC(19, "Specified project is not the owner of the topic", Response.Status.BAD_REQUEST), - ACL_FOR_ANY_USER(20, "Cannot create an ACL for user with email '*'", Response.Status.BAD_REQUEST); + ACL_FOR_ANY_USER(20, "Cannot create an ACL for user with email '*'", Response.Status.BAD_REQUEST), + KAFKA_UNAVAILABLE(21, "Kafka is temporarily unavailable. Please try again later", + Response.Status.SERVICE_UNAVAILABLE), + TOPIC_DELETION_FAILED(22, "Could not delete Kafka topics.", Response.Status.INTERNAL_SERVER_ERROR), + TOPIC_FETCH_FAILED(23, "Could note fetch topic details.", Response.Status.INTERNAL_SERVER_ERROR), + TOPIC_CREATION_FAILED(24, "Could not create topic.", Response.Status.INTERNAL_SERVER_ERROR); private Integer code;