Skip to content

Commit

Permalink
feature: support namespace-scoped informer
Browse files Browse the repository at this point in the history
  • Loading branch information
yue9944882 committed Dec 3, 2020
1 parent 3cf16a9 commit 17ddba6
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 88 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@
<version>4.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.1</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.core.Ordered;
import org.springframework.core.ResolvableType;
import org.springframework.stereotype.Component;

/**
* The type Kubernetes informer factory processor which basically does the following things:
Expand All @@ -41,7 +41,6 @@
* io.kubernetes.client.spring.extended.controller.annotation.KubernetesInformers}, instantiates and
* injects informers to spring context with the underlying constructing process hidden from users.
*/
@Component
public class KubernetesInformerFactoryProcessor
implements BeanDefinitionRegistryPostProcessor, Ordered {

Expand All @@ -55,6 +54,7 @@ public class KubernetesInformerFactoryProcessor
private final ApiClient apiClient;
private final SharedInformerFactory sharedInformerFactory;

@Autowired
public KubernetesInformerFactoryProcessor(
ApiClient apiClient, SharedInformerFactory sharedInformerFactory) {
this.apiClient = apiClient;
Expand Down Expand Up @@ -85,7 +85,10 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
apiClient);
SharedIndexInformer sharedIndexInformer =
sharedInformerFactory.sharedIndexInformerFor(
api, kubernetesInformer.apiTypeClass(), kubernetesInformer.resyncPeriodMillis());
api,
kubernetesInformer.apiTypeClass(),
kubernetesInformer.resyncPeriodMillis(),
kubernetesInformer.namespace());
ResolvableType informerType =
ResolvableType.forClassWithGenerics(
SharedInformer.class, kubernetesInformer.apiTypeClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

/**
* Scans and processes {@link
Expand All @@ -35,7 +34,6 @@
* <p>It will create a {@link io.kubernetes.client.extended.controller.Controller} for every
* reconciler instances registered in the spring bean-factory.
*/
@Component
public class KubernetesReconcilerProcessor implements BeanFactoryPostProcessor, Ordered {

private static final Logger log = LoggerFactory.getLogger(KubernetesReconcilerProcessor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1NamespaceList;
import io.kubernetes.client.util.Namespaces;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -58,4 +59,11 @@
* @return the long
*/
long resyncPeriodMillis() default 0;

/**
* Target namespace to list-watch, by default it will be cluster-scoped.
*
* @return the string
*/
String namespace() default Namespaces.NAMESPACE_ALL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
@Import(KubernetesInformerCreatorTest.TestConfig.class)
@SpringBootTest(classes = {KubernetesInformerCreatorTest.App.class})
public class KubernetesInformerCreatorTest {

@Rule public WireMockRule wireMockRule = new WireMockRule(8188);

@TestConfiguration
static class TestConfig {
@SpringBootApplication
@EnableAutoConfiguration
static class App {

@Bean
public ApiClient testingApiClient() {
Expand All @@ -70,12 +70,6 @@ public SharedInformerFactory sharedInformerFactory() {
return new TestSharedInformerFactory();
}

@Bean
public KubernetesInformerConfigurer kubernetesInformerConfigurer(
ApiClient apiClient, SharedInformerFactory sharedInformerFactory) {
return new KubernetesInformerConfigurer(apiClient, sharedInformerFactory);
}

@KubernetesInformers({
@KubernetesInformer(
apiTypeClass = V1Pod.class,
Expand All @@ -85,6 +79,7 @@ public KubernetesInformerConfigurer kubernetesInformerConfigurer(
@KubernetesInformer(
apiTypeClass = V1ConfigMap.class,
apiListTypeClass = V1ConfigMapList.class,
namespace = "default",
groupVersionResource =
@GroupVersionResource(
apiGroup = "",
Expand Down Expand Up @@ -138,7 +133,7 @@ public void testInformerInjection() throws InterruptedException {
.willReturn(aResponse().withStatus(200).withBody("{}")));

wireMockRule.stubFor(
get(urlMatching("^/api/v1/configmaps.*"))
get(urlMatching("^/api/v1/namespaces/default/configmaps.*"))
.withQueryParam("watch", equalTo("false"))
.willReturn(
aResponse()
Expand All @@ -150,7 +145,7 @@ public void testInformerInjection() throws InterruptedException {
.metadata(new V1ListMeta().resourceVersion("0"))
.items(Arrays.asList(bar1))))));
wireMockRule.stubFor(
get(urlMatching("^/api/v1/configmaps.*"))
get(urlMatching("^/api/v1/namespaces/default/configmaps.*"))
.withQueryParam("watch", equalTo("true"))
.willReturn(aResponse().withStatus(200).withBody("{}")));

Expand All @@ -165,10 +160,10 @@ public void testInformerInjection() throws InterruptedException {
getRequestedFor(urlPathEqualTo("/api/v1/pods")).withQueryParam("watch", equalTo("true")));
verify(
1,
getRequestedFor(urlPathEqualTo("/api/v1/configmaps"))
getRequestedFor(urlPathEqualTo("/api/v1/namespaces/default/configmaps"))
.withQueryParam("watch", equalTo("false")));
verify(
getRequestedFor(urlPathEqualTo("/api/v1/configmaps"))
getRequestedFor(urlPathEqualTo("/api/v1/namespaces/default/configmaps"))
.withQueryParam("watch", equalTo("true")));

assertEquals(1, podLister.list().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
*/
package io.kubernetes.client.spring.extended.controller;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

import com.github.tomakehurst.wiremock.junit.WireMockRule;
import io.kubernetes.client.common.KubernetesObject;
Expand All @@ -29,18 +27,24 @@
import io.kubernetes.client.informer.cache.DeltaFIFO;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1ConfigMapList;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.spring.extended.controller.annotation.AddWatchEventFilter;
import io.kubernetes.client.spring.extended.controller.annotation.DeleteWatchEventFilter;
import io.kubernetes.client.spring.extended.controller.annotation.GroupVersionResource;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesInformer;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesInformers;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconciler;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerReadyFunc;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWatch;
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWatches;
import io.kubernetes.client.spring.extended.controller.annotation.UpdateWatchEventFilter;
import io.kubernetes.client.spring.extended.controller.factory.KubernetesControllerFactory;
import io.kubernetes.client.util.ClientBuilder;
import java.util.LinkedList;
import java.util.function.Function;
import javax.annotation.Resource;
Expand All @@ -49,21 +53,48 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
@Import(KubernetesInformerCreatorTest.TestConfig.class)
@SpringBootTest(classes = {KubernetesReconcilerCreatorTest.App.class})
public class KubernetesReconcilerCreatorTest {

@Rule public WireMockRule wireMockRule = new WireMockRule(8189);

@TestConfiguration
static class TestConfig {
@SpringBootApplication
@EnableAutoConfiguration
static class App {
@Bean
public ApiClient testingApiClient() {
ApiClient apiClient = new ClientBuilder().setBasePath("http://localhost:" + 8188).build();
return apiClient;
}

@Bean
public SharedInformerFactory sharedInformerFactory() {
return new KubernetesInformerCreatorTest.App.TestSharedInformerFactory();
}

@KubernetesInformers({
@KubernetesInformer(
apiTypeClass = V1Pod.class,
apiListTypeClass = V1PodList.class,
groupVersionResource =
@GroupVersionResource(apiGroup = "", apiVersion = "v1", resourcePlural = "pods")),
@KubernetesInformer(
apiTypeClass = V1ConfigMap.class,
apiListTypeClass = V1ConfigMapList.class,
groupVersionResource =
@GroupVersionResource(
apiGroup = "",
apiVersion = "v1",
resourcePlural = "configmaps")),
})
static class TestSharedInformerFactory extends SharedInformerFactory {}

@Bean
public TestReconciler testReconciler() {
Expand Down

This file was deleted.

5 changes: 5 additions & 0 deletions util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.util.CallGenerator;
import io.kubernetes.client.util.CallGeneratorParams;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.Watchable;
import io.kubernetes.client.util.*;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -162,7 +159,31 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi,
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis) {
ListerWatcher<ApiType, ApiListType> listerWatcher = listerWatcherFor(genericKubernetesApi);
return sharedIndexInformerFor(
genericKubernetesApi, apiTypeClass, resyncPeriodInMillis, Namespaces.NAMESPACE_ALL);
}

/**
* Working the same as {@link SharedInformerFactory#sharedIndexInformerFor} above.
*
* <p>Constructs and returns a shared index informer for a specific namespace.
*
* @param <ApiType> the type parameter
* @param <ApiListType> the type parameter
* @param genericKubernetesApi the generic kubernetes api
* @param apiTypeClass the api type class
* @param resyncPeriodInMillis the resync period in millis
* @param namespace the target namespace
* @return the shared index informer
*/
public synchronized <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
SharedIndexInformer<ApiType> sharedIndexInformerFor(
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi,
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis,
String namespace) {
ListerWatcher<ApiType, ApiListType> listerWatcher =
listerWatcherFor(genericKubernetesApi, namespace);
return sharedIndexInformerFor(listerWatcher, apiTypeClass, resyncPeriodInMillis);
}

Expand Down Expand Up @@ -197,28 +218,46 @@ public Watch<ApiType> watch(CallGeneratorParams params) throws ApiException {

private <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
ListerWatcher<ApiType, ApiListType> listerWatcherFor(
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi) {
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi, String namespace) {
if (apiClient.getReadTimeout() > 0) {
// set read timeout zero to ensure client doesn't time out
apiClient.setReadTimeout(0);
}
// TODO: it seems read timeout is determined by genericKubernetesApi instead of above apiClient.
return new ListerWatcher<ApiType, ApiListType>() {
public ApiListType list(CallGeneratorParams params) throws ApiException {
return genericKubernetesApi
.list(
new ListOptions() {
{
setResourceVersion(params.resourceVersion);
setTimeoutSeconds(params.timeoutSeconds);
}
})
.throwsApiException()
.getObject();
if (Namespaces.NAMESPACE_ALL.equals(namespace)) {
return genericKubernetesApi
.list(
new ListOptions() {
{
setResourceVersion(params.resourceVersion);
setTimeoutSeconds(params.timeoutSeconds);
}
})
.throwsApiException()
.getObject();
} else {
return genericKubernetesApi
.list(
namespace,
new ListOptions() {
{
setResourceVersion(params.resourceVersion);
setTimeoutSeconds(params.timeoutSeconds);
}
})
.throwsApiException()
.getObject();
}
}

public Watchable<ApiType> watch(CallGeneratorParams params) throws ApiException {
return genericKubernetesApi.watch();
if (Namespaces.NAMESPACE_ALL.equals(namespace)) {
return genericKubernetesApi.watch();
} else {
return genericKubernetesApi.watch(namespace);
}
}
};
}
Expand Down
Loading

0 comments on commit 17ddba6

Please sign in to comment.