Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watch PVC WaitForFirstConsumer event to avoid deadlock #14239

Merged
merged 4 commits into from
Aug 16, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static java.util.stream.Collectors.toSet;

import io.fabric8.kubernetes.api.model.DoneablePersistentVolumeClaim;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
Expand All @@ -27,11 +28,12 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.workspace.infrastructure.kubernetes.KubernetesClientFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.KubernetesInfrastructureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Defines an internal API for managing {@link PersistentVolumeClaim} instances in {@link
Expand All @@ -41,7 +43,12 @@
*/
public class KubernetesPersistentVolumeClaims {

public static final String PVC_BOUND_PHASE = "Bound";
private static final Logger LOG = LoggerFactory.getLogger(KubernetesPersistentVolumeClaims.class);

private static final String PVC_BOUND_PHASE = "Bound";
private static final String PVC_EVENT_REASON_FIELD_KEY = "reason";
private static final String PVC_EVENT_WAIT_CONSUMER_REASON = "WaitForFirstConsumer";
private static final String PVC_EVENT_UID_FIELD_KEY = "involvedObject.uid";

private final String namespace;
private final String workspaceId;
Expand Down Expand Up @@ -151,36 +158,18 @@ public void delete(Map<String, String> labels) throws InfrastructureException {
}

/**
* Waits until persistent volume claim state is bound.
* Waits until persistent volume claim state is bound. If used k8s Storage Class has
* 'volumeBindingMode: WaitForFirstConsumer', we don't wait to avoid deadlock.
*
* @param name name of persistent volume claim that should be watched
* @param timeoutMillis waiting timeout in milliseconds
* @return persistent volume claim that satisfies the specified predicate
* @return persistent volume claim that is bound or in waiting for consumer state
* @throws InfrastructureException when specified timeout is reached
* @throws InfrastructureException when {@link Thread} is interrupted while waiting
* @throws InfrastructureException when any other exception occurs
*/
public PersistentVolumeClaim waitBound(String name, long timeoutMillis)
throws InfrastructureException {
return wait(name, timeoutMillis, pvc -> pvc.getStatus().getPhase().equals(PVC_BOUND_PHASE));
}

/**
* Waits until persistent volume claim state will suit for specified predicate.
*
* @param name name of persistent volume claim that should be watched
* @param timeoutMillis waiting timeout in milliseconds
* @param predicate predicate to perform state check
* @return persistent volume claim that satisfies the specified predicate
* @throws InfrastructureException when specified timeout is reached
* @throws InfrastructureException when {@link Thread} is interrupted while waiting
* @throws InfrastructureException when any other exception occurs
*/
public PersistentVolumeClaim wait(
String name, long timeoutMillis, Predicate<PersistentVolumeClaim> predicate)
throws InfrastructureException {
CompletableFuture<PersistentVolumeClaim> future = new CompletableFuture<>();
Watch watch = null;
try {
Resource<PersistentVolumeClaim, DoneablePersistentVolumeClaim> pvcResource =
clientFactory
Expand All @@ -189,29 +178,15 @@ public PersistentVolumeClaim wait(
.inNamespace(namespace)
.withName(name);

watch =
pvcResource.watch(
new Watcher<PersistentVolumeClaim>() {
@Override
public void eventReceived(Action action, PersistentVolumeClaim pvc) {
if (predicate.test(pvc)) {
future.complete(pvc);
}
}

@Override
public void onClose(KubernetesClientException cause) {
future.completeExceptionally(
new InfrastructureException(
"Waiting for persistent volume claim '" + name + "' was interrupted"));
}
});

PersistentVolumeClaim actualPvc = pvcResource.get();
if (predicate.test(actualPvc)) {
if (actualPvc.getStatus().getPhase().equals(PVC_BOUND_PHASE)) {
return actualPvc;
}
try {

CompletableFuture<PersistentVolumeClaim> future = new CompletableFuture<>();
// any of these watchers can finish the operation resolving the future
try (Watch boundWatcher = pvcIsBoundWatcher(future, pvcResource);
Watch waitingWatcher = pvcIsWaitingForConsumerWatcher(future, actualPvc)) {
return future.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
// May happen only if WebSocket Connection is closed before needed event received.
Expand All @@ -231,10 +206,69 @@ public void onClose(KubernetesClientException cause) {
}
} catch (KubernetesClientException e) {
throw new KubernetesInfrastructureException(e);
} finally {
if (watch != null) {
watch.close();
}
}
}

private Watch pvcIsBoundWatcher(
CompletableFuture<PersistentVolumeClaim> future,
Resource<PersistentVolumeClaim, DoneablePersistentVolumeClaim> pvcResource) {
return pvcResource.watch(
new Watcher<PersistentVolumeClaim>() {
@Override
public void eventReceived(Action action, PersistentVolumeClaim pvc) {
if (pvc.getStatus().getPhase().equals(PVC_BOUND_PHASE)) {
LOG.debug("pvc '" + pvc.getMetadata().getName() + "' is bound");
future.complete(pvc);
}
}

@Override
public void onClose(KubernetesClientException cause) {
safelyFinishFutureOnClose(cause, future, pvcResource.get().getMetadata().getName());
}
});
}

/**
* Creates and returns {@link Watch} that watches for 'WaitForFirstConsumer' events on given PVC.
*/
private Watch pvcIsWaitingForConsumerWatcher(
CompletableFuture<PersistentVolumeClaim> future, PersistentVolumeClaim actualPvc)
throws InfrastructureException {
return clientFactory
.create(workspaceId)
.events()
sleshchenko marked this conversation as resolved.
Show resolved Hide resolved
.inNamespace(namespace)
.withField(PVC_EVENT_REASON_FIELD_KEY, PVC_EVENT_WAIT_CONSUMER_REASON)
.withField(PVC_EVENT_UID_FIELD_KEY, actualPvc.getMetadata().getUid())
.watch(
new Watcher<Event>() {
@Override
public void eventReceived(Action action, Event resource) {
LOG.debug(
"PVC '"
+ actualPvc.getMetadata().getName()
+ "' is waiting for first consumer. Don't wait to bound to avoid deadlock.");
future.complete(actualPvc);
}

@Override
public void onClose(KubernetesClientException cause) {
sparkoo marked this conversation as resolved.
Show resolved Hide resolved
safelyFinishFutureOnClose(cause, future, actualPvc.getMetadata().getName());
}
});
}

private void safelyFinishFutureOnClose(
KubernetesClientException cause,
CompletableFuture<PersistentVolumeClaim> future,
String pvcName) {
if (cause != null) {
future.completeExceptionally(
new InfrastructureException(
"Waiting for persistent volume claim '" + pvcName + "' was interrupted"));
} else if (!future.isDone()) {
future.cancel(true);
}
}
}