diff --git a/CHANGELOG.md b/CHANGELOG.md index dcaba42b..5c09ce46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ All notable changes to this project will be documented in this file. - Use less MongoDB calls when updating a task to a final status. (#649) - Save contribution and result updload replicate data when `CONTRIBUTE_AND_FINALIZE_DONE`. (#651) - Fix potential `NullPointerException` during first worker replicate request. (#652) +- Fix missed replicate status update detectors to avoid false positives by mixing `CONTRIBUTE-REVEAL-FINALIZE` and `CONTRIBUTE_AND_FINALIZE` workflows. (#653) ### Dependency Upgrades diff --git a/src/main/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetector.java b/src/main/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetector.java index d4eabe5d..47d0eb5e 100644 --- a/src/main/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetector.java +++ b/src/main/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetector.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH + * Copyright 2023-2024 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetector.java b/src/main/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetector.java index 19c0f988..a52dce61 100644 --- a/src/main/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetector.java +++ b/src/main/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetector.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetector.java b/src/main/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetector.java index cfd8cc3b..b495730f 100644 --- a/src/main/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetector.java +++ b/src/main/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetector.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/iexec/core/detector/replicate/UnnotifiedAbstractDetector.java b/src/main/java/com/iexec/core/detector/replicate/UnnotifiedAbstractDetector.java index 14c538a8..c81f3906 100644 --- a/src/main/java/com/iexec/core/detector/replicate/UnnotifiedAbstractDetector.java +++ b/src/main/java/com/iexec/core/detector/replicate/UnnotifiedAbstractDetector.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,8 +30,7 @@ import java.util.List; -import static com.iexec.common.replicate.ReplicateStatus.WORKER_LOST; -import static com.iexec.common.replicate.ReplicateStatus.getMissingStatuses; +import static com.iexec.common.replicate.ReplicateStatus.*; @Slf4j public abstract class UnnotifiedAbstractDetector { @@ -95,21 +94,11 @@ void detectOnchainDoneWhenOffchainOngoing() { this.onchainDone, this.offchainOngoing, this.detectorRate); for (Task task : taskService.findByCurrentStatus(detectWhenOffChainTaskStatuses)) { - for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) { - final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus(); - if (lastRelevantStatus != offchainOngoing) { - continue; - } - - final boolean statusTrueOnChain = detectStatusReachedOnChain( - task.getChainTaskId(), replicate.getWalletAddress()); - - if (statusTrueOnChain) { - log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]", - lastRelevantStatus, onchainDone, task.getChainTaskId()); - updateReplicateStatuses(task, replicate); - } - } + replicatesService.getReplicates(task.getChainTaskId()).stream() + .filter(replicate -> replicate.getLastRelevantStatus() == offchainOngoing) + .filter(this::checkDetectionIsValid) + .filter(this::detectStatusReachedOnChain) + .forEach(replicate -> updateReplicateStatuses(task, replicate)); } } @@ -123,33 +112,40 @@ void detectOnchainDoneWhenOffchainOngoing() { public void detectOnchainDone() { log.debug("Detect onchain {} [retryIn:{}]", onchainDone, this.detectorRate * LESS_OFTEN_DETECTOR_FREQUENCY); for (Task task : taskService.findByCurrentStatus(detectWhenOffChainTaskStatuses)) { - for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) { - final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus(); - - if (lastRelevantStatus == offchainDone) { - continue; - } - - final boolean statusTrueOnChain = detectStatusReachedOnChain( - task.getChainTaskId(), replicate.getWalletAddress()); - - if (statusTrueOnChain) { - log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]", - lastRelevantStatus, onchainDone, task.getChainTaskId()); - updateReplicateStatuses(task, replicate); - } - } + replicatesService.getReplicates(task.getChainTaskId()).stream() + .filter(replicate -> replicate.getLastRelevantStatus() != offchainDone) + .filter(this::checkDetectionIsValid) + .filter(this::detectStatusReachedOnChain) + .forEach(replicate -> updateReplicateStatuses(task, replicate)); } } + /** + * Checks replicate eligibility to a detection against an {@code offchainDone} status. + *

+ * All replicates are eligible to detection against {@code REVEALED}, but not against {@code CONTRIBUTED} or + * {@code CONTRIBUTE_AND_FINALIZED}. + * + * @param replicate The replicate to check + * @return {@literal true} if the replicate is eligible, {@literal false} otherwise + */ + private boolean checkDetectionIsValid(Replicate replicate) { + final boolean isEligibleToContributeAndFinalize = iexecHubService.getTaskDescription(replicate.getChainTaskId()) + .isEligibleToContributeAndFinalize(); + return offchainDone == REVEALED + || (!isEligibleToContributeAndFinalize && offchainDone == CONTRIBUTED) + || (isEligibleToContributeAndFinalize && offchainDone == CONTRIBUTE_AND_FINALIZE_DONE); + } + /** * Checks if {@code onchainDone} status has been reached on blockchain network. * - * @param chainTaskId ID of on-chain task - * @param walletAddress Address of a worker working on the current task. - * @return + * @param replicate Replicate whose on-chain status will be checked + * @return {@literal true} if given status has been found on-chain, {@literal false} otherwise. */ - private boolean detectStatusReachedOnChain(String chainTaskId, String walletAddress) { + private boolean detectStatusReachedOnChain(Replicate replicate) { + final String chainTaskId = replicate.getChainTaskId(); + final String walletAddress = replicate.getWalletAddress(); switch (onchainDone) { case CONTRIBUTED: return iexecHubService.isContributed(chainTaskId, walletAddress); @@ -171,14 +167,13 @@ private boolean detectStatusReachedOnChain(String chainTaskId, String walletAddr private void updateReplicateStatuses(Task task, Replicate replicate) { final String chainTaskId = task.getChainTaskId(); final long initBlockNumber = task.getInitializationBlockNumber(); - - final ReplicateStatus retrieveFrom = replicate.getCurrentStatus().equals(WORKER_LOST) - ? replicate.getLastButOneStatus() - : replicate.getCurrentStatus(); - final List statusesToUpdate = getMissingStatuses(retrieveFrom, offchainDone); - + final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus(); + final List statusesToUpdate = getMissingStatuses(lastRelevantStatus, offchainDone); final String wallet = replicate.getWalletAddress(); + log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]", + lastRelevantStatus, onchainDone, task.getChainTaskId()); + for (ReplicateStatus statusToUpdate : statusesToUpdate) { // add details to the update if needed ReplicateStatusDetails details = null; diff --git a/src/test/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetectorTests.java b/src/test/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetectorTests.java index 3da2f41e..03c0fc58 100644 --- a/src/test/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetectorTests.java +++ b/src/test/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetectorTests.java @@ -1,9 +1,27 @@ +/* + * Copyright 2023-2024 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.iexec.core.detector.replicate; import com.iexec.common.replicate.ReplicateStatus; import com.iexec.common.replicate.ReplicateStatusDetails; import com.iexec.common.replicate.ReplicateStatusUpdate; import com.iexec.commons.poco.chain.ChainReceipt; +import com.iexec.commons.poco.task.TaskDescription; +import com.iexec.commons.poco.utils.BytesUtils; import com.iexec.core.chain.IexecHubService; import com.iexec.core.chain.Web3jService; import com.iexec.core.configuration.CronConfiguration; @@ -14,15 +32,18 @@ import com.iexec.core.task.TaskStatus; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.*; +import org.springframework.test.util.ReflectionTestUtils; -import java.util.Arrays; +import java.math.BigInteger; import java.util.Collections; import static com.iexec.common.replicate.ReplicateStatus.*; import static com.iexec.common.replicate.ReplicateStatusModifier.WORKER; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.when; class ContributionAndFinalizationUnnotifiedDetectorTests { @@ -51,9 +72,23 @@ class ContributionAndFinalizationUnnotifiedDetectorTests { @BeforeEach void init() { MockitoAnnotations.openMocks(this); + ReflectionTestUtils.setField(detector, "detectorRate", 1000); + when(iexecHubService.getTaskDescription(anyString())).thenReturn(TaskDescription.builder() + .trust(BigInteger.ONE) + .isTeeTask(true) + .callback(BytesUtils.EMPTY_ADDRESS) + .build()); + } + + private Replicate getReplicateWithStatus(ReplicateStatus replicateStatus) { + Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); + ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder() + .modifier(WORKER).status(replicateStatus).build(); + replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + return replicate; } - // region Detector aggregator + // region detectOnChainChanges /** * When running {@link ContributionAndFinalizationUnnotifiedDetector#detectOnChainChanges} 10 times, @@ -68,10 +103,7 @@ void shouldDetectBothChangesOnChain() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).modifier(WORKER).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - + Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING); when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -97,17 +129,14 @@ void shouldDetectBothChangesOnChain() { // endregion - //region detectOnchainDoneWhenOffchainOngoing (ContributeAndFinalizeOngoing) + // region detectOnchainDoneWhenOffchainOngoing (ContributeAndFinalizeOngoing) @Test - void shouldDetectUnNotifiedContributeAndFinalizeDoneAfterContributeAndFinalizeOngoing() { + void shouldDetectMissedUpdateSinceOffChainOngoing() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); + Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING); when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -130,53 +159,45 @@ void shouldDetectUnNotifiedContributeAndFinalizeDoneAfterContributeAndFinalizeOn } @Test - void shouldDetectUnNotifiedContributeAndFinalizeDoneSinceBeforeContributeAndFinalizeOngoing() { + void shouldNotDetectMissedUpdateSinceNotOffChainOngoing() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(COMPUTED).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); + Replicate replicate = getReplicateWithStatus(COMPUTED); when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true); detector.detectOnchainDoneWhenOffchainOngoing(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } @Test - void shouldNotDetectUnNotifiedContributeAndFinalizeDoneSinceNotFinalizedOnChain() { + void shouldNotDetectMissedUpdateSinceNotOnChainDone() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); + Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING); when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(false); detector.detectOnchainDoneWhenOffchainOngoing(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } // endregion - //region detectOnchainDone (REVEALED) + // region detectOnchainDone (REVEALED) - @Test - void shouldDetectUnNotifiedContributeAndFinalizeOngoing() { + @ParameterizedTest + @EnumSource(value = ReplicateStatus.class, names = {"COMPUTED", "CONTRIBUTE_AND_FINALIZE_ONGOING"}) + void shouldDetectMissedUpdateSinceOnChainDoneNotOffChainDone(ReplicateStatus replicateStatus) { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); + Replicate replicate = getReplicateWithStatus(replicateStatus); when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -197,19 +218,38 @@ void shouldDetectUnNotifiedContributeAndFinalizeOngoing() { } @Test - void shouldNotDetectUnNotifiedContributedSinceContributeAndFinalizeDone() { + void shouldNotDetectMissedUpdateSinceOnChainDoneAndOffChainDone() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_DONE).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); + Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_DONE); when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true); detector.detectOnchainDone(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) + .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); + } + + @Test + void shouldNotDetectMissedUpdateSinceOnChainDoneAndNotEligibleToContributeAndFinalize() { + when(iexecHubService.getTaskDescription(CHAIN_TASK_ID)).thenReturn( + TaskDescription.builder().trust(BigInteger.ONE).isTeeTask(true).callback("0x2").build()); + Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); + + Replicate replicate = getReplicateWithStatus(CONTRIBUTING); + when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); + when(iexecHubService.isContributed(any(), any())).thenReturn(true); + when(web3jService.getLatestBlockNumber()).thenReturn(11L); + when(iexecHubService.getContributionBlock(anyString(), anyString(), anyLong())).thenReturn(ChainReceipt.builder() + .blockNumber(10L) + .txHash("0xabcef") + .build()); + + detector.detectOnchainDone(); + + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } diff --git a/src/test/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetectorTests.java b/src/test/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetectorTests.java index fc8ec478..3a999698 100644 --- a/src/test/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetectorTests.java +++ b/src/test/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetectorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import com.iexec.common.replicate.ReplicateStatusDetails; import com.iexec.common.replicate.ReplicateStatusUpdate; import com.iexec.commons.poco.chain.ChainReceipt; +import com.iexec.commons.poco.task.TaskDescription; +import com.iexec.commons.poco.utils.BytesUtils; import com.iexec.core.chain.IexecHubService; import com.iexec.core.chain.Web3jService; import com.iexec.core.configuration.CronConfiguration; @@ -30,21 +32,24 @@ import com.iexec.core.task.TaskStatus; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.*; +import org.springframework.test.util.ReflectionTestUtils; -import java.util.Arrays; +import java.math.BigInteger; import java.util.Collections; import static com.iexec.common.replicate.ReplicateStatus.*; import static com.iexec.common.replicate.ReplicateStatusModifier.WORKER; import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.when; class ContributionUnnotifiedDetectorTests { private final static String CHAIN_TASK_ID = "chainTaskId"; private final static String WALLET_ADDRESS = "0x1"; - private final static int DETECTOR_PERIOD = 1000; @Mock private TaskService taskService; @@ -68,9 +73,23 @@ class ContributionUnnotifiedDetectorTests { @BeforeEach void init() { MockitoAnnotations.openMocks(this); + ReflectionTestUtils.setField(contributionDetector, "detectorRate", 1000); + when(iexecHubService.getTaskDescription(anyString())).thenReturn(TaskDescription.builder() + .trust(BigInteger.ONE) + .isTeeTask(true) + .callback("0x1") + .build()); + } + + private Replicate getReplicateWithStatus(ReplicateStatus replicateStatus) { + Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); + ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder() + .modifier(WORKER).status(replicateStatus).build(); + replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + return replicate; } - // region Detector aggregator + // region detectOnChainChanges /** * When running {@link ContributionUnnotifiedDetector#detectOnChainChanges} 10 times, @@ -85,10 +104,7 @@ void shouldDetectBothChangesOnChain() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTING).modifier(WORKER).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - + Replicate replicate = getReplicateWithStatus(CONTRIBUTING); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isContributed(any(), any())).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -107,18 +123,14 @@ void shouldDetectBothChangesOnChain() { // endregion - //region detectOnchainDoneWhenOffchainOngoing (CONTRIBUTING) + // region detectOnchainDoneWhenOffchainOngoing (CONTRIBUTING) @Test - void shouldDetectUnNotifiedContributedAfterContributing() { + void shouldDetectMissedUpdateSinceOffChainOngoing() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); - when(cronConfiguration.getContribute()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(CONTRIBUTING); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isContributed(any(), any())).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -132,39 +144,31 @@ void shouldDetectUnNotifiedContributedAfterContributing() { } @Test - void shouldDetectUnNotifiedContributedAfterContributingSinceBeforeContributing() { + void shouldNotDetectMissedUpdateSinceNotOffChainOngoing() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(COMPUTED).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); - when(cronConfiguration.getContribute()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(COMPUTED); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isContributed(any(), any())).thenReturn(true); contributionDetector.detectOnchainDoneWhenOffchainOngoing(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } @Test - void shouldNotDetectUnNotifiedContributedAfterContributingSinceNotContributedOnChain() { + void shouldNotDetectMissedUpdateSinceNotOnChainDone() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); - // when(cronConfiguration.getContribute()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(CONTRIBUTING); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isContributed(any(), any())).thenReturn(false); contributionDetector.detectOnchainDoneWhenOffchainOngoing(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } @@ -172,16 +176,13 @@ void shouldNotDetectUnNotifiedContributedAfterContributingSinceNotContributedOnC // region detectOnchainDone (CONTRIBUTED) - @Test - void shouldDetectUnNotifiedContributed1() { + @ParameterizedTest + @EnumSource(value = ReplicateStatus.class, names = {"COMPUTED", "CONTRIBUTING"}) + void shouldDetectMissedUpdateSinceOnChainDoneNotOffChainDone(ReplicateStatus replicateStatus) { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); - when(cronConfiguration.getContribute()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(replicateStatus); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isContributed(any(), any())).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -197,44 +198,38 @@ void shouldDetectUnNotifiedContributed1() { } @Test - void shouldDetectUnNotifiedContributed2() { + void shouldNotDetectMissedUpdateSinceOnChainDoneAndOffChainDone() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); - when(cronConfiguration.getContribute()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(CONTRIBUTED); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isContributed(any(), any())).thenReturn(true); - when(web3jService.getLatestBlockNumber()).thenReturn(11L); - when(iexecHubService.getContributionBlock(anyString(), anyString(), anyLong())).thenReturn(ChainReceipt.builder() - .blockNumber(10L) - .txHash("0xabcef") - .build()); - contributionDetector.detectOnchainDone(); - Mockito.verify(replicatesService, Mockito.times(1))//Missed CONTRIBUTING & CONTRIBUTED + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } @Test - void shouldNotDetectUnNotifiedContributedSinceContributed() { + void shouldNotDetectMissedUpdateSinceOnChainDoneAndEligibleToContributeAndFinalize() { + when(iexecHubService.getTaskDescription(CHAIN_TASK_ID)).thenReturn( + TaskDescription.builder().trust(BigInteger.ONE).isTeeTask(true).callback(BytesUtils.EMPTY_ADDRESS).build()); Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTED).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task)); - when(cronConfiguration.getContribute()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(CONTRIBUTING); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isContributed(any(), any())).thenReturn(true); + when(web3jService.getLatestBlockNumber()).thenReturn(11L); + when(iexecHubService.getContributionBlock(anyString(), anyString(), anyLong())).thenReturn(ChainReceipt.builder() + .blockNumber(10L) + .txHash("0xabcef") + .build()); + contributionDetector.detectOnchainDone(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } diff --git a/src/test/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetectorTests.java b/src/test/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetectorTests.java index 5809dc75..5b48741f 100644 --- a/src/test/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetectorTests.java +++ b/src/test/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetectorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import com.iexec.common.replicate.ReplicateStatusDetails; import com.iexec.common.replicate.ReplicateStatusUpdate; import com.iexec.commons.poco.chain.ChainReceipt; +import com.iexec.commons.poco.task.TaskDescription; import com.iexec.core.chain.IexecHubService; import com.iexec.core.chain.Web3jService; import com.iexec.core.configuration.CronConfiguration; @@ -30,20 +31,23 @@ import com.iexec.core.task.TaskStatus; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.*; +import org.springframework.test.util.ReflectionTestUtils; import java.util.Collections; import static com.iexec.common.replicate.ReplicateStatus.*; import static com.iexec.common.replicate.ReplicateStatusModifier.WORKER; import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.when; class RevealUnnotifiedDetectorTests { private final static String CHAIN_TASK_ID = "chainTaskId"; private final static String WALLET_ADDRESS = "0x1"; - private final static int DETECTOR_PERIOD = 1000; @Mock private TaskService taskService; @@ -67,9 +71,19 @@ class RevealUnnotifiedDetectorTests { @BeforeEach void init() { MockitoAnnotations.openMocks(this); + ReflectionTestUtils.setField(revealDetector, "detectorRate", 1000); + when(iexecHubService.getTaskDescription(anyString())).thenReturn(TaskDescription.builder().build()); } - // region Detector aggregator + private Replicate getReplicateWithStatus(ReplicateStatus replicateStatus) { + Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); + ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder() + .modifier(WORKER).status(replicateStatus).build(); + replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); + return replicate; + } + + // region detectOnChainChanges /** * When running {@link RevealUnnotifiedDetector#detectOnChainChanges} 10 times, @@ -84,10 +98,7 @@ void shouldDetectBothChangesOnChain() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); when(taskService.findByCurrentStatus(TaskStatus.getWaitingRevealStatuses())).thenReturn(Collections.singletonList(task)); - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(REVEALING).modifier(WORKER).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - + Replicate replicate = getReplicateWithStatus(REVEALING); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(any(), any())).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -106,18 +117,14 @@ void shouldDetectBothChangesOnChain() { // endregion - //region detectOnchainDoneWhenOffchainOngoing (REVEALING) + // region detectOnchainDoneWhenOffchainOngoing (REVEALING) @Test - void shouldDetectUnNotifiedRevealedAfterRevealing() { + void shouldDetectMissedUpdateSinceOffChainOngoing() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); when(taskService.findByCurrentStatus(TaskStatus.getWaitingRevealStatuses())).thenReturn(Collections.singletonList(task)); - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(REVEALING).modifier(WORKER).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - - when(cronConfiguration.getReveal()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(REVEALING); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(any(), any())).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -133,37 +140,30 @@ void shouldDetectUnNotifiedRevealedAfterRevealing() { } @Test - void shouldDetectUnNotifiedRevealedAfterRevealingSinceBeforeRevealing() { + void shouldNotDetectMissedUpdateSinceNotOffChainOngoing() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); when(taskService.findByCurrentStatus(TaskStatus.getWaitingRevealStatuses())).thenReturn(Collections.singletonList(task)); - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().modifier(WORKER).status(CONTRIBUTING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - - when(cronConfiguration.getReveal()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(CONTRIBUTING); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(any(), any())).thenReturn(true); revealDetector.detectOnchainDoneWhenOffchainOngoing(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } @Test - void shouldNotDetectUnNotifiedRevealedAfterRevealingSinceNotRevealedOnChain() { + void shouldNotDetectMissedUpdateSinceNotOnChainDone() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); when(taskService.findByCurrentStatus(TaskStatus.getWaitingRevealStatuses())).thenReturn(Collections.singletonList(task)); - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().modifier(WORKER).status(REVEALING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - when(cronConfiguration.getReveal()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(REVEALING); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(any(), any())).thenReturn(false); revealDetector.detectOnchainDoneWhenOffchainOngoing(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } @@ -171,16 +171,13 @@ void shouldNotDetectUnNotifiedRevealedAfterRevealingSinceNotRevealedOnChain() { // region detectOnchainDone (REVEALED) - @Test - void shouldDetectUnNotifiedRevealed1() { + @ParameterizedTest + @EnumSource(value = ReplicateStatus.class, names = {"CONTRIBUTED", "REVEALING"}) + void shouldDetectMissedUpdateSinceOnChainDoneNotOffChainDone(ReplicateStatus replicateStatus) { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); when(taskService.findByCurrentStatus(TaskStatus.getWaitingRevealStatuses())).thenReturn(Collections.singletonList(task)); - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().modifier(WORKER).status(REVEALING).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - - when(cronConfiguration.getReveal()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(replicateStatus); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(any(), any())).thenReturn(true); when(web3jService.getLatestBlockNumber()).thenReturn(11L); @@ -196,49 +193,16 @@ void shouldDetectUnNotifiedRevealed1() { } @Test - void shouldDetectUnNotifiedRevealed2() { - Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); - when(taskService.findByCurrentStatus(TaskStatus.getWaitingRevealStatuses())).thenReturn(Collections.singletonList(task)); - - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusDetails details = new ReplicateStatusDetails(10L); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder() - .modifier(WORKER) - .status(CONTRIBUTED) - .details(details) - .build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - - when(cronConfiguration.getReveal()).thenReturn(DETECTOR_PERIOD); - when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); - when(iexecHubService.isRevealed(any(), any())).thenReturn(true); - when(web3jService.getLatestBlockNumber()).thenReturn(11L); - when(iexecHubService.getRevealBlock(anyString(), anyString(), anyLong())).thenReturn(ChainReceipt.builder() - .blockNumber(10L) - .txHash("0xabcef") - .build()); - - revealDetector.detectOnchainDone(); - - Mockito.verify(replicatesService, Mockito.times(1))//Missed REVEALING & REVEALED - .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); - } - - @Test - void shouldNotDetectUnNotifiedRevealedSinceRevealed() { + void shouldNotDetectMissedUpdateSinceOnChainDoneAndOffChainDone() { Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build(); when(taskService.findByCurrentStatus(TaskStatus.getWaitingRevealStatuses())).thenReturn(Collections.singletonList(task)); - Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID); - ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().modifier(WORKER).status(REVEALED).build(); - replicate.setStatusUpdateList(Collections.singletonList(statusUpdate)); - - when(cronConfiguration.getReveal()).thenReturn(DETECTOR_PERIOD); + Replicate replicate = getReplicateWithStatus(REVEALED); when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate)); when(iexecHubService.isRevealed(any(), any())).thenReturn(true); revealDetector.detectOnchainDone(); - Mockito.verify(replicatesService, Mockito.times(0)) + Mockito.verify(replicatesService, never()) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); }