Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if Worker can still accept more work right before giving it a new replicate #644

Merged
merged 8 commits into from
Dec 14, 2023
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## [[8.2.3]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.3) 2023-12-14

### Bug Fixes

- Check if Worker can still accept more work right before giving it a new replicate. (#644)

## [[8.2.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.2) 2023-12-13

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=8.2.2
version=8.2.3
iexecCommonVersion=8.3.0
iexecCommonsPocoVersion=3.1.0
iexecBlockchainAdapterVersion=8.2.0
Expand Down
101 changes: 53 additions & 48 deletions src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,9 @@ public ReplicateSupplyService(ReplicatesService replicatesService,
*/
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 5)
Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastBlock, String walletAddress) {
// return empty if max computing task is reached or if the worker is not found
if (!workerService.canAcceptMoreWorks(walletAddress)) {
return Optional.empty();
}

// return empty if the worker is not sync
//TODO Check if worker node is sync
boolean isWorkerLastBlockAvailable = workerLastBlock > 0;
final boolean isWorkerLastBlockAvailable = workerLastBlock > 0;
if (!isWorkerLastBlockAvailable) {
return Optional.empty();
}
Expand All @@ -102,13 +97,16 @@ Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastB
return Optional.empty();
}

// TODO : Remove this, the optional can never be empty
// This is covered in workerService.canAcceptMoreWorks
Optional<Worker> optional = workerService.getWorker(walletAddress);
final Optional<Worker> optional = workerService.getWorker(walletAddress);
if (optional.isEmpty()) {
return Optional.empty();
}
Worker worker = optional.get();
final Worker worker = optional.get();

// return empty if max computing task is reached or if the worker is not found
if (!workerService.canAcceptMoreWorks(worker)) {
return Optional.empty();
}

return getReplicateTaskSummaryForAnyAvailableTask(
walletAddress,
Expand Down Expand Up @@ -161,8 +159,8 @@ private Optional<ReplicateTaskSummary> getReplicateTaskSummary(Task task, String
chainTaskId,
task.getEnclaveChallenge());
ReplicateTaskSummaryBuilder replicateTaskSummary = ReplicateTaskSummary.builder()
.workerpoolAuthorization(authorization);
if(task.isTeeTask()){
.workerpoolAuthorization(authorization);
if (task.isTeeTask()) {
replicateTaskSummary.smsUrl(task.getSmsUrl());
}
return Optional.of(replicateTaskSummary.build());
Expand All @@ -173,7 +171,7 @@ private Optional<ReplicateTaskSummary> getReplicateTaskSummary(Task task, String
* tries to accept the task - i.e. create a new {@link Replicate}
* for that task on that worker.
*
* @param task {@link Task} needing at least one new {@link Replicate}.
* @param task {@link Task} needing at least one new {@link Replicate}.
* @param walletAddress Wallet address of a worker looking for new {@link Task}.
* @return {@literal true} if the task has been accepted,
* {@literal false} otherwise.
Expand All @@ -184,22 +182,6 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
}

final String chainTaskId = task.getChainTaskId();
final Optional<ReplicatesList> oReplicatesList = replicatesService.getReplicatesList(chainTaskId);
// Check is only here to prevent
// "`Optional.get()` without `isPresent()` warning".
// This case should not happen.
if (oReplicatesList.isEmpty()) {
return false;
}

final ReplicatesList replicatesList = oReplicatesList.get();

final boolean hasWorkerAlreadyParticipated =
replicatesList.hasWorkerAlreadyParticipated(walletAddress);
if (hasWorkerAlreadyParticipated) {
return false;
}

final Lock lock = taskAccessForNewReplicateLocks
.computeIfAbsent(chainTaskId, k -> new ReentrantLock());
if (!lock.tryLock()) {
Expand All @@ -209,33 +191,56 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
}

try {
final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus(
chainTaskId,
replicatesList.getReplicates(),
task.getTrust(),
task.getMaxExecutionTime());

if (!taskNeedsMoreContributions
|| taskService.isConsensusReached(replicatesList)) {
return false;
}

replicatesService.addNewReplicate(chainTaskId, walletAddress);
workerService.addChainTaskIdToWorker(chainTaskId, walletAddress);
return replicatesService.getReplicatesList(chainTaskId)
.map(replicatesList -> acceptOrRejectTask(task, walletAddress, replicatesList))
.orElse(false);
} finally {
// We should always unlock the task
// so that it could be taken by another replicate
// if there's any issue.
lock.unlock();
}
}

return true;
/**
* Given a {@link Task}, a {@code walletAddress} of a worker and a {@link ReplicatesList},
* tries to accept the task - i.e. create a new {@link Replicate}
* for that task on that worker.
*
* @param task {@link Task} needing at least one new {@link Replicate}.
* @param walletAddress Wallet address of a worker looking for new {@link Task}.
* @param replicatesList Replicates of given {@link Task}.
* @return {@literal true} if the task has been accepted,
* {@literal false} otherwise.
*/
boolean acceptOrRejectTask(Task task, String walletAddress, ReplicatesList replicatesList) {
final boolean hasWorkerAlreadyParticipated =
replicatesList.hasWorkerAlreadyParticipated(walletAddress);
if (hasWorkerAlreadyParticipated) {
return false;
}

final String chainTaskId = replicatesList.getChainTaskId();
final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus(
chainTaskId,
replicatesList.getReplicates(),
task.getTrust(),
task.getMaxExecutionTime());

if (!taskNeedsMoreContributions
|| taskService.isConsensusReached(replicatesList)) {
return false;
}

return workerService.addChainTaskIdToWorker(chainTaskId, walletAddress)
.map(worker -> replicatesService.addNewReplicate(replicatesList, walletAddress))
.orElse(false);
}

/**
* Get notifications missed by the worker during the time it was absent.
*
* @param blockNumber last seen blocknumber by the worker
*
* @param blockNumber last seen blocknumber by the worker
* @param walletAddress of the worker
* @return list of missed notifications. Can be empty if no notification is found
*/
Expand Down Expand Up @@ -264,7 +269,7 @@ public List<TaskNotification> getMissedTaskNotifications(long blockNumber, Strin
continue;
}
TaskNotificationExtra taskNotificationExtra =
getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge);
getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge);

TaskNotification taskNotification = TaskNotification.builder()
.chainTaskId(chainTaskId)
Expand All @@ -286,7 +291,7 @@ public List<TaskNotification> getMissedTaskNotifications(long blockNumber, Strin
private TaskNotificationExtra getTaskNotificationExtra(Task task, TaskNotificationType taskNotificationType, String walletAddress, String enclaveChallenge) {
TaskNotificationExtra taskNotificationExtra = TaskNotificationExtra.builder().build();

switch (taskNotificationType){
switch (taskNotificationType) {
case PLEASE_CONTRIBUTE:
WorkerpoolAuthorization authorization = signatureService.createAuthorization(
walletAddress, task.getChainTaskId(), enclaveChallenge);
Expand All @@ -312,7 +317,7 @@ public Optional<TaskNotificationType> getTaskNotificationType(Task task, Replica
// CONTRIBUTION_TIMEOUT or CONSENSUS_REACHED without contribution
if (task.getCurrentStatus().equals(TaskStatus.CONTRIBUTION_TIMEOUT)
|| (task.getCurrentStatus().equals(TaskStatus.CONSENSUS_REACHED)
&& !replicate.containsContributedStatus())) {
&& !replicate.containsContributedStatus())) {
return Optional.of(TaskNotificationType.PLEASE_ABORT);
}

Expand Down
25 changes: 12 additions & 13 deletions src/main/java/com/iexec/core/replicate/ReplicatesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,21 @@ public ReplicatesService(ReplicatesRepository replicatesRepository,
this.taskLogsService = taskLogsService;
}

public void addNewReplicate(String chainTaskId, String walletAddress) {
if (getReplicate(chainTaskId, walletAddress).isEmpty()) {
Optional<ReplicatesList> optional = getReplicatesList(chainTaskId);
if (optional.isPresent()) {
ReplicatesList replicatesList = optional.get();
Replicate replicate = new Replicate(walletAddress, chainTaskId);
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate
replicatesList.getReplicates().add(replicate);

replicatesRepository.save(replicatesList);
log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
}
public boolean addNewReplicate(ReplicatesList replicatesList, String walletAddress) {
final String chainTaskId = replicatesList.getChainTaskId();
if (replicatesList.getReplicateOfWorker(walletAddress).isEmpty()) {
Replicate replicate = new Replicate(walletAddress, chainTaskId);
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate
replicatesList.getReplicates().add(replicate);

replicatesRepository.save(replicatesList);
log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
} else {
log.error("Replicate already saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
return false;
}

return true;
}

public synchronized void createEmptyReplicateList(String chainTaskId) {
Expand Down Expand Up @@ -635,4 +634,4 @@ public void setRevealTimeoutStatusIfNeeded(String chainTaskId, Replicate replica
updateReplicateStatus(chainTaskId, replicate.getWalletAddress(), statusUpdate);
}
}
}
}
41 changes: 21 additions & 20 deletions src/main/java/com/iexec/core/worker/WorkerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public Optional<Worker> getWorker(String walletAddress) {
return workerRepository.findByWalletAddress(walletAddress);
}

public boolean isAllowedToJoin(String workerAddress){
public boolean isAllowedToJoin(String workerAddress) {
List<String> whitelist = workerConfiguration.getWhitelist();
// if the whitelist is empty, there is no restriction on the workers
if (whitelist.isEmpty()){
if (whitelist.isEmpty()) {
return true;
}
return whitelist.contains(workerAddress);
Expand Down Expand Up @@ -133,19 +133,13 @@ public List<Worker> getAliveWorkers() {
return workerRepository.findByWalletAddressIn(aliveWorkers);
}

public boolean canAcceptMoreWorks(String walletAddress) {
Optional<Worker> optionalWorker = getWorker(walletAddress);
if (optionalWorker.isEmpty()){
return false;
}

Worker worker = optionalWorker.get();
public boolean canAcceptMoreWorks(Worker worker) {
int workerMaxNbTasks = worker.getMaxNbTasks();
int runningReplicateNb = worker.getComputingChainTaskIds().size();

if (runningReplicateNb >= workerMaxNbTasks) {
log.debug("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]",
walletAddress, runningReplicateNb, workerMaxNbTasks);
worker.getWalletAddress(), runningReplicateNb, workerMaxNbTasks);
return false;
}

Expand All @@ -154,44 +148,44 @@ public boolean canAcceptMoreWorks(String walletAddress) {

public int getAliveAvailableCpu() {
int availableCpus = 0;
for (Worker worker: getAliveWorkers()) {
for (Worker worker : getAliveWorkers()) {
if (worker.isGpuEnabled()) {
continue;
}

int workerCpuNb = worker.getCpuNb();
int computingReplicateNb = worker.getComputingChainTaskIds().size();
int availableCpu = workerCpuNb - computingReplicateNb;
availableCpus+= availableCpu;
availableCpus += availableCpu;
}
return availableCpus;
}

public int getAliveTotalCpu() {
int totalCpus = 0;
for (Worker worker: getAliveWorkers()){
if(worker.isGpuEnabled()) {
for (Worker worker : getAliveWorkers()) {
if (worker.isGpuEnabled()) {
continue;
}
totalCpus+= worker.getCpuNb();
totalCpus += worker.getCpuNb();
}
return totalCpus;
}

// We suppose for now that 1 Gpu enabled worker has only one GPU
public int getAliveTotalGpu() {
int totalGpus = 0;
for(Worker worker: getAliveWorkers()) {
for (Worker worker : getAliveWorkers()) {
if (worker.isGpuEnabled()) {
totalGpus++;
}
}
return totalGpus;
}

public int getAliveAvailableGpu () {
public int getAliveAvailableGpu() {
int availableGpus = getAliveTotalGpu();
for (Worker worker: getAliveWorkers()) {
for (Worker worker : getAliveWorkers()) {
if (worker.isGpuEnabled()) {
boolean isWorking = !worker.getComputingChainTaskIds().isEmpty();
if (isWorking) {
Expand Down Expand Up @@ -246,13 +240,20 @@ public Optional<Worker> addChainTaskIdToWorker(String chainTaskId, String wallet
}

private Optional<Worker> addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
final Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
if (optional.isPresent()) {
Worker worker = optional.get();
final Worker worker = optional.get();
if (!canAcceptMoreWorks(worker)) {
log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerName:{}]",
chainTaskId, walletAddress);
return Optional.empty();
}
worker.addChainTaskId(chainTaskId);
log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
return Optional.of(workerRepository.save(worker));
}
log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerName:{}]",
chainTaskId, walletAddress);
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void shouldCreateNewReplicate() {
ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, list);
when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
when(replicatesRepository.save(any())).thenReturn(replicatesList);
replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_3);
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_3);
Mockito.verify(replicatesRepository, Mockito.times(1))
.save(any());
}
Expand All @@ -103,11 +103,11 @@ void shouldNotCreateNewReplicate() {
when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
when(replicatesRepository.save(any())).thenReturn(replicatesList);

replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1);
Mockito.verify(replicatesRepository, Mockito.times(0))
.save(any());

replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_2);
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_2);
Mockito.verify(replicatesRepository, Mockito.times(0))
.save(any());
}
Expand Down Expand Up @@ -1505,4 +1505,4 @@ void computeUpdateReplicateStatusArgsResultUploadFailed() {
.build());
}

}
}
Loading