diff --git a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/namespace/KubernetesPersistentVolumeClaims.java b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/namespace/KubernetesPersistentVolumeClaims.java index d4c83fcd70e..9afdb8c017e 100644 --- a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/namespace/KubernetesPersistentVolumeClaims.java +++ b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/namespace/KubernetesPersistentVolumeClaims.java @@ -14,6 +14,7 @@ import static java.util.stream.Collectors.toSet; import io.fabric8.kubernetes.api.model.DoneablePersistentVolumeClaim; +import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.PersistentVolumeClaim; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; @@ -27,11 +28,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Predicate; import org.eclipse.che.api.workspace.server.spi.InfrastructureException; import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException; import org.eclipse.che.workspace.infrastructure.kubernetes.KubernetesClientFactory; import org.eclipse.che.workspace.infrastructure.kubernetes.KubernetesInfrastructureException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Defines an internal API for managing {@link PersistentVolumeClaim} instances in {@link @@ -41,7 +43,12 @@ */ public class KubernetesPersistentVolumeClaims { - public static final String PVC_BOUND_PHASE = "Bound"; + private static final Logger LOG = LoggerFactory.getLogger(KubernetesPersistentVolumeClaims.class); + + private static final String PVC_BOUND_PHASE = "Bound"; + private static final String PVC_EVENT_REASON_FIELD_KEY = "reason"; + private static final String PVC_EVENT_WAIT_CONSUMER_REASON = "WaitForFirstConsumer"; + private static final String PVC_EVENT_UID_FIELD_KEY = "involvedObject.uid"; private final String namespace; private final String workspaceId; @@ -151,36 +158,18 @@ public void delete(Map labels) throws InfrastructureException { } /** - * Waits until persistent volume claim state is bound. + * Waits until persistent volume claim state is bound. If used k8s Storage Class has + * 'volumeBindingMode: WaitForFirstConsumer', we don't wait to avoid deadlock. * * @param name name of persistent volume claim that should be watched * @param timeoutMillis waiting timeout in milliseconds - * @return persistent volume claim that satisfies the specified predicate + * @return persistent volume claim that is bound or in waiting for consumer state * @throws InfrastructureException when specified timeout is reached * @throws InfrastructureException when {@link Thread} is interrupted while waiting * @throws InfrastructureException when any other exception occurs */ public PersistentVolumeClaim waitBound(String name, long timeoutMillis) throws InfrastructureException { - return wait(name, timeoutMillis, pvc -> pvc.getStatus().getPhase().equals(PVC_BOUND_PHASE)); - } - - /** - * Waits until persistent volume claim state will suit for specified predicate. - * - * @param name name of persistent volume claim that should be watched - * @param timeoutMillis waiting timeout in milliseconds - * @param predicate predicate to perform state check - * @return persistent volume claim that satisfies the specified predicate - * @throws InfrastructureException when specified timeout is reached - * @throws InfrastructureException when {@link Thread} is interrupted while waiting - * @throws InfrastructureException when any other exception occurs - */ - public PersistentVolumeClaim wait( - String name, long timeoutMillis, Predicate predicate) - throws InfrastructureException { - CompletableFuture future = new CompletableFuture<>(); - Watch watch = null; try { Resource pvcResource = clientFactory @@ -189,29 +178,15 @@ public PersistentVolumeClaim wait( .inNamespace(namespace) .withName(name); - watch = - pvcResource.watch( - new Watcher() { - @Override - public void eventReceived(Action action, PersistentVolumeClaim pvc) { - if (predicate.test(pvc)) { - future.complete(pvc); - } - } - - @Override - public void onClose(KubernetesClientException cause) { - future.completeExceptionally( - new InfrastructureException( - "Waiting for persistent volume claim '" + name + "' was interrupted")); - } - }); - PersistentVolumeClaim actualPvc = pvcResource.get(); - if (predicate.test(actualPvc)) { + if (actualPvc.getStatus().getPhase().equals(PVC_BOUND_PHASE)) { return actualPvc; } - try { + + CompletableFuture future = new CompletableFuture<>(); + // any of these watchers can finish the operation resolving the future + try (Watch boundWatcher = pvcIsBoundWatcher(future, pvcResource); + Watch waitingWatcher = pvcIsWaitingForConsumerWatcher(future, actualPvc)) { return future.get(timeoutMillis, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { // May happen only if WebSocket Connection is closed before needed event received. @@ -231,10 +206,69 @@ public void onClose(KubernetesClientException cause) { } } catch (KubernetesClientException e) { throw new KubernetesInfrastructureException(e); - } finally { - if (watch != null) { - watch.close(); - } + } + } + + private Watch pvcIsBoundWatcher( + CompletableFuture future, + Resource pvcResource) { + return pvcResource.watch( + new Watcher() { + @Override + public void eventReceived(Action action, PersistentVolumeClaim pvc) { + if (pvc.getStatus().getPhase().equals(PVC_BOUND_PHASE)) { + LOG.debug("pvc '" + pvc.getMetadata().getName() + "' is bound"); + future.complete(pvc); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + safelyFinishFutureOnClose(cause, future, pvcResource.get().getMetadata().getName()); + } + }); + } + + /** + * Creates and returns {@link Watch} that watches for 'WaitForFirstConsumer' events on given PVC. + */ + private Watch pvcIsWaitingForConsumerWatcher( + CompletableFuture future, PersistentVolumeClaim actualPvc) + throws InfrastructureException { + return clientFactory + .create(workspaceId) + .events() + .inNamespace(namespace) + .withField(PVC_EVENT_REASON_FIELD_KEY, PVC_EVENT_WAIT_CONSUMER_REASON) + .withField(PVC_EVENT_UID_FIELD_KEY, actualPvc.getMetadata().getUid()) + .watch( + new Watcher() { + @Override + public void eventReceived(Action action, Event resource) { + LOG.debug( + "PVC '" + + actualPvc.getMetadata().getName() + + "' is waiting for first consumer. Don't wait to bound to avoid deadlock."); + future.complete(actualPvc); + } + + @Override + public void onClose(KubernetesClientException cause) { + safelyFinishFutureOnClose(cause, future, actualPvc.getMetadata().getName()); + } + }); + } + + private void safelyFinishFutureOnClose( + KubernetesClientException cause, + CompletableFuture future, + String pvcName) { + if (cause != null) { + future.completeExceptionally( + new InfrastructureException( + "Waiting for persistent volume claim '" + pvcName + "' was interrupted")); + } else if (!future.isDone()) { + future.cancel(true); } } }