Skip to content

Commit

Permalink
BE: Controllers structure minor refactr (#4110)
Browse files Browse the repository at this point in the history
Audit & access controll services moved to AbstractController
  • Loading branch information
iliax authored Aug 11, 2023
1 parent d915de4 commit fa9547b
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,46 @@

import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

public abstract class AbstractController {

private ClustersStorage clustersStorage;
protected ClustersStorage clustersStorage;
protected AccessControlService accessControlService;
protected AuditService auditService;

protected KafkaCluster getCluster(String name) {
return clustersStorage.getClusterByName(name)
.orElseThrow(() -> new ClusterNotFoundException(
String.format("Cluster with name '%s' not found", name)));
}

protected Mono<Void> validateAccess(AccessContext context) {
return accessControlService.validateAccess(context);
}

protected void audit(AccessContext acxt, Signal<?> sig) {
auditService.audit(acxt, sig);
}

@Autowired
public void setClustersStorage(ClustersStorage clustersStorage) {
this.clustersStorage = clustersStorage;
}

@Autowired
public void setAccessControlService(AccessControlService accessControlService) {
this.accessControlService = accessControlService;
}

@Autowired
public void setAuditService(AuditService auditService) {
this.auditService = auditService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.service.acl.AclsService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.resource.PatternType;
Expand All @@ -29,8 +27,6 @@
public class AclsController extends AbstractController implements AclsApi {

private final AclsService aclsService;
private final AccessControlService accessControlService;
private final AuditService auditService;

@Override
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
Expand All @@ -41,11 +37,11 @@ public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO
.operationName("createAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -58,11 +54,11 @@ public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO
.operationName("deleteAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -88,12 +84,12 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,

var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);

return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter)
.map(ClusterMapper::toKafkaAclDto)))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -104,11 +100,11 @@ public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExc
.operationName("getAclAsCsv")
.build();

return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
);
}

Expand All @@ -120,10 +116,10 @@ public Mono<ResponseEntity<Void>> syncAclsCsv(String clusterName, Mono<String> c
.operationName("syncAclsCsv")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(csvMono)
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -137,10 +133,10 @@ public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
.operationName("createConsumerAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createConsumerAclDto)
.flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -154,10 +150,10 @@ public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
.operationName("createProducerAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createProducerAclDto)
.flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -171,10 +167,10 @@ public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
.operationName("createStreamAppAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createStreamAppAclDto)
.flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ApplicationInfoService;
import com.provectus.kafka.ui.service.KafkaClusterFactory;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationRestarter;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
Expand All @@ -39,7 +37,7 @@
@Slf4j
@RestController
@RequiredArgsConstructor
public class ApplicationConfigController implements ApplicationConfigApi {
public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {

private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);

Expand All @@ -51,12 +49,10 @@ interface PropertiesMapper {
ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
}

private final AccessControlService accessControlService;
private final DynamicConfigOperations dynamicConfigOperations;
private final ApplicationRestarter restarter;
private final KafkaClusterFactory kafkaClusterFactory;
private final ApplicationInfoService applicationInfoService;
private final AuditService auditService;

@Override
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
Expand All @@ -69,12 +65,12 @@ public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExch
.applicationConfigActions(VIEW)
.operationName("getCurrentConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
new ApplicationConfigDTO()
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
)))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -84,14 +80,14 @@ public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> rest
.applicationConfigActions(EDIT)
.operationName("restartWithConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(restartRequestDto)
.<ResponseEntity<Void>>map(dto -> {
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
restarter.requestRestart();
return ResponseEntity.ok().build();
})
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -101,13 +97,13 @@ public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Pa
.applicationConfigActions(EDIT)
.operationName("uploadConfigRelatedFile")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(fileFlux.single())
.flatMap(file ->
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -117,7 +113,7 @@ public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<
.applicationConfigActions(EDIT)
.operationName("validateConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(configDto)
.flatMap(config -> {
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
Expand All @@ -126,7 +122,7 @@ public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
})
.map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.service.BrokerService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand All @@ -31,9 +29,6 @@ public class BrokersController extends AbstractController implements BrokersApi
private final BrokerService brokerService;
private final ClusterMapper clusterMapper;

private final AuditService auditService;
private final AccessControlService accessControlService;

@Override
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
ServerWebExchange exchange) {
Expand All @@ -43,9 +38,9 @@ public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
.build();

var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(job))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -57,14 +52,14 @@ public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterNa
.operationParams(Map.of("id", id))
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -80,10 +75,10 @@ public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String
.operationParams(Map.of("brokerIds", brokerIds))
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -97,11 +92,11 @@ public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String cluste
.operationParams(Map.of("brokerId", id))
.build();

return accessControlService.validateAccess(context).thenReturn(
return validateAccess(context).thenReturn(
ResponseEntity.ok(
brokerService.getBrokerConfig(getCluster(clusterName), id)
.map(clusterMapper::toBrokerConfig))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -116,11 +111,11 @@ public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(String cluste
.operationParams(Map.of("brokerId", id))
.build();

return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
brokerLogdir
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -136,11 +131,11 @@ public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
.operationParams(Map.of("brokerId", id))
.build();

return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
brokerConfig
.flatMap(bci -> brokerService.updateBrokerConfigByName(
getCluster(clusterName), id, name, bci.getValue()))
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
}
Loading

0 comments on commit fa9547b

Please sign in to comment.