diff --git a/api/build.gradle b/api/build.gradle index ca071b806..dfbbf8201 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -54,6 +54,10 @@ dependencies { antlr libs.antlr implementation libs.antlr.runtime + implementation libs.lucene + implementation libs.lucene.queryparser + implementation libs.lucene.analysis.common + implementation libs.opendatadiscovery.oddrn implementation(libs.opendatadiscovery.client) { exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux' @@ -68,7 +72,6 @@ dependencies { // CVE Fixes implementation libs.apache.commons.compress implementation libs.okhttp3.logging.intercepter - // CVE Fixes End implementation libs.modelcontextprotocol.spring.webflux implementation libs.victools.jsonschema.generator diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 8d5be375a..495229ca0 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -41,6 +41,7 @@ public class ClustersProperties { MetricsStorage defaultMetricsStorage = new MetricsStorage(); CacheProperties cache = new CacheProperties(); + ClusterFtsProperties fts = new ClusterFtsProperties(); @Data public static class Cluster { @@ -217,6 +218,25 @@ public static class CacheProperties { Duration connectClusterCacheExpiry = Duration.ofHours(24); } + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class NgramProperties { + int ngramMin = 1; + int ngramMax = 4; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class ClusterFtsProperties { + boolean enabled = false; + NgramProperties schemas = new NgramProperties(1, 4); + NgramProperties consumers = new NgramProperties(1, 4); + NgramProperties connect = new NgramProperties(1, 4); + NgramProperties acl = new NgramProperties(1, 4); + } + @PostConstruct public void validateAndSetDefaults() { if (clusters != null) { diff --git a/api/src/main/java/io/kafbat/ui/controller/SchemasController.java b/api/src/main/java/io/kafbat/ui/controller/SchemasController.java index a34110031..4df01cacb 100644 --- a/api/src/main/java/io/kafbat/ui/controller/SchemasController.java +++ b/api/src/main/java/io/kafbat/ui/controller/SchemasController.java @@ -1,8 +1,7 @@ package io.kafbat.ui.controller; -import static org.apache.commons.lang3.Strings.CI; - import io.kafbat.ui.api.SchemasApi; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.exception.ValidationException; import io.kafbat.ui.mapper.KafkaSrMapper; import io.kafbat.ui.mapper.KafkaSrMapperImpl; @@ -15,13 +14,13 @@ import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.SchemaAction; import io.kafbat.ui.service.SchemaRegistryService; +import io.kafbat.ui.service.index.SchemasFilter; import io.kafbat.ui.service.mcp.McpTool; import java.util.List; import java.util.Map; import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; @@ -38,6 +37,7 @@ public class SchemasController extends AbstractController implements SchemasApi, private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl(); private final SchemaRegistryService schemaRegistryService; + private final ClustersProperties clustersProperties; @Override protected KafkaCluster getCluster(String clusterName) { @@ -214,6 +214,8 @@ public Mono> getSchemas(String cluster .operationName("getSchemas") .build(); + ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts(); + return schemaRegistryService .getAllSubjectNames(getCluster(clusterName)) .flatMapIterable(l -> l) @@ -222,10 +224,10 @@ public Mono> getSchemas(String cluster .flatMap(subjects -> { int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE; int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize; - List filteredSubjects = subjects - .stream() - .filter(subj -> search == null || CI.contains(subj, search)) - .sorted().toList(); + + SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas()); + List filteredSubjects = filter.find(search); + var totalPages = (filteredSubjects.size() / pageSize) + (filteredSubjects.size() % pageSize == 0 ? 0 : 1); List subjectsToRender = filteredSubjects.stream() diff --git a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java index b72a4e395..4ad3f11e9 100644 --- a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java @@ -10,6 +10,7 @@ import static org.apache.commons.lang3.Strings.CI; import io.kafbat.ui.api.TopicsApi; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.mapper.ClusterMapper; import io.kafbat.ui.model.InternalTopic; import io.kafbat.ui.model.InternalTopicConfig; @@ -37,7 +38,6 @@ import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; @@ -55,6 +55,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M private final TopicsService topicsService; private final TopicAnalysisService topicAnalysisService; private final ClusterMapper clusterMapper; + private final ClustersProperties clustersProperties; @Override public Mono> createTopic( @@ -181,23 +182,23 @@ public Mono> getTopics(String clusterName, .operationName("getTopics") .build(); - return topicsService.getTopicsForPagination(getCluster(clusterName)) + return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal) .flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName)) .flatMap(topics -> { int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE; var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize; + ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts(); + Comparator comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled()); var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC) - ? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed(); - List filtered = topics.stream() - .filter(topic -> !topic.isInternal() - || showInternal != null && showInternal) - .filter(topic -> search == null || CI.contains(topic.getName(), search)) - .sorted(comparator) - .toList(); + ? comparatorForTopic : comparatorForTopic.reversed(); + + List filtered = topics.stream().sorted(comparator).toList(); + var totalPages = (filtered.size() / pageSize) + (filtered.size() % pageSize == 0 ? 0 : 1); List topicsPage = filtered.stream() + .filter(t -> !t.isInternal() || showInternal != null && showInternal) .skip(topicsToSkip) .limit(pageSize) .map(InternalTopic::getName) @@ -348,9 +349,12 @@ public Mono>> getActiveProducerStates } private Comparator getComparatorForTopic( - TopicColumnsToSortDTO orderBy) { + TopicColumnsToSortDTO orderBy, + boolean ftsEnabled) { var defaultComparator = Comparator.comparing(InternalTopic::getName); - if (orderBy == null) { + if (orderBy == null && ftsEnabled) { + return (o1, o2) -> 0; + } else if (orderBy == null) { return defaultComparator; } return switch (orderBy) { diff --git a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java index 1e8c31b5b..5dfab7c42 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java @@ -38,6 +38,16 @@ public class InternalTopic { private final long segmentSize; private final long segmentCount; + + public InternalTopic withMetrics(Metrics metrics) { + var builder = toBuilder(); + if (metrics != null) { + builder.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(this.name)); + builder.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(this.name)); + } + return builder.build(); + } + public static InternalTopic from(TopicDescription topicDescription, List configs, InternalPartitionsOffsets partitionsOffsets, @@ -113,8 +123,10 @@ public static InternalTopic from(TopicDescription topicDescription, topic.segmentSize(stats.getSegmentSize()); }); - topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name())); - topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name())); + if (metrics != null) { + topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name())); + topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name())); + } topic.topicConfigs( configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList())); diff --git a/api/src/main/java/io/kafbat/ui/model/Statistics.java b/api/src/main/java/io/kafbat/ui/model/Statistics.java index 6caba634a..d5b6ebd1b 100644 --- a/api/src/main/java/io/kafbat/ui/model/Statistics.java +++ b/api/src/main/java/io/kafbat/ui/model/Statistics.java @@ -11,7 +11,7 @@ @Value @Builder(toBuilder = true) -public class Statistics { +public class Statistics implements AutoCloseable { ServerStatusDTO status; Throwable lastKafkaException; String version; @@ -46,4 +46,11 @@ public Stream topicDescriptions() { public Statistics withClusterState(UnaryOperator stateUpdate) { return toBuilder().clusterState(stateUpdate.apply(clusterState)).build(); } + + @Override + public void close() throws Exception { + if (clusterState != null) { + clusterState.close(); + } + } } diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 5fda6d4ce..1f610ff2c 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -1,15 +1,15 @@ package io.kafbat.ui.service; -import static org.apache.commons.lang3.Strings.CI; - import com.google.common.collect.Streams; import com.google.common.collect.Table; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.emitter.EnhancedConsumer; import io.kafbat.ui.model.ConsumerGroupOrderingDTO; import io.kafbat.ui.model.InternalConsumerGroup; import io.kafbat.ui.model.InternalTopicConsumerGroup; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.SortOrderDTO; +import io.kafbat.ui.service.index.ConsumerGroupFilter; import io.kafbat.ui.service.rbac.AccessControlService; import io.kafbat.ui.util.ApplicationMetrics; import io.kafbat.ui.util.KafkaClientSslPropertiesUtil; @@ -25,7 +25,6 @@ import java.util.stream.Stream; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.OffsetSpec; @@ -41,6 +40,7 @@ public class ConsumerGroupService { private final AdminClientService adminClientService; private final AccessControlService accessControlService; + private final ClustersProperties clustersProperties; private Mono> getConsumerGroups( ReactiveAdminClient ac, @@ -114,11 +114,7 @@ public Mono getConsumerGroupsPage( SortOrderDTO sortOrderDto) { return adminClientService.get(cluster).flatMap(ac -> ac.listConsumerGroups() - .map(listing -> search == null - ? listing - : listing.stream() - .filter(g -> CI.contains(g.groupId(), search)) - .toList() + .map(listing -> filterGroups(listing, search) ) .flatMapIterable(lst -> lst) .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName())) @@ -131,6 +127,12 @@ public Mono getConsumerGroupsPage( (allGroups.size() / perPage) + (allGroups.size() % perPage == 0 ? 0 : 1)))))); } + private Collection filterGroups(Collection groups, String search) { + ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts(); + ConsumerGroupFilter filter = new ConsumerGroupFilter(groups, fts.isEnabled(), fts.getConsumers()); + return filter.find(search, false); + } + private Mono> loadSortedDescriptions(ReactiveAdminClient ac, List groups, int pageNum, diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index b3e5aa058..2d6dfae15 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -27,6 +27,7 @@ import io.kafbat.ui.model.NewConnectorDTO; import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.connect.InternalConnectorInfo; +import io.kafbat.ui.service.index.KafkaConnectNgramFilter; import io.kafbat.ui.util.ReactiveFailover; import jakarta.validation.Valid; import java.util.List; @@ -151,15 +152,16 @@ public Flux getAllConnectors(final KafkaCluster cluster, .topics(tuple.getT4().getTopics()) .build()))) .map(kafkaConnectMapper::fullConnectorInfo) - .filter(matchesSearchTerm(search)); + .collectList() + .map(lst -> filterConnectors(lst, search)) + .flatMapMany(Flux::fromIterable); } - private Predicate matchesSearchTerm(@Nullable final String search) { - if (search == null) { - return c -> true; - } - return connector -> getStringsForSearch(connector) - .anyMatch(string -> CI.contains(string, search)); + private List filterConnectors(List connectors, String search) { + ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts(); + KafkaConnectNgramFilter filter = + new KafkaConnectNgramFilter(connectors, fts.isEnabled(), fts.getConnect()); + return filter.find(search); } private Stream getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) { diff --git a/api/src/main/java/io/kafbat/ui/service/StatisticsCache.java b/api/src/main/java/io/kafbat/ui/service/StatisticsCache.java index 04306e9e8..0b8274959 100644 --- a/api/src/main/java/io/kafbat/ui/service/StatisticsCache.java +++ b/api/src/main/java/io/kafbat/ui/service/StatisticsCache.java @@ -1,16 +1,20 @@ package io.kafbat.ui.service; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.InternalPartitionsOffsets; import io.kafbat.ui.model.KafkaCluster; +import io.kafbat.ui.model.ServerStatusDTO; import io.kafbat.ui.model.Statistics; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.TopicDescription; import org.springframework.stereotype.Component; +@Slf4j @Component public class StatisticsCache { @@ -28,12 +32,22 @@ public synchronized void replace(KafkaCluster c, Statistics stats) { public synchronized void update(KafkaCluster c, Map descriptions, Map> configs, - InternalPartitionsOffsets partitionsOffsets) { + InternalPartitionsOffsets partitionsOffsets, + ClustersProperties clustersProperties) { var stats = get(c); replace( c, - stats.withClusterState(s -> s.updateTopics(descriptions, configs, partitionsOffsets)) + stats.withClusterState(s -> + s.updateTopics(descriptions, configs, partitionsOffsets, clustersProperties) + ) ); + try { + if (!stats.getStatus().equals(ServerStatusDTO.INITIALIZING)) { + stats.close(); + } + } catch (Exception e) { + log.error("Error closing cluster {} stats", c.getName(), e); + } } public synchronized void onTopicDelete(KafkaCluster c, String topic) { diff --git a/api/src/main/java/io/kafbat/ui/service/StatisticsService.java b/api/src/main/java/io/kafbat/ui/service/StatisticsService.java index 9e302596e..9185838dd 100644 --- a/api/src/main/java/io/kafbat/ui/service/StatisticsService.java +++ b/api/src/main/java/io/kafbat/ui/service/StatisticsService.java @@ -2,6 +2,7 @@ import static io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.ClusterFeature; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.Metrics; @@ -22,6 +23,7 @@ public class StatisticsService { private final AdminClientService adminClientService; private final FeatureService featureService; private final StatisticsCache cache; + private final ClustersProperties clustersProperties; public Mono updateCache(KafkaCluster c) { return getStatistics(c).doOnSuccess(m -> cache.replace(c, m)); @@ -62,7 +64,7 @@ private Statistics createStats(ClusterDescription description, private Mono loadClusterState(ClusterDescription clusterDescription, ReactiveAdminClient ac) { - return ScrapedClusterState.scrape(clusterDescription, ac); + return ScrapedClusterState.scrape(clusterDescription, ac, clustersProperties); } private Mono scrapeMetrics(KafkaCluster cluster, diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index becfd55ad..79da561dd 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -26,6 +26,7 @@ import io.kafbat.ui.model.TopicUpdateDTO; import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState.TopicState; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -48,6 +49,7 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -76,7 +78,7 @@ public Mono> loadTopics(KafkaCluster c, List topics) ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics, false), (descriptions, configs) -> getPartitionOffsets(descriptions, ac).map(offsets -> { - statisticsCache.update(c, descriptions, configs, offsets); + statisticsCache.update(c, descriptions, configs, offsets, clustersProperties); var stats = statisticsCache.get(c); return createList( topics, @@ -465,23 +467,19 @@ public Mono cloneTopic( ); } - public Mono> getTopicsForPagination(KafkaCluster cluster) { + public Mono> getTopicsForPagination(KafkaCluster cluster, String search, Boolean showInternal) { Statistics stats = statisticsCache.get(cluster); - Map topicStates = stats.getClusterState().getTopicStates(); - return filterExisting(cluster, topicStates.keySet()) - .map(lst -> lst.stream() - .map(topicName -> - InternalTopic.from( - topicStates.get(topicName).description(), - topicStates.get(topicName).configs(), - InternalPartitionsOffsets.empty(), - stats.getMetrics(), - Optional.ofNullable(topicStates.get(topicName)) - .map(TopicState::segmentStats).orElse(null), - Optional.ofNullable(topicStates.get(topicName)) - .map(TopicState::partitionsSegmentStats).orElse(null), - clustersProperties.getInternalTopicPrefix() - )).collect(toList())); + ScrapedClusterState clusterState = stats.getClusterState(); + + try { + return Mono.just( + clusterState.getTopicIndex().find(search, showInternal, null) + ).flatMap(lst -> filterExisting(cluster, lst)).map(lst -> + lst.stream().map(t -> t.withMetrics(stats.getMetrics())).toList() + ); + } catch (Exception e) { + return Mono.error(e); + } } public Mono>> getActiveProducersState(KafkaCluster cluster, String topic) { @@ -489,12 +487,12 @@ public Mono>> getActiveProducersState(Ka .flatMap(ac -> ac.getActiveProducersState(topic)); } - private Mono> filterExisting(KafkaCluster cluster, Collection topics) { + private Mono> filterExisting(KafkaCluster cluster, Collection topics) { return adminClientService.get(cluster) .flatMap(ac -> ac.listTopics(true)) - .map(existing -> existing + .map(existing -> topics .stream() - .filter(topics::contains) + .filter(s -> existing.contains(s.getName())) .collect(toList())); } diff --git a/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java b/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java index c0ee2469a..9e9943946 100644 --- a/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java +++ b/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java @@ -15,12 +15,14 @@ import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID; import com.google.common.collect.Sets; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.CreateConsumerAclDTO; import io.kafbat.ui.model.CreateProducerAclDTO; import io.kafbat.ui.model.CreateStreamAppAclDTO; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.service.AdminClientService; import io.kafbat.ui.service.ReactiveAdminClient; +import io.kafbat.ui.service.index.AclBindingNgramFilter; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -48,6 +50,7 @@ public class AclsService { private final AdminClientService adminClientService; + private final ClustersProperties clustersProperties; public Mono createAcl(KafkaCluster cluster, AclBinding aclBinding) { return adminClientService.get(cluster) @@ -70,10 +73,19 @@ public Mono deleteAcl(KafkaCluster cluster, AclBinding aclBinding) { public Flux listAcls(KafkaCluster cluster, ResourcePatternFilter filter, String principalSearch) { return adminClientService.get(cluster) - .flatMap(c -> c.listAcls(filter)) - .flatMapIterable(acls -> acls) - .filter(acl -> principalSearch == null || acl.entry().principal().contains(principalSearch)) - .sort(Comparator.comparing(AclBinding::toString)); //sorting to keep stable order on different calls + .flatMap(c -> c.listAcls(filter)) + .flatMapIterable(acls -> acls) + .filter(acl -> principalSearch == null || acl.entry().principal().contains(principalSearch)) + .collectList() + .map(lst -> filter(lst, principalSearch)) + .flatMapMany(Flux::fromIterable) + .sort(Comparator.comparing(AclBinding::toString)); //sorting to keep stable order on different calls + } + + private List filter(List acls, String principalSearch) { + ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts(); + AclBindingNgramFilter filter = new AclBindingNgramFilter(acls, fts.isEnabled(), fts.getAcl()); + return filter.find(principalSearch); } public Mono getAclAsCsvString(KafkaCluster cluster) { diff --git a/api/src/main/java/io/kafbat/ui/service/index/AclBindingNgramFilter.java b/api/src/main/java/io/kafbat/ui/service/index/AclBindingNgramFilter.java new file mode 100644 index 000000000..1622e20f6 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/AclBindingNgramFilter.java @@ -0,0 +1,29 @@ +package io.kafbat.ui.service.index; + +import io.kafbat.ui.config.ClustersProperties; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.common.acl.AclBinding; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +public class AclBindingNgramFilter extends NgramFilter { + private final List, AclBinding>> bindings; + + public AclBindingNgramFilter(Collection bindings) { + this(bindings, true, new ClustersProperties.NgramProperties(1, 4)); + } + + public AclBindingNgramFilter( + Collection bindings, + boolean enabled, + ClustersProperties.NgramProperties properties) { + super(properties, enabled); + this.bindings = bindings.stream().map(g -> Tuples.of(List.of(g.entry().principal()), g)).toList(); + } + + @Override + protected List, AclBinding>> getItems() { + return this.bindings; + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/ConsumerGroupFilter.java b/api/src/main/java/io/kafbat/ui/service/index/ConsumerGroupFilter.java new file mode 100644 index 000000000..cc0ab8e03 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/ConsumerGroupFilter.java @@ -0,0 +1,29 @@ +package io.kafbat.ui.service.index; + +import io.kafbat.ui.config.ClustersProperties; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +public class ConsumerGroupFilter extends NgramFilter { + private final List, ConsumerGroupListing>> groups; + + public ConsumerGroupFilter(Collection groups) { + this(groups, true, new ClustersProperties.NgramProperties(1, 4)); + } + + public ConsumerGroupFilter( + Collection groups, + boolean enabled, + ClustersProperties.NgramProperties properties) { + super(properties, enabled); + this.groups = groups.stream().map(g -> Tuples.of(List.of(g.groupId()), g)).toList(); + } + + @Override + protected List, ConsumerGroupListing>> getItems() { + return this.groups; + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/FilterTopicIndex.java b/api/src/main/java/io/kafbat/ui/service/index/FilterTopicIndex.java new file mode 100644 index 000000000..0b5a30900 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/FilterTopicIndex.java @@ -0,0 +1,35 @@ +package io.kafbat.ui.service.index; + +import static org.apache.commons.lang3.Strings.CI; + +import io.kafbat.ui.model.InternalTopic; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +public class FilterTopicIndex implements TopicsIndex { + private List topics; + + public FilterTopicIndex(List topics) { + this.topics = topics; + } + + @Override + public List find(String search, Boolean showInternal, String sort, Integer count) { + if (search == null || search.isBlank()) { + return new ArrayList<>(this.topics); + } + Stream stream = topics.stream().filter(topic -> !topic.isInternal() + || showInternal != null && showInternal) + .filter( + topic -> search == null || CI.contains(topic.getName(), search) + ); + + return stream.toList(); + } + + @Override + public void close() throws Exception { + + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/KafkaConnectNgramFilter.java b/api/src/main/java/io/kafbat/ui/service/index/KafkaConnectNgramFilter.java new file mode 100644 index 000000000..99ffd5275 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/KafkaConnectNgramFilter.java @@ -0,0 +1,40 @@ +package io.kafbat.ui.service.index; + +import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.model.FullConnectorInfoDTO; +import java.util.Collection; +import java.util.List; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +public class KafkaConnectNgramFilter extends NgramFilter { + private final List, FullConnectorInfoDTO>> connectors; + + public KafkaConnectNgramFilter(Collection connectors) { + this(connectors, true, new ClustersProperties.NgramProperties(1, 4)); + } + + public KafkaConnectNgramFilter( + Collection connectors, + boolean enabled, + ClustersProperties.NgramProperties properties) { + super(properties, enabled); + this.connectors = connectors.stream().map(this::getItem).toList(); + } + + private Tuple2, FullConnectorInfoDTO> getItem(FullConnectorInfoDTO connector) { + return Tuples.of( + List.of( + connector.getName(), + connector.getConnect(), + connector.getStatus().getState().getValue(), + connector.getType().getValue() + ), connector + ); + } + + @Override + protected List, FullConnectorInfoDTO>> getItems() { + return this.connectors; + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/LuceneTopicsIndex.java b/api/src/main/java/io/kafbat/ui/service/index/LuceneTopicsIndex.java new file mode 100644 index 000000000..6953add26 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/LuceneTopicsIndex.java @@ -0,0 +1,152 @@ +package io.kafbat.ui.service.index; + +import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.model.InternalTopic; +import io.kafbat.ui.model.InternalTopicConfig; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.IntPoint; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.queryparser.classic.ParseException; +import org.apache.lucene.queryparser.classic.QueryParser; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; + +public class LuceneTopicsIndex implements TopicsIndex { + public static final String FIELD_NAME_RAW = "name_raw"; + + private final Directory directory; + private final DirectoryReader indexReader; + private final IndexSearcher indexSearcher; + private final Analyzer analyzer; + private final int maxSize; + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + private final Map topicMap; + + public LuceneTopicsIndex(List topics) throws IOException { + this.analyzer = new ShortWordAnalyzer(); + this.topicMap = topics.stream().collect(Collectors.toMap(InternalTopic::getName, Function.identity())); + this.directory = build(topics); + this.indexReader = DirectoryReader.open(directory); + this.indexSearcher = new IndexSearcher(indexReader); + this.maxSize = topics.size(); + } + + private Directory build(List topics) { + Directory directory = new ByteBuffersDirectory(); + try (IndexWriter directoryWriter = new IndexWriter(directory, new IndexWriterConfig(this.analyzer))) { + for (InternalTopic topic : topics) { + Document doc = new Document(); + doc.add(new StringField(FIELD_NAME_RAW, topic.getName(), Field.Store.YES)); + doc.add(new TextField(FIELD_NAME, topic.getName(), Field.Store.NO)); + doc.add(new IntPoint(FIELD_PARTITIONS, topic.getPartitionCount())); + doc.add(new IntPoint(FIELD_REPLICATION, topic.getReplicationFactor())); + doc.add(new LongPoint(FIELD_SIZE, topic.getSegmentSize())); + if (topic.getTopicConfigs() != null && !topic.getTopicConfigs().isEmpty()) { + for (InternalTopicConfig topicConfig : topic.getTopicConfigs()) { + doc.add(new StringField(FIELD_CONFIG_PREFIX + "_" + topicConfig.getName(), topicConfig.getValue(), + Field.Store.NO)); + } + } + doc.add(new StringField(FIELD_INTERNAL, String.valueOf(topic.isInternal()), Field.Store.NO)); + directoryWriter.addDocument(doc); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return directory; + } + + @Override + public void close() throws Exception { + this.closeLock.writeLock().lock(); + try { + if (indexReader != null) { + this.indexReader.close(); + } + if (this.directory != null) { + this.directory.close(); + } + } finally { + this.closeLock.writeLock().unlock(); + } + } + + public List find(String search, Boolean showInternal, String sort, Integer count) { + return find(search, showInternal, sort, count, 0.0f); + } + + public List find(String search, Boolean showInternal, + String sortField, Integer count, float minScore) { + if (search == null || search.isBlank()) { + return new ArrayList<>(this.topicMap.values()); + } + closeLock.readLock().lock(); + try { + + QueryParser queryParser = new PrefixQueryParser(FIELD_NAME, this.analyzer); + queryParser.setDefaultOperator(QueryParser.Operator.AND); + Query nameQuery = queryParser.parse(search);; + + Query internalFilter = new TermQuery(new Term(FIELD_INTERNAL, "true")); + + BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder(); + queryBuilder.add(nameQuery, BooleanClause.Occur.MUST); + if (showInternal == null || !showInternal) { + queryBuilder.add(internalFilter, BooleanClause.Occur.MUST_NOT); + } + + List sortFields = new ArrayList<>(); + sortFields.add(SortField.FIELD_SCORE); + if (!sortField.equals(FIELD_NAME)) { + sortFields.add(new SortField(sortField, SortField.Type.INT, true)); + } + + Sort sort = new Sort(sortFields.toArray(new SortField[0])); + + TopDocs result = this.indexSearcher.search(queryBuilder.build(), count != null ? count : this.maxSize, sort); + + List topics = new ArrayList<>(); + for (ScoreDoc scoreDoc : result.scoreDocs) { + if (minScore > 0.00001f && scoreDoc.score < minScore) { + continue; + } + Document document = this.indexSearcher.storedFields().document(scoreDoc.doc); + topics.add(document.get(FIELD_NAME_RAW)); + } + return topics.stream().map(topicMap::get).filter(Objects::nonNull).toList(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } finally { + this.closeLock.readLock().unlock(); + } + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/NgramFilter.java b/api/src/main/java/io/kafbat/ui/service/index/NgramFilter.java new file mode 100644 index 000000000..58ab5d582 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/NgramFilter.java @@ -0,0 +1,131 @@ +package io.kafbat.ui.service.index; + +import static org.apache.commons.lang3.Strings.CI; + +import com.google.common.cache.CacheBuilder; +import io.kafbat.ui.config.ClustersProperties; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import reactor.util.function.Tuple2; + +@Slf4j +public abstract class NgramFilter { + private final Analyzer analyzer; + private final boolean enabled; + + public NgramFilter(ClustersProperties.NgramProperties properties, boolean enabled) { + this.enabled = enabled; + this.analyzer = new ShortWordNGramAnalyzer(properties.getNgramMin(), properties.getNgramMax(), false); + } + + protected abstract List, T>> getItems(); + + private static final Map> cache = CacheBuilder.newBuilder() + .maximumSize(1_000) + .>build() + .asMap(); + + public List find(String search) { + return find(search, true); + } + + public List find(String search, boolean sort) { + if (search == null || search.isBlank()) { + return list(this.getItems().stream().map(Tuple2::getT2), sort); + } + if (!enabled) { + return list(this.getItems() + .stream() + .filter(t -> t.getT1().stream().anyMatch(s -> CI.contains(s, search))) + .map(Tuple2::getT2), sort); + } + try { + List> result = new ArrayList<>(); + List queryTokens = tokenizeString(analyzer, search); + Map queryFreq = termFreq(queryTokens); + + for (Tuple2, T> item : getItems()) { + for (String field : item.getT1()) { + List itemTokens = tokenizeString(analyzer, field); + HashSet itemTokensSet = new HashSet<>(itemTokens); + if (itemTokensSet.containsAll(queryTokens)) { + double score = cosineSimilarity(queryFreq, itemTokens); + result.add(new SearchResult<>(item.getT2(), score)); + } + } + } + if (sort) { + result.sort((o1, o2) -> Double.compare(o2.score, o1.score)); + } + return result.stream().map(r -> r.item).toList(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private List list(Stream stream, boolean sort) { + if (sort) { + return stream.sorted().toList(); + } else { + return stream.toList(); + } + } + + private record SearchResult(T item, double score) { + } + + + static List tokenizeString(Analyzer analyzer, String text) { + return cache.computeIfAbsent(text, (t) -> tokenizeStringSimple(analyzer, text)); + } + + @SneakyThrows + static List tokenizeStringSimple(Analyzer analyzer, String text) { + List tokens = new ArrayList<>(); + try (TokenStream tokenStream = analyzer.tokenStream(null, text)) { + CharTermAttribute attr = tokenStream.addAttribute(CharTermAttribute.class); + tokenStream.reset(); + while (tokenStream.incrementToken()) { + tokens.add(attr.toString()); + } + tokenStream.end(); + } + return tokens; + } + + private static double cosineSimilarity(Map queryFreq, List itemTokens) { + // Build frequency maps + Map terms = termFreq(itemTokens); + + double dot = 0.0; + double mag1 = 0.0; + double mag2 = 0.0; + + for (String term : terms.keySet()) { + int f1 = queryFreq.getOrDefault(term, 0); + int f2 = terms.getOrDefault(term, 0); + dot += f1 * f2; + mag1 += f1 * f1; + mag2 += f2 * f2; + } + + return (mag1 == 0 || mag2 == 0) ? 0.0 : dot / (Math.sqrt(mag1) * Math.sqrt(mag2)); + } + + private static Map termFreq(List tokens) { + Map freq = new HashMap<>(); + for (String token : tokens) { + freq.put(token, freq.getOrDefault(token, 0) + 1); + } + return freq; + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/PrefixQueryParser.java b/api/src/main/java/io/kafbat/ui/service/index/PrefixQueryParser.java new file mode 100644 index 000000000..2db1f7133 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/PrefixQueryParser.java @@ -0,0 +1,75 @@ +package io.kafbat.ui.service.index; + +import static org.apache.lucene.search.BoostAttribute.DEFAULT_BOOST; + +import io.kafbat.ui.service.index.TopicsIndex.FieldType; +import java.util.List; +import java.util.Optional; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.IntPoint; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.Term; +import org.apache.lucene.queryparser.classic.QueryParser; +import org.apache.lucene.search.BoostQuery; +import org.apache.lucene.search.PrefixQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TermRangeQuery; + +public class PrefixQueryParser extends QueryParser { + + public PrefixQueryParser(String field, Analyzer analyzer) { + super(field, analyzer); + } + + @Override + protected Query newRangeQuery(String field, String part1, String part2, boolean startInclusive, + boolean endInclusive) { + FieldType fieldType = Optional.ofNullable(field) + .map(TopicsIndex.FIELD_TYPES::get) + .orElse(FieldType.STRING); + + return switch (fieldType) { + case STRING, BOOLEAN -> super.newRangeQuery(field, part1, part2, startInclusive, endInclusive); + case INT -> IntPoint.newRangeQuery(field, parseInt(part1, true), parseInt(part2, false)); + case LONG -> LongPoint.newRangeQuery(field, parseLong(part1, true), parseLong(part2, false)); + }; + } + + private Integer parseInt(String value, boolean min) { + if ("*".equals(value) || value == null) { + return min ? Integer.MIN_VALUE : Integer.MAX_VALUE; + } else { + return Integer.parseInt(value); + } + } + + private Long parseLong(String value, boolean min) { + if ("*".equals(value) || value == null) { + return min ? Long.MIN_VALUE : Long.MAX_VALUE; + } else { + return Long.parseLong(value); + } + } + + @Override + protected Query newTermQuery(Term term, float boost) { + + FieldType fieldType = Optional.ofNullable(term.field()) + .map(TopicsIndex.FIELD_TYPES::get) + .orElse(FieldType.STRING); + + Query query = switch (fieldType) { + case STRING -> new PrefixQuery(term); + case INT -> IntPoint.newExactQuery(term.field(), Integer.parseInt(term.text())); + case LONG -> LongPoint.newExactQuery(term.field(), Long.parseLong(term.text())); + case BOOLEAN -> new TermQuery(term); + }; + + if (boost == DEFAULT_BOOST) { + return query; + } + return new BoostQuery(query, boost); + } + +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/SchemasFilter.java b/api/src/main/java/io/kafbat/ui/service/index/SchemasFilter.java new file mode 100644 index 000000000..bfe1516ca --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/SchemasFilter.java @@ -0,0 +1,21 @@ +package io.kafbat.ui.service.index; + +import io.kafbat.ui.config.ClustersProperties; +import java.util.Collection; +import java.util.List; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +public class SchemasFilter extends NgramFilter { + private final List, String>> subjects; + + public SchemasFilter(Collection subjects, boolean enabled, ClustersProperties.NgramProperties properties) { + super(properties, enabled); + this.subjects = subjects.stream().map(g -> Tuples.of(List.of(g), g)).toList(); + } + + @Override + protected List, String>> getItems() { + return this.subjects; + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/ShortWordAnalyzer.java b/api/src/main/java/io/kafbat/ui/service/index/ShortWordAnalyzer.java new file mode 100644 index 000000000..ee278fb09 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/ShortWordAnalyzer.java @@ -0,0 +1,33 @@ +package io.kafbat.ui.service.index; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.LowerCaseFilter; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.Tokenizer; +import org.apache.lucene.analysis.miscellaneous.WordDelimiterGraphFilter; +import org.apache.lucene.analysis.standard.StandardTokenizer; + +class ShortWordAnalyzer extends Analyzer { + + public ShortWordAnalyzer() {} + + @Override + protected TokenStreamComponents createComponents(String fieldName) { + Tokenizer tokenizer = new StandardTokenizer(); + + TokenStream tokenStream = new WordDelimiterGraphFilter( + tokenizer, + WordDelimiterGraphFilter.GENERATE_WORD_PARTS + | WordDelimiterGraphFilter.SPLIT_ON_CASE_CHANGE + | WordDelimiterGraphFilter.PRESERVE_ORIGINAL + | WordDelimiterGraphFilter.GENERATE_NUMBER_PARTS + | WordDelimiterGraphFilter.STEM_ENGLISH_POSSESSIVE, + + null + ); + + tokenStream = new LowerCaseFilter(tokenStream); + + return new TokenStreamComponents(tokenizer, tokenStream); + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/ShortWordNGramAnalyzer.java b/api/src/main/java/io/kafbat/ui/service/index/ShortWordNGramAnalyzer.java new file mode 100644 index 000000000..2bb0bcaaa --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/ShortWordNGramAnalyzer.java @@ -0,0 +1,46 @@ +package io.kafbat.ui.service.index; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.LowerCaseFilter; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.Tokenizer; +import org.apache.lucene.analysis.miscellaneous.WordDelimiterGraphFilter; +import org.apache.lucene.analysis.ngram.NGramTokenFilter; +import org.apache.lucene.analysis.standard.StandardTokenizer; + +class ShortWordNGramAnalyzer extends Analyzer { + private final int minGram; + private final int maxGram; + private final boolean preserveOriginal; + + public ShortWordNGramAnalyzer(int minGram, int maxGram) { + this(minGram, maxGram, true); + } + + public ShortWordNGramAnalyzer(int minGram, int maxGram, boolean preserveOriginal) { + this.minGram = minGram; + this.maxGram = maxGram; + this.preserveOriginal = preserveOriginal; + } + + + @Override + protected TokenStreamComponents createComponents(String fieldName) { + Tokenizer tokenizer = new StandardTokenizer(); + + TokenStream tokenStream = new WordDelimiterGraphFilter( + tokenizer, + WordDelimiterGraphFilter.GENERATE_WORD_PARTS + | WordDelimiterGraphFilter.SPLIT_ON_CASE_CHANGE + | WordDelimiterGraphFilter.STEM_ENGLISH_POSSESSIVE, + null + ); + + tokenStream = new LowerCaseFilter(tokenStream); + + // Add n-gram generation from characters (min=2, max=4) + tokenStream = new NGramTokenFilter(tokenStream, minGram, maxGram, this.preserveOriginal); + + return new TokenStreamComponents(tokenizer, tokenStream); + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/index/TopicsIndex.java b/api/src/main/java/io/kafbat/ui/service/index/TopicsIndex.java new file mode 100644 index 000000000..14fc36501 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/index/TopicsIndex.java @@ -0,0 +1,36 @@ +package io.kafbat.ui.service.index; + +import io.kafbat.ui.model.InternalTopic; +import java.util.List; +import java.util.Map; + +public interface TopicsIndex extends AutoCloseable { + String FIELD_NAME = "name"; + String FIELD_INTERNAL = "internal"; + String FIELD_PARTITIONS = "partitions"; + String FIELD_REPLICATION = "replication"; + String FIELD_SIZE = "size"; + String FIELD_CONFIG_PREFIX = "config"; + + enum FieldType { + STRING, + INT, + LONG, + BOOLEAN + } + + Map FIELD_TYPES = Map.of( + FIELD_NAME, FieldType.STRING, + FIELD_INTERNAL, FieldType.BOOLEAN, + FIELD_PARTITIONS, FieldType.INT, + FIELD_REPLICATION, FieldType.INT, + FIELD_SIZE, FieldType.LONG, + FIELD_CONFIG_PREFIX, FieldType.STRING + ); + + default List find(String search, Boolean showInternal, Integer count) { + return this.find(search, showInternal, FIELD_NAME, count); + } + + List find(String search, Boolean showInternal, String sort, Integer count); +} diff --git a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java index e5d8c059c..fe8c2e6fc 100644 --- a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java +++ b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java @@ -5,9 +5,14 @@ import static io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription; import com.google.common.collect.Table; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.InternalLogDirStats; import io.kafbat.ui.model.InternalPartitionsOffsets; +import io.kafbat.ui.model.InternalTopic; import io.kafbat.ui.service.ReactiveAdminClient; +import io.kafbat.ui.service.index.FilterTopicIndex; +import io.kafbat.ui.service.index.LuceneTopicsIndex; +import io.kafbat.ui.service.index.TopicsIndex; import jakarta.annotation.Nullable; import java.time.Instant; import java.util.HashMap; @@ -19,6 +24,7 @@ import lombok.Builder; import lombok.RequiredArgsConstructor; import lombok.Value; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; @@ -31,12 +37,21 @@ @Builder(toBuilder = true) @RequiredArgsConstructor @Value -public class ScrapedClusterState { +@Slf4j +public class ScrapedClusterState implements AutoCloseable { Instant scrapeFinishedAt; Map nodesStates; Map topicStates; Map consumerGroupsStates; + TopicsIndex topicIndex; + + @Override + public void close() throws Exception { + if (this.topicIndex != null) { + this.topicIndex.close(); + } + } public record NodeState(int id, Node node, @@ -71,7 +86,8 @@ public static ScrapedClusterState empty() { public ScrapedClusterState updateTopics(Map descriptions, Map> configs, - InternalPartitionsOffsets partitionsOffsets) { + InternalPartitionsOffsets partitionsOffsets, + ClustersProperties clustersProperties) { var updatedTopicStates = new HashMap<>(topicStates); descriptions.forEach((topic, description) -> { SegmentStats segmentStats = null; @@ -93,8 +109,10 @@ public ScrapedClusterState updateTopics(Map descriptio ) ); }); + return toBuilder() .topicStates(updatedTopicStates) + .topicIndex(buildTopicIndex(clustersProperties, updatedTopicStates)) .build(); } @@ -107,7 +125,7 @@ public ScrapedClusterState topicDeleted(String topic) { } public static Mono scrape(ClusterDescription clusterDescription, - ReactiveAdminClient ac) { + ReactiveAdminClient ac, ClustersProperties clustersProperties) { return Mono.zip( ac.describeLogDirs(clusterDescription.getNodes().stream().map(Node::id).toList()) .map(InternalLogDirStats::new), @@ -126,7 +144,8 @@ public static Mono scrape(ClusterDescription clusterDescrip phase1.getT1(), topicStateMap(phase1.getT1(), phase1.getT3(), phase1.getT4(), phase2.getT1(), phase2.getT2()), phase2.getT3(), - phase2.getT4() + phase2.getT4(), + clustersProperties ))); } @@ -157,7 +176,8 @@ private static ScrapedClusterState create(ClusterDescription clusterDescription, InternalLogDirStats segmentStats, Map topicStates, Map consumerDescriptions, - Table consumerOffsets) { + Table consumerOffsets, + ClustersProperties clustersProperties) { Map consumerGroupsStates = new HashMap<>(); consumerDescriptions.forEach((name, desc) -> @@ -184,10 +204,28 @@ private static ScrapedClusterState create(ClusterDescription clusterDescription, Instant.now(), nodesStates, topicStates, - consumerGroupsStates + consumerGroupsStates, + buildTopicIndex(clustersProperties, topicStates) ); } + private static TopicsIndex buildTopicIndex(ClustersProperties clustersProperties, + Map topicStates) { + ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts(); + List topics = topicStates.values().stream().map( + topicState -> buildInternalTopic(topicState, clustersProperties) + ).toList(); + + if (fts.isEnabled()) { + try { + return new LuceneTopicsIndex(topics); + } catch (Exception e) { + log.error("Error creating lucene topics index", e); + } + } + return new FilterTopicIndex(topics); + } + private static Map filterTopic(String topicForFilter, Map tpMap) { return tpMap.entrySet() .stream() @@ -195,5 +233,17 @@ private static Map filterTopic(String topicForFilter, Map e.getKey().partition(), Map.Entry::getValue)); } + private static InternalTopic buildInternalTopic(TopicState state, ClustersProperties clustersProperties) { + return InternalTopic.from( + state.description(), + state.configs(), + InternalPartitionsOffsets.empty(), + null, + state.segmentStats(), + state.partitionsSegmentStats(), + clustersProperties.getInternalTopicPrefix() + ); + } + } diff --git a/api/src/main/resources/application-localtest.yaml b/api/src/main/resources/application-localtest.yaml index 91b266390..89550e4e6 100644 --- a/api/src/main/resources/application-localtest.yaml +++ b/api/src/main/resources/application-localtest.yaml @@ -10,6 +10,8 @@ kafka: - name: local bootstrapServers: localhost:9092 schemaRegistry: http://localhost:8085 + fts: + enabled: true dynamic.config.enabled: true diff --git a/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java b/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java index 43cb29382..8b77e7cdb 100644 --- a/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java @@ -6,6 +6,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.controller.SchemasController; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.SchemaSubjectDTO; @@ -43,7 +44,7 @@ private void init(List subjects) { new SchemaRegistryService.SubjectWithCompatibilityLevel( new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL))); - this.controller = new SchemasController(schemaRegistryService); + this.controller = new SchemasController(schemaRegistryService, new ClustersProperties()); this.controller.setAccessControlService(new AccessControlServiceMock().getMock()); this.controller.setAuditService(mock(AuditService.class)); this.controller.setClustersStorage(clustersStorage); diff --git a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java index 3fb1a804f..9ad9c651f 100644 --- a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java @@ -1,20 +1,26 @@ package io.kafbat.ui.service; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.controller.TopicsController; import io.kafbat.ui.mapper.ClusterMapper; import io.kafbat.ui.mapper.ClusterMapperImpl; import io.kafbat.ui.model.InternalLogDirStats; +import io.kafbat.ui.model.InternalPartition; import io.kafbat.ui.model.InternalPartitionsOffsets; import io.kafbat.ui.model.InternalTopic; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.Metrics; import io.kafbat.ui.model.SortOrderDTO; +import io.kafbat.ui.model.Statistics; import io.kafbat.ui.model.TopicColumnsToSortDTO; import io.kafbat.ui.model.TopicDTO; import io.kafbat.ui.service.analyze.TopicAnalysisService; @@ -34,6 +40,7 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartitionInfo; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import reactor.core.publisher.Mono; @@ -41,30 +48,80 @@ class TopicsServicePaginationTest { private static final String LOCAL_KAFKA_CLUSTER_NAME = "local"; - private final TopicsService topicsService = Mockito.mock(TopicsService.class); + private final AdminClientService adminClientService = Mockito.mock(AdminClientService.class); + private final ReactiveAdminClient reactiveAdminClient = Mockito.mock(ReactiveAdminClient.class); private final ClustersStorage clustersStorage = Mockito.mock(ClustersStorage.class); + private final StatisticsCache statisticsCache = new StatisticsCache(clustersStorage); + private final ClustersProperties clustersProperties = new ClustersProperties(); + private final TopicsService topicsService = new TopicsService( + adminClientService, + statisticsCache, + clustersProperties + ); + + private final TopicsService mockTopicsService = Mockito.mock(TopicsService.class); private final ClusterMapper clusterMapper = new ClusterMapperImpl(); + private final AccessControlService accessControlService = new AccessControlServiceMock().getMock(); private final TopicsController topicsController = - new TopicsController(topicsService, mock(TopicAnalysisService.class), clusterMapper); + new TopicsController(mockTopicsService, mock(TopicAnalysisService.class), clusterMapper, clustersProperties); private void init(Map topicsInCache) { - + KafkaCluster kafkaCluster = buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME); + statisticsCache.replace(kafkaCluster, Statistics.empty()); + statisticsCache.update( + kafkaCluster, + topicsInCache.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, + v -> toTopicDescription(v.getValue()) + ) + ), + Map.of(), + new InternalPartitionsOffsets(Map.of()), + clustersProperties + ); + when(adminClientService.get(isA(KafkaCluster.class))).thenReturn(Mono.just(reactiveAdminClient)); + when(reactiveAdminClient.listTopics(anyBoolean())).thenReturn(Mono.just(topicsInCache.keySet())); when(clustersStorage.getClusterByName(isA(String.class))) - .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME))); - when(topicsService.getTopicsForPagination(isA(KafkaCluster.class))) - .thenReturn(Mono.just(new ArrayList<>(topicsInCache.values()))); - when(topicsService.loadTopics(isA(KafkaCluster.class), anyList())) + .thenReturn(Optional.of(kafkaCluster)); + when(mockTopicsService.getTopicsForPagination(isA(KafkaCluster.class), any(), any())) + .thenAnswer(a -> + topicsService.getTopicsForPagination( + a.getArgument(0), + a.getArgument(1), + a.getArgument(2) + ) + ); + + + when(mockTopicsService.loadTopics(isA(KafkaCluster.class), anyList())) .thenAnswer(a -> { List lst = a.getArgument(1); return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList())); }); + topicsController.setAccessControlService(accessControlService); topicsController.setAuditService(mock(AuditService.class)); topicsController.setClustersStorage(clustersStorage); } + private TopicDescription toTopicDescription(InternalTopic t) { + return new TopicDescription( + t.getName(), t.isInternal(), + t.getPartitions().values().stream().map(p -> toTopicPartitionInfo(p)).toList() + ); + } + + private TopicPartitionInfo toTopicPartitionInfo(InternalPartition p) { + return new TopicPartitionInfo( + p.getPartition(), + null, List.of(), List.of() + ); + } + + @Test void shouldListFirst25Topics() { init( diff --git a/api/src/test/java/io/kafbat/ui/service/acl/AclsServiceTest.java b/api/src/test/java/io/kafbat/ui/service/acl/AclsServiceTest.java index 109683014..517f3ce7d 100644 --- a/api/src/test/java/io/kafbat/ui/service/acl/AclsServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/acl/AclsServiceTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.CreateConsumerAclDTO; import io.kafbat.ui.model.CreateProducerAclDTO; import io.kafbat.ui.model.CreateStreamAppAclDTO; @@ -34,7 +35,7 @@ class AclsServiceTest { private final ReactiveAdminClient adminClientMock = mock(ReactiveAdminClient.class); private final AdminClientService adminClientService = mock(AdminClientService.class); - private final AclsService aclsService = new AclsService(adminClientService); + private final AclsService aclsService = new AclsService(adminClientService, new ClustersProperties()); @BeforeEach void initMocks() { diff --git a/api/src/test/java/io/kafbat/ui/service/index/ConsumerGroupsFilterTest.java b/api/src/test/java/io/kafbat/ui/service/index/ConsumerGroupsFilterTest.java new file mode 100644 index 000000000..6410007c0 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/index/ConsumerGroupsFilterTest.java @@ -0,0 +1,46 @@ +package io.kafbat.ui.service.index; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.kafbat.ui.model.InternalConsumerGroup; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.junit.jupiter.api.Test; + +class ConsumerGroupsFilterTest { + private final List names = List.of( + "connect-test-10", + "connect-test-9", + "connect-test-90", + "connect-local-file-sink", + "martech-mt0-data-integration-platform-rapi-orders", + "pmp-recon-connect.payments.crypto.ctt.agent.created.v1", + "rmd-flink.volatility.trigger.v26", + "iat-ntapi-account-dd-consumer-prod" + ); + + @Test + void testFindTopicsByName() throws Exception { + List groups = + names.stream().map(n -> new ConsumerGroupListing(n, true)).toList(); + + ConsumerGroupFilter filter = new ConsumerGroupFilter(groups); + + Map tests = Map.of( + "test 10", 1, + "test", 3, + "payment created", 1, + "test 9", 2, + "volatility 26", 1 + ); + + for (Map.Entry entry : tests.entrySet()) { + List result = filter.find(entry.getKey()); + assertThat(result).size() + .withFailMessage("Expected %d results for '%s', but got %s", entry.getValue(), entry.getKey(), result) + .isEqualTo(entry.getValue()); + } + } + +} diff --git a/api/src/test/java/io/kafbat/ui/service/index/LuceneTopicsIndexTest.java b/api/src/test/java/io/kafbat/ui/service/index/LuceneTopicsIndexTest.java new file mode 100644 index 000000000..557c2a785 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/index/LuceneTopicsIndexTest.java @@ -0,0 +1,87 @@ +package io.kafbat.ui.service.index; + +import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.model.InternalPartition; +import io.kafbat.ui.model.InternalTopic; +import io.kafbat.ui.model.InternalTopicConfig; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.Test; + +class LuceneTopicsIndexTest { + @Test + void testFindTopicsByName() throws Exception { + List topics = new ArrayList<>( + Stream.of("topic", "topic-1", "topic-2", "topic-3", + "topic-4", "topic-5", "topic-6", "topic-7", + "topic-8", "red-dog", + "sk.payment.events", + "sk.payment.events.dlq", + "sk.payment.commands", + "sk.payment.changes", + "sk.payment.stats", + "sk.currency.rates", + "audit.payment.events", + "audit.clients.state", + "audit.clients.repartitioned.status", + "reporting.payments.by.clientId", + "reporting.payments.by.currencyId" + ) + .map(s -> InternalTopic.builder().name(s).partitions(Map.of()).build()).toList()); + + topics.addAll( + List.of( + InternalTopic.builder().name("configurable").partitions(Map.of()).topicConfigs( + List.of(InternalTopicConfig.builder().name("retention").value("compact").build()) + ).build(), + InternalTopic.builder().name("multiple_parts").partitionCount(10).partitions( + IntStream.range(0, 10).mapToObj(i -> + InternalPartition.builder().partition(i).build() + ).collect(Collectors.toMap( + InternalPartition::getPartition, + Function.identity() + )) + ).build() + ) + ); + + int testTopicsCount = (int) topics.stream().filter(s -> s.getName().contains("topic")).count(); + + Map examples = Map.ofEntries( + Map.entry("topic", testTopicsCount), + Map.entry("8", 1), + Map.entry("9", 0), + Map.entry("dog red", 1), + Map.entry("topic-1", 1), + Map.entry("payment dlq", 1), + Map.entry("stats dlq", 0), + Map.entry("stat", 3), + Map.entry("changes", 1), + Map.entry("commands", 1), + Map.entry("id", 2), + Map.entry("config_retention:compact", 1), + Map.entry("partitions:10", 1), + Map.entry("partitions:{1 TO *]", 1), + Map.entry("partitions:{* TO 9]", topics.size() - 1) + ); + + SoftAssertions softly = new SoftAssertions(); + try (LuceneTopicsIndex index = new LuceneTopicsIndex(topics)) { + for (Map.Entry entry : examples.entrySet()) { + List resultAll = index.find(entry.getKey(), null, topics.size()); + softly.assertThat(resultAll.size()) + .withFailMessage("Expected %d results for '%s', but got %s", entry.getValue(), entry.getKey(), resultAll) + .isEqualTo(entry.getValue()); + } + } + softly.assertAll(); + } + +} diff --git a/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java b/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java index 6cbcf96c5..eaa1d9cbc 100644 --- a/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java +++ b/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java @@ -8,6 +8,7 @@ import com.github.victools.jsonschema.generator.SchemaGenerator; import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder; import com.github.victools.jsonschema.generator.SchemaVersion; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.controller.TopicsController; import io.kafbat.ui.mapper.ClusterMapper; import io.kafbat.ui.model.SortOrderDTO; @@ -35,7 +36,8 @@ private static SchemaGenerator schemaGenerator() { @Test void testConvertController() { TopicsController topicsController = new TopicsController( - mock(TopicsService.class), mock(TopicAnalysisService.class), mock(ClusterMapper.class) + mock(TopicsService.class), mock(TopicAnalysisService.class), mock(ClusterMapper.class), + mock(ClustersProperties.class) ); List specifications = MCP_SPECIFICATION_GENERATOR.convertTool(topicsController); diff --git a/e2e-tests/src/main/java/io/kafbat/ui/settings/drivers/WebDriver.java b/e2e-tests/src/main/java/io/kafbat/ui/settings/drivers/WebDriver.java index c884e259e..50efa30fa 100644 --- a/e2e-tests/src/main/java/io/kafbat/ui/settings/drivers/WebDriver.java +++ b/e2e-tests/src/main/java/io/kafbat/ui/settings/drivers/WebDriver.java @@ -11,8 +11,10 @@ import io.kafbat.ui.settings.BaseSource; import io.qameta.allure.Step; import io.qameta.allure.selenide.AllureSelenide; +import io.qameta.allure.selenide.LogType; import java.util.HashMap; import java.util.Map; +import java.util.logging.Level; import lombok.extern.slf4j.Slf4j; import org.openqa.selenium.chrome.ChromeOptions; @@ -97,6 +99,8 @@ public static void browserQuit() { public static void selenideLoggerSetup() { SelenideLogger.addListener("AllureSelenide", new AllureSelenide() .savePageSource(true) - .screenshots(true)); + .screenshots(true) + .enableLogs(LogType.BROWSER, Level.ALL) + ); } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d938ea0b6..15ec71b23 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -42,6 +42,7 @@ testng = '7.10.0' bonigarcia-webdrivermanager = '6.1.1' aspectj = '1.9.21' prometheus = '1.3.6' +lucene = '10.2.2' [plugins] spring-boot = { id = 'org.springframework.boot', version.ref = 'spring-boot' } @@ -149,3 +150,7 @@ prometheus-metrics-textformats = { module = 'io.prometheus:prometheus-metrics-ex prometheus-metrics-exporter-pushgateway = { module = 'io.prometheus:prometheus-metrics-exporter-pushgateway', version.ref = 'prometheus'} snappy = {module = 'org.xerial.snappy:snappy-java', version = '1.1.10.7'} + +lucene = {module = 'org.apache.lucene:lucene-core', version.ref = 'lucene'} +lucene-queryparser = {module = 'org.apache.lucene:lucene-queryparser', version.ref = 'lucene'} +lucene-analysis-common = {module = 'org.apache.lucene:lucene-analysis-common', version.ref = 'lucene'}