Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Implement audit log level #4103

Merged
merged 4 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,13 @@ public static class AuditProperties {
Integer auditTopicsPartitions;
Boolean topicAuditEnabled;
Boolean consoleAuditEnabled;
LogLevel level;
Map<String, String> auditTopicProperties;

public enum LogLevel {
ALL,
ALTER_ONLY //default
}
}

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum AclAction implements PermissibleAction {

VIEW,
EDIT;
EDIT

;

public static final Set<AclAction> ALTER_ACTIONS = Set.of(EDIT);

@Nullable
public static AclAction fromString(String name) {
return EnumUtils.getEnum(AclAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -10,9 +11,15 @@ public enum ApplicationConfigAction implements PermissibleAction {

;

public static final Set<ApplicationConfigAction> ALTER_ACTIONS = Set.of(EDIT);

@Nullable
public static ApplicationConfigAction fromString(String name) {
return EnumUtils.getEnum(ApplicationConfigAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum AuditAction implements PermissibleAction {

VIEW;
VIEW

;

private static final Set<AuditAction> ALTER_ACTIONS = Set.of();

@Nullable
public static AuditAction fromString(String name) {
return EnumUtils.getEnum(AuditAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -10,9 +11,15 @@ public enum ClusterConfigAction implements PermissibleAction {

;

public static final Set<ClusterConfigAction> ALTER_ACTIONS = Set.of(EDIT);

@Nullable
public static ClusterConfigAction fromString(String name) {
return EnumUtils.getEnum(ClusterConfigAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -12,9 +13,15 @@ public enum ConnectAction implements PermissibleAction {

;

public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, RESTART);

@Nullable
public static ConnectAction fromString(String name) {
return EnumUtils.getEnum(ConnectAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum ConsumerGroupAction implements PermissibleAction {

VIEW,
DELETE,

RESET_OFFSETS

;

public static final Set<ConsumerGroupAction> ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS);

@Nullable
public static ConsumerGroupAction fromString(String name) {
return EnumUtils.getEnum(ConsumerGroupAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum KsqlAction implements PermissibleAction {

EXECUTE;
EXECUTE

;

public static final Set<KsqlAction> ALTER_ACTIONS = Set.of(EXECUTE);

@Nullable
public static KsqlAction fromString(String name) {
return EnumUtils.getEnum(KsqlAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ public sealed interface PermissibleAction permits
ConsumerGroupAction, SchemaAction,
ConnectAction, ClusterConfigAction,
KsqlAction, TopicAction, AuditAction {

String name();

boolean isAlter();

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -13,9 +14,15 @@ public enum SchemaAction implements PermissibleAction {

;

public static final Set<SchemaAction> ALTER_ACTIONS = Set.of(CREATE, DELETE, EDIT, MODIFY_GLOBAL_COMPATIBILITY);

@Nullable
public static SchemaAction fromString(String name) {
return EnumUtils.getEnum(SchemaAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -9,16 +10,21 @@ public enum TopicAction implements PermissibleAction {
CREATE,
EDIT,
DELETE,

MESSAGES_READ,
MESSAGES_PRODUCE,
MESSAGES_DELETE,

;

public static final Set<TopicAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, MESSAGES_PRODUCE, MESSAGES_DELETE);

@Nullable
public static TopicAction fromString(String name) {
return EnumUtils.getEnum(TopicAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Resource;
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -33,33 +34,37 @@ String toJson() {
return MAPPER.writeValueAsString(this);
}

record AuditResource(String accessType, Resource type, @Nullable Object id) {
record AuditResource(String accessType, boolean alter, Resource type, @Nullable Object id) {

private static AuditResource create(PermissibleAction action, Resource type, @Nullable Object id) {
return new AuditResource(action.name(), action.isAlter(), type, id);
}

static List<AuditResource> getAccessedResources(AccessContext ctx) {
List<AuditResource> resources = new ArrayList<>();
ctx.getClusterConfigActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CLUSTERCONFIG, null)));
.forEach(a -> resources.add(create(a, Resource.CLUSTERCONFIG, null)));
ctx.getTopicActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.TOPIC, nameId(ctx.getTopic()))));
.forEach(a -> resources.add(create(a, Resource.TOPIC, nameId(ctx.getTopic()))));
ctx.getConsumerGroupActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
.forEach(a -> resources.add(create(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
ctx.getConnectActions()
.forEach(a -> {
Map<String, String> resourceId = new LinkedHashMap<>();
resourceId.put("connect", ctx.getConnect());
if (ctx.getConnector() != null) {
resourceId.put("connector", ctx.getConnector());
}
resources.add(new AuditResource(a.name(), Resource.CONNECT, resourceId));
resources.add(create(a, Resource.CONNECT, resourceId));
});
ctx.getSchemaActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.SCHEMA, nameId(ctx.getSchema()))));
.forEach(a -> resources.add(create(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
ctx.getKsqlActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.KSQL, null)));
.forEach(a -> resources.add(create(a, Resource.KSQL, null)));
ctx.getAclActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.ACL, null)));
.forEach(a -> resources.add(create(a, Resource.ACL, null)));
ctx.getAuditAction()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.AUDIT, null)));
.forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
return resources;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.service.audit;

import static com.provectus.kafka.ui.config.ClustersProperties.AuditProperties.LogLevel.ALTER_ONLY;
import static com.provectus.kafka.ui.service.MessagesService.createProducer;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -80,12 +81,13 @@ static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
}
boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
boolean alterLogOnly = Optional.ofNullable(auditProps.getLevel()).map(lvl -> lvl == ALTER_ONLY).orElse(true);
if (!topicAudit && !consoleAudit) {
return Optional.empty();
}
if (!topicAudit) {
log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
return Optional.of(consoleOnlyWriter(cluster));
return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
}
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
Expand All @@ -95,23 +97,24 @@ static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
"Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
cluster.getName()
);
return Optional.of(consoleOnlyWriter(cluster));
return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
}
return Optional.empty();
}
log.info("Audit initialization finished for cluster '{}'", cluster.getName());
return Optional.of(
new AuditWriter(
cluster.getName(),
alterLogOnly,
auditTopicName,
producerFactory.get(),
consoleAudit ? AUDIT_LOGGER : null
)
);
}

private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
private static AuditWriter consoleOnlyWriter(KafkaCluster cluster, boolean alterLogOnly) {
return new AuditWriter(cluster.getName(), alterLogOnly, null, null, AUDIT_LOGGER);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

@Slf4j
record AuditWriter(String clusterName,
boolean logAlterOperationsOnly,
@Nullable String targetTopic,
@Nullable KafkaProducer<byte[], byte[]> producer,
@Nullable Logger consoleLogger) implements Closeable {
Expand All @@ -39,6 +40,10 @@ void write(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
}

private void write(AuditRecord rec) {
if (logAlterOperationsOnly && rec.resources().stream().noneMatch(AuditResource::alter)) {
//we should only log alter operations, but this is read-only op
return;
}
String json = rec.toJson();
if (consoleLogger != null) {
consoleLogger.info(json);
Expand Down
Loading