Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes sure the workspace activity checker reconciles the workspaces with their true statuses. #14574

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ che.installer.registry.remote=NULL

# Period of inactive workspaces suspend job execution.
che.workspace.activity_check_scheduler_period_s=60

# The period of the cleanup of the activity table. The activity table can contain invalid or stale data
# if some unforeseen errors happen, like a server crash at a peculiar point in time. The default is to
# run the cleanup job every hour.
che.workspace.activity_cleanup_scheduler_period_s=3600

# The delay after server startup to start the first activity clean up job.
che.workspace.activity_cleanup_scheduler_initial_delay_s=60

#
# Delay before first workspace idleness check job started to avoid
# mass suspend if ws master was unavailable for period close to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
*/
package org.eclipse.che.api.workspace.activity;

import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.inject.Singleton;
import org.eclipse.che.api.core.ConflictException;
import org.eclipse.che.api.core.Page;
Expand Down Expand Up @@ -53,7 +54,7 @@ public List<String> findExpired(long timestamp) {
.stream()
.filter(a -> a.getExpiration() != null && a.getExpiration() < timestamp)
.map(WorkspaceActivity::getWorkspaceId)
.collect(Collectors.toList());
.collect(toList());
}

@Override
Expand Down Expand Up @@ -111,7 +112,7 @@ public Page<String> findInStatusSince(
}
})
.map(WorkspaceActivity::getWorkspaceId)
.collect(Collectors.toList());
.collect(toList());

int total = all.size();
int from = skipCount > total ? total : (int) skipCount;
Expand Down Expand Up @@ -149,6 +150,15 @@ public void createActivity(WorkspaceActivity activity) throws ConflictException
}
}

@Override
public Page<WorkspaceActivity> getAll(int maxItems, long skipCount) {
return new Page<>(
workspaceActivities.values().stream().skip(skipCount).limit(maxItems).collect(toList()),
skipCount,
maxItems,
workspaceActivities.size());
}

private boolean isGreater(Long value, long threshold) {
return value != null && value > threshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,27 @@ public WorkspaceActivity findActivity(String workspaceId) throws ServerException
}
}

@Override
@Transactional(rollbackOn = ServerException.class)
public Page<WorkspaceActivity> getAll(int maxItems, long skipCount) throws ServerException {
try {
EntityManager em = managerProvider.get();
long total =
em.createNamedQuery("WorkspaceActivity.getAllCount", Long.class).getSingleResult();

List<WorkspaceActivity> page =
em.createNamedQuery("WorkspaceActivity.getAll", WorkspaceActivity.class)
.setFirstResult((int) skipCount)
.setMaxResults(maxItems)
.getResultStream()
.collect(Collectors.toList());

return new Page<>(page, skipCount, maxItems, total);
} catch (RuntimeException e) {
throw new ServerException(e.getLocalizedMessage(), e);
}
}

@Override
@Transactional(rollbackOn = {ConflictException.class, ServerException.class})
public void createActivity(WorkspaceActivity activity) throws ConflictException, ServerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
"SELECT COUNT(a) FROM WorkspaceActivity a"
+ " WHERE a.status = org.eclipse.che.api.core.model.workspace.WorkspaceStatus.STARTING"
+ " AND a.lastStarting <= :time"),
@NamedQuery(name = "WorkspaceActivity.getAll", query = "SELECT a FROM WorkspaceActivity a"),
@NamedQuery(
name = "WorkspaceActivity.getAllCount",
query = "SELECT COUNT(a) FROM WorkspaceActivity a"),
})
public class WorkspaceActivity {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Clock;
import org.eclipse.che.api.core.ConflictException;
import org.eclipse.che.api.core.NotFoundException;
import org.eclipse.che.api.core.Pages;
import org.eclipse.che.api.core.ServerException;
import org.eclipse.che.api.core.model.workspace.Workspace;
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
Expand All @@ -32,9 +33,10 @@
import org.slf4j.LoggerFactory;

/**
* Is in charge of checking the validity of the workspace activity records. The sole important
* method is {@link #validate()} which is run on a schedule to periodically check the validity of
* the records and report the potential error conditions.
* Is in charge of checking the validity of the workspace activity records. The important methods
* are {@link #expire()} which is run on a schedule to periodically stop the expired workspaces and
* {@link #cleanup()} which will try to clean up and reconcile the possibly invalid activity
* records.
*
* @author Lukas Krejci
*/
Expand Down Expand Up @@ -83,18 +85,30 @@ public WorkspaceActivityChecker(
initialDelayParameterName = "che.workspace.activity_check_scheduler_delay_s",
delayParameterName = "che.workspace.activity_check_scheduler_period_s")
@VisibleForTesting
void validate() {
void expire() {
try {
stopAllExpired();
} catch (ServerException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}

@ScheduleDelay(
initialDelayParameterName = "che.workspace.activity_cleanup_scheduler_initial_delay_s",
delayParameterName = "che.workspace.activity_cleanup_scheduler_period_s")
@VisibleForTesting
void cleanup() {
try {
checkActivityRecordValidity();
} catch (ServerException e) {
LOG.error(e.getLocalizedMessage(), e);
}

try {
reconcileActivityStatuses();
metlos marked this conversation as resolved.
Show resolved Hide resolved
} catch (ServerException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}

private void stopAllExpired() throws ServerException {
Expand Down Expand Up @@ -153,6 +167,29 @@ private void checkActivityRecordValidity() throws ServerException {
}
}

/**
* Makes sure that any activity records are rectified if they do not reflect the true state of the
* workspace anymore.
*
* @throws ServerException or error
metlos marked this conversation as resolved.
Show resolved Hide resolved
*/
private void reconcileActivityStatuses() throws ServerException {
for (WorkspaceActivity a : Pages.iterateLazily(activityDao::getAll, 200)) {
WorkspaceStatus status = workspaceRuntimes.getStatus(a.getWorkspaceId());
if (a.getStatus() != status) {
if (LOG.isWarnEnabled()) {
metlos marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn(
"Activity record for workspace {} was registering {} status while the workspace was {} in reality."
+ " Rectifying the activity record to reflect the true state of the workspace.",
a.getWorkspaceId(),
a.getStatus(),
status);
}
activityDao.setStatusChangeTime(a.getWorkspaceId(), status, clock.millis());
}
}
}

private void createMissingActivityRecord(String runningWsId, long idleTimeout)
throws ServerException {
LOG.warn(
Expand Down Expand Up @@ -337,7 +374,7 @@ private void rectifyExpirationTime(
/**
* Makes sure the activity of a running workspace has a last running time. The activity won't have
* a last running time very shortly after it was found running by the runtime before our event
* handler updated the activity record. If the schedule of the {@link #validate()} method precedes
* handler updated the activity record. If the schedule of the {@link #cleanup()} method precedes
* or coincides with the event handler we might not see the value. Otherwise this can
* theoretically also happen when the server is stopped at an unfortunate point in time while the
* workspace is starting and/or running and before the event handler had a chance of updating the
Expand All @@ -362,7 +399,7 @@ private boolean rectifyLastRunningTime(
LOG.warn(
"Workspace '{}' has been found running yet there is an activity on it newer than the"
+ " last running time. This should not happen. Resetting the last running time to"
+ " the newest activity time. The activity record is this: ",
+ " the newest activity time. The activity record is this: {}",
wsId,
activity.toString());
activityDao.setStatusChangeTime(wsId, WorkspaceStatus.RUNNING, latestActivityTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,13 @@ Page<String> findInStatusSince(
* @throws ServerException on other error
*/
void createActivity(WorkspaceActivity activity) throws ConflictException, ServerException;

/**
* Returns all current workspace activities.
*
* @param maxItems the page size
* @param skipCount the number of records to skip
* @return the workspace activities for all workspaces
*/
Page<WorkspaceActivity> getAll(int maxItems, long skipCount) throws ServerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
*/
package org.eclipse.che.api.workspace.activity;

import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -29,6 +32,7 @@
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import org.eclipse.che.api.core.Page;
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.workspace.server.WorkspaceManager;
Expand All @@ -54,13 +58,22 @@ public class WorkspaceActivityCheckerTest {
@Mock private EventService eventService;

@BeforeMethod
public void setUp() {
public void setUp() throws Exception {
clock = new ManualClock();

WorkspaceActivityManager activityManager =
new WorkspaceActivityManager(
workspaceManager, workspaceActivityDao, eventService, DEFAULT_TIMEOUT, clock);

when(workspaceActivityDao.getAll(anyInt(), anyLong()))
.thenAnswer(
inv -> {
int maxItems = inv.getArgument(0);
long skipCount = inv.getArgument(1);

return new Page<WorkspaceActivity>(emptyList(), skipCount, maxItems, 0);
});

checker =
new WorkspaceActivityChecker(
workspaceActivityDao, workspaceManager, workspaceRuntimes, activityManager, clock);
Expand All @@ -70,7 +83,7 @@ public void setUp() {
public void shouldStopAllExpiredWorkspaces() throws Exception {
when(workspaceActivityDao.findExpired(anyLong())).thenReturn(Arrays.asList("1", "2", "3"));

checker.validate();
checker.expire();

verify(workspaceActivityDao, times(3)).removeExpiration(anyString());
verify(workspaceActivityDao).removeExpiration(eq("1"));
Expand All @@ -93,7 +106,7 @@ public void shouldRecreateMissingActivityRecord() throws Exception {

// when
clock.forward(Duration.of(1, ChronoUnit.SECONDS));
checker.validate();
checker.cleanup();

// then
ArgumentCaptor<WorkspaceActivity> captor = ArgumentCaptor.forClass(WorkspaceActivity.class);
Expand Down Expand Up @@ -124,7 +137,7 @@ public void shouldRestoreCreatedTimeOnInvalidActivityRecord() throws Exception {
.build());

// when
checker.validate();
checker.cleanup();

// then
verify(workspaceActivityDao).setCreatedTime(eq(id), eq(15L));
Expand All @@ -143,7 +156,7 @@ public void shouldRestoreLastRunningTimeOnInvalidActivityRecordUsingCreatedTime(

// when
clock.forward(Duration.of(1, ChronoUnit.SECONDS));
checker.validate();
checker.cleanup();

// then
verify(workspaceActivityDao, never()).setCreatedTime(eq(id), anyLong());
Expand All @@ -170,7 +183,7 @@ public void shouldRestoreLastRunningTimeOnInvalidActivityRecordUsingLastStarting

// when
clock.forward(Duration.of(1, ChronoUnit.SECONDS));
checker.validate();
checker.cleanup();

// then
verify(workspaceActivityDao).setCreatedTime(eq(id), eq(15L));
Expand All @@ -191,7 +204,7 @@ public void shouldRestoreExpirationTimeMoreThanASecondAfterRunning() throws Exce

// when
clock.forward(Duration.of(1500, ChronoUnit.MILLIS));
checker.validate();
checker.cleanup();

// then
verify(workspaceActivityDao).setExpirationTime(eq(id), eq(lastRunning + DEFAULT_TIMEOUT));
Expand All @@ -209,12 +222,45 @@ public void shouldNotRestoreExpirationTimeLessThanASecondAfterRunning() throws E

// when
clock.forward(Duration.of(900, ChronoUnit.MILLIS));
checker.validate();
checker.cleanup();

// then
verify(workspaceActivityDao, never()).setExpirationTime(anyString(), anyLong());
}

@Test
public void shouldRestoreTrueStateOfWorkspaceIfActivityDoesntReflectThat() throws Exception {
// given
String wsId = "1";
WorkspaceActivity activity = new WorkspaceActivity();
activity.setCreated(clock.millis());
activity.setWorkspaceId(wsId);
activity.setStatus(WorkspaceStatus.STARTING);
activity.setLastStarting(clock.millis());
doAnswer(
inv -> {
int maxItems = inv.getArgument(0);
long skipCount = inv.getArgument(1);

if (skipCount < 1) {
return new Page<>(singleton(activity), skipCount, maxItems, 1);
} else {
return new Page<>(emptyList(), skipCount, maxItems, 1);
}
})
.when(workspaceActivityDao)
.getAll(anyInt(), anyLong());

when(workspaceRuntimes.getStatus(eq(wsId))).thenReturn(WorkspaceStatus.STOPPED);

// when
checker.cleanup();

// then
verify(workspaceActivityDao)
.setStatusChangeTime(eq(wsId), eq(WorkspaceStatus.STOPPED), eq(clock.millis()));
}

private static final class ManualClock extends Clock {

private Instant instant;
Expand Down