Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prediction refactor #305

Merged
merged 12 commits into from
Jun 18, 2019
52 changes: 52 additions & 0 deletions src/main/java/com/iexec/core/contribution/ConsensusService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.iexec.core.contribution;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ConsensusService {

private PredictionService predictionService;

public ConsensusService(PredictionService predictionService) {
this.predictionService = predictionService;
}


/*
*
* Estimating pending workers are going to contribute to the best prediction
*
* Return true means a consensus is not possible now, a new worker is welcome to add more weight
* Return false means a consensus is possible now, no need to add new workers
*
*/
public boolean doesTaskNeedMoreContributionsForConsensus(String chainTaskId, int trust, long maxExecutionTime) {
trust = Math.max(trust, 1);//ensure trust equals 1

int bestPredictionWeight = predictionService.getBestPredictionWeight(chainTaskId, maxExecutionTime);
int worstPredictionsWeight = predictionService.getWorstPredictionsWeight(chainTaskId);

int allPredictionsWeight = worstPredictionsWeight + bestPredictionWeight;

boolean needsMoreContributions = !isConsensusPossibleNow(trust, bestPredictionWeight, allPredictionsWeight);

if (needsMoreContributions){
log.info("More contributions needed [chainTaskId:{}, trust:{}, bestPredictionWeight:{}, " +
"allPredictionsWeight:{}]", chainTaskId, trust, bestPredictionWeight, allPredictionsWeight);
}

return needsMoreContributions;
}

private boolean isConsensusPossibleNow(int trust, int pendingAndContributedBestPredictionWeight, int allPredictionsWeight) {
return pendingAndContributedBestPredictionWeight * trust > (1 + allPredictionsWeight) * (trust - 1);
}






}
99 changes: 99 additions & 0 deletions src/main/java/com/iexec/core/contribution/ContributionService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.iexec.core.contribution;

import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.core.replicate.Replicate;
import com.iexec.core.replicate.ReplicatesService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

@Slf4j
@Service
public class ContributionService {

private final ReplicatesService replicatesService;

public ContributionService(ReplicatesService replicatesService) {
this.replicatesService = replicatesService;
}

/*
*
* Get weight of a contributed
*
* */
int getContributedWeight(String chainTaskId, String contribution) {
int groupWeight = 0;
for (Replicate replicate : replicatesService.getReplicates(chainTaskId)) {

Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();
if (!lastRelevantStatus.isPresent()) {
continue;
}

boolean isContributed = lastRelevantStatus.get().equals(ReplicateStatus.CONTRIBUTED);
boolean haveSameContribution = contribution.equals(replicate.getContributionHash());
boolean hasWeight = replicate.getWorkerWeight() > 0;

if (isContributed && haveSameContribution && hasWeight) {
groupWeight = Math.max(groupWeight, 1) * replicate.getWorkerWeight();
}
}
return groupWeight;
}

/*
*
* Should exclude workers that have not CONTRIBUTED yet after t=date(CREATED)+1T
*
* */
int getPendingWeight(String chainTaskId, long maxExecutionTime) {
int pendingGroupWeight = 0;

for (Replicate replicate : replicatesService.getReplicates(chainTaskId)) {

Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();
if (!lastRelevantStatus.isPresent()) {
continue;
}

boolean isCreatedLessThanOnePeriodAgo = !replicate.isCreatedMoreThanNPeriodsAgo(1, maxExecutionTime);
boolean isNotContributed = !lastRelevantStatus.get().equals(ReplicateStatus.CONTRIBUTED);
boolean isNotFailed = !lastRelevantStatus.get().equals(ReplicateStatus.FAILED);
boolean hasWeight = replicate.getWorkerWeight() > 0;

if (isCreatedLessThanOnePeriodAgo && isNotContributed && isNotFailed && hasWeight) {
pendingGroupWeight = Math.max(pendingGroupWeight, 1) * replicate.getWorkerWeight();
}
}
return pendingGroupWeight;
}

/*
*
* Retrieves distinct contributions
*
* */
Set<String> getDistinctContributions(String chainTaskId) {

Set<String> distinctContributions = new HashSet<>();

for (Replicate replicate : replicatesService.getReplicates(chainTaskId)) {

Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();
if (!lastRelevantStatus.isPresent()) {
continue;
}

if (lastRelevantStatus.get().equals(ReplicateStatus.CONTRIBUTED)) {
distinctContributions.add(replicate.getContributionHash());
}
}
return distinctContributions;
}


}
15 changes: 15 additions & 0 deletions src/main/java/com/iexec/core/contribution/Prediction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.iexec.core.contribution;

import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;

@Data
@Getter
@Setter
@Builder
public class Prediction {
private String contribution;
private int weight;
}
81 changes: 81 additions & 0 deletions src/main/java/com/iexec/core/contribution/PredictionService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.iexec.core.contribution;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Set;

@Slf4j
@Service
public class PredictionService {

private ContributionService contributionService;

public PredictionService(ContributionService contributionService) {
this.contributionService = contributionService;
}

Prediction getContributedBestPrediction(String chainTaskId) {
Set<String> distinctContributions = contributionService.getDistinctContributions(chainTaskId);
Prediction bestPrediction = Prediction.builder().contribution("").weight(0).build();

for (String predictionContribution : distinctContributions) {
int predictionWeight = contributionService.getContributedWeight(chainTaskId, predictionContribution);

if (predictionWeight >= bestPrediction.getWeight()) {
bestPrediction.setContribution(predictionContribution);
bestPrediction.setWeight(predictionWeight);
}
}
return bestPrediction;
}

private int getContributedBestPredictionWeight(String chainTaskId) {
return this.getContributedBestPrediction(chainTaskId).getWeight();
}

/*
*
* Considering pending workers are going to contribute to the best prediction
* Counting pending and contributed
*
* */
int getBestPredictionWeight(String chainTaskId, long maxExecutionTime) {
int contributedBestPredictionWeight = getContributedBestPredictionWeight(chainTaskId);
int pendingWeight = contributionService.getPendingWeight(chainTaskId, maxExecutionTime);

int bestPredictionWeight;
if (pendingWeight == 0 && contributedBestPredictionWeight == 0) {
bestPredictionWeight = 0;
} else if (pendingWeight > 0 && contributedBestPredictionWeight == 0) {
bestPredictionWeight = pendingWeight;
} else if (pendingWeight == 0 && contributedBestPredictionWeight > 0) {
bestPredictionWeight = contributedBestPredictionWeight;
} else {
bestPredictionWeight = contributedBestPredictionWeight * pendingWeight;
}
return bestPredictionWeight;
}

/*
*
* Sum all prediction weights but exclude contributed best prediction weight
*
* */
int getWorstPredictionsWeight(String chainTaskId) {
Set<String> distinctContributions = contributionService.getDistinctContributions(chainTaskId);
String bestPredictionContribution = this.getContributedBestPrediction(chainTaskId).getContribution();

int allOtherPredictionsWeight = 0;

for (String contribution : distinctContributions) {
int predictionWeight = contributionService.getContributedWeight(chainTaskId, contribution);

if (!contribution.equals(bestPredictionContribution)) {
allOtherPredictionsWeight = allOtherPredictionsWeight + predictionWeight;
}
}
return allOtherPredictionsWeight;
}

}
7 changes: 2 additions & 5 deletions src/main/java/com/iexec/core/replicate/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;


Expand All @@ -27,6 +23,7 @@ public class Replicate {
private String chainTaskId;
private String contributionHash;
private int credibility;
private int workerWeight;

public Replicate(String walletAddress, String chainTaskId) {
this.chainTaskId = chainTaskId;
Expand Down
21 changes: 12 additions & 9 deletions src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.iexec.core.chain.SignatureService;
import com.iexec.core.chain.Web3jService;
import com.iexec.core.detector.task.ContributionTimeoutTaskDetector;
import com.iexec.core.contribution.ConsensusService;
import com.iexec.core.contribution.PredictionService;
import com.iexec.core.sms.SmsService;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskExecutorEngine;
Expand All @@ -23,10 +25,7 @@
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

Expand All @@ -42,6 +41,8 @@ public class ReplicateSupplyService {
private SmsService smsService;
private Web3jService web3jService;
private ContributionTimeoutTaskDetector contributionTimeoutTaskDetector;
private ConsensusService consensusService;
private PredictionService predictionService;

public ReplicateSupplyService(ReplicatesService replicatesService,
SignatureService signatureService,
Expand All @@ -50,7 +51,8 @@ public ReplicateSupplyService(ReplicatesService replicatesService,
WorkerService workerService,
SmsService smsService,
Web3jService web3jService,
ContributionTimeoutTaskDetector contributionTimeoutTaskDetector) {
ContributionTimeoutTaskDetector contributionTimeoutTaskDetector,
ConsensusService consensusService) {
this.replicatesService = replicatesService;
this.signatureService = signatureService;
this.taskExecutorEngine = taskExecutorEngine;
Expand All @@ -59,6 +61,8 @@ public ReplicateSupplyService(ReplicatesService replicatesService,
this.smsService = smsService;
this.web3jService = web3jService;
this.contributionTimeoutTaskDetector = contributionTimeoutTaskDetector;
this.consensusService = consensusService;
this.predictionService = predictionService;
}

/*
Expand Down Expand Up @@ -136,10 +140,9 @@ Optional<ContributionAuthorization> getAuthOfAvailableReplicate(long workerLastB
boolean isFewBlocksAfterInitialization = isFewBlocksAfterInitialization(task);
boolean hasWorkerAlreadyParticipated = replicatesService.hasWorkerAlreadyParticipated(
chainTaskId, walletAddress);
boolean moreReplicatesNeeded = replicatesService.moreReplicatesNeeded(chainTaskId,
task.getNumWorkersNeeded(), task.getMaxExecutionTime());

if (isFewBlocksAfterInitialization && !hasWorkerAlreadyParticipated && moreReplicatesNeeded) {
if (isFewBlocksAfterInitialization && !hasWorkerAlreadyParticipated
&& consensusService.doesTaskNeedMoreContributionsForConsensus(chainTaskId, task.getTrust(), task.getMaxExecutionTime())) {

String enclaveChallenge = smsService.getEnclaveChallenge(chainTaskId, doesTaskNeedTEE);
if (enclaveChallenge.isEmpty()) {
Expand All @@ -161,7 +164,6 @@ Optional<ContributionAuthorization> getAuthOfAvailableReplicate(long workerLastB
return Optional.empty();
}


private boolean isFewBlocksAfterInitialization(Task task) {
long coreLastBlock = web3jService.getLatestBlockNumber();
long initializationBlock = task.getInitializationBlockNumber();
Expand Down Expand Up @@ -438,4 +440,5 @@ private Optional<TaskNotificationType> recoverReplicateIfRevealed(Replicate repl

return Optional.empty();
}

}
21 changes: 5 additions & 16 deletions src/main/java/com/iexec/core/replicate/ReplicatesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ public void addNewReplicate(String chainTaskId, String walletAddress) {
Optional<ReplicatesList> optional = getReplicatesList(chainTaskId);
if (optional.isPresent()) {
ReplicatesList replicatesList = optional.get();
replicatesList.getReplicates().add(new Replicate(walletAddress, chainTaskId));
Replicate replicate = new Replicate(walletAddress, chainTaskId);
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate
replicatesList.getReplicates().add(replicate);

replicatesRepository.save(replicatesList);
log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
}
Expand Down Expand Up @@ -194,21 +197,6 @@ private boolean isStatusBeforeWorkerLostEqualsTo(Replicate replicate, ReplicateS
&& replicate.getStatusChangeList().get(size - 2).getStatus().equals(status);
}

public boolean moreReplicatesNeeded(String chainTaskId, int nbWorkersNeeded, long maxExecutionTime) {
int nbValidReplicates = 0;
for (Replicate replicate : getReplicates(chainTaskId)) {
//TODO think: When do we really need more replicates?
boolean isReplicateSuccessfullSoFar = ReplicateStatus.getSuccessStatuses().contains(replicate.getCurrentStatus());
boolean doesContributionTakesTooLong = !replicate.containsContributedStatus() &&
replicate.isCreatedMoreThanNPeriodsAgo(2, maxExecutionTime);

if (isReplicateSuccessfullSoFar && !doesContributionTakesTooLong) {
nbValidReplicates++;
}
}
return nbValidReplicates < nbWorkersNeeded;
}

public void updateReplicateStatus(String chainTaskId,
String walletAddress,
ReplicateStatus newStatus,
Expand Down Expand Up @@ -358,6 +346,7 @@ private Replicate getReplicateWithBlockchainUpdates(Replicate replicate, ChainCo
ChainContribution chainContribution = optional.get();
if (wishedChainStatus.equals(ChainContributionStatus.CONTRIBUTED)) {
replicate.setContributionHash(chainContribution.getResultHash());
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(replicate.getWalletAddress()));//Should update weight on contributed
}
return replicate;
}
Expand Down
Loading