Skip to content

Commit

Permalink
Conflict Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gimral committed Feb 23, 2024
1 parent 7667df1 commit 545f0e3
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.provectus.kafka.ui.model.TopicCreationDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
Expand Down Expand Up @@ -327,6 +328,34 @@ public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterNam
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates(String clusterName,
String topicName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW)
.operationName("getActiveProducerStates")
.build();

Comparator<TopicProducerStateDTO> ordering =
Comparator.comparingInt(TopicProducerStateDTO::getPartition)
.thenComparing(Comparator.comparing(TopicProducerStateDTO::getProducerId).reversed());

Flux<TopicProducerStateDTO> states = topicsService.getActiveProducersState(getCluster(clusterName), topicName)
.flatMapMany(statesMap ->
Flux.fromStream(
statesMap.entrySet().stream()
.flatMap(e -> e.getValue().stream().map(p -> clusterMapper.map(e.getKey().partition(), p)))
.sorted(ordering)));

return validateAccess(context)
.thenReturn(states)
.map(ResponseEntity::ok)
.doOnEach(sig -> audit(context, sig));
}

private Comparator<InternalTopic> getComparatorForTopic(
TopicColumnsToSortDTO orderBy) {
var defaultComparator = Comparator.comparing(InternalTopic::getName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
Expand Down Expand Up @@ -117,6 +119,17 @@ default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBroke
return brokerDiskUsage;
}

default TopicProducerStateDTO map(int partition, ProducerState state) {
return new TopicProducerStateDTO()
.partition(partition)
.producerId(state.producerId())
.producerEpoch(state.producerEpoch())
.lastSequence(state.lastSequence())
.lastTimestampMs(state.lastTimestamp())
.coordinatorEpoch(state.coordinatorEpoch().stream().boxed().findAny().orElse(null))
.currentTransactionStartOffset(state.currentTransactionStartOffset().stream().boxed().findAny().orElse(null));
}

static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
return switch (operation) {
case ALL -> KafkaAclDTO.OperationEnum.ALL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.AccessLevel;
Expand All @@ -55,6 +56,7 @@
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -658,6 +660,21 @@ public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replica
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
}

// returns tp -> list of active producer's states (if any)
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
return describeTopic(topic)
.map(td -> client.describeProducers(
IntStream.range(0, td.partitions().size())
.mapToObj(i -> new TopicPartition(topic, i))
.toList()
).all()
)
.flatMap(ReactiveAdminClient::toMono)
.map(map -> map.entrySet().stream()
.filter(e -> !e.getValue().activeProducers().isEmpty()) // skipping partitions without producers
.collect(toMap(Map.Entry::getKey, e -> e.getValue().activeProducers())));
}

private Mono<Void> incrementalAlterConfig(String topicName,
List<ConfigEntry> currentConfigs,
Map<String, String> newConfigs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -459,6 +460,11 @@ public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster) {
);
}

public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {
return adminClientService.get(cluster)
.flatMap(ac -> ac.getActiveProducersState(topic));
}

private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
return adminClientService.get(cluster)
.flatMap(ac -> ac.listTopics(true))
Expand Down

0 comments on commit 545f0e3

Please sign in to comment.