Skip to content

Commit

Permalink
#1629 Added injection of exception handler into shared informer.
Browse files Browse the repository at this point in the history
  • Loading branch information
cbuschka committed Nov 18, 2021
1 parent f383ab4 commit eb8e28b
Show file tree
Hide file tree
Showing 8 changed files with 559 additions and 354 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.common.KubernetesListObject;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.cache.Cache;
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import okhttp3.Call;
import org.apache.commons.collections4.MapUtils;

Expand Down Expand Up @@ -140,8 +142,19 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
ListerWatcher<ApiType, ApiListType> listerWatcher,
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis) {
return sharedIndexInformerFor(listerWatcher, apiTypeClass, resyncPeriodInMillis, null);
}

public synchronized <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
SharedIndexInformer<ApiType> sharedIndexInformerFor(
ListerWatcher<ApiType, ApiListType> listerWatcher,
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {

SharedIndexInformer<ApiType> informer =
new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis);
new DefaultSharedIndexInformer<>(
apiTypeClass, listerWatcher, resyncPeriodInMillis, new Cache<>(), exceptionHandler);
this.informers.putIfAbsent(TypeToken.get(apiTypeClass).getType(), informer);
return informer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -65,19 +66,34 @@ public class Controller<

private ScheduledFuture reflectorFuture;

/* visible for testing */ BiConsumer<Class<ApiType>, Throwable> exceptionHandler;

public Controller(
Class<ApiType> apiTypeClass,
DeltaFIFO queue,
ListerWatcher<ApiType, ApiListType> listerWatcher,
Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> processFunc,
Supplier<Boolean> resyncFunc,
long fullResyncPeriod) {
this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, null);
}

public Controller(
Class<ApiType> apiTypeClass,
DeltaFIFO queue,
ListerWatcher<ApiType, ApiListType> listerWatcher,
Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> processFunc,
Supplier<Boolean> resyncFunc,
long fullResyncPeriod,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {

this.queue = queue;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
this.processFunc = processFunc;
this.resyncFunc = resyncFunc;
this.fullResyncPeriod = fullResyncPeriod;
this.exceptionHandler = exceptionHandler;

// starts one daemon thread for reflector
this.reflectExecutor =
Expand Down Expand Up @@ -113,7 +129,7 @@ public void run() {

synchronized (this) {
// TODO(yue9944882): proper naming for reflector
reflector = new ReflectorRunnable<ApiType, ApiListType>(apiTypeClass, listerWatcher, queue);
reflector = newReflector();
try {
reflectorFuture =
reflectExecutor.scheduleWithFixedDelay(
Expand All @@ -130,6 +146,10 @@ public void run() {
this.processLoop();
}

/* visible for testing */ ReflectorRunnable<ApiType, ApiListType> newReflector() {
return new ReflectorRunnable<>(apiTypeClass, listerWatcher, queue, exceptionHandler);
}

/** stops the resync thread pool firstly, then stop the reflector */
public void stop() {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,25 @@ public class ReflectorRunnable<

private AtomicBoolean isActive = new AtomicBoolean(true);

private final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;
/* visible for testing */ final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;

public ReflectorRunnable(
Class<ApiType> apiTypeClass, ListerWatcher listerWatcher, DeltaFIFO store) {
this(apiTypeClass, listerWatcher, store, ReflectorRunnable::defaultWatchErrorHandler);
Class<ApiType> apiTypeClass,
ListerWatcher<ApiType, ApiListType> listerWatcher,
DeltaFIFO store) {
this(apiTypeClass, listerWatcher, store, null);
}

public ReflectorRunnable(
Class<ApiType> apiTypeClass,
ListerWatcher listerWatcher,
ListerWatcher<ApiType, ApiListType> listerWatcher,
DeltaFIFO store,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
this.listerWatcher = listerWatcher;
this.store = store;
this.apiTypeClass = apiTypeClass;
this.exceptionHandler = exceptionHandler;
this.exceptionHandler =
exceptionHandler == null ? ReflectorRunnable::defaultWatchErrorHandler : exceptionHandler;
}

/**
Expand Down Expand Up @@ -277,7 +280,7 @@ private void watchHandler(Watchable<ApiType> watch) {
}
}

private static <ApiType extends KubernetesObject> void defaultWatchErrorHandler(
static <ApiType extends KubernetesObject> void defaultWatchErrorHandler(
Class<ApiType> watchingApiTypeClass, Throwable t) {
log.error(String.format("%s#Reflector loop failed unexpectedly", watchingApiTypeClass), t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -82,7 +83,28 @@ public DefaultSharedIndexInformer(
new DeltaFIFO(
(Function<KubernetesObject, String>) cache.getKeyFunc(),
(Cache<KubernetesObject>) cache),
cache);
cache,
null);
}

public DefaultSharedIndexInformer(
Class<ApiType> apiTypeClass,
ListerWatcher<ApiType, ApiListType> listerWatcher,
long resyncPeriod,
Cache<ApiType> cache,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
this(
apiTypeClass,
listerWatcher,
resyncPeriod,
// down-casting should be safe here because one delta FIFO instance only serves one
// resource
// type
new DeltaFIFO(
(Function<KubernetesObject, String>) cache.getKeyFunc(),
(Cache<KubernetesObject>) cache),
cache,
exceptionHandler);
}

public DefaultSharedIndexInformer(
Expand All @@ -91,19 +113,31 @@ public DefaultSharedIndexInformer(
long resyncPeriod,
DeltaFIFO deltaFIFO,
Indexer<ApiType> indexer) {
this(apiTypeClass, listerWatcher, resyncPeriod, deltaFIFO, indexer, null);
}

public DefaultSharedIndexInformer(
Class<ApiType> apiTypeClass,
ListerWatcher<ApiType, ApiListType> listerWatcher,
long resyncPeriod,
DeltaFIFO deltaFIFO,
Indexer<ApiType> indexer,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {

this.resyncCheckPeriodMillis = resyncPeriod;
this.defaultEventHandlerResyncPeriod = resyncPeriod;

this.processor = new SharedProcessor<>();
this.indexer = indexer;
this.controller =
new Controller<ApiType, ApiListType>(
new Controller<>(
apiTypeClass,
deltaFIFO,
listerWatcher,
this::handleDeltas,
processor::shouldResync,
resyncCheckPeriodMillis);
resyncCheckPeriodMillis,
exceptionHandler);

controllerThread =
new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.junit.Assert.*;

import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.EventType;
import io.kubernetes.client.informer.ListerWatcher;
import io.kubernetes.client.openapi.models.V1ListMeta;
Expand All @@ -23,13 +24,36 @@
import io.kubernetes.client.util.Watch;
import java.time.Duration;
import java.util.Arrays;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.MutablePair;
import org.awaitility.Awaitility;
import org.hamcrest.core.IsEqual;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

public class ControllerTest {

@Rule public MockitoRule mockitoRule = MockitoJUnit.rule();

private static final Class<V1Pod> anyApiTypeClass = V1Pod.class;
private static final long anyFullResyncPeriod = 1000L;

@Mock private DeltaFIFO deltaFIFOMock;
@Mock private ListerWatcher<V1Pod, V1PodList> listerWatcherMock;

@Mock
private Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> popProcessFuncMock;

@Mock private Supplier<Boolean> resyncFuncMock;
@Mock private BiConsumer<Class<V1Pod>, Throwable> exceptionHandlerMock;

@Test
public void testControllerProcessDeltas() {

Expand Down Expand Up @@ -75,4 +99,37 @@ public void testControllerProcessDeltas() {
controller.stop();
}
}

@Test
public void testReflectorIsConstructedWithExeptionHandler() {
Controller<V1Pod, V1PodList> controller =
new Controller<>(
anyApiTypeClass,
deltaFIFOMock,
listerWatcherMock,
popProcessFuncMock,
resyncFuncMock,
anyFullResyncPeriod,
exceptionHandlerMock);
assertSame(exceptionHandlerMock, controller.exceptionHandler);

ReflectorRunnable<V1Pod, V1PodList> reflector = controller.newReflector();

assertSame(exceptionHandlerMock, reflector.exceptionHandler);
}

@Test
public void testControllerHasNoExceptionHandlerPerDefault() {

Controller<V1Pod, V1PodList> controller =
new Controller<>(
anyApiTypeClass,
deltaFIFOMock,
listerWatcherMock,
popProcessFuncMock,
resyncFuncMock,
anyFullResyncPeriod);

assertNull(controller.exceptionHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
*/
package io.kubernetes.client.informer.cache;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -35,6 +34,7 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.awaitility.Awaitility;
import org.hamcrest.core.IsEqual;
import org.junit.Test;
Expand All @@ -45,10 +45,14 @@
@RunWith(MockitoJUnitRunner.class)
public class ReflectorRunnableTest {

private static final Class<V1Pod> anyApiType = V1Pod.class;

@Mock private DeltaFIFO deltaFIFO;

@Mock private ListerWatcher<V1Pod, V1PodList> listerWatcher;

@Mock private BiConsumer<Class<V1Pod>, Throwable> exceptionHandler;

@Test
public void testReflectorRunOnce() throws ApiException {
String mockResourceVersion = "1000";
Expand Down Expand Up @@ -343,4 +347,20 @@ public void testReflectorListShouldHandleExpiredResourceVersionFromWatchHandler(
reflectorRunnable.stop();
}
}

@Test
public void testDefaultExceptionHandlerSetPerDefault() {
ReflectorRunnable<V1Pod, V1PodList> reflector =
new ReflectorRunnable<>(anyApiType, listerWatcher, deltaFIFO);

assertNotNull(reflector.exceptionHandler);
}

@Test
public void testGivemExceptionHandlerSet() {
ReflectorRunnable<V1Pod, V1PodList> reflector =
new ReflectorRunnable<>(anyApiType, listerWatcher, deltaFIFO, exceptionHandler);

assertSame(exceptionHandler, reflector.exceptionHandler);
}
}
Loading

0 comments on commit eb8e28b

Please sign in to comment.