Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(registry): regexp filter on schema registry subjects #1505

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ akhq:
# Regexp list to filter consumer groups visible for group
consumer-groups-filter-regexp:
- "consumer.*"
# Regexp list to filter schemas visible for group
subjects-filter-regexp:
- "subject.*"
topic-reader: # unique key
name: topic-reader # Other group
roles:
Expand Down
3 changes: 2 additions & 1 deletion docs/docs/configuration/authentifications/groups.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ Define groups with specific roles for your users
* `attributes.connects-filter-regexp`: Regexp list to filter Connect tasks available for current group
* `attributes.consumer-groups-filter-regexp`: Regexp list to filter Consumer Groups available for current group
* `attributes.acls-filter-regexp`: Regexp list to filter acls available for current group
* `attributes.subjects-filter-regexp`: Regexp list to filter schema registry subjects available for current group

::: warning
`topics-filter-regexp`, `connects-filter-regexp`, `consumer-groups-filter-regexp` and `acls-filter-regexp` are only used when listing resources.
`topics-filter-regexp`, `connects-filter-regexp`, `consumer-groups-filter-regexp`, `acls-filter-regexp` and `subjects-filter-regexp` are only used when listing resources.
If you have `topics/create` or `connect/create` roles and you try to create a resource that doesn't follow the regexp, that resource **WILL** be created.
:::

Expand Down
48 changes: 45 additions & 3 deletions src/main/java/org/akhq/repositories/SchemaRegistryRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.micronaut.context.ApplicationContext;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.akhq.configs.Connection;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.models.Schema;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.DefaultGroupUtils;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;
import org.apache.kafka.common.serialization.Deserializer;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
Expand All @@ -34,6 +38,13 @@ public class SchemaRegistryRepository extends AbstractRepository {

@Inject
private KafkaModule kafkaModule;

@Inject
private ApplicationContext applicationContext;

@Inject
private DefaultGroupUtils defaultGroupUtils;

private final Map<String, Deserializer> kafkaAvroDeserializers = new HashMap<>();
private final Map<String, Deserializer> kafkaJsonDeserializers = new HashMap<>();
private final Map<String, Deserializer> kafkaProtoDeserializers = new HashMap<>();
Expand Down Expand Up @@ -89,7 +100,7 @@ public List<String> all(String clusterId, Optional<String> search) throws IOExc
return maybeRegistryRestClient.get()
.getAllSubjects()
.stream()
.filter(s -> isSearchMatch(search, s))
.filter(s -> isSearchMatch(search, s) && isMatchRegex(getSubjectsFilterRegex(), s))
.sorted(Comparator.comparing(String::toLowerCase))
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -313,4 +324,35 @@ public SchemaRegistryType getSchemaRegistryType(String clusterId) {
static {
JacksonMapper.INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}


private Optional<List<String>> getSubjectsFilterRegex() {

List<String> subjectsFilterRegex = new ArrayList<>();

if (applicationContext.containsBean(SecurityService.class)) {
SecurityService securityService = applicationContext.getBean(SecurityService.class);
Optional<Authentication> authentication = securityService.getAuthentication();
if (authentication.isPresent()) {
Authentication auth = authentication.get();
subjectsFilterRegex.addAll(getSubjectsFilterRegexFromAttributes(auth.getAttributes()));
}
}
// get schemas filter regex for default groups
subjectsFilterRegex.addAll(getSubjectsFilterRegexFromAttributes(
defaultGroupUtils.getDefaultAttributes()
));

return Optional.of(subjectsFilterRegex);
}

@SuppressWarnings("unchecked")
private List<String> getSubjectsFilterRegexFromAttributes(Map<String, Object> attributes) {
if (attributes.get("subjectsFilterRegexp") != null) {
if (attributes.get("subjectsFilterRegexp") instanceof List) {
return (List<String>)attributes.get("subjectsFilterRegexp");
}
}
return new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.authentication.ServerAuthentication;
import io.micronaut.security.utils.DefaultSecurityService;
import io.micronaut.security.utils.SecurityService;
import org.akhq.AbstractTest;
import org.akhq.KafkaTestCluster;
import org.akhq.models.Schema;
Expand All @@ -13,18 +18,30 @@
import org.junit.jupiter.api.Test;

import jakarta.inject.Inject;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.when;

public class SchemaRegistryRepositoryTest extends AbstractTest {
@Inject
@InjectMocks
private SchemaRegistryRepository repository;

@Mock
ApplicationContext applicationContext;

public final static String SUBJECT_1 = "SCHEMA_1";
public final static org.apache.avro.Schema SCHEMA_1_V1 = SchemaBuilder
.record("schema1").namespace("org.akhq")
Expand Down Expand Up @@ -60,6 +77,11 @@ public class SchemaRegistryRepositoryTest extends AbstractTest {
public final static String SUBJECT_4 = "SCHEMA_4";
public final static String SCHEMA_4 = "{\"name\":\"Schema4\",\"namespace\":\"org.akhq\",\"type\":\"record\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"]},{\"name\":\"schema3\",\"type\":\"Schema3\"}]}";

@BeforeEach
void initMocks() {
MockitoAnnotations.initMocks(this);
}

@BeforeEach
void cleanup() {
try {
Expand All @@ -81,6 +103,17 @@ void getAll() throws IOException, RestClientException, ExecutionException, Inter
assertEquals(3, all.size());
}

@Test
void getAllWithSubjectsRegex() throws IOException, RestClientException, ExecutionException, InterruptedException {
mockApplicationContext();
PagedList<Schema> all = repository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.empty()
);
assertEquals(1, all.size());
}

@Test
void getAllSearch() throws IOException, RestClientException, ExecutionException, InterruptedException {
PagedList<Schema> all = repository.list(
Expand Down Expand Up @@ -168,4 +201,12 @@ void delete() throws IOException, RestClientException, ExecutionException, Inter
void getDefaultConfig() throws IOException, RestClientException {
assertEquals(Schema.Config.CompatibilityLevelConfig.BACKWARD, repository.getDefaultConfig(KafkaTestCluster.CLUSTER_ID).getCompatibilityLevel());
}

private void mockApplicationContext() {
Authentication auth = new ServerAuthentication("test", List.of(), Collections.singletonMap("subjectsFilterRegexp", new ArrayList<>(Arrays.asList("stream-count.*"))));
DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
when(securityService.getAuthentication()).thenReturn(Optional.of(auth));
when(applicationContext.containsBean(SecurityService.class)).thenReturn(true);
when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService);
}
}