Skip to content

Commit

Permalink
Always provide a WorkerpoolAuthorization to a worker during its rec…
Browse files Browse the repository at this point in the history
…overy (#674)
  • Loading branch information
jbern0rd authored Mar 14, 2024
1 parent d76af8b commit 038f984
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.
- Keep a single `updateReplicateStatus` method in `ReplicatesService`. (#670)
- Check result has been uploaded for TEE tasks. (#672)
- Check for consensus early if a worker has already `CONTRIBUTED` when the task is updated to `RUNNING`. (#673)
- Always provide a `WorkerpoolAuthorization` to a worker during its recovery. (#674)

### Quality

Expand Down
46 changes: 26 additions & 20 deletions src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.iexec.core.task.update.TaskUpdateRequestManager;
import com.iexec.core.worker.Worker;
import com.iexec.core.worker.WorkerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
Expand All @@ -47,7 +48,7 @@

import static com.iexec.common.replicate.ReplicateStatus.*;


@Slf4j
@Service
public class ReplicateSupplyService implements Purgeable {

Expand Down Expand Up @@ -251,51 +252,56 @@ public List<TaskNotification> getMissedTaskNotifications(long blockNumber, Strin
for (Task task : tasksWithWorkerParticipation) {
String chainTaskId = task.getChainTaskId();

Optional<Replicate> oReplicate = replicatesService.getReplicate(chainTaskId, walletAddress);
if (oReplicate.isEmpty()) {
final Replicate replicate = replicatesService.getReplicate(chainTaskId, walletAddress).orElse(null);
if (replicate == null) {
log.debug("no replicate [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
continue;
}
Replicate replicate = oReplicate.get();
boolean isRecoverable = replicate.isRecoverable();
final boolean isRecoverable = replicate.isRecoverable();
if (!isRecoverable) {
log.debug("not recoverable [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
continue;
}
String enclaveChallenge = task.getEnclaveChallenge();
final String enclaveChallenge = task.getEnclaveChallenge();
if (task.isTeeTask() && enclaveChallenge.isEmpty()) {
log.debug("empty enclave challenge [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
continue;
}
Optional<TaskNotificationType> taskNotificationType = getTaskNotificationType(task, replicate, blockNumber);
if (taskNotificationType.isEmpty()) {
final TaskNotificationType taskNotificationType = getTaskNotificationType(task, replicate, blockNumber).orElse(null);
if (taskNotificationType == null) {
log.debug("no task notification [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
continue;
}
TaskNotificationExtra taskNotificationExtra =
getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge);
log.debug("task notification type {}", taskNotificationType);

final TaskNotificationExtra taskNotificationExtra =
getTaskNotificationExtra(task, taskNotificationType, walletAddress);

TaskNotification taskNotification = TaskNotification.builder()
final TaskNotification taskNotification = TaskNotification.builder()
.chainTaskId(chainTaskId)
.workersAddress(Collections.singletonList(walletAddress))
.taskNotificationType(taskNotificationType.get())
.taskNotificationType(taskNotificationType)
.taskNotificationExtra(taskNotificationExtra)
.build();

// change replicate status
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(RECOVERING);
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);

replicatesService.updateReplicateStatus(
chainTaskId, walletAddress, ReplicateStatusUpdate.poolManagerRequest(RECOVERING));
taskNotifications.add(taskNotification);
}

return taskNotifications;
}

private TaskNotificationExtra getTaskNotificationExtra(Task task, TaskNotificationType taskNotificationType, String walletAddress, String enclaveChallenge) {
TaskNotificationExtra taskNotificationExtra = TaskNotificationExtra.builder().build();
private TaskNotificationExtra getTaskNotificationExtra(Task task, TaskNotificationType taskNotificationType, String walletAddress) {
final WorkerpoolAuthorization authorization = signatureService.createAuthorization(
walletAddress, task.getChainTaskId(), task.getEnclaveChallenge());
final TaskNotificationExtra taskNotificationExtra = TaskNotificationExtra.builder()
.workerpoolAuthorization(authorization)
.build();

switch (taskNotificationType) {
case PLEASE_CONTRIBUTE:
WorkerpoolAuthorization authorization = signatureService.createAuthorization(
walletAddress, task.getChainTaskId(), enclaveChallenge);
taskNotificationExtra.setWorkerpoolAuthorization(authorization);
break;
case PLEASE_REVEAL:
taskNotificationExtra.setBlockNumber(task.getConsensusReachedBlockNumber());
Expand Down

0 comments on commit 038f984

Please sign in to comment.