Skip to content

Commit

Permalink
Merge pull request #2284 from HubSpot/missing_task_data
Browse files Browse the repository at this point in the history
Handle tasks which no longer have data in zk
  • Loading branch information
ssalinas authored Apr 7, 2022
2 parents eadde5c + 5f99b3e commit ae5c512
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ private CompletableFuture<StatusUpdateResult> handleStatusUpdateAsync(
);
}
if (result == StatusUpdateResult.KILL_TASK) {
LOG.info(
LOG.warn(
"Killing a task {} which Singularity has no remaining active state for. It will be given 1 minute to shut down gracefully",
status.getTaskId().getValue()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
long now = System.currentTimeMillis();
long delta = now - timestamp;

LOG.debug(
LOG.info(
"Update: task {} is now {} ({}) at {} (delta: {})",
taskId,
status.getState(),
Expand Down Expand Up @@ -507,16 +507,39 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
} else {
return StatusUpdateResult.KILL_TASK;
}
} else if (
}

// If a task is missing data in Singularity there is not much we can do to recover it
Optional<SingularityTask> maybeTask = taskManager.getTask(taskIdObj);
if (!maybeTask.isPresent()) {
LOG.warn("Missing task data for {}, trying to recover", taskId);
maybeTask = taskManager.tryRepairTask(taskIdObj);
}
if (!maybeTask.isPresent()) {
if (taskState.isDone()) {
saveNewTaskStatusHolder(taskIdObj, newTaskStatusHolder, taskState);
return StatusUpdateResult.DONE;
} else {
final String message = String.format(
"Task %s is active but is missing task data, killing task",
taskId
);
exceptionNotifier.notify(message);
LOG.error(message);
return StatusUpdateResult.KILL_TASK;
}
}

SingularityTask task = maybeTask.get();

if (
isDuplicateOrIgnorableStatusUpdate(previousTaskStatusHolder, newTaskStatusHolder)
) {
LOG.trace("Ignoring status update {} to {}", taskState, taskIdObj);
saveNewTaskStatusHolder(taskIdObj, newTaskStatusHolder, taskState);
return StatusUpdateResult.IGNORED;
}

Optional<SingularityTask> task = taskManager.getTask(taskIdObj);

if (status.getState() == TaskState.TASK_LOST) {
boolean isMesosFailure =
status.getReason() == Reason.REASON_INVALID_OFFERS ||
Expand All @@ -526,14 +549,12 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
status.getReason() == Reason.REASON_MASTER_DISCONNECTED ||
status.getReason() == Reason.REASON_AGENT_DISCONNECTED;

RequestType requestType = task.isPresent()
? task.get().getTaskRequest().getRequest().getRequestType()
: null;
RequestType requestType = task.getTaskRequest().getRequest().getRequestType();
boolean isRelaunchable = requestType != null && !requestType.isLongRunning();

if (isMesosFailure && isRelaunchable) {
LOG.info("Relaunching lost task {}", task);
relaunchTask(task.get());
relaunchTask(task);
}
lostTasksMeter.mark();
if (configuration.getDisasterDetection().isEnabled()) {
Expand All @@ -542,67 +563,40 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
}

if (!taskState.isDone()) {
if (!task.isPresent()) {
task = taskManager.tryRepairTask(taskIdObj);
}
if (task.isPresent()) {
final Optional<SingularityPendingDeploy> pendingDeploy = deployManager.getPendingDeploy(
taskIdObj.getRequestId()
);
final Optional<SingularityPendingDeploy> pendingDeploy = deployManager.getPendingDeploy(
taskIdObj.getRequestId()
);

Optional<SingularityRequestWithState> requestWithState = Optional.empty();
Optional<SingularityRequestWithState> requestWithState = Optional.empty();

if (taskState == ExtendedTaskState.TASK_RUNNING) {
requestWithState = requestManager.getRequest(taskIdObj.getRequestId());
healthchecker.enqueueHealthcheck(task.get(), pendingDeploy, requestWithState);
}
if (taskState == ExtendedTaskState.TASK_RUNNING) {
requestWithState = requestManager.getRequest(taskIdObj.getRequestId());
healthchecker.enqueueHealthcheck(task, pendingDeploy, requestWithState);
}

if (
!pendingDeploy.isPresent() ||
!pendingDeploy
.get()
.getDeployMarker()
.getDeployId()
.equals(taskIdObj.getDeployId())
) {
if (!requestWithState.isPresent()) {
requestWithState = requestManager.getRequest(taskIdObj.getRequestId());
}
newTaskChecker.enqueueNewTaskCheck(task.get(), requestWithState, healthchecker);
if (
!pendingDeploy.isPresent() ||
!pendingDeploy
.get()
.getDeployMarker()
.getDeployId()
.equals(taskIdObj.getDeployId())
) {
if (!requestWithState.isPresent()) {
requestWithState = requestManager.getRequest(taskIdObj.getRequestId());
}
} else {
final String message = String.format(
"Task %s is active but is missing task data",
taskId
);
taskManager.createTaskCleanup(
new SingularityTaskCleanup(
Optional.empty(),
TaskCleanupType.UNHEALTHY_NEW_TASK,
System.currentTimeMillis(),
taskIdObj,
Optional.of(
"Task is active but had no task data. Unable to continue running"
),
Optional.empty(),
Optional.empty()
)
);
exceptionNotifier.notify(message);
LOG.error(message);
newTaskChecker.enqueueNewTaskCheck(task, requestWithState, healthchecker);
}
}

final Optional<String> statusMessage = getStatusMessage(status, task);
final Optional<String> statusMessage = getStatusMessage(status, Optional.of(task));

final SingularityTaskHistoryUpdate taskUpdate = new SingularityTaskHistoryUpdate(
taskIdObj,
timestamp,
taskState,
statusMessage,
status.hasReason()
? Optional.of(status.getReason().name())
: Optional.<String>empty()
status.hasReason() ? Optional.of(status.getReason().name()) : Optional.empty()
);
final SingularityCreateResult taskHistoryUpdateCreateResult = taskManager.saveTaskHistoryUpdate(
taskUpdate
Expand All @@ -621,7 +615,7 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
taskIdObj,
taskState,
taskHistoryUpdateCreateResult,
task,
Optional.of(task),
timestamp
);
}
Expand Down Expand Up @@ -665,8 +659,11 @@ public boolean hasRoomForMoreUpdates() {
public CompletableFuture<StatusUpdateResult> processStatusUpdateAsync(
Protos.TaskStatus status
) {
LOG.info("Creating status update -- task: " + status.getTaskId());
return CompletableFuture.supplyAsync(
() -> {
LOG.info("Starting status update -- task: " + status.getTaskId());

final String taskId = status.getTaskId().getValue();
final Optional<SingularityTaskId> maybeTaskId = getTaskId(taskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2622,14 +2622,23 @@ public void testTaskOddities() {

Assertions.assertTrue(taskManager.isActiveTask(taskOne.getTaskId()));

statusUpdate(taskOne, TaskState.TASK_RUNNING);
List<SingularityTaskHistoryUpdate> updates = taskManager.getTaskHistoryUpdates(
taskOne.getTaskId()
);
Assertions.assertEquals(2, updates.size());
Assertions.assertTrue(
updates.stream().anyMatch(t -> t.getTaskState() == ExtendedTaskState.TASK_CLEANING)
// assert that we sent back a status that was not DONE for the status update (e.g. KILL_TASK)
Assertions.assertFalse(
sms
.statusUpdate(
TaskStatus
.newBuilder()
.setTaskId(MesosProtosUtils.toTaskId(taskOne.getMesosTask().getTaskId()))
.setAgentId(MesosProtosUtils.toAgentId(taskOne.getAgentId()))
.setState(TaskState.TASK_RUNNING)
.build()
)
.join()
);

// Task should get killed because we cannot recover any state
statusUpdate(taskOne, TaskState.TASK_KILLED);
Assertions.assertFalse(taskManager.isActiveTask(taskOne.getTaskId()));
}

@Test
Expand Down

0 comments on commit ae5c512

Please sign in to comment.