From fa9547b95a4f40ea28be8868f0ad169c53ac7208 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Fri, 11 Aug 2023 16:41:07 +0400 Subject: [PATCH] BE: Controllers structure minor refactr (#4110) Audit & access controll services moved to AbstractController --- .../ui/controller/AbstractController.java | 27 +++++++++- .../kafka/ui/controller/AclsController.java | 36 ++++++------- .../ApplicationConfigController.java | 22 ++++---- .../ui/controller/BrokersController.java | 29 +++++----- .../ui/controller/ClustersController.java | 16 +++--- .../controller/ConsumerGroupsController.java | 24 ++++----- .../ui/controller/KafkaConnectController.java | 46 ++++++++-------- .../kafka/ui/controller/KsqlController.java | 18 +++---- .../ui/controller/MessagesController.java | 18 +++---- .../ui/controller/SchemasController.java | 46 ++++++++-------- .../kafka/ui/controller/TopicsController.java | 54 +++++++++---------- .../service/SchemaRegistryPaginationTest.java | 5 +- .../service/TopicsServicePaginationTest.java | 6 ++- 13 files changed, 167 insertions(+), 180 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AbstractController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AbstractController.java index fd323d55a14..e4dbb3cfcf7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AbstractController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AbstractController.java @@ -2,12 +2,19 @@ import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.service.ClustersStorage; +import com.provectus.kafka.ui.service.audit.AuditService; +import com.provectus.kafka.ui.service.rbac.AccessControlService; import org.springframework.beans.factory.annotation.Autowired; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Signal; public abstract class AbstractController { - private ClustersStorage clustersStorage; + protected ClustersStorage clustersStorage; + protected AccessControlService accessControlService; + protected AuditService auditService; protected KafkaCluster getCluster(String name) { return clustersStorage.getClusterByName(name) @@ -15,8 +22,26 @@ protected KafkaCluster getCluster(String name) { String.format("Cluster with name '%s' not found", name))); } + protected Mono validateAccess(AccessContext context) { + return accessControlService.validateAccess(context); + } + + protected void audit(AccessContext acxt, Signal sig) { + auditService.audit(acxt, sig); + } + @Autowired public void setClustersStorage(ClustersStorage clustersStorage) { this.clustersStorage = clustersStorage; } + + @Autowired + public void setAccessControlService(AccessControlService accessControlService) { + this.accessControlService = accessControlService; + } + + @Autowired + public void setAuditService(AuditService auditService) { + this.auditService = auditService; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java index 71700e3f7b7..2ba0add5be0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java @@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.service.acl.AclsService; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.Optional; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.resource.PatternType; @@ -29,8 +27,6 @@ public class AclsController extends AbstractController implements AclsApi { private final AclsService aclsService; - private final AccessControlService accessControlService; - private final AuditService auditService; @Override public Mono> createAcl(String clusterName, Mono kafkaAclDto, @@ -41,11 +37,11 @@ public Mono> createAcl(String clusterName, Mono aclsService.createAcl(getCluster(clusterName), binding)) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } @@ -58,11 +54,11 @@ public Mono> deleteAcl(String clusterName, Mono aclsService.deleteAcl(getCluster(clusterName), binding)) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } @@ -88,12 +84,12 @@ public Mono>> listAcls(String clusterName, var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( Mono.just( ResponseEntity.ok( aclsService.listAcls(getCluster(clusterName), filter) .map(ClusterMapper::toKafkaAclDto))) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -104,11 +100,11 @@ public Mono> getAclAsCsv(String clusterName, ServerWebExc .operationName("getAclAsCsv") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( aclsService.getAclAsCsvString(getCluster(clusterName)) .map(ResponseEntity::ok) .flatMap(Mono::just) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) ); } @@ -120,10 +116,10 @@ public Mono> syncAclsCsv(String clusterName, Mono c .operationName("syncAclsCsv") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(csvMono) .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv)) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } @@ -137,10 +133,10 @@ public Mono> createConsumerAcl(String clusterName, .operationName("createConsumerAcl") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(createConsumerAclDto) .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req)) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } @@ -154,10 +150,10 @@ public Mono> createProducerAcl(String clusterName, .operationName("createProducerAcl") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(createProducerAclDto) .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req)) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } @@ -171,10 +167,10 @@ public Mono> createStreamAppAcl(String clusterName, .operationName("createStreamAppAcl") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(createStreamAppAclDto) .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req)) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java index 5f03c9ab5c6..480d62b1784 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java @@ -15,8 +15,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.service.ApplicationInfoService; import com.provectus.kafka.ui.service.KafkaClusterFactory; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import com.provectus.kafka.ui.util.ApplicationRestarter; import com.provectus.kafka.ui.util.DynamicConfigOperations; import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure; @@ -39,7 +37,7 @@ @Slf4j @RestController @RequiredArgsConstructor -public class ApplicationConfigController implements ApplicationConfigApi { +public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi { private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class); @@ -51,12 +49,10 @@ interface PropertiesMapper { ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure); } - private final AccessControlService accessControlService; private final DynamicConfigOperations dynamicConfigOperations; private final ApplicationRestarter restarter; private final KafkaClusterFactory kafkaClusterFactory; private final ApplicationInfoService applicationInfoService; - private final AuditService auditService; @Override public Mono> getApplicationInfo(ServerWebExchange exchange) { @@ -69,12 +65,12 @@ public Mono> getCurrentConfig(ServerWebExch .applicationConfigActions(VIEW) .operationName("getCurrentConfig") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(Mono.fromSupplier(() -> ResponseEntity.ok( new ApplicationConfigDTO() .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties())) ))) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -84,14 +80,14 @@ public Mono> restartWithConfig(Mono rest .applicationConfigActions(EDIT) .operationName("restartWithConfig") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(restartRequestDto) .>map(dto -> { dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties())); restarter.requestRestart(); return ResponseEntity.ok().build(); }) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -101,13 +97,13 @@ public Mono> uploadConfigRelatedFile(Flux dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file) .map(path -> new UploadedFileInfoDTO().location(path.toString())) .map(ResponseEntity::ok)) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -117,7 +113,7 @@ public Mono> validateConfig(Mono< .applicationConfigActions(EDIT) .operationName("validateConfig") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(configDto) .flatMap(config -> { PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties()); @@ -126,7 +122,7 @@ public Mono> validateConfig(Mono< .map(validations -> new ApplicationConfigValidationDTO().clusters(validations)); }) .map(ResponseEntity::ok) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } private Mono> validateClustersConfig( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java index 81965179fcc..a3209ce7227 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java @@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.service.BrokerService; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -31,9 +29,6 @@ public class BrokersController extends AbstractController implements BrokersApi private final BrokerService brokerService; private final ClusterMapper clusterMapper; - private final AuditService auditService; - private final AccessControlService accessControlService; - @Override public Mono>> getBrokers(String clusterName, ServerWebExchange exchange) { @@ -43,9 +38,9 @@ public Mono>> getBrokers(String clusterName, .build(); var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto); - return accessControlService.validateAccess(context) + return validateAccess(context) .thenReturn(ResponseEntity.ok(job)) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -57,14 +52,14 @@ public Mono> getBrokersMetrics(String clusterNa .operationParams(Map.of("id", id)) .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then( brokerService.getBrokerMetrics(getCluster(clusterName), id) .map(clusterMapper::toBrokerMetrics) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()) ) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -80,10 +75,10 @@ public Mono>> getAllBrokersLogdirs(String .operationParams(Map.of("brokerIds", brokerIds)) .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .thenReturn(ResponseEntity.ok( brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds))) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -97,11 +92,11 @@ public Mono>> getBrokerConfig(String cluste .operationParams(Map.of("brokerId", id)) .build(); - return accessControlService.validateAccess(context).thenReturn( + return validateAccess(context).thenReturn( ResponseEntity.ok( brokerService.getBrokerConfig(getCluster(clusterName), id) .map(clusterMapper::toBrokerConfig)) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -116,11 +111,11 @@ public Mono> updateBrokerTopicPartitionLogDir(String cluste .operationParams(Map.of("brokerId", id)) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( brokerLogdir .flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld)) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -136,11 +131,11 @@ public Mono> updateBrokerConfigByName(String clusterName, .operationParams(Map.of("brokerId", id)) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( brokerConfig .flatMap(bci -> brokerService.updateBrokerConfigByName( getCluster(clusterName), id, name, bci.getValue())) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java index 20c64ed01f4..7b12b30644f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java @@ -6,8 +6,6 @@ import com.provectus.kafka.ui.model.ClusterStatsDTO; import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.service.ClusterService; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; @@ -21,8 +19,6 @@ @Slf4j public class ClustersController extends AbstractController implements ClustersApi { private final ClusterService clusterService; - private final AccessControlService accessControlService; - private final AuditService auditService; @Override public Mono>> getClusters(ServerWebExchange exchange) { @@ -40,13 +36,13 @@ public Mono> getClusterMetrics(String clusterN .operationName("getClusterMetrics") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then( clusterService.getClusterMetrics(getCluster(clusterName)) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()) ) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -57,13 +53,13 @@ public Mono> getClusterStats(String clusterName, .operationName("getClusterStats") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then( clusterService.getClusterStats(getCluster(clusterName)) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()) ) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -75,8 +71,8 @@ public Mono> updateClusterInfo(String clusterName, .operationName("updateClusterInfo") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok)) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index 4f4574725f2..aac43855afe 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -19,8 +19,6 @@ import com.provectus.kafka.ui.model.rbac.permission.TopicAction; import com.provectus.kafka.ui.service.ConsumerGroupService; import com.provectus.kafka.ui.service.OffsetsResetService; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; @@ -42,8 +40,6 @@ public class ConsumerGroupsController extends AbstractController implements Cons private final ConsumerGroupService consumerGroupService; private final OffsetsResetService offsetsResetService; - private final AccessControlService accessControlService; - private final AuditService auditService; @Value("${consumer.groups.page.size:25}") private int defaultConsumerGroupsPageSize; @@ -59,9 +55,9 @@ public Mono> deleteConsumerGroup(String clusterName, .operationName("deleteConsumerGroup") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id)) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } @@ -76,11 +72,11 @@ public Mono> getConsumerGroup(String clu .operationName("getConsumerGroup") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId) .map(ConsumerGroupMapper::toDetailsDto) .map(ResponseEntity::ok)) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -104,9 +100,9 @@ public Mono>> getTopicConsumerGroups(Strin .map(ResponseEntity::ok) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(job) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -125,7 +121,7 @@ public Mono> getConsumerGroupsPage .operationName("getConsumerGroupsPage") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( consumerGroupService.getConsumerGroupsPage( getCluster(clusterName), Optional.ofNullable(page).filter(i -> i > 0).orElse(1), @@ -136,7 +132,7 @@ public Mono> getConsumerGroupsPage ) .map(this::convertPage) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -191,9 +187,9 @@ public Mono> resetConsumerGroupOffsets(String clusterName, } }; - return accessControlService.validateAccess(context) + return validateAccess(context) .then(mono.get()) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); }).thenReturn(ResponseEntity.ok().build()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java index 8f08472fa1d..9d2376c22f0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java @@ -18,8 +18,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.service.KafkaConnectService; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.Comparator; import java.util.Map; import java.util.Set; @@ -40,8 +38,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS); private final KafkaConnectService kafkaConnectService; - private final AccessControlService accessControlService; - private final AuditService auditService; @Override public Mono>> getConnects(String clusterName, @@ -64,9 +60,9 @@ public Mono>> getConnectors(String clusterName, Stri .operationName("getConnectors") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -81,10 +77,10 @@ public Mono> createConnector(String clusterName, St .operationName("createConnector") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -100,10 +96,10 @@ public Mono> getConnector(String clusterName, Strin .operationName("getConnector") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -119,10 +115,10 @@ public Mono> deleteConnector(String clusterName, String con .operationParams(Map.of("connectorName", connectName)) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @@ -150,7 +146,7 @@ public Mono>> getAllConnectors( .sort(comparator); return Mono.just(ResponseEntity.ok(job)) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -166,11 +162,11 @@ public Mono>> getConnectorConfig(String clust .operationName("getConnectorConfig") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( kafkaConnectService .getConnectorConfig(getCluster(clusterName), connectName, connectorName) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -187,11 +183,11 @@ public Mono> setConnectorConfig(String clusterName, .operationParams(Map.of("connectorName", connectorName)) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( kafkaConnectService .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody) .map(ResponseEntity::ok)) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -214,11 +210,11 @@ public Mono> updateConnectorState(String clusterName, Strin .operationParams(Map.of("connectorName", connectorName)) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( kafkaConnectService .updateConnectorState(getCluster(clusterName), connectName, connectorName, action) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -234,11 +230,11 @@ public Mono>> getConnectorTasks(String clusterName, .operationParams(Map.of("connectorName", connectorName)) .build(); - return accessControlService.validateAccess(context).thenReturn( + return validateAccess(context).thenReturn( ResponseEntity .ok(kafkaConnectService .getConnectorTasks(getCluster(clusterName), connectName, connectorName)) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -254,11 +250,11 @@ public Mono> restartConnectorTask(String clusterName, Strin .operationParams(Map.of("connectorName", connectorName)) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( kafkaConnectService .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -272,11 +268,11 @@ public Mono>> getConnectorPlugins( .operationName("getConnectorPlugins") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( Mono.just( ResponseEntity.ok( kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName))) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java index 312727fe792..c15f36488ec 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java @@ -9,9 +9,7 @@ import com.provectus.kafka.ui.model.KsqlTableResponseDTO; import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.permission.KsqlAction; -import com.provectus.kafka.ui.service.audit.AuditService; import com.provectus.kafka.ui.service.ksql.KsqlServiceV2; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.List; import java.util.Map; import java.util.Optional; @@ -29,8 +27,6 @@ public class KsqlController extends AbstractController implements KsqlApi { private final KsqlServiceV2 ksqlServiceV2; - private final AccessControlService accessControlService; - private final AuditService auditService; @Override public Mono> executeKsql(String clusterName, @@ -44,13 +40,13 @@ public Mono> executeKsql(String cluster .operationName("executeKsql") .operationParams(command) .build(); - return accessControlService.validateAccess(context).thenReturn( + return validateAccess(context).thenReturn( new KsqlCommandV2ResponseDTO().pipeId( ksqlServiceV2.registerCommand( getCluster(clusterName), command.getKsql(), Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of())))) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } ) .map(ResponseEntity::ok); @@ -66,7 +62,7 @@ public Mono>> openKsqlResponsePipe(String c .operationName("openKsqlResponsePipe") .build(); - return accessControlService.validateAccess(context).thenReturn( + return validateAccess(context).thenReturn( ResponseEntity.ok(ksqlServiceV2.execute(pipeId) .map(table -> new KsqlResponseDTO() .table( @@ -86,9 +82,9 @@ public Mono>> listStreams(String c .operationName("listStreams") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName)))) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -100,8 +96,8 @@ public Mono>> listTables(String clu .operationName("listTables") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName)))) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index 00eae8a7c71..32d341e6134 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -24,8 +24,6 @@ import com.provectus.kafka.ui.model.rbac.permission.TopicAction; import com.provectus.kafka.ui.service.DeserializationService; import com.provectus.kafka.ui.service.MessagesService; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.List; import java.util.Map; import java.util.Optional; @@ -49,8 +47,6 @@ public class MessagesController extends AbstractController implements MessagesAp private final MessagesService messagesService; private final DeserializationService deserializationService; - private final AccessControlService accessControlService; - private final AuditService auditService; @Override public Mono> deleteTopicMessages( @@ -63,13 +59,13 @@ public Mono> deleteTopicMessages( .topicActions(MESSAGES_DELETE) .build(); - return accessControlService.validateAccess(context).>then( + return validateAccess(context).>then( messagesService.deleteTopicMessages( getCluster(clusterName), topicName, Optional.ofNullable(partitions).orElse(List.of()) ).thenReturn(ResponseEntity.ok().build()) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -120,9 +116,9 @@ public Mono>> getTopicMessages(String ); var context = contextBuilder.build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(job) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -137,11 +133,11 @@ public Mono> sendTopicMessages( .operationName("sendTopicMessages") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( createTopicMessage.flatMap(msg -> messagesService.sendMessage(getCluster(clusterName), topicName, msg).then() ).map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } /** @@ -192,7 +188,7 @@ public Mono> getSerdes(String clusterNam ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE) : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE)); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( Mono.just(dto) .subscribeOn(Schedulers.boundedElastic()) .map(ResponseEntity::ok) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java index 1c6a07abc20..a6cee97fd3a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java @@ -13,8 +13,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.permission.SchemaAction; import com.provectus.kafka.ui.service.SchemaRegistryService; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -38,8 +36,6 @@ public class SchemasController extends AbstractController implements SchemasApi private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl(); private final SchemaRegistryService schemaRegistryService; - private final AccessControlService accessControlService; - private final AuditService auditService; @Override protected KafkaCluster getCluster(String clusterName) { @@ -61,7 +57,7 @@ public Mono> checkSchemaCompatibil .operationName("checkSchemaCompatibility") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( newSchemaSubjectMono.flatMap(subjectDTO -> schemaRegistryService.checksSchemaCompatibility( getCluster(clusterName), @@ -70,7 +66,7 @@ public Mono> checkSchemaCompatibil )) .map(kafkaSrMapper::toDto) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -83,7 +79,7 @@ public Mono> createNewSchema( .operationName("createNewSchema") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( newSchemaSubjectMono.flatMap(newSubject -> schemaRegistryService.registerNewSchema( getCluster(clusterName), @@ -92,7 +88,7 @@ public Mono> createNewSchema( ) ).map(kafkaSrMapper::toDto) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -105,9 +101,9 @@ public Mono> deleteLatestSchema( .operationName("deleteLatestSchema") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()) ); } @@ -122,9 +118,9 @@ public Mono> deleteSchema( .operationName("deleteSchema") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()) ); } @@ -139,9 +135,9 @@ public Mono> deleteSchemaByVersion( .operationName("deleteSchemaByVersion") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()) ); } @@ -160,9 +156,9 @@ public Mono>> getAllVersionsBySubject( schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName) .map(kafkaSrMapper::toDto); - return accessControlService.validateAccess(context) + return validateAccess(context) .thenReturn(ResponseEntity.ok(schemas)) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -185,11 +181,11 @@ public Mono> getLatestSchema(String clusterName .operationName("getLatestSchema") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject) .map(kafkaSrMapper::toDto) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -203,12 +199,12 @@ public Mono> getSchemaByVersion( .operationParams(Map.of("subject", subject, "version", version)) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( schemaRegistryService.getSchemaSubjectByVersion( getCluster(clusterName), subject, version) .map(kafkaSrMapper::toDto) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -244,7 +240,7 @@ public Mono> getSchemas(String cluster .map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList()) .map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs)); }).map(ResponseEntity::ok) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -257,14 +253,14 @@ public Mono> updateGlobalSchemaCompatibilityLevel( .operationName("updateGlobalSchemaCompatibilityLevel") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( compatibilityLevelMono .flatMap(compatibilityLevelDTO -> schemaRegistryService.updateGlobalSchemaCompatibility( getCluster(clusterName), kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility()) )) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()) ); } @@ -280,7 +276,7 @@ public Mono> updateSchemaCompatibilityLevel( .operationParams(Map.of("subject", subject)) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( compatibilityLevelMono .flatMap(compatibilityLevelDTO -> schemaRegistryService.updateSchemaCompatibility( @@ -288,7 +284,7 @@ public Mono> updateSchemaCompatibilityLevel( subject, kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility()) )) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index 5ec0fbc642e..d422a2cbc68 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -27,8 +27,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.service.TopicsService; import com.provectus.kafka.ui.service.analyze.TopicAnalysisService; -import com.provectus.kafka.ui.service.audit.AuditService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -53,8 +51,6 @@ public class TopicsController extends AbstractController implements TopicsApi { private final TopicsService topicsService; private final TopicAnalysisService topicAnalysisService; private final ClusterMapper clusterMapper; - private final AccessControlService accessControlService; - private final AuditService auditService; @Override public Mono> createTopic( @@ -67,12 +63,12 @@ public Mono> createTopic( .operationParams(topicCreation) .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(topicsService.createTopic(getCluster(clusterName), topicCreation)) .map(clusterMapper::toTopic) .map(s -> new ResponseEntity<>(s, HttpStatus.OK)) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); }); } @@ -86,11 +82,11 @@ public Mono> recreateTopic(String clusterName, .operationName("recreateTopic") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( topicsService.recreateTopic(getCluster(clusterName), topicName) .map(clusterMapper::toTopic) .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED)) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -105,11 +101,11 @@ public Mono> cloneTopic( .operationParams(Map.of("newTopicName", newTopicName)) .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName) .map(clusterMapper::toTopic) .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED)) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -123,11 +119,11 @@ public Mono> deleteTopic( .operationName("deleteTopic") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then( topicsService.deleteTopic(getCluster(clusterName), topicName) .thenReturn(ResponseEntity.ok().build()) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @@ -142,7 +138,7 @@ public Mono>> getTopicConfigs( .operationName("getTopicConfigs") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( topicsService.getTopicConfigs(getCluster(clusterName), topicName) .map(lst -> lst.stream() .map(InternalTopicConfig::from) @@ -150,7 +146,7 @@ public Mono>> getTopicConfigs( .collect(toList())) .map(Flux::fromIterable) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -164,11 +160,11 @@ public Mono> getTopicDetails( .operationName("getTopicDetails") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( topicsService.getTopicDetails(getCluster(clusterName), topicName) .map(clusterMapper::toTopicDetails) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -215,7 +211,7 @@ public Mono> getTopics(String clusterName, .pageCount(totalPages)); }) .map(ResponseEntity::ok) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } @Override @@ -230,12 +226,12 @@ public Mono> updateTopic( .operationName("updateTopic") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( topicsService .updateTopic(getCluster(clusterName), topicName, topicUpdate) .map(clusterMapper::toTopic) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -250,11 +246,11 @@ public Mono> increaseTopicPartitio .topicActions(VIEW, EDIT) .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( partitionsIncrease.flatMap(partitions -> topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions) ).map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -270,12 +266,12 @@ public Mono> changeReplicatio .operationName("changeReplicationFactor") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( replicationFactorChange .flatMap(rfc -> topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc)) .map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -288,9 +284,9 @@ public Mono> analyzeTopic(String clusterName, String topicN .operationName("analyzeTopic") .build(); - return accessControlService.validateAccess(context).then( + return validateAccess(context).then( topicAnalysisService.analyze(getCluster(clusterName), topicName) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()) ); } @@ -305,9 +301,9 @@ public Mono> cancelTopicAnalysis(String clusterName, String .operationName("cancelTopicAnalysis") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName))) - .doOnEach(sig -> auditService.audit(context, sig)) + .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } @@ -324,11 +320,11 @@ public Mono> getTopicAnalysis(String clusterNam .operationName("getTopicAnalysis") .build(); - return accessControlService.validateAccess(context) + return validateAccess(context) .thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName) .map(ResponseEntity::ok) .orElseGet(() -> ResponseEntity.notFound().build())) - .doOnEach(sig -> auditService.audit(context, sig)); + .doOnEach(sig -> audit(context, sig)); } private Comparator getComparatorForTopic( diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java index 295eeccbb3c..cba0f58fae7 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java @@ -42,8 +42,9 @@ private void init(List subjects) { new SchemaRegistryService.SubjectWithCompatibilityLevel( new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL))); - this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock(), - mock(AuditService.class)); + this.controller = new SchemasController(schemaRegistryService); + this.controller.setAccessControlService(new AccessControlServiceMock().getMock()); + this.controller.setAuditService(mock(AuditService.class)); this.controller.setClustersStorage(clustersStorage); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java index 97f193022a5..3a6cebc8347 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java @@ -45,8 +45,8 @@ class TopicsServicePaginationTest { private final ClusterMapper clusterMapper = new ClusterMapperImpl(); private final AccessControlService accessControlService = new AccessControlServiceMock().getMock(); - private final TopicsController topicsController = new TopicsController( - topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService, mock(AuditService.class)); + private final TopicsController topicsController = + new TopicsController(topicsService, mock(TopicAnalysisService.class), clusterMapper); private void init(Map topicsInCache) { @@ -59,6 +59,8 @@ private void init(Map topicsInCache) { List lst = a.getArgument(1); return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList())); }); + topicsController.setAccessControlService(accessControlService); + topicsController.setAuditService(mock(AuditService.class)); topicsController.setClustersStorage(clustersStorage); }