Skip to content

Commit

Permalink
Merge pull request #524 from iExecBlockchainComputing/bugfix/remove-o…
Browse files Browse the repository at this point in the history
…ptional-from-getLastRelevantStatus

Refactor `Replicate#getLastRelevantStatus` to remove the `Optional` return type
  • Loading branch information
mcornaton authored Oct 4, 2022
2 parents 32aece6 + 7e35e70 commit 625d074
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 171 deletions.
25 changes: 7 additions & 18 deletions src/main/java/com/iexec/core/contribution/ContributionHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public class ContributionHelper {
Expand All @@ -38,12 +37,9 @@ static int getContributedWeight(List<Replicate> replicates, String contribution)
int groupWeight = 0;
for (Replicate replicate : replicates) {

Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();
if (lastRelevantStatus.isEmpty()) {
continue;
}
ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();

boolean isContributed = lastRelevantStatus.get().equals(ReplicateStatus.CONTRIBUTED);
boolean isContributed = lastRelevantStatus == ReplicateStatus.CONTRIBUTED;
boolean haveSameContribution = contribution.equals(replicate.getContributionHash());
boolean hasWeight = replicate.getWorkerWeight() > 0;

Expand All @@ -64,14 +60,11 @@ static int getPendingWeight(List<Replicate> replicates, long maxExecutionTime) {

for (Replicate replicate : replicates) {

Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();
if (lastRelevantStatus.isEmpty()) {
continue;
}
ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();

boolean isCreatedLessThanOnePeriodAgo = !replicate.isCreatedMoreThanNPeriodsAgo(1, maxExecutionTime);
boolean isNotContributed = !lastRelevantStatus.get().equals(ReplicateStatus.CONTRIBUTED);
boolean isNotFailed = !lastRelevantStatus.get().equals(ReplicateStatus.FAILED);
boolean isNotContributed = lastRelevantStatus != ReplicateStatus.CONTRIBUTED;
boolean isNotFailed = lastRelevantStatus != ReplicateStatus.FAILED;
boolean hasWeight = replicate.getWorkerWeight() > 0;

if (isCreatedLessThanOnePeriodAgo && isNotContributed && isNotFailed && hasWeight) {
Expand All @@ -92,12 +85,8 @@ static Set<String> getDistinctContributions(List<Replicate> replicates) {

for (Replicate replicate : replicates) {

Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();
if (lastRelevantStatus.isEmpty()) {
continue;
}

if (lastRelevantStatus.get().equals(ReplicateStatus.CONTRIBUTED)) {
ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();
if (lastRelevantStatus == ReplicateStatus.CONTRIBUTED) {
distinctContributions.add(replicate.getContributionHash());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Optional;

import static com.iexec.common.replicate.ReplicateStatus.WORKER_LOST;
import static com.iexec.common.replicate.ReplicateStatus.getMissingStatuses;
Expand Down Expand Up @@ -60,16 +59,16 @@ void dectectOnchainCompletedWhenOffchainCompleting(List<TaskStatus> detectWhenOf
ChainContributionStatus onchainCompleted) {
for (Task task : taskService.findByCurrentStatus(detectWhenOffChainTaskStatuses)) {
for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) {
Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();
if (lastRelevantStatus.isEmpty() || !lastRelevantStatus.get().equals(offchainCompleting)) {
ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();
if (lastRelevantStatus != offchainCompleting) {
continue;
}

boolean statusTrueOnChain = iexecHubService.isStatusTrueOnChain(task.getChainTaskId(), replicate.getWalletAddress(), onchainCompleted);

if (statusTrueOnChain) {
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
lastRelevantStatus.get(), onchainCompleted, task.getChainTaskId());
lastRelevantStatus, onchainCompleted, task.getChainTaskId());
updateReplicateStatuses(task, replicate, offchainCompleted);
}
}
Expand All @@ -82,17 +81,17 @@ void dectectOnchainCompleted(List<TaskStatus> detectWhenOffChainTaskStatuses,
ChainContributionStatus onchainCompleted) {
for (Task task : taskService.findByCurrentStatus(detectWhenOffChainTaskStatuses)) {
for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) {
Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();
ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();

if (lastRelevantStatus.isEmpty() || lastRelevantStatus.get().equals(offchainCompleted)) {
if (lastRelevantStatus == offchainCompleted) {
continue;
}

boolean statusTrueOnChain = iexecHubService.isStatusTrueOnChain(task.getChainTaskId(), replicate.getWalletAddress(), onchainCompleted);

if (statusTrueOnChain) {
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
lastRelevantStatus.get(), onchainCompleted, task.getChainTaskId());
lastRelevantStatus, onchainCompleted, task.getChainTaskId());
updateReplicateStatuses(task, replicate, offchainCompleted);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.iexec.core.replicate;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* Exception that can be thrown
* when searching for a {@link com.iexec.common.replicate.ReplicateStatus} fails.
*/
@AllArgsConstructor
@Getter
public class NoReplicateStatusException extends RuntimeException {
private final String chainTaskId;
}
58 changes: 28 additions & 30 deletions src/main/java/com/iexec/core/replicate/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,31 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.iexec.common.chain.ChainReceipt;
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusCause;
import com.iexec.common.replicate.ReplicateStatusDetails;
import com.iexec.common.replicate.ReplicateStatusModifier;
import com.iexec.common.replicate.ReplicateStatusUpdate;

import com.iexec.common.replicate.*;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

import static com.iexec.common.replicate.ReplicateStatus.*;
import static com.iexec.common.replicate.ReplicateStatusUpdate.*;
import static com.iexec.common.replicate.ReplicateStatus.CREATED;
import static com.iexec.common.replicate.ReplicateStatus.WORKER_LOST;
import static com.iexec.common.replicate.ReplicateStatusUpdate.poolManagerRequest;


@Data
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class Replicate {

private List<ReplicateStatusUpdate> statusUpdateList;
// FIXME: should be final
private List<ReplicateStatusUpdate> statusUpdateList = new ArrayList<>(
// a new replicate should only be created by the scheduler
List.of(poolManagerRequest(CREATED))
);
private String walletAddress;
private String resultLink;
private String chainCallbackData;
Expand All @@ -52,9 +55,6 @@ public class Replicate {
public Replicate(String walletAddress, String chainTaskId) {
this.chainTaskId = chainTaskId;
this.walletAddress = walletAddress;
this.statusUpdateList = new ArrayList<>();
// a new replicate should only be create by the scheduler
this.statusUpdateList.add(poolManagerRequest(CREATED));
this.contributionHash = "";
}

Expand All @@ -64,10 +64,10 @@ public ReplicateStatus getCurrentStatus() {
}

@JsonIgnore
public Optional<ReplicateStatus> getLastRelevantStatus() { // FIXME: remove Optional and add a no-args constructor
public ReplicateStatus getLastRelevantStatus() {
// ignore cases like: WORKER_LOST and RECOVERING

List<ReplicateStatus> statusList = getStatusUpdateList().stream()
List<ReplicateStatus> statusList = statusUpdateList.stream()
.map(ReplicateStatusUpdate::getStatus)
.collect(Collectors.toList());

Expand All @@ -77,21 +77,21 @@ public Optional<ReplicateStatus> getLastRelevantStatus() { // FIXME: remove Opt

for (int i = statusList.size() - 1; i >= 0; i--) {
if (!ignoredStatuses.contains(statusList.get(i))) {
return Optional.of(statusList.get(i));
return statusList.get(i);
}
}

return Optional.empty();
throw new NoReplicateStatusException(chainTaskId);
}

@JsonIgnore
public ReplicateStatus getLastButOneStatus() {
return this.getStatusUpdateList().get(this.getStatusUpdateList().size() - 2).getStatus();
return statusUpdateList.get(statusUpdateList.size() - 2).getStatus();
}

@JsonIgnore
private ReplicateStatusUpdate getLatestStatusUpdate() {
return this.getStatusUpdateList().get(this.getStatusUpdateList().size() - 1);
return statusUpdateList.get(statusUpdateList.size() - 1);
}

public boolean updateStatus(ReplicateStatus newStatus, ReplicateStatusModifier modifier) {
Expand All @@ -115,7 +115,7 @@ public boolean updateStatus(ReplicateStatusUpdate statusUpdate) {
}

public boolean containsStatus(ReplicateStatus replicateStatus) {
for (ReplicateStatusUpdate replicateStatusUpdate : this.getStatusUpdateList()) {
for (ReplicateStatusUpdate replicateStatusUpdate : statusUpdateList) {
if (replicateStatusUpdate.getStatus().equals(replicateStatus)) {
return true;
}
Expand All @@ -132,7 +132,7 @@ public boolean containsRevealedStatus() {
}

public boolean isCreatedMoreThanNPeriodsAgo(int numberPeriod, long maxExecutionTime) {
Date creationDate = this.getStatusUpdateList().get(0).getDate();
Date creationDate = statusUpdateList.get(0).getDate();
Date numberPeriodsAfterCreationDate = new Date(creationDate.getTime() + numberPeriod * maxExecutionTime);
Date now = new Date();

Expand All @@ -149,21 +149,19 @@ public boolean isBusyComputing() {
}

public boolean isRecoverable() {
Optional<ReplicateStatus> currentStatus = getLastRelevantStatus();
if (currentStatus.isEmpty()) return false;
return ReplicateStatus.isRecoverable(currentStatus.get());
ReplicateStatus currentStatus = getLastRelevantStatus();
return ReplicateStatus.isRecoverable(currentStatus);
}

public boolean isBeforeStatus(ReplicateStatus status) {
Optional<ReplicateStatus> currentStatus = getLastRelevantStatus();
if (currentStatus.isEmpty()) return false;
return currentStatus.get().ordinal() < status.ordinal();
ReplicateStatus currentStatus = getLastRelevantStatus();
return currentStatus.ordinal() < status.ordinal();
}

boolean isStatusBeforeWorkerLostEqualsTo(ReplicateStatus status) {
int size = getStatusUpdateList().size();
int size = statusUpdateList.size();
return size >= 2
&& getStatusUpdateList().get(size - 1).getStatus().equals(WORKER_LOST)
&& getStatusUpdateList().get(size - 2).getStatus().equals(status);
&& statusUpdateList.get(size - 1).getStatus().equals(WORKER_LOST)
&& statusUpdateList.get(size - 2).getStatus().equals(status);
}
}
43 changes: 12 additions & 31 deletions src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,8 @@ private Optional<TaskNotificationType> recoverReplicateInContributionPhase(Task
String chainTaskId = task.getChainTaskId();
String walletAddress = replicate.getWalletAddress();

if (replicate.getLastRelevantStatus().isEmpty()) {
return Optional.empty();
}

boolean beforeContributing = replicate.isBeforeStatus(ReplicateStatus.CONTRIBUTING);
boolean didReplicateStartContributing = replicate.getLastRelevantStatus().get().equals(ReplicateStatus.CONTRIBUTING);
boolean didReplicateStartContributing = replicate.getLastRelevantStatus() == ReplicateStatus.CONTRIBUTING;
boolean didReplicateContributeOnChain = replicatesService.didReplicateContributeOnchain(chainTaskId, walletAddress);

if (beforeContributing) {
Expand All @@ -373,12 +369,8 @@ private Optional<TaskNotificationType> recoverReplicateInContributionPhase(Task
}

Replicate replicateWithLatestChanges = oReplicateWithLatestChanges.get();
if (replicateWithLatestChanges.getLastRelevantStatus().isEmpty()) {
return Optional.empty();
}

boolean didReplicateContribute = replicateWithLatestChanges.getLastRelevantStatus().get()
.equals(ReplicateStatus.CONTRIBUTED);
boolean didReplicateContribute = replicateWithLatestChanges.getLastRelevantStatus()
== ReplicateStatus.CONTRIBUTED;

if (didReplicateContribute) {
final Optional<ReplicatesList> oReplicatesList = replicatesService.getReplicatesList(chainTaskId);
Expand Down Expand Up @@ -408,12 +400,8 @@ private Optional<TaskNotificationType> recoverReplicateInRevealPhase(Task task,
String chainTaskId = task.getChainTaskId();
String walletAddress = replicate.getWalletAddress();

if (replicate.getLastRelevantStatus().isEmpty()) {
return Optional.empty();
}

boolean isInStatusContributed = replicate.getLastRelevantStatus().get().equals(ReplicateStatus.CONTRIBUTED);
boolean didReplicateStartRevealing = replicate.getLastRelevantStatus().get().equals(ReplicateStatus.REVEALING);
boolean isInStatusContributed = replicate.getLastRelevantStatus() == ReplicateStatus.CONTRIBUTED;
boolean didReplicateStartRevealing = replicate.getLastRelevantStatus() == ReplicateStatus.REVEALING;
boolean didReplicateRevealOnChain = replicatesService.didReplicateRevealOnchain(chainTaskId, walletAddress);

if (isInStatusContributed) {
Expand All @@ -437,15 +425,12 @@ private Optional<TaskNotificationType> recoverReplicateInRevealPhase(Task task,
return Optional.empty();
}
replicate = oReplicateWithLatestChanges.get();
if (replicate.getLastRelevantStatus().isEmpty()) {
return Optional.empty();
}

boolean didReplicateReveal = replicate.getLastRelevantStatus().get()
.equals(ReplicateStatus.REVEALED);
boolean didReplicateReveal = replicate.getLastRelevantStatus()
== ReplicateStatus.REVEALED;

boolean wasReplicateRequestedToUpload = replicate.getLastRelevantStatus().get()
.equals(ReplicateStatus.RESULT_UPLOAD_REQUESTED);
boolean wasReplicateRequestedToUpload = replicate.getLastRelevantStatus()
== ReplicateStatus.RESULT_UPLOAD_REQUESTED;

if (didReplicateReveal) {
return Optional.of(TaskNotificationType.PLEASE_WAIT);
Expand All @@ -470,14 +455,10 @@ private Optional<TaskNotificationType> recoverReplicateInResultUploadPhase(Task
String chainTaskId = task.getChainTaskId();
String walletAddress = replicate.getWalletAddress();

if (replicate.getLastRelevantStatus().isEmpty()) {
return Optional.empty();
}

boolean wasReplicateRequestedToUpload = replicate.getLastRelevantStatus().get().equals(ReplicateStatus.RESULT_UPLOAD_REQUESTED);
boolean didReplicateStartUploading = replicate.getLastRelevantStatus().get().equals(ReplicateStatus.RESULT_UPLOADING);
boolean wasReplicateRequestedToUpload = replicate.getLastRelevantStatus() == ReplicateStatus.RESULT_UPLOAD_REQUESTED;
boolean didReplicateStartUploading = replicate.getLastRelevantStatus() == ReplicateStatus.RESULT_UPLOADING;
boolean didReplicateUploadWithoutNotifying = replicatesService.isResultUploaded(task.getChainTaskId());
boolean hasReplicateAlreadyUploaded = replicate.getLastRelevantStatus().get().equals(ReplicateStatus.RESULT_UPLOADED);
boolean hasReplicateAlreadyUploaded = replicate.getLastRelevantStatus() == ReplicateStatus.RESULT_UPLOADED;

if (wasReplicateRequestedToUpload) {
return Optional.of(TaskNotificationType.PLEASE_UPLOAD);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/iexec/core/replicate/ReplicatesList.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public ReplicatesList(String chainTaskId, List<Replicate> replicates) {
public int getNbValidContributedWinners(String contributionHash) {
int nbValidWinners = 0;
for (Replicate replicate : replicates) {
Optional<ReplicateStatus> oStatus = replicate.getLastRelevantStatus();
if (oStatus.isPresent() && oStatus.get().equals(CONTRIBUTED)
ReplicateStatus status = replicate.getLastRelevantStatus();
if (status == CONTRIBUTED
&& contributionHash.equals(replicate.getContributionHash())) {
nbValidWinners++;
}
Expand All @@ -94,7 +94,7 @@ public int getNbReplicatesWithLastRelevantStatus(ReplicateStatus... listStatus)
int nbReplicates = 0;
for (Replicate replicate : replicates) {
for (ReplicateStatus status : listStatus) {
if (Objects.equals(replicate.getLastRelevantStatus().orElse(null), status)) {
if (replicate.getLastRelevantStatus() == status) {
nbReplicates++;
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/com/iexec/core/replicate/ReplicatesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,8 @@ public boolean didReplicateRevealOnchain(String chainTaskId, String walletAddres
}

public void setRevealTimeoutStatusIfNeeded(String chainTaskId, Replicate replicate) {
Optional<ReplicateStatus> oStatus = replicate.getLastRelevantStatus();
if (oStatus.isEmpty()) {
return;
}
ReplicateStatus status = oStatus.get();
if (status.equals(REVEALING) || status.equals(CONTRIBUTED)) {
ReplicateStatus status = replicate.getLastRelevantStatus();
if (status == REVEALING || status == CONTRIBUTED) {
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(FAILED, REVEAL_TIMEOUT);
updateReplicateStatus(chainTaskId, replicate.getWalletAddress(), statusUpdate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ void running2RunningFailed(Task task) {
boolean notAllReplicatesFailed = replicatesOfAliveWorkers
.stream()
.map(Replicate::getLastRelevantStatus)
.map(Optional::get)
.anyMatch(Predicate.not(ReplicateStatus::isFailedBeforeComputed));

if (notAllReplicatesFailed) {
Expand Down Expand Up @@ -498,8 +497,7 @@ void resultUploading2Uploaded(Task task) {

boolean isReplicateUploading = replicate.getCurrentStatus() == ReplicateStatus.RESULT_UPLOADING;
boolean isReplicateRecoveringToUpload = replicate.getCurrentStatus() == ReplicateStatus.RECOVERING &&
replicate.getLastRelevantStatus().isPresent() &&
replicate.getLastRelevantStatus().get() == ReplicateStatus.RESULT_UPLOADING;
replicate.getLastRelevantStatus() == ReplicateStatus.RESULT_UPLOADING;

if (!isReplicateUploading && !isReplicateRecoveringToUpload) {
requestUpload(task);
Expand Down
Loading

0 comments on commit 625d074

Please sign in to comment.