diff --git a/application.example.yml b/application.example.yml index 0e2a03df6..cfcfadecc 100644 --- a/application.example.yml +++ b/application.example.yml @@ -225,6 +225,9 @@ akhq: # Regexp list to filter consumer groups visible for group consumer-groups-filter-regexp: - "consumer.*" + # Regexp list to filter schemas visible for group + subjects-filter-regexp: + - "subject.*" topic-reader: # unique key name: topic-reader # Other group roles: diff --git a/docs/docs/configuration/authentifications/groups.md b/docs/docs/configuration/authentifications/groups.md index c34e4a01b..947870e07 100644 --- a/docs/docs/configuration/authentifications/groups.md +++ b/docs/docs/configuration/authentifications/groups.md @@ -13,9 +13,10 @@ Define groups with specific roles for your users * `attributes.connects-filter-regexp`: Regexp list to filter Connect tasks available for current group * `attributes.consumer-groups-filter-regexp`: Regexp list to filter Consumer Groups available for current group * `attributes.acls-filter-regexp`: Regexp list to filter acls available for current group + * `attributes.subjects-filter-regexp`: Regexp list to filter schema registry subjects available for current group ::: warning -`topics-filter-regexp`, `connects-filter-regexp`, `consumer-groups-filter-regexp` and `acls-filter-regexp` are only used when listing resources. +`topics-filter-regexp`, `connects-filter-regexp`, `consumer-groups-filter-regexp`, `acls-filter-regexp` and `subjects-filter-regexp` are only used when listing resources. If you have `topics/create` or `connect/create` roles and you try to create a resource that doesn't follow the regexp, that resource **WILL** be created. ::: diff --git a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java index d7d9a81cf..8b512c387 100644 --- a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java @@ -12,16 +12,20 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; +import io.micronaut.context.ApplicationContext; +import io.micronaut.security.authentication.Authentication; +import io.micronaut.security.utils.SecurityService; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.akhq.configs.Connection; import org.akhq.configs.SchemaRegistryType; import org.akhq.models.Schema; import org.akhq.modules.KafkaModule; +import org.akhq.utils.DefaultGroupUtils; import org.akhq.utils.PagedList; import org.akhq.utils.Pagination; import org.apache.kafka.common.serialization.Deserializer; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.*; @@ -34,6 +38,13 @@ public class SchemaRegistryRepository extends AbstractRepository { @Inject private KafkaModule kafkaModule; + + @Inject + private ApplicationContext applicationContext; + + @Inject + private DefaultGroupUtils defaultGroupUtils; + private final Map kafkaAvroDeserializers = new HashMap<>(); private final Map kafkaJsonDeserializers = new HashMap<>(); private final Map kafkaProtoDeserializers = new HashMap<>(); @@ -89,7 +100,7 @@ public List all(String clusterId, Optional search) throws IOExc return maybeRegistryRestClient.get() .getAllSubjects() .stream() - .filter(s -> isSearchMatch(search, s)) + .filter(s -> isSearchMatch(search, s) && isMatchRegex(getSubjectsFilterRegex(), s)) .sorted(Comparator.comparing(String::toLowerCase)) .collect(Collectors.toList()); } @@ -313,4 +324,35 @@ public SchemaRegistryType getSchemaRegistryType(String clusterId) { static { JacksonMapper.INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } + + + private Optional> getSubjectsFilterRegex() { + + List subjectsFilterRegex = new ArrayList<>(); + + if (applicationContext.containsBean(SecurityService.class)) { + SecurityService securityService = applicationContext.getBean(SecurityService.class); + Optional authentication = securityService.getAuthentication(); + if (authentication.isPresent()) { + Authentication auth = authentication.get(); + subjectsFilterRegex.addAll(getSubjectsFilterRegexFromAttributes(auth.getAttributes())); + } + } + // get schemas filter regex for default groups + subjectsFilterRegex.addAll(getSubjectsFilterRegexFromAttributes( + defaultGroupUtils.getDefaultAttributes() + )); + + return Optional.of(subjectsFilterRegex); + } + + @SuppressWarnings("unchecked") + private List getSubjectsFilterRegexFromAttributes(Map attributes) { + if (attributes.get("subjectsFilterRegexp") != null) { + if (attributes.get("subjectsFilterRegexp") instanceof List) { + return (List)attributes.get("subjectsFilterRegexp"); + } + } + return new ArrayList<>(); + } } diff --git a/src/test/java/org/akhq/repositories/SchemaRegistryRepositoryTest.java b/src/test/java/org/akhq/repositories/SchemaRegistryRepositoryTest.java index 600477dc3..d42e921d1 100644 --- a/src/test/java/org/akhq/repositories/SchemaRegistryRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/SchemaRegistryRepositoryTest.java @@ -2,6 +2,11 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.micronaut.context.ApplicationContext; +import io.micronaut.security.authentication.Authentication; +import io.micronaut.security.authentication.ServerAuthentication; +import io.micronaut.security.utils.DefaultSecurityService; +import io.micronaut.security.utils.SecurityService; import org.akhq.AbstractTest; import org.akhq.KafkaTestCluster; import org.akhq.models.Schema; @@ -13,18 +18,30 @@ import org.junit.jupiter.api.Test; import jakarta.inject.Inject; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.when; public class SchemaRegistryRepositoryTest extends AbstractTest { @Inject + @InjectMocks private SchemaRegistryRepository repository; + @Mock + ApplicationContext applicationContext; + public final static String SUBJECT_1 = "SCHEMA_1"; public final static org.apache.avro.Schema SCHEMA_1_V1 = SchemaBuilder .record("schema1").namespace("org.akhq") @@ -60,6 +77,11 @@ public class SchemaRegistryRepositoryTest extends AbstractTest { public final static String SUBJECT_4 = "SCHEMA_4"; public final static String SCHEMA_4 = "{\"name\":\"Schema4\",\"namespace\":\"org.akhq\",\"type\":\"record\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"]},{\"name\":\"schema3\",\"type\":\"Schema3\"}]}"; + @BeforeEach + void initMocks() { + MockitoAnnotations.initMocks(this); + } + @BeforeEach void cleanup() { try { @@ -81,6 +103,17 @@ void getAll() throws IOException, RestClientException, ExecutionException, Inter assertEquals(3, all.size()); } + @Test + void getAllWithSubjectsRegex() throws IOException, RestClientException, ExecutionException, InterruptedException { + mockApplicationContext(); + PagedList all = repository.list( + KafkaTestCluster.CLUSTER_ID, + new Pagination(100, URIBuilder.empty(), 1), + Optional.empty() + ); + assertEquals(1, all.size()); + } + @Test void getAllSearch() throws IOException, RestClientException, ExecutionException, InterruptedException { PagedList all = repository.list( @@ -168,4 +201,12 @@ void delete() throws IOException, RestClientException, ExecutionException, Inter void getDefaultConfig() throws IOException, RestClientException { assertEquals(Schema.Config.CompatibilityLevelConfig.BACKWARD, repository.getDefaultConfig(KafkaTestCluster.CLUSTER_ID).getCompatibilityLevel()); } + + private void mockApplicationContext() { + Authentication auth = new ServerAuthentication("test", List.of(), Collections.singletonMap("subjectsFilterRegexp", new ArrayList<>(Arrays.asList("stream-count.*")))); + DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class); + when(securityService.getAuthentication()).thenReturn(Optional.of(auth)); + when(applicationContext.containsBean(SecurityService.class)).thenReturn(true); + when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService); + } }