Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Support namespace-scoped informer #1409

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@
<version>4.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.1</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.core.Ordered;
import org.springframework.core.ResolvableType;
import org.springframework.stereotype.Component;

/**
* The type Kubernetes informer factory processor which basically does the following things:
Expand All @@ -41,7 +41,6 @@
* io.kubernetes.client.spring.extended.controller.annotation.KubernetesInformers}, instantiates and
* injects informers to spring context with the underlying constructing process hidden from users.
*/
@Component
public class KubernetesInformerFactoryProcessor
implements BeanDefinitionRegistryPostProcessor, Ordered {

Expand All @@ -55,6 +54,7 @@ public class KubernetesInformerFactoryProcessor
private final ApiClient apiClient;
private final SharedInformerFactory sharedInformerFactory;

@Autowired
public KubernetesInformerFactoryProcessor(
ApiClient apiClient, SharedInformerFactory sharedInformerFactory) {
this.apiClient = apiClient;
Expand Down Expand Up @@ -85,7 +85,10 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
apiClient);
SharedIndexInformer sharedIndexInformer =
sharedInformerFactory.sharedIndexInformerFor(
api, kubernetesInformer.apiTypeClass(), kubernetesInformer.resyncPeriodMillis());
api,
kubernetesInformer.apiTypeClass(),
kubernetesInformer.resyncPeriodMillis(),
kubernetesInformer.namespace());
ResolvableType informerType =
ResolvableType.forClassWithGenerics(
SharedInformer.class, kubernetesInformer.apiTypeClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

/**
* Scans and processes {@link
Expand All @@ -35,7 +34,6 @@
* <p>It will create a {@link io.kubernetes.client.extended.controller.Controller} for every
* reconciler instances registered in the spring bean-factory.
*/
@Component
public class KubernetesReconcilerProcessor implements BeanFactoryPostProcessor, Ordered {

private static final Logger log = LoggerFactory.getLogger(KubernetesReconcilerProcessor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1NamespaceList;
import io.kubernetes.client.util.Namespaces;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -58,4 +59,11 @@
* @return the long
*/
long resyncPeriodMillis() default 0;

/**
* Target namespace to list-watch, by default it will be cluster-scoped.
*
* @return the string
*/
String namespace() default Namespaces.NAMESPACE_ALL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
@Import(KubernetesInformerCreatorTest.TestConfig.class)
@SpringBootTest(classes = {KubernetesInformerCreatorTest.App.class})
public class KubernetesInformerCreatorTest {

@Rule public WireMockRule wireMockRule = new WireMockRule(8188);

@TestConfiguration
static class TestConfig {
@SpringBootApplication
@EnableAutoConfiguration
static class App {

@Bean
public ApiClient testingApiClient() {
Expand All @@ -70,12 +70,6 @@ public SharedInformerFactory sharedInformerFactory() {
return new TestSharedInformerFactory();
}

@Bean
public KubernetesInformerConfigurer kubernetesInformerConfigurer(
ApiClient apiClient, SharedInformerFactory sharedInformerFactory) {
return new KubernetesInformerConfigurer(apiClient, sharedInformerFactory);
}

@KubernetesInformers({
@KubernetesInformer(
apiTypeClass = V1Pod.class,
Expand All @@ -85,6 +79,7 @@ public KubernetesInformerConfigurer kubernetesInformerConfigurer(
@KubernetesInformer(
apiTypeClass = V1ConfigMap.class,
apiListTypeClass = V1ConfigMapList.class,
namespace = "default",
groupVersionResource =
@GroupVersionResource(
apiGroup = "",
Expand Down Expand Up @@ -138,7 +133,7 @@ public void testInformerInjection() throws InterruptedException {
.willReturn(aResponse().withStatus(200).withBody("{}")));

wireMockRule.stubFor(
get(urlMatching("^/api/v1/configmaps.*"))
get(urlMatching("^/api/v1/namespaces/default/configmaps.*"))
.withQueryParam("watch", equalTo("false"))
.willReturn(
aResponse()
Expand All @@ -150,7 +145,7 @@ public void testInformerInjection() throws InterruptedException {
.metadata(new V1ListMeta().resourceVersion("0"))
.items(Arrays.asList(bar1))))));
wireMockRule.stubFor(
get(urlMatching("^/api/v1/configmaps.*"))
get(urlMatching("^/api/v1/namespaces/default/configmaps.*"))
.withQueryParam("watch", equalTo("true"))
.willReturn(aResponse().withStatus(200).withBody("{}")));

Expand All @@ -165,10 +160,10 @@ public void testInformerInjection() throws InterruptedException {
getRequestedFor(urlPathEqualTo("/api/v1/pods")).withQueryParam("watch", equalTo("true")));
verify(
1,
getRequestedFor(urlPathEqualTo("/api/v1/configmaps"))
getRequestedFor(urlPathEqualTo("/api/v1/namespaces/default/configmaps"))
.withQueryParam("watch", equalTo("false")));
verify(
getRequestedFor(urlPathEqualTo("/api/v1/configmaps"))
getRequestedFor(urlPathEqualTo("/api/v1/namespaces/default/configmaps"))
.withQueryParam("watch", equalTo("true")));

assertEquals(1, podLister.list().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
*/
package io.kubernetes.client.spring.extended.controller;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

import com.github.tomakehurst.wiremock.junit.WireMockRule;
import io.kubernetes.client.common.KubernetesObject;
Expand All @@ -29,18 +27,24 @@
import io.kubernetes.client.informer.cache.DeltaFIFO;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1ConfigMapList;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.spring.extended.controller.annotation.AddWatchEventFilter;
import io.kubernetes.client.spring.extended.controller.annotation.DeleteWatchEventFilter;
import io.kubernetes.client.spring.extended.controller.annotation.GroupVersionResource;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesInformer;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesInformers;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconciler;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerReadyFunc;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWatch;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWatches;
import io.kubernetes.client.spring.extended.controller.annotation.UpdateWatchEventFilter;
import io.kubernetes.client.spring.extended.controller.factory.KubernetesControllerFactory;
import io.kubernetes.client.util.ClientBuilder;
import java.util.LinkedList;
import java.util.function.Function;
import javax.annotation.Resource;
Expand All @@ -49,21 +53,48 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
@Import(KubernetesInformerCreatorTest.TestConfig.class)
@SpringBootTest(classes = {KubernetesReconcilerCreatorTest.App.class})
public class KubernetesReconcilerCreatorTest {

@Rule public WireMockRule wireMockRule = new WireMockRule(8189);

@TestConfiguration
static class TestConfig {
@SpringBootApplication
@EnableAutoConfiguration
static class App {
@Bean
public ApiClient testingApiClient() {
ApiClient apiClient = new ClientBuilder().setBasePath("http://localhost:" + 8188).build();
return apiClient;
}

@Bean
public SharedInformerFactory sharedInformerFactory() {
return new KubernetesInformerCreatorTest.App.TestSharedInformerFactory();
}

@KubernetesInformers({
@KubernetesInformer(
apiTypeClass = V1Pod.class,
apiListTypeClass = V1PodList.class,
groupVersionResource =
@GroupVersionResource(apiGroup = "", apiVersion = "v1", resourcePlural = "pods")),
@KubernetesInformer(
apiTypeClass = V1ConfigMap.class,
apiListTypeClass = V1ConfigMapList.class,
groupVersionResource =
@GroupVersionResource(
apiGroup = "",
apiVersion = "v1",
resourcePlural = "configmaps")),
})
static class TestSharedInformerFactory extends SharedInformerFactory {}

@Bean
public TestReconciler testReconciler() {
Expand Down

This file was deleted.

5 changes: 5 additions & 0 deletions util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.util.CallGenerator;
import io.kubernetes.client.util.CallGeneratorParams;
import io.kubernetes.client.util.Namespaces;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.Watchable;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
Expand Down Expand Up @@ -162,7 +163,31 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi,
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis) {
ListerWatcher<ApiType, ApiListType> listerWatcher = listerWatcherFor(genericKubernetesApi);
return sharedIndexInformerFor(
genericKubernetesApi, apiTypeClass, resyncPeriodInMillis, Namespaces.NAMESPACE_ALL);
}

/**
* Working the same as {@link SharedInformerFactory#sharedIndexInformerFor} above.
*
* <p>Constructs and returns a shared index informer for a specific namespace.
*
* @param <ApiType> the type parameter
* @param <ApiListType> the type parameter
* @param genericKubernetesApi the generic kubernetes api
* @param apiTypeClass the api type class
* @param resyncPeriodInMillis the resync period in millis
* @param namespace the target namespace
* @return the shared index informer
*/
public synchronized <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
SharedIndexInformer<ApiType> sharedIndexInformerFor(
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi,
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis,
String namespace) {
ListerWatcher<ApiType, ApiListType> listerWatcher =
listerWatcherFor(genericKubernetesApi, namespace);
return sharedIndexInformerFor(listerWatcher, apiTypeClass, resyncPeriodInMillis);
}

Expand Down Expand Up @@ -197,28 +222,46 @@ public Watch<ApiType> watch(CallGeneratorParams params) throws ApiException {

private <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
ListerWatcher<ApiType, ApiListType> listerWatcherFor(
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi) {
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi, String namespace) {
if (apiClient.getReadTimeout() > 0) {
// set read timeout zero to ensure client doesn't time out
apiClient.setReadTimeout(0);
}
// TODO: it seems read timeout is determined by genericKubernetesApi instead of above apiClient.
return new ListerWatcher<ApiType, ApiListType>() {
public ApiListType list(CallGeneratorParams params) throws ApiException {
return genericKubernetesApi
.list(
new ListOptions() {
{
setResourceVersion(params.resourceVersion);
setTimeoutSeconds(params.timeoutSeconds);
}
})
.throwsApiException()
.getObject();
if (Namespaces.NAMESPACE_ALL.equals(namespace)) {
return genericKubernetesApi
.list(
new ListOptions() {
{
setResourceVersion(params.resourceVersion);
setTimeoutSeconds(params.timeoutSeconds);
}
})
.throwsApiException()
.getObject();
} else {
return genericKubernetesApi
.list(
namespace,
new ListOptions() {
{
setResourceVersion(params.resourceVersion);
setTimeoutSeconds(params.timeoutSeconds);
}
})
.throwsApiException()
.getObject();
}
}

public Watchable<ApiType> watch(CallGeneratorParams params) throws ApiException {
return genericKubernetesApi.watch();
if (Namespaces.NAMESPACE_ALL.equals(namespace)) {
return genericKubernetesApi.watch();
} else {
return genericKubernetesApi.watch(namespace);
}
}
};
}
Expand Down
Loading