From 7ff08c7d54e73581f1b0cd80e720740dc1742bd0 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Wed, 13 Dec 2023 15:07:44 +0100 Subject: [PATCH 1/8] Check if Worker can still accept more work right before giving it new replicate --- CHANGELOG.md | 6 + gradle.properties | 2 +- .../replicate/ReplicateSupplyService.java | 36 +++--- .../core/replicate/ReplicatesService.java | 27 +++-- .../com/iexec/core/worker/WorkerService.java | 41 ++++--- .../ReplicateSupplyServiceTests.java | 103 ++++++++++++------ .../WorkerServiceRealRepositoryTests.java | 7 +- .../iexec/core/worker/WorkerServiceTests.java | 26 +++-- 8 files changed, 153 insertions(+), 95 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f25c7d33c..bf57d3b10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## [[8.2.3]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.3) 2023-12-13 + +### Bug Fixes + +- Check if Worker can still accept more work right before giving it new replicate. (#644) + ## [[8.2.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.2) 2023-12-13 ### Bug Fixes diff --git a/gradle.properties b/gradle.properties index bbad4cdc7..d3c77ac17 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=8.2.2 +version=8.2.3 iexecCommonVersion=8.3.0 iexecCommonsPocoVersion=3.1.0 iexecBlockchainAdapterVersion=8.2.0 diff --git a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java index d1d5d3707..1b5331fc6 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java @@ -86,14 +86,9 @@ public ReplicateSupplyService(ReplicatesService replicatesService, */ @Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 5) Optional getAvailableReplicateTaskSummary(long workerLastBlock, String walletAddress) { - // return empty if max computing task is reached or if the worker is not found - if (!workerService.canAcceptMoreWorks(walletAddress)) { - return Optional.empty(); - } - // return empty if the worker is not sync //TODO Check if worker node is sync - boolean isWorkerLastBlockAvailable = workerLastBlock > 0; + final boolean isWorkerLastBlockAvailable = workerLastBlock > 0; if (!isWorkerLastBlockAvailable) { return Optional.empty(); } @@ -104,11 +99,16 @@ Optional getAvailableReplicateTaskSummary(long workerLastB // TODO : Remove this, the optional can never be empty // This is covered in workerService.canAcceptMoreWorks - Optional optional = workerService.getWorker(walletAddress); + final Optional optional = workerService.getWorker(walletAddress); if (optional.isEmpty()) { return Optional.empty(); } - Worker worker = optional.get(); + final Worker worker = optional.get(); + + // return empty if max computing task is reached or if the worker is not found + if (!workerService.canAcceptMoreWorks(worker)) { + return Optional.empty(); + } return getReplicateTaskSummaryForAnyAvailableTask( walletAddress, @@ -161,8 +161,8 @@ private Optional getReplicateTaskSummary(Task task, String chainTaskId, task.getEnclaveChallenge()); ReplicateTaskSummaryBuilder replicateTaskSummary = ReplicateTaskSummary.builder() - .workerpoolAuthorization(authorization); - if(task.isTeeTask()){ + .workerpoolAuthorization(authorization); + if (task.isTeeTask()) { replicateTaskSummary.smsUrl(task.getSmsUrl()); } return Optional.of(replicateTaskSummary.build()); @@ -173,7 +173,7 @@ private Optional getReplicateTaskSummary(Task task, String * tries to accept the task - i.e. create a new {@link Replicate} * for that task on that worker. * - * @param task {@link Task} needing at least one new {@link Replicate}. + * @param task {@link Task} needing at least one new {@link Replicate}. * @param walletAddress Wallet address of a worker looking for new {@link Task}. * @return {@literal true} if the task has been accepted, * {@literal false} otherwise. @@ -220,8 +220,8 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { return false; } - replicatesService.addNewReplicate(chainTaskId, walletAddress); - workerService.addChainTaskIdToWorker(chainTaskId, walletAddress); + workerService.addChainTaskIdToWorker(chainTaskId, walletAddress) + .ifPresent(worker -> replicatesService.addNewReplicate(chainTaskId, walletAddress)); } finally { // We should always unlock the task // so that it could be taken by another replicate @@ -234,8 +234,8 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { /** * Get notifications missed by the worker during the time it was absent. - * - * @param blockNumber last seen blocknumber by the worker + * + * @param blockNumber last seen blocknumber by the worker * @param walletAddress of the worker * @return list of missed notifications. Can be empty if no notification is found */ @@ -264,7 +264,7 @@ public List getMissedTaskNotifications(long blockNumber, Strin continue; } TaskNotificationExtra taskNotificationExtra = - getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge); + getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge); TaskNotification taskNotification = TaskNotification.builder() .chainTaskId(chainTaskId) @@ -286,7 +286,7 @@ public List getMissedTaskNotifications(long blockNumber, Strin private TaskNotificationExtra getTaskNotificationExtra(Task task, TaskNotificationType taskNotificationType, String walletAddress, String enclaveChallenge) { TaskNotificationExtra taskNotificationExtra = TaskNotificationExtra.builder().build(); - switch (taskNotificationType){ + switch (taskNotificationType) { case PLEASE_CONTRIBUTE: WorkerpoolAuthorization authorization = signatureService.createAuthorization( walletAddress, task.getChainTaskId(), enclaveChallenge); @@ -312,7 +312,7 @@ public Optional getTaskNotificationType(Task task, Replica // CONTRIBUTION_TIMEOUT or CONSENSUS_REACHED without contribution if (task.getCurrentStatus().equals(TaskStatus.CONTRIBUTION_TIMEOUT) || (task.getCurrentStatus().equals(TaskStatus.CONSENSUS_REACHED) - && !replicate.containsContributedStatus())) { + && !replicate.containsContributedStatus())) { return Optional.of(TaskNotificationType.PLEASE_ABORT); } diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesService.java b/src/main/java/com/iexec/core/replicate/ReplicatesService.java index ea793ceab..909779904 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesService.java @@ -72,17 +72,20 @@ public ReplicatesService(ReplicatesRepository replicatesRepository, } public void addNewReplicate(String chainTaskId, String walletAddress) { - if (getReplicate(chainTaskId, walletAddress).isEmpty()) { - Optional optional = getReplicatesList(chainTaskId); - if (optional.isPresent()) { - ReplicatesList replicatesList = optional.get(); - Replicate replicate = new Replicate(walletAddress, chainTaskId); - replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate - replicatesList.getReplicates().add(replicate); - - replicatesRepository.save(replicatesList); - log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); - } + final Optional oReplicatesList = getReplicatesList(chainTaskId); + if (oReplicatesList.isEmpty()) { + log.warn("Can't add replicate to unknown ReplicatesList [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress); + return; + } + + final ReplicatesList replicatesList = oReplicatesList.get(); + if (replicatesList.getReplicateOfWorker(walletAddress).isEmpty()) { + Replicate replicate = new Replicate(walletAddress, chainTaskId); + replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate + replicatesList.getReplicates().add(replicate); + + replicatesRepository.save(replicatesList); + log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); } else { log.error("Replicate already saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); } @@ -635,4 +638,4 @@ public void setRevealTimeoutStatusIfNeeded(String chainTaskId, Replicate replica updateReplicateStatus(chainTaskId, replicate.getWalletAddress(), statusUpdate); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/iexec/core/worker/WorkerService.java b/src/main/java/com/iexec/core/worker/WorkerService.java index ee92158db..7b0d6fe4d 100644 --- a/src/main/java/com/iexec/core/worker/WorkerService.java +++ b/src/main/java/com/iexec/core/worker/WorkerService.java @@ -69,10 +69,10 @@ public Optional getWorker(String walletAddress) { return workerRepository.findByWalletAddress(walletAddress); } - public boolean isAllowedToJoin(String workerAddress){ + public boolean isAllowedToJoin(String workerAddress) { List whitelist = workerConfiguration.getWhitelist(); // if the whitelist is empty, there is no restriction on the workers - if (whitelist.isEmpty()){ + if (whitelist.isEmpty()) { return true; } return whitelist.contains(workerAddress); @@ -135,17 +135,17 @@ public List getAliveWorkers() { public boolean canAcceptMoreWorks(String walletAddress) { Optional optionalWorker = getWorker(walletAddress); - if (optionalWorker.isEmpty()){ - return false; - } + return optionalWorker.filter(this::canAcceptMoreWorks).isPresent(); + + } - Worker worker = optionalWorker.get(); + public boolean canAcceptMoreWorks(Worker worker) { int workerMaxNbTasks = worker.getMaxNbTasks(); int runningReplicateNb = worker.getComputingChainTaskIds().size(); if (runningReplicateNb >= workerMaxNbTasks) { log.debug("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]", - walletAddress, runningReplicateNb, workerMaxNbTasks); + worker.getWalletAddress(), runningReplicateNb, workerMaxNbTasks); return false; } @@ -154,7 +154,7 @@ public boolean canAcceptMoreWorks(String walletAddress) { public int getAliveAvailableCpu() { int availableCpus = 0; - for (Worker worker: getAliveWorkers()) { + for (Worker worker : getAliveWorkers()) { if (worker.isGpuEnabled()) { continue; } @@ -162,18 +162,18 @@ public int getAliveAvailableCpu() { int workerCpuNb = worker.getCpuNb(); int computingReplicateNb = worker.getComputingChainTaskIds().size(); int availableCpu = workerCpuNb - computingReplicateNb; - availableCpus+= availableCpu; + availableCpus += availableCpu; } return availableCpus; } public int getAliveTotalCpu() { int totalCpus = 0; - for (Worker worker: getAliveWorkers()){ - if(worker.isGpuEnabled()) { + for (Worker worker : getAliveWorkers()) { + if (worker.isGpuEnabled()) { continue; } - totalCpus+= worker.getCpuNb(); + totalCpus += worker.getCpuNb(); } return totalCpus; } @@ -181,7 +181,7 @@ public int getAliveTotalCpu() { // We suppose for now that 1 Gpu enabled worker has only one GPU public int getAliveTotalGpu() { int totalGpus = 0; - for(Worker worker: getAliveWorkers()) { + for (Worker worker : getAliveWorkers()) { if (worker.isGpuEnabled()) { totalGpus++; } @@ -189,9 +189,9 @@ public int getAliveTotalGpu() { return totalGpus; } - public int getAliveAvailableGpu () { + public int getAliveAvailableGpu() { int availableGpus = getAliveTotalGpu(); - for (Worker worker: getAliveWorkers()) { + for (Worker worker : getAliveWorkers()) { if (worker.isGpuEnabled()) { boolean isWorking = !worker.getComputingChainTaskIds().isEmpty(); if (isWorking) { @@ -246,13 +246,20 @@ public Optional addChainTaskIdToWorker(String chainTaskId, String wallet } private Optional addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) { - Optional optional = workerRepository.findByWalletAddress(walletAddress); + final Optional optional = workerRepository.findByWalletAddress(walletAddress); if (optional.isPresent()) { - Worker worker = optional.get(); + final Worker worker = optional.get(); + if (!canAcceptMoreWorks(worker)) { + log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerName:{}]", + chainTaskId, walletAddress); + return Optional.empty(); + } worker.addChainTaskId(chainTaskId); log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress); return Optional.of(workerRepository.save(worker)); } + log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerName:{}]", + chainTaskId, walletAddress); return Optional.empty(); } diff --git a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java index 2a28f281c..7e86ffd0d 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java @@ -58,7 +58,7 @@ class ReplicateSupplyServiceTests { private final static String WALLET_WORKER_1 = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; private final static String WALLET_WORKER_2 = "0xdcfeffee1443fbf9277e6fa3b50cf3b38f7101af"; - private final static String CHAIN_TASK_ID = "0x65bc5e94ed1486b940bd6cc0013c418efad58a0a52a3d08cee89faaa21970426"; + private final static String CHAIN_TASK_ID = "0x65bc5e94ed1486b940bd6cc0013c418efad58a0a52a3d08cee89faaa21970426"; private final static String CHAIN_TASK_ID_2 = "0xc536af16737e02bb28100452a932056d499be3c462619751a9ed36515de64d50"; private final static String DAPP_NAME = "dappName"; @@ -69,12 +69,18 @@ class ReplicateSupplyServiceTests { private final static long maxExecutionTime = 60000; long workerLastBlock = 12; - @Mock private ReplicatesService replicatesService; - @Mock private SignatureService signatureService; - @Mock private TaskService taskService; - @Mock private TaskUpdateRequestManager taskUpdateRequestManager; - @Mock private WorkerService workerService; - @Mock private Web3jService web3jService; + @Mock + private ReplicatesService replicatesService; + @Mock + private SignatureService signatureService; + @Mock + private TaskService taskService; + @Mock + private TaskUpdateRequestManager taskUpdateRequestManager; + @Mock + private WorkerService workerService; + @Mock + private Web3jService web3jService; @Spy @InjectMocks @@ -97,11 +103,12 @@ void workerCanWorkAndHasGas(String workerAddress) { // in getAuthOfAvailableReplicate method @Test void shouldNotGetAnyReplicateSinceWorkerDoesNotExist() { + workerCanWorkAndHasGas(WALLET_WORKER_1); when(workerService.getWorker(Mockito.anyString())).thenReturn(Optional.empty()); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isEmpty(); - Mockito.verifyNoInteractions(web3jService, taskService, taskUpdateRequestManager, replicatesService, signatureService); + Mockito.verifyNoInteractions(taskService, taskUpdateRequestManager, replicatesService, signatureService); } @Test @@ -130,6 +137,7 @@ void shouldNotGetReplicateSinceNoReplicatesList() { .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(4) + .maxNbTasks(3) .teeEnabled(false) .build(); @@ -160,6 +168,7 @@ void shouldNotGetReplicateSinceConsensusReachedOnChain() { .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(4) + .maxNbTasks(3) .teeEnabled(false) .build(); final Replicate replicate = new Replicate(WALLET_WORKER_2, CHAIN_TASK_ID); @@ -180,6 +189,7 @@ void shouldNotGetReplicateSinceConsensusReachedOnChain() { when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_2)).thenReturn(Optional.of(worker)); + when(workerService.canAcceptMoreWorks(worker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(taskService.isConsensusReached(replicatesList)).thenReturn(true); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_2)).thenReturn(false); @@ -220,6 +230,7 @@ void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .build(); Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID); @@ -256,6 +267,7 @@ void shouldNotGetReplicateSinceDoesNotNeedMoreContributionsForConsensus() { .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(2) + .maxNbTasks(1) .build(); int trust = 5; @@ -277,11 +289,14 @@ void shouldNotGetReplicateSinceDoesNotNeedMoreContributionsForConsensus() { ); // Try to see if a replicate of the task can be scheduled on worker2 - workerCanWorkAndHasGas(WALLET_WORKER_2); + when(web3jService.hasEnoughGas(WALLET_WORKER_2)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_2)).thenReturn(Optional.of(existingWorker)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_2); @@ -335,6 +350,7 @@ void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(4) + .maxNbTasks(3) .teeEnabled(false) .build(); @@ -361,9 +377,12 @@ void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() { when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(task1)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, BytesUtils.EMPTY_ADDRESS)) .thenReturn(WorkerpoolAuthorization.builder().chainTaskId(CHAIN_TASK_ID).build()); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); final Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -382,6 +401,7 @@ void shouldNotGetReplicateWhenTaskAlreadyAccessed() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(false) .build(); @@ -413,6 +433,7 @@ void shouldGetReplicateWithNoTee() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(false) .build(); @@ -432,9 +453,12 @@ void shouldGetReplicateWithNoTee() { .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, BytesUtils.EMPTY_ADDRESS)) .thenReturn(new WorkerpoolAuthorization()); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -452,6 +476,7 @@ void shouldGetReplicateWithTee() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(true) .build(); @@ -470,9 +495,12 @@ void shouldGetReplicateWithTee() { when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, ENCLAVE_CHALLENGE)) .thenReturn(new WorkerpoolAuthorization()); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = @@ -491,6 +519,7 @@ void shouldTeeNeededTaskNotBeGivenToTeeDisabledWorker() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(false) .build(); @@ -519,6 +548,7 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(true) .build(); @@ -537,9 +567,12 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, ENCLAVE_CHALLENGE)) .thenReturn(new WorkerpoolAuthorization()); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = @@ -559,6 +592,7 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { */ private void assertTaskAccessForNewReplicateNotDeadLocking(String chainTaskId) { final Lock lock = replicateSupplyService.taskAccessForNewReplicateLocks.get(chainTaskId); + System.out.println("Task: " + chainTaskId + " ; lock : " + lock); final Boolean successfulLock = CompletableFuture.supplyAsync(() -> { final boolean locked = lock.tryLock(); if (!locked) { @@ -574,7 +608,8 @@ private void assertTaskAccessForNewReplicateNotDeadLocking(String chainTaskId) { private void assertTaskAccessForNewReplicateLockNeverUsed(String chainTaskId) { final Lock lock = replicateSupplyService.taskAccessForNewReplicateLocks.get(chainTaskId); - assertThat(lock).isNull();; + assertThat(lock).isNull(); + ; } // Tests on getMissedTaskNotifications() @@ -612,12 +647,12 @@ void shouldNotGetInterruptedReplicateSinceEnclaveChallengeNeededButNotGenerated( assertThat(taskNotifications).isEmpty(); Mockito.verify(replicatesService, times(0)) - .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); + .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } @Test - // CREATED, ..., CAN_CONTRIBUTE => RecoveryAction.CONTRIBUTE + // CREATED, ..., CAN_CONTRIBUTE => RecoveryAction.CONTRIBUTE void shouldTellReplicateToContributeWhenComputing() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RUNNING); @@ -641,7 +676,7 @@ void shouldTellReplicateToContributeWhenComputing() { } @Test - // CONTRIBUTING + !onChain => RecoveryAction.CONTRIBUTE + // CONTRIBUTING + !onChain => RecoveryAction.CONTRIBUTE void shouldTellReplicateToContributeSinceNotDoneOnchain() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RUNNING); @@ -668,8 +703,8 @@ void shouldTellReplicateToContributeSinceNotDoneOnchain() { } @Test - // CONTRIBUTING + done onChain => updateStatus to CONTRIBUTED - // Task not in CONSENSUS_REACHED => RecoveryAction.WAIT + // CONTRIBUTING + done onChain => updateStatus to CONTRIBUTED + // Task not in CONSENSUS_REACHED => RecoveryAction.WAIT void shouldTellReplicateToWaitSinceContributedOnchain() { long blockNumber = 3; // ChainReceipt chainReceipt = new ChainReceipt(blockNumber, ""); @@ -707,8 +742,8 @@ void shouldTellReplicateToWaitSinceContributedOnchain() { } @Test - // CONTRIBUTING + done onChain => updateStatus to CONTRIBUTED - // Task in CONSENSUS_REACHED => RecoveryAction.REVEAL + // CONTRIBUTING + done onChain => updateStatus to CONTRIBUTED + // Task in CONSENSUS_REACHED => RecoveryAction.REVEAL void shouldTellReplicateToRevealSinceConsensusReached() { long blockNumber = 3; // ChainReceipt chainReceipt = new ChainReceipt(blockNumber, ""); @@ -746,7 +781,7 @@ void shouldTellReplicateToRevealSinceConsensusReached() { } @Test - // any status + Task in CONTRIBUTION_TIMEOUT => RecoveryAction.ABORT_CONTRIBUTION_TIMEOUT + // any status + Task in CONTRIBUTION_TIMEOUT => RecoveryAction.ABORT_CONTRIBUTION_TIMEOUT void shouldTellReplicateToAbortSinceContributionTimeout() { long blockNumber = 3; List ids = List.of(CHAIN_TASK_ID); @@ -773,7 +808,7 @@ void shouldTellReplicateToAbortSinceContributionTimeout() { } @Test - // !CONTRIBUTED + Task in CONSENSUS_REACHED => RecoveryAction.ABORT_CONSENSUS_REACHED + // !CONTRIBUTED + Task in CONSENSUS_REACHED => RecoveryAction.ABORT_CONSENSUS_REACHED void shouldTellReplicateToWaitSinceConsensusReachedAndItDidNotContribute() { long blockNumber = 3; List ids = List.of(CHAIN_TASK_ID); @@ -801,7 +836,7 @@ void shouldTellReplicateToWaitSinceConsensusReachedAndItDidNotContribute() { } @Test - // CONTRIBUTED + Task in REVEAL phase => RecoveryAction.REVEAL + // CONTRIBUTED + Task in REVEAL phase => RecoveryAction.REVEAL void shouldTellReplicateToRevealSinceContributed() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.AT_LEAST_ONE_REVEALED); @@ -825,7 +860,7 @@ void shouldTellReplicateToRevealSinceContributed() { } @Test - // REVEALING + !onChain => RecoveryAction.REVEAL + // REVEALING + !onChain => RecoveryAction.REVEAL void shouldTellReplicateToRevealSinceNotDoneOnchain() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.AT_LEAST_ONE_REVEALED); @@ -852,8 +887,8 @@ void shouldTellReplicateToRevealSinceNotDoneOnchain() { } @Test - // REVEALING + done onChain => updateStatus to REVEALED - // no RESULT_UPLOAD_REQUESTED => RecoveryAction.WAIT + // REVEALING + done onChain => updateStatus to REVEALED + // no RESULT_UPLOAD_REQUESTED => RecoveryAction.WAIT void shouldTellReplicateToWaitSinceRevealed() { long blockNumber = 3; // ChainReceipt chainReceipt = new ChainReceipt(blockNumber, ""); @@ -891,8 +926,8 @@ void shouldTellReplicateToWaitSinceRevealed() { } @Test - // REVEALING + done onChain => updateStatus to REVEALED - // RESULT_UPLOAD_REQUESTED => RecoveryAction.UPLOAD_RESULT + // REVEALING + done onChain => updateStatus to REVEALED + // RESULT_UPLOAD_REQUESTED => RecoveryAction.UPLOAD_RESULT void shouldTellReplicateToUploadResultSinceRequestedAfterRevealing() { long blockNumber = 3; // ChainReceipt chainReceipt = new ChainReceipt(blockNumber, ""); @@ -929,7 +964,7 @@ void shouldTellReplicateToUploadResultSinceRequestedAfterRevealing() { } @Test - // RESULT_UPLOAD_REQUESTED => RecoveryAction.UPLOAD_RESULT + // RESULT_UPLOAD_REQUESTED => RecoveryAction.UPLOAD_RESULT void shouldTellReplicateToUploadResultSinceRequested() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RESULT_UPLOADING); @@ -953,7 +988,7 @@ void shouldTellReplicateToUploadResultSinceRequested() { } @Test - // RESULT_UPLOADING + not done yet => RecoveryAction.UPLOAD_RESULT + // RESULT_UPLOADING + not done yet => RecoveryAction.UPLOAD_RESULT void shouldTellReplicateToUploadResultSinceNotDoneYet() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RESULT_UPLOADING); @@ -979,8 +1014,8 @@ void shouldTellReplicateToUploadResultSinceNotDoneYet() { } @Test - // RESULT_UPLOADING + done => update to ReplicateStatus.RESULT_UPLOADED - // RecoveryAction.WAIT + // RESULT_UPLOADING + done => update to ReplicateStatus.RESULT_UPLOADED + // RecoveryAction.WAIT void shouldTellReplicateToWaitSinceDetectedResultUpload() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RESULT_UPLOADING); @@ -1009,7 +1044,7 @@ void shouldTellReplicateToWaitSinceDetectedResultUpload() { } @Test - // RESULT_UPLOADED => RecoveryAction.WAIT + // RESULT_UPLOADED => RecoveryAction.WAIT void shouldTellReplicateToWaitSinceItUploadedResult() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RESULT_UPLOADING); @@ -1038,7 +1073,7 @@ void shouldTellReplicateToWaitSinceItUploadedResult() { } @Test - // REVEALED + Task in completion phase => RecoveryAction.WAIT + // REVEALED + Task in completion phase => RecoveryAction.WAIT void shouldTellReplicateToWaitForCompletionSinceItRevealed() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.FINALIZING); @@ -1064,7 +1099,7 @@ void shouldTellReplicateToWaitForCompletionSinceItRevealed() { } @Test - // REVEALED + RESULT_UPLOADED + Task in completion phase => RecoveryAction.WAIT + // REVEALED + RESULT_UPLOADED + Task in completion phase => RecoveryAction.WAIT void shouldTellReplicateToWaitForCompletionSinceItRevealedAndUploaded() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.FINALIZING); @@ -1122,7 +1157,7 @@ void shouldTellReplicateToCompleteSinceItRevealed() { } @Test - // !REVEALED + Task in completion phase => null / nothing + // !REVEALED + Task in completion phase => null / nothing void shouldNotTellReplicateToWaitForCompletionSinceItDidNotReveal() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.FINALIZING); @@ -1215,4 +1250,4 @@ Optional getStubReplicate(ReplicateStatus status) { WorkerpoolAuthorization getStubAuth() { return new WorkerpoolAuthorization(); } -} \ No newline at end of file +} diff --git a/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java b/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java index 989206c89..cab83cbad 100644 --- a/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java +++ b/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java @@ -37,7 +37,9 @@ import java.util.Date; import java.util.List; import java.util.Optional; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -74,13 +76,14 @@ void init() { */ @Test void addMultipleTaskIds() { + final int nThreads = 10; workerService.addWorker( Worker.builder() .walletAddress(WALLET_WORKER_1) + .maxNbTasks(nThreads) .build() ); - final int nThreads = 10; final ExecutorService executor = Executors.newFixedThreadPool(nThreads); final List>> futures = IntStream.range(0, nThreads) diff --git a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java index 9d7a2201c..13d74472e 100644 --- a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java +++ b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java @@ -19,7 +19,10 @@ import com.iexec.core.configuration.WorkerConfiguration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.*; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -194,7 +197,7 @@ void shouldWorkerBeAllowedToAskReplicateSinceFirstTime() { void shouldWorkerNotBeAllowedToAskReplicateSinceTooSoon() { String wallet = "wallet"; workerService.getWorkerStatsMap().computeIfAbsent(wallet, WorkerService.WorkerStats::new) - .setLastReplicateDemandDate(Date.from(Instant.now().minusSeconds(1))); + .setLastReplicateDemandDate(Date.from(Instant.now().minusSeconds(1))); when(workerConfiguration.getAskForReplicatePeriod()).thenReturn(5000L); assertThat(workerService.isWorkerAllowedToAskReplicate(wallet)).isFalse(); @@ -219,7 +222,7 @@ void shouldUpdateLastReplicateDemand() { // addChainTaskIdToWorker @Test - void shouldAddTaskIdToWorker(){ + void shouldAddTaskIdToWorker() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; Worker existingWorker = Worker.builder() @@ -229,6 +232,7 @@ void shouldAddTaskIdToWorker(){ .os("Linux") .cpu("x86") .cpuNb(8) + .maxNbTasks(7) .participatingChainTaskIds(new ArrayList<>(Arrays.asList("task1", "task2"))) .computingChainTaskIds(new ArrayList<>(Arrays.asList("task1", "task2"))) .build(); @@ -246,7 +250,7 @@ void shouldAddTaskIdToWorker(){ } @Test - void shouldNotAddTaskIdToWorker(){ + void shouldNotAddTaskIdToWorker() { when(workerRepository.findByWalletAddress(Mockito.anyString())).thenReturn(Optional.empty()); Optional addedWorker = workerService.addChainTaskIdToWorker("task1", "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"); assertThat(addedWorker).isEmpty(); @@ -294,11 +298,11 @@ void shouldNotGetComputingTaskIdsSinceNoWorker() { assertThat(workerService.getComputingTaskIds(wallet)).isEmpty(); } - + // removeChainTaskIdFromWorker @Test - void shouldRemoveTaskIdFromWorker(){ + void shouldRemoveTaskIdFromWorker() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; Worker existingWorker = Worker.builder() @@ -325,14 +329,14 @@ void shouldRemoveTaskIdFromWorker(){ } @Test - void shouldNotRemoveTaskIdWorkerNotFound(){ + void shouldNotRemoveTaskIdWorkerNotFound() { when(workerRepository.findByWalletAddress(Mockito.anyString())).thenReturn(Optional.empty()); Optional addedWorker = workerService.removeChainTaskIdFromWorker("task1", "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"); assertThat(addedWorker).isEmpty(); } @Test - void shouldNotRemoveAnythingSinceTaskIdNotFound(){ + void shouldNotRemoveAnythingSinceTaskIdNotFound() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; List participatingIds = new ArrayList<>(Arrays.asList("task1", "task2")); @@ -362,7 +366,7 @@ void shouldNotRemoveAnythingSinceTaskIdNotFound(){ } @Test - void shouldRemoveComputedChainTaskIdFromWorker(){ + void shouldRemoveComputedChainTaskIdFromWorker() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; List participatingIds = new ArrayList<>(Arrays.asList("task1", "task2")); @@ -392,7 +396,7 @@ void shouldRemoveComputedChainTaskIdFromWorker(){ } @Test - void shouldNotRemoveComputedChainTaskIdFromWorkerSinceWorkerNotFound(){ + void shouldNotRemoveComputedChainTaskIdFromWorkerSinceWorkerNotFound() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; List participatingIds = new ArrayList<>(Arrays.asList("task1", "task2")); @@ -416,7 +420,7 @@ void shouldNotRemoveComputedChainTaskIdFromWorkerSinceWorkerNotFound(){ } @Test - void shouldNotRemoveComputedChainTaskIdFromWorkerSinceChainTaskIdNotFound(){ + void shouldNotRemoveComputedChainTaskIdFromWorkerSinceChainTaskIdNotFound() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; List participatingIds = new ArrayList<>(Arrays.asList("task1", "task2")); From be52571bc24b0e15c5ec967681ad087e388f4902 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Wed, 13 Dec 2023 15:34:11 +0100 Subject: [PATCH 2/8] Add new replicate to `ReplicatesList` without querying it twice --- CHANGELOG.md | 2 +- .../replicate/ReplicateSupplyService.java | 39 +++++++++---------- .../core/replicate/ReplicatesService.java | 11 ++---- .../core/replicate/ReplicateServiceTests.java | 8 ++-- .../ReplicateSupplyServiceTests.java | 18 ++++++--- 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf57d3b10..76dae2082 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ All notable changes to this project will be documented in this file. ### Bug Fixes -- Check if Worker can still accept more work right before giving it new replicate. (#644) +- Check if Worker can still accept more work right before giving it a new replicate. (#644) ## [[8.2.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.2) 2023-12-13 diff --git a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java index 1b5331fc6..730195685 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java @@ -184,22 +184,6 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { } final String chainTaskId = task.getChainTaskId(); - final Optional oReplicatesList = replicatesService.getReplicatesList(chainTaskId); - // Check is only here to prevent - // "`Optional.get()` without `isPresent()` warning". - // This case should not happen. - if (oReplicatesList.isEmpty()) { - return false; - } - - final ReplicatesList replicatesList = oReplicatesList.get(); - - final boolean hasWorkerAlreadyParticipated = - replicatesList.hasWorkerAlreadyParticipated(walletAddress); - if (hasWorkerAlreadyParticipated) { - return false; - } - final Lock lock = taskAccessForNewReplicateLocks .computeIfAbsent(chainTaskId, k -> new ReentrantLock()); if (!lock.tryLock()) { @@ -209,6 +193,22 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { } try { + final Optional oReplicatesList = replicatesService.getReplicatesList(chainTaskId); + // Check is only here to prevent + // "`Optional.get()` without `isPresent()` warning". + // This case should not happen. + if (oReplicatesList.isEmpty()) { + return false; + } + + final ReplicatesList replicatesList = oReplicatesList.get(); + + final boolean hasWorkerAlreadyParticipated = + replicatesList.hasWorkerAlreadyParticipated(walletAddress); + if (hasWorkerAlreadyParticipated) { + return false; + } + final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus( chainTaskId, replicatesList.getReplicates(), @@ -220,16 +220,15 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { return false; } - workerService.addChainTaskIdToWorker(chainTaskId, walletAddress) - .ifPresent(worker -> replicatesService.addNewReplicate(chainTaskId, walletAddress)); + return workerService.addChainTaskIdToWorker(chainTaskId, walletAddress) + .map(worker -> replicatesService.addNewReplicate(replicatesList, walletAddress)) + .orElse(false); } finally { // We should always unlock the task // so that it could be taken by another replicate // if there's any issue. lock.unlock(); } - - return true; } /** diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesService.java b/src/main/java/com/iexec/core/replicate/ReplicatesService.java index 909779904..505c01676 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesService.java @@ -71,14 +71,8 @@ public ReplicatesService(ReplicatesRepository replicatesRepository, this.taskLogsService = taskLogsService; } - public void addNewReplicate(String chainTaskId, String walletAddress) { - final Optional oReplicatesList = getReplicatesList(chainTaskId); - if (oReplicatesList.isEmpty()) { - log.warn("Can't add replicate to unknown ReplicatesList [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress); - return; - } - - final ReplicatesList replicatesList = oReplicatesList.get(); + public boolean addNewReplicate(ReplicatesList replicatesList, String walletAddress) { + final String chainTaskId = replicatesList.getChainTaskId(); if (replicatesList.getReplicateOfWorker(walletAddress).isEmpty()) { Replicate replicate = new Replicate(walletAddress, chainTaskId); replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate @@ -90,6 +84,7 @@ public void addNewReplicate(String chainTaskId, String walletAddress) { log.error("Replicate already saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); } + return true; } public synchronized void createEmptyReplicateList(String chainTaskId) { diff --git a/src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java b/src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java index 376213918..790c56dbd 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java @@ -83,7 +83,7 @@ void shouldCreateNewReplicate() { ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, list); when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(replicatesRepository.save(any())).thenReturn(replicatesList); - replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_3); + replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_3); Mockito.verify(replicatesRepository, Mockito.times(1)) .save(any()); } @@ -103,11 +103,11 @@ void shouldNotCreateNewReplicate() { when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(replicatesRepository.save(any())).thenReturn(replicatesList); - replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(replicatesRepository, Mockito.times(0)) .save(any()); - replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_2); + replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_2); Mockito.verify(replicatesRepository, Mockito.times(0)) .save(any()); } @@ -1505,4 +1505,4 @@ void computeUpdateReplicateStatusArgsResultUploadFailed() { .build()); } -} \ No newline at end of file +} diff --git a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java index 7e86ffd0d..bf1eb4dee 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java @@ -338,7 +338,7 @@ void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() { assertThat(replicateTaskSummary).isEmpty(); - Mockito.verify(replicatesService, Mockito.never()).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService, Mockito.never()).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService, Mockito.never()).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); Mockito.verifyNoInteractions(signatureService); assertTaskAccessForNewReplicateLockNeverUsed(CHAIN_TASK_ID); @@ -383,13 +383,15 @@ void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() { .thenReturn(WorkerpoolAuthorization.builder().chainTaskId(CHAIN_TASK_ID).build()); when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); final Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isPresent(); assertThat(replicateTaskSummary.get().getWorkerpoolAuthorization().getChainTaskId()).isEqualTo(CHAIN_TASK_ID); - Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); Mockito.verify(signatureService, times(0)).createAuthorization(any(), eq(CHAIN_TASK_ID_2), any()); assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); @@ -459,12 +461,14 @@ void shouldGetReplicateWithNoTee() { when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isPresent(); - Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); Mockito.verify(signatureService).createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, BytesUtils.EMPTY_ADDRESS); assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); @@ -501,6 +505,8 @@ void shouldGetReplicateWithTee() { .thenReturn(new WorkerpoolAuthorization()); when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = @@ -508,7 +514,7 @@ void shouldGetReplicateWithTee() { assertThat(replicateTaskSummary).isPresent(); - Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); } @@ -573,6 +579,8 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { .thenReturn(new WorkerpoolAuthorization()); when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = @@ -580,7 +588,7 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { assertThat(replicateTaskSummary).isPresent(); - Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); } From 589cc16c893c3d8c10baed7b4479388353b22310 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Wed, 13 Dec 2023 15:40:56 +0100 Subject: [PATCH 3/8] Improve code coverage --- .../iexec/core/worker/WorkerServiceTests.java | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java index 13d74472e..81c4e6109 100644 --- a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java +++ b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java @@ -250,12 +250,35 @@ void shouldAddTaskIdToWorker() { } @Test - void shouldNotAddTaskIdToWorker() { + void shouldNotAddTaskIdToWorkerSinceUnknownWorker() { when(workerRepository.findByWalletAddress(Mockito.anyString())).thenReturn(Optional.empty()); Optional addedWorker = workerService.addChainTaskIdToWorker("task1", "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"); assertThat(addedWorker).isEmpty(); } + @Test + void shouldNotAddTaskIdToWorkerSinceCantAcceptMoreWorker() { + String workerName = "worker1"; + String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; + Worker existingWorker = Worker.builder() + .id("1") + .name(workerName) + .walletAddress(walletAddress) + .os("Linux") + .cpu("x86") + .cpuNb(3) + .maxNbTasks(2) + .participatingChainTaskIds(new ArrayList<>(Arrays.asList("task1", "task2"))) + .computingChainTaskIds(new ArrayList<>(Arrays.asList("task1", "task2"))) + .build(); + + when(workerRepository.findByWalletAddress(walletAddress)).thenReturn(Optional.of(existingWorker)); + when(workerRepository.save(existingWorker)).thenReturn(existingWorker); + + Optional addedWorker = workerService.addChainTaskIdToWorker("task3", walletAddress); + assertThat(addedWorker).isEmpty(); + } + // getChainTaskIds @Test From 1ba52a22ad7cbf1298ce080e87658b3c198fb4db Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Wed, 13 Dec 2023 16:45:33 +0100 Subject: [PATCH 4/8] Remove `canAcceptMoreWorks(String walletAddress)` method --- .../com/iexec/core/worker/WorkerService.java | 6 --- .../ReplicateSupplyServiceTests.java | 54 ++++++++++++------- .../iexec/core/worker/WorkerServiceTests.java | 16 +----- 3 files changed, 36 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/iexec/core/worker/WorkerService.java b/src/main/java/com/iexec/core/worker/WorkerService.java index 7b0d6fe4d..bddb3ecd2 100644 --- a/src/main/java/com/iexec/core/worker/WorkerService.java +++ b/src/main/java/com/iexec/core/worker/WorkerService.java @@ -133,12 +133,6 @@ public List getAliveWorkers() { return workerRepository.findByWalletAddressIn(aliveWorkers); } - public boolean canAcceptMoreWorks(String walletAddress) { - Optional optionalWorker = getWorker(walletAddress); - return optionalWorker.filter(this::canAcceptMoreWorks).isPresent(); - - } - public boolean canAcceptMoreWorks(Worker worker) { int workerMaxNbTasks = worker.getMaxNbTasks(); int runningReplicateNb = worker.getComputingChainTaskIds().size(); diff --git a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java index bf1eb4dee..0bd5d703d 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java @@ -91,11 +91,6 @@ void init() { MockitoAnnotations.openMocks(this); } - void workerCanWorkAndHasGas(String workerAddress) { - when(workerService.canAcceptMoreWorks(workerAddress)).thenReturn(true); - when(web3jService.hasEnoughGas(workerAddress)).thenReturn(true); - } - // Tests on getAuthOfAvailableReplicate() // If worker does not exist, canAcceptMoreWorks return false @@ -103,7 +98,7 @@ void workerCanWorkAndHasGas(String workerAddress) { // in getAuthOfAvailableReplicate method @Test void shouldNotGetAnyReplicateSinceWorkerDoesNotExist() { - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(workerService.getWorker(Mockito.anyString())).thenReturn(Optional.empty()); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -113,7 +108,7 @@ void shouldNotGetAnyReplicateSinceWorkerDoesNotExist() { @Test void shouldNotGetReplicateSinceWorkerLastBlockNotAvailable() { - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(0, WALLET_WORKER_1); assertThat(replicateTaskSummary).isEmpty(); @@ -122,7 +117,18 @@ void shouldNotGetReplicateSinceWorkerLastBlockNotAvailable() { @Test void shouldNotGetReplicateSinceNoRunningTask() { - workerCanWorkAndHasGas(WALLET_WORKER_1); + final Worker worker = Worker.builder() + .id("1") + .walletAddress(WALLET_WORKER_1) + .cpuNb(4) + .maxNbTasks(3) + .teeEnabled(false) + .build(); + + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); + when(workerService.getWorker(WALLET_WORKER_1)) + .thenReturn(Optional.ofNullable(worker)); + when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())).thenReturn(Optional.empty()); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -148,7 +154,7 @@ void shouldNotGetReplicateSinceNoReplicatesList() { runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60)); runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - workerCanWorkAndHasGas(WALLET_WORKER_2); + when(web3jService.hasEnoughGas(WALLET_WORKER_2)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_2)).thenReturn(Optional.of(worker)); @@ -185,7 +191,7 @@ void shouldNotGetReplicateSinceConsensusReachedOnChain() { runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60)); runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - workerCanWorkAndHasGas(WALLET_WORKER_2); + when(web3jService.hasEnoughGas(WALLET_WORKER_2)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_2)).thenReturn(Optional.of(worker)); @@ -206,7 +212,14 @@ void shouldNotGetReplicateSinceConsensusReachedOnChain() { @Test void shouldNotGetAnyReplicateSinceWorkerIsFull() { - when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(false); + final Worker worker = Worker.builder() + .walletAddress(WALLET_WORKER_1) + .cpuNb(2) + .maxNbTasks(1) + .build(); + when(workerService.getWorker(WALLET_WORKER_1)) + .thenReturn(Optional.of(worker)); + when(workerService.canAcceptMoreWorks(worker)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isEmpty(); @@ -215,7 +228,6 @@ void shouldNotGetAnyReplicateSinceWorkerIsFull() { @Test void shouldNotGetAnyReplicateSinceWorkerDoesNotHaveEnoughGas() { - when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(true); when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -245,7 +257,8 @@ void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() { Collections.singletonList(new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID)) )); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -312,6 +325,7 @@ void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(true) .build(); @@ -326,7 +340,7 @@ void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() { new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) ); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -371,7 +385,7 @@ void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() { ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, List.of(CHAIN_TASK_ID))) .thenReturn(Optional.of(taskDeadlineReached)); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) @@ -414,7 +428,7 @@ void shouldNotGetReplicateWhenTaskAlreadyAccessed() { runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60)); runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -450,7 +464,7 @@ void shouldGetReplicateWithNoTee() { new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) ); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -495,7 +509,7 @@ void shouldGetReplicateWithTee() { new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) ); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -535,7 +549,7 @@ void shouldTeeNeededTaskNotBeGivenToTeeDisabledWorker() { runningTask.setTag(TEE_TAG); runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60)); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.empty()); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -569,7 +583,7 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) ); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); diff --git a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java index 81c4e6109..c5d59fd59 100644 --- a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java +++ b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java @@ -522,19 +522,8 @@ void shouldAcceptMoreWorks() { 3, Arrays.asList("task1", "task2", "task3", "task4", "task5"), Arrays.asList("task1", "task3")); - when(workerRepository.findByWalletAddress(walletAddress)).thenReturn(Optional.of(worker)); - - assertThat(workerService.canAcceptMoreWorks(walletAddress)).isTrue(); - } - - @Test - void shouldNotAcceptMoreWorksSinceWorkerNotFound() { - String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; - - when(workerRepository.findByWalletAddress(Mockito.any())).thenReturn(Optional.empty()); - boolean canAccept = workerService.canAcceptMoreWorks(walletAddress); - assertThat(canAccept).isFalse(); + assertThat(workerService.canAcceptMoreWorks(worker)).isTrue(); } @Test @@ -545,9 +534,8 @@ void shouldNotAcceptMoreWorksSinceSaturatedCpus() { 2, Arrays.asList("task1", "task2", "task3", "task4"), Arrays.asList("task1", "task3")); - when(workerRepository.findByWalletAddress(Mockito.anyString())).thenReturn(Optional.of(worker)); - assertThat(workerService.canAcceptMoreWorks(walletAddress)).isFalse(); + assertThat(workerService.canAcceptMoreWorks(worker)).isFalse(); } List getDummyWorkers() { From 6e1986735fb499ef3d652d93887927348ab8c381 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Wed, 13 Dec 2023 16:45:47 +0100 Subject: [PATCH 5/8] Return `false` when replicate already saved --- src/main/java/com/iexec/core/replicate/ReplicatesService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesService.java b/src/main/java/com/iexec/core/replicate/ReplicatesService.java index 505c01676..04dd9995b 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesService.java @@ -82,6 +82,7 @@ public boolean addNewReplicate(ReplicatesList replicatesList, String walletAddre log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); } else { log.error("Replicate already saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); + return false; } return true; From d0c0db7fd1a2a61d5135ed50783f50e61aecefc1 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Thu, 14 Dec 2023 08:52:30 +0100 Subject: [PATCH 6/8] Remove out-dated TODO --- .../java/com/iexec/core/replicate/ReplicateSupplyService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java index 730195685..e3c2e3ba8 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java @@ -97,8 +97,6 @@ Optional getAvailableReplicateTaskSummary(long workerLastB return Optional.empty(); } - // TODO : Remove this, the optional can never be empty - // This is covered in workerService.canAcceptMoreWorks final Optional optional = workerService.getWorker(walletAddress); if (optional.isEmpty()) { return Optional.empty(); From caff7a75a410b490aa55eff377668e3e19bf94f6 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Thu, 14 Dec 2023 08:52:49 +0100 Subject: [PATCH 7/8] Update release date --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76dae2082..7cf60b9a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. -## [[8.2.3]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.3) 2023-12-13 +## [[8.2.3]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.3) 2023-12-14 ### Bug Fixes From 5ddcf26f978f3a59c78fdc11f939c263c06ce419 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Thu, 14 Dec 2023 08:58:11 +0100 Subject: [PATCH 8/8] Use functional style to improve reading of method --- .../replicate/ReplicateSupplyService.java | 66 +++++++++++-------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java index e3c2e3ba8..4da8738cf 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java @@ -191,35 +191,8 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { } try { - final Optional oReplicatesList = replicatesService.getReplicatesList(chainTaskId); - // Check is only here to prevent - // "`Optional.get()` without `isPresent()` warning". - // This case should not happen. - if (oReplicatesList.isEmpty()) { - return false; - } - - final ReplicatesList replicatesList = oReplicatesList.get(); - - final boolean hasWorkerAlreadyParticipated = - replicatesList.hasWorkerAlreadyParticipated(walletAddress); - if (hasWorkerAlreadyParticipated) { - return false; - } - - final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus( - chainTaskId, - replicatesList.getReplicates(), - task.getTrust(), - task.getMaxExecutionTime()); - - if (!taskNeedsMoreContributions - || taskService.isConsensusReached(replicatesList)) { - return false; - } - - return workerService.addChainTaskIdToWorker(chainTaskId, walletAddress) - .map(worker -> replicatesService.addNewReplicate(replicatesList, walletAddress)) + return replicatesService.getReplicatesList(chainTaskId) + .map(replicatesList -> acceptOrRejectTask(task, walletAddress, replicatesList)) .orElse(false); } finally { // We should always unlock the task @@ -229,6 +202,41 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { } } + /** + * Given a {@link Task}, a {@code walletAddress} of a worker and a {@link ReplicatesList}, + * tries to accept the task - i.e. create a new {@link Replicate} + * for that task on that worker. + * + * @param task {@link Task} needing at least one new {@link Replicate}. + * @param walletAddress Wallet address of a worker looking for new {@link Task}. + * @param replicatesList Replicates of given {@link Task}. + * @return {@literal true} if the task has been accepted, + * {@literal false} otherwise. + */ + boolean acceptOrRejectTask(Task task, String walletAddress, ReplicatesList replicatesList) { + final boolean hasWorkerAlreadyParticipated = + replicatesList.hasWorkerAlreadyParticipated(walletAddress); + if (hasWorkerAlreadyParticipated) { + return false; + } + + final String chainTaskId = replicatesList.getChainTaskId(); + final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus( + chainTaskId, + replicatesList.getReplicates(), + task.getTrust(), + task.getMaxExecutionTime()); + + if (!taskNeedsMoreContributions + || taskService.isConsensusReached(replicatesList)) { + return false; + } + + return workerService.addChainTaskIdToWorker(chainTaskId, walletAddress) + .map(worker -> replicatesService.addNewReplicate(replicatesList, walletAddress)) + .orElse(false); + } + /** * Get notifications missed by the worker during the time it was absent. *