diff --git a/pom.xml b/pom.xml index b0d6443cb6..bcaa8ebe48 100644 --- a/pom.xml +++ b/pom.xml @@ -242,6 +242,18 @@ junit test + + io.fabric8 + kubernetes-server-mock + 5.12.1 + test + + + org.junit.jupiter + junit-jupiter-api + + + diff --git a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/KubernetesClientProvider.java b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/KubernetesClientProvider.java index 1533ece474..fd37a7ead0 100644 --- a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/KubernetesClientProvider.java +++ b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/KubernetesClientProvider.java @@ -10,6 +10,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import edu.umd.cs.findbugs.annotations.NonNull; import org.jenkinsci.plugins.kubernetes.auth.KubernetesAuthException; import org.kohsuke.accmod.Restricted; import org.kohsuke.accmod.restrictions.NoExternalUse; @@ -63,8 +64,15 @@ static KubernetesClient createClient(KubernetesCloud cloud) throws KubernetesAut return c.getClient(); } - private static int getValidity(KubernetesCloud cloud) { - Object cloudObjects[] = { cloud.getServerUrl(), cloud.getNamespace(), cloud.getServerCertificate(), + /** + * Compute the hash of connection properties of the given cloud. This hash can be used to determine if a cloud + * was updated and a new connection is needed. + * @param cloud cloud to compute validity hash for + * @return client validity hash code + */ + @Restricted(NoExternalUse.class) + public static int getValidity(@NonNull KubernetesCloud cloud) { + Object[] cloudObjects = { cloud.getServerUrl(), cloud.getNamespace(), cloud.getServerCertificate(), cloud.getCredentialsId(), cloud.isSkipTlsVerify(), cloud.getConnectTimeout(), cloud.getReadTimeout(), cloud.getMaxRequestsPerHostStr(), cloud.isUseJenkinsProxy() }; return Arrays.hashCode(cloudObjects); @@ -93,7 +101,13 @@ public static void invalidate(String displayName) { clients.invalidate(displayName); } - @Extension + @Restricted(NoExternalUse.class) // testing only + public static void invalidateAll() { + clients.invalidateAll(); + } + + // set ordinal to 1 so it runs ahead of Reaper + @Extension(ordinal = 1) public static class SaveableListenerImpl extends SaveableListener { @Override public void onChange(Saveable o, XmlFile file) { diff --git a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/TaskListenerEventWatcher.java b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/TaskListenerEventWatcher.java index d358f8b99b..33fe61101b 100644 --- a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/TaskListenerEventWatcher.java +++ b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/TaskListenerEventWatcher.java @@ -1,5 +1,6 @@ package org.csanchez.jenkins.plugins.kubernetes; +import edu.umd.cs.findbugs.annotations.NonNull; import hudson.Functions; import hudson.model.TaskListener; import io.fabric8.kubernetes.api.model.Event; @@ -19,18 +20,22 @@ class TaskListenerEventWatcher implements Watcher { private final String name; private final TaskListener listener; - public TaskListenerEventWatcher(String name, TaskListener listener) { + public TaskListenerEventWatcher(@NonNull String name, @NonNull TaskListener listener) { this.name = name; this.listener = listener; } @Override public void eventReceived(Action action, Event event) { - PrintStream logger = listener.getLogger(); - // Messages can have multiple lines - String[] lines = event.getMessage().split("\n"); - for (String line : lines) { - logger.printf("[%s][%s/%s][%s] %s%n", event.getType(), event.getInvolvedObject().getNamespace(), event.getInvolvedObject().getName(), event.getReason(), line); + // ignore bookmark actions + // event may be null if Error action + if (action != Action.BOOKMARK && event != null) { + PrintStream logger = listener.getLogger(); + // Messages can have multiple lines + String[] lines = event.getMessage().split("\n"); + for (String line : lines) { + logger.printf("[%s][%s/%s][%s] %s%n", event.getType(), event.getInvolvedObject().getNamespace(), event.getInvolvedObject().getName(), event.getReason(), line); + } } } diff --git a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pod/retention/Reaper.java b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pod/retention/Reaper.java index efca07af14..769c4d7a1b 100644 --- a/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pod/retention/Reaper.java +++ b/src/main/java/org/csanchez/jenkins/plugins/kubernetes/pod/retention/Reaper.java @@ -16,41 +16,36 @@ package org.csanchez.jenkins.plugins.kubernetes.pod.retention; +import edu.umd.cs.findbugs.annotations.CheckForNull; import edu.umd.cs.findbugs.annotations.NonNull; import hudson.Extension; import hudson.ExtensionList; import hudson.ExtensionPoint; +import hudson.XmlFile; import hudson.model.Computer; import hudson.model.Node; +import hudson.model.Saveable; import hudson.model.TaskListener; import hudson.model.listeners.ItemListener; +import hudson.model.listeners.SaveableListener; import hudson.security.ACL; import hudson.security.ACLContext; -import hudson.slaves.Cloud; import hudson.slaves.ComputerListener; import hudson.slaves.EphemeralNode; -import io.fabric8.kubernetes.api.model.ContainerStateTerminated; -import io.fabric8.kubernetes.api.model.ContainerStateWaiting; -import io.fabric8.kubernetes.api.model.ContainerStatus; -import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.*; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import io.fabric8.kubernetes.client.WatcherException; import jenkins.model.Jenkins; -import org.csanchez.jenkins.plugins.kubernetes.KubernetesCloud; -import org.csanchez.jenkins.plugins.kubernetes.KubernetesComputer; -import org.csanchez.jenkins.plugins.kubernetes.KubernetesSlave; -import org.csanchez.jenkins.plugins.kubernetes.PodUtils; +import org.csanchez.jenkins.plugins.kubernetes.*; import org.jenkinsci.plugins.kubernetes.auth.KubernetesAuthException; /** @@ -62,7 +57,7 @@ * unless some external process or garbage collection policy results in pod deletion. */ @Extension -public class Reaper extends ComputerListener implements Watcher { +public class Reaper extends ComputerListener { private static final Logger LOGGER = Logger.getLogger(Reaper.class.getName()); @@ -74,7 +69,7 @@ public class Reaper extends ComputerListener implements Watcher { public static class ReaperShutdownListener extends ItemListener { @Override public void onBeforeShutdown() { - Reaper.getInstance().closeWatch(); + Reaper.getInstance().closeAllWatchers(); } } @@ -88,12 +83,23 @@ public static Reaper getInstance() { */ private final AtomicBoolean activated = new AtomicBoolean(); - private Watch watch; + private final Map watchers = new ConcurrentHashMap<>(); @Override public void onOnline(Computer c, TaskListener listener) throws IOException, InterruptedException { if (c instanceof KubernetesComputer) { maybeActivate(); + + // ensure associated cloud is being watched. the watch may have been closed due to exception or + // failure to register on initial activation. + KubernetesSlave node = ((KubernetesComputer) c).getNode(); + if (node != null && !isWatchingCloud(node.getCloudName())) { + try { + watchCloud(node.getKubernetesCloud()); + } catch (IllegalStateException ise) { + LOGGER.log(Level.WARNING, ise, () -> "kubernetes cloud not found: " + node.getCloudName()); + } + } } } @@ -108,67 +114,117 @@ private void activate() { // First check all existing nodes to see if they still have active pods. // (We may have missed deletion events while Jenkins was shut off, // or pods may have been deleted before any Kubernetes agent was brought online.) - for (Node n : new ArrayList<>(Jenkins.get().getNodes())) { - if (!(n instanceof KubernetesSlave)) { - continue; - } - KubernetesSlave ks = (KubernetesSlave) n; - if (ks.getLauncher().isLaunchSupported()) { - // Being launched, don't touch it. - continue; - } - String ns = ks.getNamespace(); - String name = ks.getPodName(); - try { - // TODO more efficient to do a single (or paged) list request, but tricky since there may be multiple clouds, - // and even within a single cloud an agent pod is permitted to use a nondefault namespace, - // yet we do not want to do an unnamespaced pod list for RBAC reasons. - // Could use a hybrid approach: first list all pods in the configured namespace for all clouds; - // then go back and individually check any unmatched agents with their configured namespace. - if (ks.getKubernetesCloud().connect().pods().inNamespace(ns).withName(name).get() == null) { - LOGGER.info(() -> ns + "/" + name + " seems to have been deleted, so removing corresponding Jenkins agent"); - Jenkins.get().removeNode(ks); - } else { - LOGGER.fine(() -> ns + "/" + name + " still seems to exist, OK"); + reapAgents(); + + // Now set up a watch for any subsequent pod deletions. + watchClouds(); + } + + /** + * Remove any {@link KubernetesSlave} nodes that reference Pods that don't exist. + */ + private void reapAgents() { + Jenkins jenkins = Jenkins.getInstanceOrNull(); + if (jenkins != null) { + for (Node n : new ArrayList<>(jenkins.getNodes())) { + if (!(n instanceof KubernetesSlave)) { + continue; + } + KubernetesSlave ks = (KubernetesSlave) n; + if (ks.getLauncher().isLaunchSupported()) { + // Being launched, don't touch it. + continue; + } + String ns = ks.getNamespace(); + String name = ks.getPodName(); + try { + // TODO more efficient to do a single (or paged) list request, but tricky since there may be multiple clouds, + // and even within a single cloud an agent pod is permitted to use a nondefault namespace, + // yet we do not want to do an unnamespaced pod list for RBAC reasons. + // Could use a hybrid approach: first list all pods in the configured namespace for all clouds; + // then go back and individually check any unmatched agents with their configured namespace. + KubernetesCloud cloud = ks.getKubernetesCloud(); + if (cloud.connect().pods().inNamespace(ns).withName(name).get() == null) { + LOGGER.info(() -> ns + "/" + name + " seems to have been deleted, so removing corresponding Jenkins agent"); + jenkins.removeNode(ks); + } else { + LOGGER.fine(() -> ns + "/" + name + " still seems to exist, OK"); + } + } catch (Exception x) { + LOGGER.log(Level.WARNING, x, () -> "failed to do initial reap check for " + ns + "/" + name); } - } catch (Exception x) { - LOGGER.log(Level.WARNING, "failed to do initial reap check for " + ns + "/" + name, x); } } - // Now set up a watch for any subsequent pod deletions. - for (Cloud c : Jenkins.get().clouds) { - if (!(c instanceof KubernetesCloud)) { - continue; + } + + /** + * Create watchers for each configured {@link KubernetesCloud} in Jenkins and remove any existing watchers + * for clouds that have been removed. If a {@link KubernetesCloud} client configuration property has been + * updated a new watcher will be created to replace the existing one. + */ + private void watchClouds() { + Jenkins jenkins = Jenkins.getInstanceOrNull(); + if (jenkins != null) { + Set cloudNames = new HashSet<>(this.watchers.keySet()); + for (KubernetesCloud kc : jenkins.clouds.getAll(KubernetesCloud.class)) { + watchCloud(kc); + cloudNames.remove(kc.name); } - KubernetesCloud kc = (KubernetesCloud) c; + + // close any cloud watchers that have been removed + cloudNames.stream() + .map(this.watchers::get) + .filter(Objects::nonNull) + .forEach(cpw -> { + LOGGER.info(() -> "stopping pod watcher for deleted kubernetes cloud " + cpw.cloudName); + cpw.stop(); + }); + } + } + + /** + * Register {@link CloudPodWatcher} for the given cloud if one does not exist or if the existing watcher + * is no longer valid. + * @param kc kubernetes cloud to watch + */ + private void watchCloud(@NonNull KubernetesCloud kc) { + // can't use ConcurrentHashMap#computeIfAbsent because CloudPodWatcher will remove itself from the watchers + // map on close. If an error occurs when creating the watch it would create a deadlock situation. + CloudPodWatcher watcher = new CloudPodWatcher(kc); + if (!isCloudPodWatcherActive(watcher)) { try { KubernetesClient client = kc.connect(); - watch = client.pods().inNamespace(client.getNamespace()).watch(this); + watcher.watch = client.pods().inNamespace(client.getNamespace()).watch(watcher); + CloudPodWatcher old = watchers.put(kc.name, watcher); + // if another watch slipped in then make sure it stopped + if (old != null) { + old.stop(); + } + LOGGER.info(() -> "set up watcher on " + kc.getDisplayName()); } catch (Exception x) { - LOGGER.log(Level.WARNING, "failed to set up watcher on " + kc.getDisplayName(), x); + LOGGER.log(Level.WARNING, x, () -> "failed to set up watcher on " + kc.getDisplayName()); } } } - @Override - public void eventReceived(Watcher.Action action, Pod pod) { - String ns = pod.getMetadata().getNamespace(); - String name = pod.getMetadata().getName(); - Jenkins jenkins = Jenkins.getInstanceOrNull(); - if (jenkins == null) { - return; - } - Optional optionalNode = resolveNode(jenkins, ns, name); - if (!optionalNode.isPresent()) { - return; - } - ExtensionList.lookup(Listener.class).forEach(listener -> { - try { - listener.onEvent(action, optionalNode.get(), pod); - } catch (Exception x) { - LOGGER.log(Level.WARNING, "Listener " + listener + " failed for " + ns + "/" + name, x); - } - }); + /** + * Check if the cloud is watched for Pod events. + * @param name cloud name + * @return true if a watcher has been registered for the given cloud + */ + boolean isWatchingCloud(String name) { + return watchers.get(name) != null; + } + + /** + * Check if the given cloud pod watcher exists and is still valid. Watchers may become invalid + * of the kubernetes client configuration changes. + * @param watcher watcher to check + * @return true if the provided watcher already exists and is valid, false otherwise + */ + private boolean isCloudPodWatcherActive(@NonNull CloudPodWatcher watcher) { + CloudPodWatcher existing = watchers.get(watcher.cloudName); + return existing != null && existing.clientValidity == watcher.clientValidity; } private static Optional resolveNode(@NonNull Jenkins jenkins, String namespace, String name) { @@ -179,18 +235,96 @@ private static Optional resolveNode(@NonNull Jenkins jenkins, S .findFirst(); } - @Override - public void onClose(WatcherException cause) { - // TODO ignore, or do we need to manually reattach the watcher? - // AllContainersRunningPodWatcher is not reattached, but this is expected to be short-lived, - // useful only until the containers of a single pod start running. - // (At least when using kubernetes-client/java, the connection gets closed after 2m on GKE - // and you need to rerun the watch. Does the fabric8io client wrap this?) + /** + * Stop all watchers + */ + private void closeAllWatchers() { + // on close each watcher should remove itself from the watchers map (see CloudPodWatcher#onClose) + watchers.values().forEach(CloudPodWatcher::stop); } - private void closeWatch() { - if (watch != null) { - watch.close(); + /** + * Kubernetes pod event watcher for a Kubernetes Cloud. Notifies {@link Listener} + * extensions on Pod events. The default Kubernetes client watch manager will + * attempt to reconnect on connection errors. If the watch api returns "410 Gone" + * then the Watch will close itself with a WatchException and this watcher will + * deregister itself. + */ + private class CloudPodWatcher implements Watcher { + private final String cloudName; + private final int clientValidity; + @CheckForNull + private Watch watch; + + CloudPodWatcher(@NonNull KubernetesCloud cloud) { + this.cloudName = cloud.name; + this.clientValidity = KubernetesClientProvider.getValidity(cloud); + } + + @Override + public void eventReceived(Action action, Pod pod) { + // don't send bookmark event to listeners as they don't represent change in pod state + if (action == Action.BOOKMARK) { + // TODO future enhancement might be to keep track of bookmarks for better reconnect behavior. Would + // likely have to track based on cloud address/namespace in case cloud was renamed or namespace + // is changed. + return; + } + + // If there was a non-success http response code from watch request + // or the api returned a Status object the watch manager notifies with + // an error action and null resource. + if (action == Action.ERROR && pod == null) { + return; + } + + Jenkins jenkins = Jenkins.getInstanceOrNull(); + if (jenkins == null) { + return; + } + + String ns = pod.getMetadata().getNamespace(); + String name = pod.getMetadata().getName(); + Optional optionalNode = resolveNode(jenkins, ns, name); + if (!optionalNode.isPresent()) { + return; + } + + ExtensionList.lookup(Listener.class).forEach(listener -> { // TODO 2.324+ jenkins.util.Listeners + try { + listener.onEvent(action, optionalNode.get(), pod); + } catch (Exception x) { + LOGGER.log(Level.WARNING, "Listener " + listener + " failed for " + ns + "/" + name, x); + } + }); + } + + /** + * Close the associated {@link Watch} handle. This should be used shutdown/stop the watch. It will cause the + * watch manager to call this classes {@link #onClose()} method. + */ + void stop() { + if (watch != null) { + this.watch.close(); + } + } + + @Override + public void onClose() { + LOGGER.fine(() -> cloudName + " watcher closed"); + // remove self from watchers list + Reaper.this.watchers.remove(cloudName, this); + } + + @Override + public void onClose(WatcherException e) { + // usually triggered because of "410 Gone" responses + // https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses + // "Gone" may be returned if the resource version requested is older than the server + // has retained. + LOGGER.log(Level.WARNING, e, () -> cloudName + " watcher closed with exception"); + // remove self from watchers list + Reaper.this.watchers.remove(cloudName, this); } } @@ -211,7 +345,7 @@ public interface Listener extends ExtensionPoint { public static class RemoveAgentOnPodDeleted implements Listener { @Override public void onEvent(@NonNull Watcher.Action action, @NonNull KubernetesSlave node, @NonNull Pod pod) throws IOException { - if (action != Action.DELETED) { + if (action != Watcher.Action.DELETED) { return; } String ns = pod.getMetadata().getNamespace(); @@ -226,8 +360,8 @@ public void onEvent(@NonNull Watcher.Action action, @NonNull KubernetesSlave nod @Extension public static class TerminateAgentOnContainerTerminated implements Listener { @Override - public void onEvent(@NonNull Action action, @NonNull KubernetesSlave node, @NonNull Pod pod) throws IOException, InterruptedException { - if (action != Action.MODIFIED) { + public void onEvent(@NonNull Watcher.Action action, @NonNull KubernetesSlave node, @NonNull Pod pod) throws IOException, InterruptedException { + if (action != Watcher.Action.MODIFIED) { return; } List terminatedContainers = PodUtils.getTerminatedContainers(pod); @@ -251,8 +385,8 @@ public void onEvent(@NonNull Action action, @NonNull KubernetesSlave node, @NonN @Extension public static class TerminateAgentOnPodFailed implements Listener { @Override - public void onEvent(@NonNull Action action, @NonNull KubernetesSlave node, @NonNull Pod pod) throws IOException, InterruptedException { - if (action != Action.MODIFIED) { + public void onEvent(@NonNull Watcher.Action action, @NonNull KubernetesSlave node, @NonNull Pod pod) throws IOException, InterruptedException { + if (action != Watcher.Action.MODIFIED) { return; } if ("Failed".equals(pod.getStatus().getPhase())) { @@ -283,7 +417,11 @@ private static void logLastLinesThenTerminateNode(KubernetesSlave node, Pod pod, public static class TerminateAgentOnImagePullBackOff implements Listener { @Override - public void onEvent(@NonNull Action action, @NonNull KubernetesSlave node, @NonNull Pod pod) throws IOException, InterruptedException { + public void onEvent(@NonNull Watcher.Action action, @NonNull KubernetesSlave node, @NonNull Pod pod) throws IOException, InterruptedException { + if (action != Watcher.Action.MODIFIED) { + return; + } + List backOffContainers = PodUtils.getContainers(pod, cs -> { ContainerStateWaiting waiting = cs.getState().getWaiting(); return waiting != null && waiting.getMessage() != null && waiting.getMessage().contains("Back-off pulling image"); @@ -301,4 +439,21 @@ public void onEvent(@NonNull Action action, @NonNull KubernetesSlave node, @NonN node.terminate(); } } + + /** + * {@link SaveableListener} that will update cloud watchers when Jenkins configuration is updated. + */ + @Extension + public static class ReaperSaveableListener extends SaveableListener { + @Override + public void onChange(Saveable o, XmlFile file) { + if (o instanceof Jenkins) { + Reaper reaper = Reaper.getInstance(); + // only update if reaper has been activated to avoid hitting api server if not in use + if (reaper.activated.get()) { + Reaper.getInstance().watchClouds(); + } + } + } + } } diff --git a/src/test/java/org/csanchez/jenkins/plugins/kubernetes/KubernetesClientProviderTest.java b/src/test/java/org/csanchez/jenkins/plugins/kubernetes/KubernetesClientProviderTest.java new file mode 100644 index 0000000000..3254c9281e --- /dev/null +++ b/src/test/java/org/csanchez/jenkins/plugins/kubernetes/KubernetesClientProviderTest.java @@ -0,0 +1,90 @@ +/* + * The MIT License + * + * Copyright (c) 2016, CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package org.csanchez.jenkins.plugins.kubernetes; + +import static org.junit.Assert.assertEquals; + +import java.util.function.Consumer; + +import org.csanchez.jenkins.plugins.kubernetes.pod.retention.Always; +import org.junit.Assert; +import org.junit.Test; + +public class KubernetesClientProviderTest { + + @Test + public void testGetValidity() { + KubernetesCloud cloud = new KubernetesCloud("foo"); + // changes to these properties should trigger different validity value + checkValidityChanges(cloud, + c -> c.setServerUrl("https://server:443"), + c -> c.setNamespace("blue"), + c -> c.setServerCertificate("cert"), + c -> c.setCredentialsId("secret"), + c -> c.setSkipTlsVerify(true), + c -> c.setConnectTimeout(46), + c -> c.setReadTimeout(43), + c -> c.setMaxRequestsPerHost(47), + c -> c.setUseJenkinsProxy(true) + ); + + // changes to these properties should not trigger different validity value + checkValidityDoesNotChange(cloud, + c -> c.setPodLabels(PodLabel.listOf("foo", "bar")), + c -> c.setJenkinsUrl("https://localhost:8081"), + c -> c.setJenkinsTunnel("https://jenkins.cluster.svc"), + c -> c.setPodRetention(new Always()), + c -> c.setWebSocket(true), + c -> c.setRetentionTimeout(52), + c -> c.setDirectConnection(true) + ); + + // verify stability + assertEquals(KubernetesClientProvider.getValidity(cloud), KubernetesClientProvider.getValidity(cloud)); + } + + private void checkValidityChanges(KubernetesCloud cloud, Consumer... mutations) { + checkValidity(cloud, Assert::assertNotEquals, mutations); + } + + private void checkValidityDoesNotChange(KubernetesCloud cloud, Consumer... mutations) { + checkValidity(cloud, Assert::assertEquals, mutations); + } + + private void checkValidity(KubernetesCloud cloud, ValidityAssertion validityAssertion, Consumer... mutations) { + int v = KubernetesClientProvider.getValidity(cloud); + int count = 1; + for (Consumer mut : mutations) { + mut.accept(cloud); + int after = KubernetesClientProvider.getValidity(cloud); + validityAssertion.doAssert("change #" + count++ + " of " + mutations.length, v, after); + v = after; + } + } + + interface ValidityAssertion { + void doAssert(String message, int before, int after); + } + +} \ No newline at end of file diff --git a/src/test/java/org/csanchez/jenkins/plugins/kubernetes/TaskListenerEventWatcherTest.java b/src/test/java/org/csanchez/jenkins/plugins/kubernetes/TaskListenerEventWatcherTest.java new file mode 100644 index 0000000000..515c437878 --- /dev/null +++ b/src/test/java/org/csanchez/jenkins/plugins/kubernetes/TaskListenerEventWatcherTest.java @@ -0,0 +1,87 @@ +/* + * The MIT License + * + * Copyright (c) 2016, CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package org.csanchez.jenkins.plugins.kubernetes; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; + +import hudson.model.TaskListener; +import io.fabric8.kubernetes.api.model.Event; +import io.fabric8.kubernetes.api.model.ObjectReference; +import io.fabric8.kubernetes.client.Watcher; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TaskListenerEventWatcherTest { + + private @Mock TaskListener listener; + private TaskListenerEventWatcher watcher; + + @Before + public void setup() { + watcher = new TaskListenerEventWatcher("foo", listener); + } + + @Test + public void ignoreBookmarkAction() { + watcher.eventReceived(Watcher.Action.BOOKMARK, new Event()); + verifyNoInteractions(listener); + } + + @Test + public void ignoreErrorAction() { + watcher.eventReceived(Watcher.Action.ERROR, null); + verifyNoInteractions(listener); + } + + @Test + public void logEventMessage() throws UnsupportedEncodingException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(bos); + when(listener.getLogger()).thenReturn(ps); + Event event = new Event(); + event.setMessage("cat\ndog"); + event.setReason("because"); + event.setType("Update"); + ObjectReference involvedObj = new ObjectReference(); + involvedObj.setName("foo-123"); + involvedObj.setNamespace("bar"); + event.setInvolvedObject(involvedObj); + + watcher.eventReceived(Watcher.Action.ADDED, event); + + verify(listener).getLogger(); + ps.flush(); + String output = bos.toString("UTF-8"); + assertEquals("[Update][bar/foo-123][because] cat\n[Update][bar/foo-123][because] dog\n", output); + } +} diff --git a/src/test/java/org/csanchez/jenkins/plugins/kubernetes/pod/retention/ReaperTest.java b/src/test/java/org/csanchez/jenkins/plugins/kubernetes/pod/retention/ReaperTest.java new file mode 100644 index 0000000000..3f7f65bfe0 --- /dev/null +++ b/src/test/java/org/csanchez/jenkins/plugins/kubernetes/pod/retention/ReaperTest.java @@ -0,0 +1,709 @@ +/* + * The MIT License + * + * Copyright (c) 2016, CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package org.csanchez.jenkins.plugins.kubernetes.pod.retention; + + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import edu.umd.cs.findbugs.annotations.NonNull; +import hudson.Extension; +import hudson.model.TaskListener; +import hudson.slaves.ComputerLauncher; +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.server.mock.KubernetesServer; +import jenkins.model.Jenkins; +import okhttp3.mockwebserver.RecordedRequest; +import org.csanchez.jenkins.plugins.kubernetes.PodTemplate; +import org.csanchez.jenkins.plugins.kubernetes.*; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExternalResource; +import org.jvnet.hudson.test.JenkinsRule; + +public class ReaperTest { + + @Rule + public JenkinsRule j = new JenkinsRule(); + + @Rule + public KubernetesServer server = new KubernetesServer(); + + @Rule + public CapturingReaperListener listener = new CapturingReaperListener(); + + @After + public void tearDown() { + KubernetesClientProvider.invalidateAll(); + } + + @Test + public void testMaybeActivate() throws IOException, InterruptedException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).always(); + + // add node that does not exist in k8s so it get's removed + KubernetesSlave podNotRunning = addNode(cloud, "k8s-node-123", "k8s-node"); + assertEquals("node added to jenkins", j.jenkins.getNodes().size(), 1); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // k8s node which no pod should be deleted on activation + assertEquals("node removed from jenkins", j.jenkins.getNodes().size(), 0); + + // watch was created + assertTrue(r.isWatchingCloud(cloud.name)); + + kubeClientRequests() + // expect pod not running to be removed + .assertRequestCount("/api/v1/namespaces/foo/pods/k8s-node-123", 1) + // watch to be created + .assertRequestCountAtLeast(watchPodsPath, 1); + + // create new node to verify activate is not run again + KubernetesSlave newNode = addNode(cloud, "new-123", "new"); + j.jenkins.addNode(newNode); + assertEquals("node added to jenkins", j.jenkins.getNodes().size(), 1); + // call again should not add any more calls + r.maybeActivate(); + + kubeClientRequests() + // expect not to be called + .assertRequestCount("/api/v1/namespaces/foo/pods/new-123", 0); + assertEquals("node not removed from jenkins", j.jenkins.getNodes().size(), 1); + } + + @Test + public void testWatchFailOnActivate() throws IOException, InterruptedException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // expect watch to be attempted + kubeClientRequests() + .assertRequestCountAtLeast("/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true", 1); + // watch failed to register + assertFalse(r.isWatchingCloud(cloud.name)); + } + + @Test + public void testActivateOnNewComputer() throws IOException, InterruptedException { + server.expect() + .withPath("/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true") + .andReturnChunked(200) + .always(); + + // initiate reaper + Reaper r = Reaper.getInstance(); + + // add new cloud + KubernetesCloud cloud = addCloud("k8s", "foo"); + KubernetesSlave n2 = addNode(cloud, "p1-123", "p1"); + TaskListener tl = mock(TaskListener.class); + KubernetesComputer kc = new KubernetesComputer(n2); + + // should not be watching the newly created cloud at this point + assertFalse("should not be watching cloud", r.isWatchingCloud(cloud.name)); + + // fire compute on-line event + r.onOnline(kc, tl); + + // expect new cloud registered + assertTrue("should be watching cloud", r.isWatchingCloud(cloud.name)); + kubeClientRequests() + .assertRequestCountAtLeast("/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true", 1); + } + + @Test(timeout = 10_000) + public void testReconnectOnNewComputer() throws InterruptedException, IOException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).once(); + // trigger HTTP_GONE status which should result in Watcher#onClose(Exception) + server.expect().withPath(watchPodsPath).andReturnChunked(410).once(); + // after Gone error, should reconnect fresh + server.expect().withPath(watchPodsPath).andReturnChunked(200).always(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + waitForKubeClientRequests(2) + .assertRequestCount(watchPodsPath, 2); + + // error status event should be filtered out + listener.expectNoEvents(); + + // wait until watch is removed + while (r.isWatchingCloud(cloud.name)) { + Thread.sleep(250L); + } + + // launch computer + KubernetesSlave n2 = addNode(cloud, "p1-123", "p1"); + TaskListener tl = mock(TaskListener.class); + KubernetesComputer kc = new KubernetesComputer(n2); + r.onOnline(kc, tl); + + // should have started new watch + assertTrue("watcher is restarted", r.isWatchingCloud(cloud.name)); + } + + @Test(timeout = 10_000) + public void testAddWatchWhenCloudAdded() throws InterruptedException, IOException { + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).always(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + String cloudName = "k8s"; + assertFalse("should not be watching cloud", r.isWatchingCloud(cloudName)); + + KubernetesCloud cloud = addCloud(cloudName, "foo"); + + // invalidate client + j.jenkins.clouds.add(cloud); + + // watch is added + assertTrue("should be watching cloud", r.isWatchingCloud(cloud.name)); + kubeClientRequests() + .assertRequestCountAtLeast(watchPodsPath, 1); + } + + @Test(timeout = 10_000) + public void testRemoveWatchWhenCloudRemoved() throws InterruptedException, IOException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).always(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // should not be watching the newly created cloud at this point + assertTrue("should be watching cloud", r.isWatchingCloud(cloud.name)); + + // invalidate client + j.jenkins.clouds.remove(cloud); + + // watch is removed + assertFalse("should not be watching cloud", r.isWatchingCloud(cloud.name)); + } + + @Test(timeout = 10_000) + public void testReplaceWatchWhenCloudUpdated() throws InterruptedException, IOException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + Pod node123 = new PodBuilder() + .withNewStatus() + .endStatus() + .withNewMetadata() + .withName("node-123") + .withNamespace("bar") + .endMetadata() + .build(); + + String watchFooPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchFooPodsPath).andReturnChunked(200).always(); + + String watchBarPodsPath = "/api/v1/namespaces/bar/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchBarPodsPath).andReturnChunked(200).once(); + server.expect().withPath(watchBarPodsPath).andReturnChunked(200, new WatchEvent(node123, "MODIFIED")).always(); + // don't remove pod on activate + server.expect().withPath("/api/v1/namespaces/bar/pods/node-123").andReturn(200, node123).once(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // should not be watching the newly created cloud at this point + assertTrue("should be watching cloud", r.isWatchingCloud(cloud.name)); + + // invalidate client + cloud.setNamespace("bar"); + j.jenkins.save(); + + KubernetesSlave node = addNode(cloud, "node-123", "node"); + + // watch is still active + assertTrue("should be watching cloud", r.isWatchingCloud(cloud.name)); + + listener.waitForEvents() + .expectEvent(Watcher.Action.MODIFIED, node); + kubeClientRequests() + .assertRequestCountAtLeast(watchBarPodsPath, 1); + } + + @Test(timeout = 10_000) + public void testStopWatchingOnCloseException() throws InterruptedException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).once(); + // trigger HTTP_GONE status which should result in Watcher#onClose(Exception) + server.expect().withPath(watchPodsPath).andReturnChunked(410).once(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + waitForKubeClientRequests(2) + .assertRequestCount(watchPodsPath, 2); + + // error status event should be filtered out + listener.expectNoEvents(); + + // watch is removed + while (r.isWatchingCloud(cloud.name)) { + Thread.sleep(250L); + } + assertFalse(r.isWatchingCloud(cloud.name)); + } + + @Test(timeout = 10_000) + public void testKeepWatchingOnKubernetesApiServerError() throws InterruptedException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).once(); + // trigger error action event + server.expect().withPath(watchPodsPath).andReturnChunked(500).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200).always(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + waitForKubeClientRequests(3) + .assertRequestCount(watchPodsPath, 3); + + // error status event should be filtered out + listener.expectNoEvents(); + + // watch is still active + assertTrue(r.isWatchingCloud(cloud.name)); + } + + @Test(timeout = 10_000) + public void testKeepWatchingOnStatusWatchEvent() throws InterruptedException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).once(); + Status status = new Status(); + status.setStatus("Unknown"); + status.setReason("Some reason"); + status.setCode(200); + server.expect().withPath(watchPodsPath) + .andReturnChunked(200, new WatchEvent(status, "ERROR")) + .once(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + waitForKubeClientRequests(2) + .assertRequestCount(watchPodsPath, 2); + + // error status event should be filtered out + listener.expectNoEvents(); + + // watch is still active + assertTrue(r.isWatchingCloud(cloud.name)); + } + + @Test + public void testCloseWatchersOnShutdown() throws InterruptedException { + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).always(); + + // add more clouds to make sure they are all closed + KubernetesCloud cloud = addCloud("k8s", "foo"); + KubernetesCloud cloud2 = addCloud("c2", "foo"); + KubernetesCloud cloud3 = addCloud("c3", "foo"); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // watching cloud + assertTrue(r.isWatchingCloud(cloud.name)); + assertTrue(r.isWatchingCloud(cloud2.name)); + assertTrue(r.isWatchingCloud(cloud3.name)); + + // trigger shutdown listener + new Reaper.ReaperShutdownListener().onBeforeShutdown(); + + // watchers removed + assertFalse(r.isWatchingCloud(cloud.name)); + assertFalse(r.isWatchingCloud(cloud2.name)); + assertFalse(r.isWatchingCloud(cloud3.name)); + } + + @Test(timeout = 10_000) + public void testDeleteNodeOnPodDelete() throws IOException, InterruptedException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + KubernetesSlave node = addNode(cloud, "node-123", "node"); + Pod node123 = createPod(node); + + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200, new WatchEvent(node123, "DELETED")).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200, new WatchEvent(node123, "BOOKMARK")).always(); + // don't remove pod on activate + server.expect().withPath("/api/v1/namespaces/foo/pods/node-123").andReturn(200, node123).once(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // verify node is still registered + assertEquals("jenkins nodes", j.jenkins.getNodes().size(), 1); + + // wait for the delete event to be processed + waitForKubeClientRequests(6) + .assertRequestCountAtLeast(watchPodsPath, 3); + + // verify listener got notified + listener.expectEvent(Watcher.Action.DELETED, node); + + // expect node to be removed + assertEquals("jenkins nodes", j.jenkins.getNodes().size(), 0); + } + + @Test(timeout = 10_000) + public void testTerminateAgentOnContainerTerminated() throws IOException, InterruptedException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + KubernetesSlave node = addNode(cloud, "node-123", "node"); + Pod node123 = withContainerStatusTerminated(createPod(node)); + + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200, new WatchEvent(node123, "MODIFIED")).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200, new WatchEvent(node123, "BOOKMARK")).always(); + // don't remove pod on activate + server.expect().withPath("/api/v1/namespaces/foo/pods/node-123").andReturn(200, node123).once(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // verify node is still registered + assertEquals("jenkins nodes", j.jenkins.getNodes().size(), 1); + + // wait for the delete event to be processed + waitForKubeClientRequests(6) + .assertRequestCountAtLeast(watchPodsPath, 3); + + // verify listener got notified + listener.waitForEvents() + .expectEvent(Watcher.Action.MODIFIED, node); + + // expect node to be terminated + verify(node).terminate(); + // verify node is still registered (will be removed when pod deleted) + assertEquals("jenkins nodes", j.jenkins.getNodes().size(), 1); + } + + @Test(timeout = 10_000) + public void testTerminateAgentOnPodFailed() throws IOException, InterruptedException { + System.out.println(server.getKubernetesMockServer().getPort()); + KubernetesCloud cloud = addCloud("k8s", "foo"); + KubernetesSlave node = addNode(cloud, "node-123", "node"); + Pod node123 = createPod(node); + node123.getStatus().setPhase("Failed"); + + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200, new WatchEvent(node123, "MODIFIED")).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200, new WatchEvent(node123, "BOOKMARK")).always(); + // don't remove pod on activate + server.expect().withPath("/api/v1/namespaces/foo/pods/node-123").andReturn(200, node123).once(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // verify node is still registered + assertEquals("jenkins nodes", j.jenkins.getNodes().size(), 1); + + // verify listener got notified + listener.waitForEvents() + .expectEvent(Watcher.Action.MODIFIED, node); + + // expect node to be terminated + verify(node).terminate(); + // verify node is still registered (will be removed when pod deleted) + assertEquals("jenkins nodes", j.jenkins.getNodes().size(), 1); + } + + @Test(timeout = 10_000) + public void testTerminateAgentOnImagePullBackoff() throws IOException, InterruptedException { + KubernetesCloud cloud = addCloud("k8s", "foo"); + KubernetesSlave node = addNode(cloud, "node-123", "node"); + Pod node123 = withContainerImagePullBackoff(createPod(node)); + + String watchPodsPath = "/api/v1/namespaces/foo/pods?allowWatchBookmarks=true&watch=true"; + server.expect().withPath(watchPodsPath).andReturnChunked(200).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200, new WatchEvent(node123, "MODIFIED")).once(); + server.expect().withPath(watchPodsPath).andReturnChunked(200, new WatchEvent(node123, "BOOKMARK")).always(); + // don't remove pod on activate + server.expect().withPath("/api/v1/namespaces/foo/pods/node-123").andReturn(200, node123).once(); + + // activate reaper + Reaper r = Reaper.getInstance(); + r.maybeActivate(); + + // verify node is still registered + assertEquals("jenkins nodes", j.jenkins.getNodes().size(), 1); + + // wait for the delete event to be processed + waitForKubeClientRequests(6) + .assertRequestCountAtLeast(watchPodsPath, 3); + + // verify listener got notified + listener.expectEvent(Watcher.Action.MODIFIED, node); + + // expect node to be terminated + verify(node).terminate(); + // verify node is still registered (will be removed when pod deleted) + assertEquals("jenkins nodes", j.jenkins.getNodes().size(), 1); + } + + private Pod withContainerImagePullBackoff(Pod pod) { + ContainerStatus status = new ContainerStatus(); + ContainerState state = new ContainerState(); + ContainerStateWaiting waiting = new ContainerStateWaiting(); + waiting.setMessage("something Back-off pulling image something"); + state.setWaiting(waiting); + status.setState(state); + pod.getStatus().getContainerStatuses().add(status); + return pod; + } + + private Pod withContainerStatusTerminated(Pod pod) { + ContainerStatus status = new ContainerStatus(); + ContainerState state = new ContainerState(); + ContainerStateTerminated terminated = new ContainerStateTerminated(); + terminated.setExitCode(123); + terminated.setReason("because"); + state.setTerminated(terminated); + status.setState(state); + pod.getStatus().getContainerStatuses().add(status); + return pod; + } + + private Pod createPod(KubernetesSlave node) { + Pod pod = new Pod(); + ObjectMeta meta = new ObjectMeta(); + meta.setNamespace(node.getNamespace()); + meta.setName(node.getPodName()); + pod.setMetadata(meta); + pod.setSpec(new PodSpec()); + pod.setStatus(new PodStatus()); + return pod; + } + + private KubernetesSlave addNode(KubernetesCloud cld, String podName, String nodeName) throws IOException { + KubernetesSlave node = mock(KubernetesSlave.class); + when(node.getNodeName()).thenReturn(nodeName); + when(node.getNamespace()).thenReturn(cld.getNamespace()); + when(node.getPodName()).thenReturn(podName); + when(node.getKubernetesCloud()).thenReturn(cld); + when(node.getCloudName()).thenReturn(cld.name); + when(node.getNumExecutors()).thenReturn(1); + PodTemplate podTemplate = new PodTemplate(); + when(node.getTemplate()).thenReturn(podTemplate); + ComputerLauncher launcher = mock(ComputerLauncher.class); + when(node.getLauncher()).thenReturn(launcher); + j.jenkins.addNode(node); + return node; + } + + private KubernetesCloud addCloud(String name, String namespace) { + KubernetesCloud c = new KubernetesCloud(name); + c.setServerUrl(server.getClient().getMasterUrl().toString()); + c.setNamespace(namespace); + c.setSkipTlsVerify(true); + j.jenkins.clouds.add(c); + return c; + } + + /** + * Get all the requests made to the kube client so far. Drains the captured requests. Next + * call to this method will only return newly captured requests. + * @return captured kube client requests so far + * @throws InterruptedException interrupted exception + */ + private CapturedRequests kubeClientRequests() throws InterruptedException { + int count = server.getKubernetesMockServer().getRequestCount(); + List requests = new LinkedList<>(); + while (count-- > 0) { + RecordedRequest rr = server.getKubernetesMockServer().takeRequest(1, TimeUnit.SECONDS); + if (rr != null) { + requests.add(rr); + } + } + return new CapturedRequests(requests); + } + + /** + * Wait until the specified number of kube client requests are captured. + * @param count number of requests to wait for + * @return captured requests + * @throws InterruptedException interrupted exception + */ + private CapturedRequests waitForKubeClientRequests(int count) throws InterruptedException { + List requests = new LinkedList<>(); + while (count-- > 0) { + requests.add(server.getKubernetesMockServer().takeRequest()); + } + return new CapturedRequests(requests); + } + + /** + * Wait until the specified request is captured. + * @param path number of requests to wait for + * @return captured requests + * @throws InterruptedException interrupted exception + */ + private CapturedRequests waitForKubeClientRequests(String path) throws InterruptedException { + List requests = new LinkedList<>(); + while (true) { + RecordedRequest rr = server.getKubernetesMockServer().takeRequest(); + requests.add(rr); + if (rr.getPath().equals(path)) { + return new CapturedRequests(requests); + } + } + } + + private static class CapturedRequests { + + private final Map countByPath; + + CapturedRequests(List requests) { + this.countByPath = requests.stream().collect(Collectors.groupingBy(RecordedRequest::getPath, Collectors.counting())); + } + + CapturedRequests assertRequestCount(String path, long count) { + assertEquals(path + " count", count, (long) countByPath.getOrDefault(path, 0L)); + return this; + } + + CapturedRequests assertRequestCountAtLeast(String path, long count) { + assertThat(path + " count at least", countByPath.getOrDefault(path, 0L), Matchers.greaterThanOrEqualTo(count)); + return this; + } + } + + @Extension + public static class CapturingReaperListener extends ExternalResource implements Reaper.Listener { + + private static final List CAPTURED_EVENTS = new LinkedList<>(); + + @Override + public synchronized void onEvent(@NonNull Watcher.Action action, @NonNull KubernetesSlave node, @NonNull Pod pod) throws IOException, InterruptedException { + CAPTURED_EVENTS.add(new ReaperListenerWatchEvent(action, node, pod)); + notifyAll(); + } + + /** + * Test should use {@link #waitForEvents()}, not this method + */ + private synchronized CapturingReaperListener waitForEventsOnJenkinsExtensionInstance() throws InterruptedException { + while (CAPTURED_EVENTS.isEmpty()) { + wait(); + } + return this; + } + + /** + * Tests should use this method to wait for events to be processed by the Reaper cloud watcher. + * @return jenkins extension instance + * @throws InterruptedException if wait was interrupted + */ + public CapturingReaperListener waitForEvents() throws InterruptedException { + // find the instance that Jenkins created and wait on that one + CapturingReaperListener l = Jenkins.get().getExtensionList(Reaper.Listener.class).get(CapturingReaperListener.class); + if (l == null) { + throw new RuntimeException("CapturingReaperListener not registered in Jenkins"); + } + + return l.waitForEventsOnJenkinsExtensionInstance(); + } + + /** + * Verify that the watcher received an event of the given action and target node. + * @param action action to match + * @param node target node + */ + public synchronized void expectEvent(Watcher.Action action, KubernetesSlave node) { + boolean found = CAPTURED_EVENTS.stream().anyMatch(e -> e.action == action && e.node == node); + assertTrue("expected event: " + action + ", " + node, found); + } + + /** + * Expect not event to have been received. + */ + public synchronized void expectNoEvents() { + assertEquals("no watcher events", 0, CAPTURED_EVENTS.size()); + } + + @Override + protected void after() { + CAPTURED_EVENTS.clear(); + } + } + + private static class ReaperListenerWatchEvent { + final Watcher.Action action; + final KubernetesSlave node; + final Pod pod; + + private ReaperListenerWatchEvent(Watcher.Action action, KubernetesSlave node, Pod pod) { + this.action = action; + this.node = node; + this.pod = pod; + } + + @Override + public String toString() { + return "[" + action + ", " + node + ", " + pod + "]"; + } + } +}