Skip to content

Commit

Permalink
Merge pull request #358 from iExecBlockchainComputing/release/4.0.1
Browse files Browse the repository at this point in the history
Release/4.0.1
  • Loading branch information
jeremyjams authored Feb 25, 2020
2 parents 22aabc4 + e2ab909 commit 3f31f71
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 75 deletions.
10 changes: 8 additions & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ pipeline {

stage('Upload Jars') {
when {
branch 'master'
anyOf{
branch 'master'
branch 'develop'
}
}
steps {
withCredentials([[$class: 'UsernamePasswordMultiBinding', credentialsId: 'nexus', usernameVariable: 'NEXUS_USER', passwordVariable: 'NEXUS_PASSWORD']]) {
Expand All @@ -32,7 +35,10 @@ pipeline {
}
stage('Build/Upload Docker image') {
when {
branch 'master'
anyOf{
branch 'master'
branch 'develop'
}
}
steps {
withCredentials([[$class: 'UsernamePasswordMultiBinding', credentialsId: 'nexus', usernameVariable: 'NEXUS_USER', passwordVariable: 'NEXUS_PASSWORD']]) {
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ build.dependsOn jacocoTestReport

def gitBranch = 'git name-rev --name-only HEAD'.execute().text.trim()
def isMasterBranch = gitBranch == "master"
def canUploadArchives = isMasterBranch && project.hasProperty("nexusUser") && project.hasProperty("nexusPassword")
def isDevelopBranch = gitBranch == "develop"
def canUploadArchives = (isMasterBranch || isDevelopBranch ) && project.hasProperty("nexusUser") && project.hasProperty("nexusPassword")
def gitShortCommit = 'git rev-parse --short HEAD'.execute().text.trim()
def isSnapshotVersion = project.version.contains("SNAPSHOT")

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
iexecCommonVersion=4.1.0-SNAPSHOT
iexecCommonVersion=4.0.1
nexusUser=fake
nexusPassword=fake
version=4.1.0-SNAPSHOT
version=4.0.1
64 changes: 42 additions & 22 deletions src/main/java/com/iexec/core/chain/IexecHubService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.DefaultBlockParameterName;
import org.web3j.protocol.core.RemoteCall;
import org.web3j.protocol.core.methods.response.BaseEventResponse;
import org.web3j.protocol.core.methods.response.TransactionReceipt;

import java.math.BigInteger;
Expand All @@ -23,7 +24,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

import static com.iexec.common.chain.ChainContributionStatus.*;
import static com.iexec.common.chain.ChainContributionStatus.CONTRIBUTED;
import static com.iexec.common.chain.ChainContributionStatus.REVEALED;
import static com.iexec.common.chain.ChainTaskStatus.ACTIVE;
import static com.iexec.common.chain.ChainTaskStatus.COMPLETED;
import static com.iexec.common.utils.BytesUtils.stringToBytes;
Expand All @@ -33,6 +35,8 @@
@Service
public class IexecHubService extends IexecHubAbstractService {

private final static int NB_BLOCKS_TO_WAIT_PER_TRY = 6;
private final static int MAX_TRY = 3;
private final ThreadPoolExecutor executor;
private final CredentialsService credentialsService;
private final Web3jService web3jService;
Expand All @@ -49,8 +53,25 @@ public IexecHubService(CredentialsService credentialsService,
this.poolAddress = chainConfig.getPoolAddress();
}

public boolean isStatusTrueOnChain(String chainTaskId, String walletAddress, ChainContributionStatus wishedStatus) {
public boolean repeatIsContributedTrue(String chainTaskId, String walletAddress) {
return web3jService.repeatCheck(NB_BLOCKS_TO_WAIT_PER_TRY, MAX_TRY, "isContributedTrue",
this::isContributedTrue, chainTaskId, walletAddress);
}

public boolean repeatIsRevealedTrue(String chainTaskId, String walletAddress) {
return web3jService.repeatCheck(NB_BLOCKS_TO_WAIT_PER_TRY, MAX_TRY, "isRevealedTrue",
this::isRevealedTrue, chainTaskId, walletAddress);
}

private boolean isContributedTrue(String... args) {
return this.isStatusTrueOnChain(args[0], args[1], CONTRIBUTED);
}

private boolean isRevealedTrue(String... args) {
return this.isStatusTrueOnChain(args[0], args[1], REVEALED);
}

public boolean isStatusTrueOnChain(String chainTaskId, String walletAddress, ChainContributionStatus wishedStatus) {
Optional<ChainContribution> optional = getChainContribution(chainTaskId, walletAddress);
if (!optional.isPresent()) {
return false;
Expand All @@ -60,23 +81,14 @@ public boolean isStatusTrueOnChain(String chainTaskId, String walletAddress, Cha
ChainContributionStatus chainStatus = chainContribution.getStatus();
switch (wishedStatus) {
case CONTRIBUTED:
if (chainStatus.equals(UNSET)) {
return false;
} else {
// has at least contributed
return chainStatus.equals(CONTRIBUTED) || chainStatus.equals(REVEALED);
}
// has at least contributed
return chainStatus.equals(CONTRIBUTED) || chainStatus.equals(REVEALED);
case REVEALED:
if (chainStatus.equals(CONTRIBUTED)) {
return false;
} else {
// has at least revealed
return chainStatus.equals(REVEALED);
}
// has at least revealed
return chainStatus.equals(REVEALED);
default:
break;
return false;
}
return false;
}

public boolean canInitialize(String chainDealId, int taskIndex) {
Expand Down Expand Up @@ -142,9 +154,7 @@ private Optional<Pair<String, ChainReceipt>> sendInitializeTransaction(String ch
initializeEvent = initializeEvents.get(0);
}

if (initializeEvent != null && initializeEvent.log != null &&
(!initializeEvent.log.getType().equals(PENDING_RECEIPT_STATUS)
|| isStatusValidOnChainAfterPendingReceipt(computedChainTaskId, ACTIVE, this::isTaskStatusValidOnChain))) {
if (isSuccessTx(computedChainTaskId, initializeEvent, ACTIVE)) {
String chainTaskId = BytesUtils.bytesToString(initializeEvent.taskid);

ChainReceipt chainReceipt = ChainUtils.buildChainReceipt(initializeEvent.log, chainTaskId, web3jService.getLatestBlockNumber());
Expand Down Expand Up @@ -218,9 +228,7 @@ private Optional<ChainReceipt> sendFinalizeTransaction(String chainTaskId, Strin
finalizeEvent = finalizeEvents.get(0);
}

if (finalizeEvent != null && finalizeEvent.log != null &&
(!finalizeEvent.log.getType().equals(PENDING_RECEIPT_STATUS)
|| isStatusValidOnChainAfterPendingReceipt(chainTaskId, COMPLETED, this::isTaskStatusValidOnChain))) {
if (isSuccessTx(chainTaskId, finalizeEvent, COMPLETED)) {
ChainReceipt chainReceipt = ChainUtils.buildChainReceipt(finalizeEvents.get(0).log, chainTaskId, web3jService.getLatestBlockNumber());

log.info("Finalized [chainTaskId:{}, resultLink:{}, callbackData:{}, shouldSendCallback:{}, gasUsed:{}]", chainTaskId,
Expand All @@ -232,6 +240,18 @@ private Optional<ChainReceipt> sendFinalizeTransaction(String chainTaskId, Strin
return Optional.empty();
}

private boolean isSuccessTx(String chainTaskId, BaseEventResponse txEvent, ChainTaskStatus pretendedStatus) {
if (txEvent == null || txEvent.log == null) {
return false;
}

if (txEvent.log.getType() == null || txEvent.log.getType().equals(PENDING_RECEIPT_STATUS)) {
return isStatusValidOnChainAfterPendingReceipt(chainTaskId, pretendedStatus, this::isTaskStatusValidOnChain);
}

return true;
}

public boolean canReopen(String chainTaskId) {
Optional<ChainTask> optional = getChainTask(chainTaskId);
if (!optional.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void detectOnchainContributedWhenOffchainContributing() {
public void detectOnchainContributed() {
log.debug("Detect onchain Contributed [retryIn:{}]",
coreConfigurationService.getUnnotifiedContributionDetectorPeriod() * DETECTOR_MULTIPLIER);

dectectOnchainCompleted(dectectWhenOffchainTaskStatuses, offchainCompleting, offchainCompleted, onchainCompleted);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public RevealUnnotifiedDetector(TaskService taskService,
@Scheduled(fixedRateString = "#{coreConfigurationService.unnotifiedRevealDetectorPeriod}")
public void detectOnchainRevealedWhenOffchainRevealed() {
log.debug("Detect onchain Revealed (when offchain Revealing) [retryIn:{}]",
coreConfigurationService.getUnnotifiedContributionDetectorPeriod());
coreConfigurationService.getUnnotifiedRevealDetectorPeriod());
dectectOnchainCompletedWhenOffchainCompleting(dectectWhenTaskStatuses, offchainCompleting, offchainCompleted, onchainCompleted);
}

Expand All @@ -60,7 +60,7 @@ public void detectOnchainRevealedWhenOffchainRevealed() {
@Scheduled(fixedRateString = "#{coreConfigurationService.unnotifiedRevealDetectorPeriod*" + DETECTOR_MULTIPLIER + "}")
public void detectOnchainRevealed() {
log.debug("Detect onchain Revealed [retryIn:{}]",
coreConfigurationService.getUnnotifiedContributionDetectorPeriod() * DETECTOR_MULTIPLIER);
coreConfigurationService.getUnnotifiedRevealDetectorPeriod() * DETECTOR_MULTIPLIER);
dectectOnchainCompleted(dectectWhenTaskStatuses, offchainCompleting, offchainCompleted, onchainCompleted);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.iexec.core.detector.replicate;

import com.iexec.common.chain.ChainContribution;
import com.iexec.common.chain.ChainContributionStatus;
import com.iexec.common.chain.ChainReceipt;
import com.iexec.common.replicate.ReplicateStatus;
Expand All @@ -11,15 +12,15 @@
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Optional;

import static com.iexec.common.replicate.ReplicateStatus.WORKER_LOST;
import static com.iexec.common.replicate.ReplicateStatus.getMissingStatuses;

@Service
@Slf4j
public abstract class UnnotifiedAbstractDetector {


Expand All @@ -42,22 +43,18 @@ void dectectOnchainCompletedWhenOffchainCompleting(List<TaskStatus> dectectWhenO
ReplicateStatus offchainCompleting,
ReplicateStatus offchainCompleted,
ChainContributionStatus onchainCompleted) {

for (Task task : taskService.findByCurrentStatus(dectectWhenOffchainTaskStatuses)) {
for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) {
Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();

if (!lastRelevantStatus.isPresent()) {
if (!lastRelevantStatus.isPresent() || !lastRelevantStatus.get().equals(offchainCompleting)) {
continue;
}

boolean isReplicateStatusCompleting = lastRelevantStatus.get().equals(offchainCompleting);

boolean isReplicateStatusCompletingAndStatusTrueOnchain = isReplicateStatusCompleting &&
iexecHubService.isStatusTrueOnChain(task.getChainTaskId(), replicate.getWalletAddress(),
onchainCompleted);
boolean statusTrueOnChain = iexecHubService.isStatusTrueOnChain(task.getChainTaskId(), replicate.getWalletAddress(), onchainCompleted);

if (isReplicateStatusCompletingAndStatusTrueOnchain) {
if (statusTrueOnChain) {
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
lastRelevantStatus.get(), onchainCompleted, task.getChainTaskId());
updateReplicateStatuses(task.getChainTaskId(), replicate, offchainCompleted);
}
}
Expand All @@ -72,19 +69,32 @@ void dectectOnchainCompleted(List<TaskStatus> dectectWhenOffchainTaskStatuses,
for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) {
Optional<ReplicateStatus> lastRelevantStatus = replicate.getLastRelevantStatus();

if (!lastRelevantStatus.isPresent()) {
if (!lastRelevantStatus.isPresent() || lastRelevantStatus.get().equals(offchainCompleted)) {
continue;
}

boolean isNotOffChainCompleted = !lastRelevantStatus.get().equals(offchainCompleted);//avoid eth node call if already contributed
boolean statusTrueOnChain = iexecHubService.isStatusTrueOnChain(task.getChainTaskId(), replicate.getWalletAddress(), onchainCompleted);

if (isNotOffChainCompleted && iexecHubService.isStatusTrueOnChain(task.getChainTaskId(), replicate.getWalletAddress(), onchainCompleted)) {
if (statusTrueOnChain) {
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
lastRelevantStatus.get(), onchainCompleted, task.getChainTaskId());
updateReplicateStatuses(task.getChainTaskId(), replicate, offchainCompleted);
}
}
}
}

/*
* Usage: printLogsUncertainMissingUpdate(task, replicate, lastRelevantStatus.get(), statusTrueOnChain);
* */
private void printLogsUncertainMissingUpdate(Task task, Replicate replicate, ReplicateStatus lastRelevantStatus, boolean statusTrueOnChain) {
ChainContributionStatus chainContributionStatus = iexecHubService.getChainContribution(task.getChainTaskId(), replicate.getWalletAddress())
.map(ChainContribution::getStatus)
.orElse(null);
log.info("Detected uncertain missing update (replicate) [off:{}, on:{}, statusTrueOnChain:{}, taskId:{}]",
lastRelevantStatus, chainContributionStatus, statusTrueOnChain, task.getChainTaskId());
}

/*
* This method should stay private. We need to insure that
* it is only called by the POOL_MANAGER.
Expand All @@ -110,7 +120,7 @@ private void updateReplicateStatuses(String chainTaskId, Replicate replicate, Re
// retrieve the contribution block for that wallet
ChainReceipt contributedBlock = iexecHubService.getContributionBlock(chainTaskId,
wallet, web3jService.getLatestBlockNumber());
long contributedBlockNumber = contributedBlock != null ? contributedBlock.getBlockNumber(): 0;
long contributedBlockNumber = contributedBlock != null ? contributedBlock.getBlockNumber() : 0;
replicatesService.updateReplicateStatus(chainTaskId, wallet,
statusToUpdate, new ReplicateStatusDetails(contributedBlockNumber));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public class FinalizedTaskDetector implements Detector {
private IexecHubService iexecHubService;

public FinalizedTaskDetector(TaskService taskService,
TaskExecutorEngine taskExecutorEngine,
IexecHubService iexecHubService) {
TaskExecutorEngine taskExecutorEngine,
IexecHubService iexecHubService) {
this.taskService = taskService;
this.taskExecutorEngine = taskExecutorEngine;
this.iexecHubService = iexecHubService;
Expand All @@ -40,6 +40,8 @@ public void detect() {
for (Task task : taskService.findByCurrentStatus(TaskStatus.FINALIZING)) {
Optional<ChainTask> chainTask = iexecHubService.getChainTask(task.getChainTaskId());
if (chainTask.isPresent() && chainTask.get().getStatus().equals(ChainTaskStatus.COMPLETED)) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
TaskStatus.FINALIZING, TaskStatus.FINALIZED, task.getChainTaskId());
taskExecutorEngine.updateTask(task.getChainTaskId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public void detect() {
log.debug("Trying to detect initialized tasks");
for (Task task : taskService.findByCurrentStatus(TaskStatus.INITIALIZING)) {
Optional<ChainTask> chainTask = iexecHubService.getChainTask(task.getChainTaskId());
if(chainTask.isPresent() && !chainTask.get().getStatus().equals(ChainTaskStatus.UNSET)) {
if (chainTask.isPresent() && !chainTask.get().getStatus().equals(ChainTaskStatus.UNSET)) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
TaskStatus.INITIALIZING, TaskStatus.INITIALIZED, task.getChainTaskId());
taskExecutorEngine.updateTask(task.getChainTaskId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ public void detect() {
log.debug("Trying to detect reopened tasks");
for (Task task : taskService.findByCurrentStatus(TaskStatus.REOPENING)) {
Optional<ChainTask> oChainTask = iexecHubService.getChainTask(task.getChainTaskId());
if (!oChainTask.isPresent()){
if (!oChainTask.isPresent()) {
continue;
}

ChainTask chainTask = oChainTask.get();
if (chainTask.getStatus().equals(ChainTaskStatus.ACTIVE)) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
TaskStatus.REOPENING, TaskStatus.REOPENED, task.getChainTaskId());
taskExecutorEngine.updateTask(task.getChainTaskId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ public void detect() {
//start finalize when needed
List<Task> notYetFinalizingTasks = taskService.findByCurrentStatus(TaskStatus.RESULT_UPLOADED);
for (Task task : notYetFinalizingTasks) {
log.info("UnstartedTxDetector should update RESULT_UPLOADED task to FINALIZING [chainTaskId:{}]",
task.getChainTaskId());
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
TaskStatus.RESULT_UPLOADED, TaskStatus.FINALIZING, task.getChainTaskId());
taskExecutorEngine.updateTask(task.getChainTaskId());
}

//start initialize when needed
List<Task> notYetInitializingTasks = taskService.findByCurrentStatus(TaskStatus.RECEIVED);
for (Task task : notYetInitializingTasks) {
log.info("UnstartedTxDetector should update RECEIVED task to INITIALIZING [chainDealId:{}, taskIndex:{}]",
task.getChainDealId(), task.getTaskIndex());
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
TaskStatus.RECEIVED, TaskStatus.INITIALIZING, task.getChainTaskId());
taskExecutorEngine.updateTask(task.getChainTaskId());
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/iexec/core/metric/MetricService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public PlatformMetric getPlatformMetrics() {
.aliveWorkers(workerService.getAliveWorkers().size())
.aliveTotalCpu(workerService.getAliveTotalCpu())
.aliveAvailableCpu(workerService.getAliveAvailableCpu())
.aliveTotalGpu(workerService.getAliveTotalGpu())
.aliveAvailableGpu(workerService.getAliveAvailableGpu())
.completedTasks(taskService.findByCurrentStatus(TaskStatus.COMPLETED).size())
.build();
}
Expand Down
Loading

0 comments on commit 3f31f71

Please sign in to comment.