Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missed replicate status update detectors to avoid false positives #653

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ All notable changes to this project will be documented in this file.
- Use less MongoDB calls when updating a task to a final status. (#649)
- Save contribution and result updload replicate data when `CONTRIBUTE_AND_FINALIZE_DONE`. (#651)
- Fix potential `NullPointerException` during first worker replicate request. (#652)
- Fix missed replicate status update detectors to avoid false positives by mixing `CONTRIBUTE-REVEAL-FINALIZE` and `CONTRIBUTE_AND_FINALIZE` workflows. (#653)

### Dependency Upgrades

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2023-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,8 +30,7 @@

import java.util.List;

import static com.iexec.common.replicate.ReplicateStatus.WORKER_LOST;
import static com.iexec.common.replicate.ReplicateStatus.getMissingStatuses;
import static com.iexec.common.replicate.ReplicateStatus.*;

@Slf4j
public abstract class UnnotifiedAbstractDetector {
Expand Down Expand Up @@ -95,21 +94,11 @@ void detectOnchainDoneWhenOffchainOngoing() {
this.onchainDone, this.offchainOngoing, this.detectorRate);

for (Task task : taskService.findByCurrentStatus(detectWhenOffChainTaskStatuses)) {
for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) {
final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();
if (lastRelevantStatus != offchainOngoing) {
continue;
}

final boolean statusTrueOnChain = detectStatusReachedOnChain(
task.getChainTaskId(), replicate.getWalletAddress());

if (statusTrueOnChain) {
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
lastRelevantStatus, onchainDone, task.getChainTaskId());
updateReplicateStatuses(task, replicate);
}
}
replicatesService.getReplicates(task.getChainTaskId()).stream()
.filter(replicate -> replicate.getLastRelevantStatus() == offchainOngoing)
.filter(this::checkDetectionIsValid)
.filter(this::detectStatusReachedOnChain)
.forEach(replicate -> updateReplicateStatuses(task, replicate));
}
}

Expand All @@ -123,33 +112,40 @@ void detectOnchainDoneWhenOffchainOngoing() {
public void detectOnchainDone() {
log.debug("Detect onchain {} [retryIn:{}]", onchainDone, this.detectorRate * LESS_OFTEN_DETECTOR_FREQUENCY);
for (Task task : taskService.findByCurrentStatus(detectWhenOffChainTaskStatuses)) {
for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) {
final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();

if (lastRelevantStatus == offchainDone) {
continue;
}

final boolean statusTrueOnChain = detectStatusReachedOnChain(
task.getChainTaskId(), replicate.getWalletAddress());

if (statusTrueOnChain) {
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
lastRelevantStatus, onchainDone, task.getChainTaskId());
updateReplicateStatuses(task, replicate);
}
}
replicatesService.getReplicates(task.getChainTaskId()).stream()
.filter(replicate -> replicate.getLastRelevantStatus() != offchainDone)
.filter(this::checkDetectionIsValid)
.filter(this::detectStatusReachedOnChain)
.forEach(replicate -> updateReplicateStatuses(task, replicate));
}
}

/**
* Checks replicate eligibility to a detection against an {@code offchainDone} status.
* <p>
* All replicates are eligible to detection against {@code REVEALED}, but not against {@code CONTRIBUTED} or
* {@code CONTRIBUTE_AND_FINALIZED}.
*
* @param replicate The replicate to check
* @return {@literal true} if the replicate is eligible, {@literal false} otherwise
*/
private boolean checkDetectionIsValid(Replicate replicate) {
final boolean isEligibleToContributeAndFinalize = iexecHubService.getTaskDescription(replicate.getChainTaskId())
.isEligibleToContributeAndFinalize();
return offchainDone == REVEALED
|| (!isEligibleToContributeAndFinalize && offchainDone == CONTRIBUTED)
|| (isEligibleToContributeAndFinalize && offchainDone == CONTRIBUTE_AND_FINALIZE_DONE);
}

/**
* Checks if {@code onchainDone} status has been reached on blockchain network.
*
* @param chainTaskId ID of on-chain task
* @param walletAddress Address of a worker working on the current task.
* @return
* @param replicate Replicate whose on-chain status will be checked
* @return {@literal true} if given status has been found on-chain, {@literal false} otherwise.
*/
private boolean detectStatusReachedOnChain(String chainTaskId, String walletAddress) {
private boolean detectStatusReachedOnChain(Replicate replicate) {
final String chainTaskId = replicate.getChainTaskId();
final String walletAddress = replicate.getWalletAddress();
switch (onchainDone) {
case CONTRIBUTED:
return iexecHubService.isContributed(chainTaskId, walletAddress);
Expand All @@ -171,14 +167,13 @@ private boolean detectStatusReachedOnChain(String chainTaskId, String walletAddr
private void updateReplicateStatuses(Task task, Replicate replicate) {
final String chainTaskId = task.getChainTaskId();
final long initBlockNumber = task.getInitializationBlockNumber();

final ReplicateStatus retrieveFrom = replicate.getCurrentStatus().equals(WORKER_LOST)
? replicate.getLastButOneStatus()
: replicate.getCurrentStatus();
final List<ReplicateStatus> statusesToUpdate = getMissingStatuses(retrieveFrom, offchainDone);

final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();
final List<ReplicateStatus> statusesToUpdate = getMissingStatuses(lastRelevantStatus, offchainDone);
final String wallet = replicate.getWalletAddress();

log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
lastRelevantStatus, onchainDone, task.getChainTaskId());

for (ReplicateStatus statusToUpdate : statusesToUpdate) {
// add details to the update if needed
ReplicateStatusDetails details = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
/*
* Copyright 2023-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.iexec.core.detector.replicate;

import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusDetails;
import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.commons.poco.chain.ChainReceipt;
import com.iexec.commons.poco.task.TaskDescription;
import com.iexec.commons.poco.utils.BytesUtils;
import com.iexec.core.chain.IexecHubService;
import com.iexec.core.chain.Web3jService;
import com.iexec.core.configuration.CronConfiguration;
Expand All @@ -14,15 +32,18 @@
import com.iexec.core.task.TaskStatus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.*;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.Arrays;
import java.math.BigInteger;
import java.util.Collections;

import static com.iexec.common.replicate.ReplicateStatus.*;
import static com.iexec.common.replicate.ReplicateStatusModifier.WORKER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;

class ContributionAndFinalizationUnnotifiedDetectorTests {
Expand Down Expand Up @@ -51,9 +72,23 @@ class ContributionAndFinalizationUnnotifiedDetectorTests {
@BeforeEach
void init() {
MockitoAnnotations.openMocks(this);
ReflectionTestUtils.setField(detector, "detectorRate", 1000);
when(iexecHubService.getTaskDescription(anyString())).thenReturn(TaskDescription.builder()
.trust(BigInteger.ONE)
.isTeeTask(true)
.callback(BytesUtils.EMPTY_ADDRESS)
.build());
}

private Replicate getReplicateWithStatus(ReplicateStatus replicateStatus) {
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder()
.modifier(WORKER).status(replicateStatus).build();
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
return replicate;
}

// region Detector aggregator
// region detectOnChainChanges

/**
* When running {@link ContributionAndFinalizationUnnotifiedDetector#detectOnChainChanges} 10 times,
Expand All @@ -68,10 +103,7 @@ void shouldDetectBothChangesOnChain() {
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));

Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).modifier(WORKER).build();
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));

Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING);
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
Expand All @@ -97,17 +129,14 @@ void shouldDetectBothChangesOnChain() {

// endregion

//region detectOnchainDoneWhenOffchainOngoing (ContributeAndFinalizeOngoing)
// region detectOnchainDoneWhenOffchainOngoing (ContributeAndFinalizeOngoing)

@Test
void shouldDetectUnNotifiedContributeAndFinalizeDoneAfterContributeAndFinalizeOngoing() {
void shouldDetectMissedUpdateSinceOffChainOngoing() {
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));

Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build();
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));

Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING);
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
Expand All @@ -130,53 +159,45 @@ void shouldDetectUnNotifiedContributeAndFinalizeDoneAfterContributeAndFinalizeOn
}

@Test
void shouldDetectUnNotifiedContributeAndFinalizeDoneSinceBeforeContributeAndFinalizeOngoing() {
void shouldNotDetectMissedUpdateSinceNotOffChainOngoing() {
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));

Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(COMPUTED).build();
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));

Replicate replicate = getReplicateWithStatus(COMPUTED);
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);

detector.detectOnchainDoneWhenOffchainOngoing();

Mockito.verify(replicatesService, Mockito.times(0))
Mockito.verify(replicatesService, never())
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
}

@Test
void shouldNotDetectUnNotifiedContributeAndFinalizeDoneSinceNotFinalizedOnChain() {
void shouldNotDetectMissedUpdateSinceNotOnChainDone() {
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));

Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build();
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));

Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING);
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(false);
detector.detectOnchainDoneWhenOffchainOngoing();

Mockito.verify(replicatesService, Mockito.times(0))
Mockito.verify(replicatesService, never())
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
}

// endregion

//region detectOnchainDone (REVEALED)
// region detectOnchainDone (REVEALED)

@Test
void shouldDetectUnNotifiedContributeAndFinalizeOngoing() {
@ParameterizedTest
@EnumSource(value = ReplicateStatus.class, names = {"COMPUTED", "CONTRIBUTE_AND_FINALIZE_ONGOING"})
void shouldDetectMissedUpdateSinceOnChainDoneNotOffChainDone(ReplicateStatus replicateStatus) {
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));

Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build();
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));

Replicate replicate = getReplicateWithStatus(replicateStatus);
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
Expand All @@ -197,19 +218,38 @@ void shouldDetectUnNotifiedContributeAndFinalizeOngoing() {
}

@Test
void shouldNotDetectUnNotifiedContributedSinceContributeAndFinalizeDone() {
void shouldNotDetectMissedUpdateSinceOnChainDoneAndOffChainDone() {
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));

Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_DONE).build();
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));

Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_DONE);
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
detector.detectOnchainDone();

Mockito.verify(replicatesService, Mockito.times(0))
Mockito.verify(replicatesService, never())
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
}

@Test
void shouldNotDetectMissedUpdateSinceOnChainDoneAndNotEligibleToContributeAndFinalize() {
when(iexecHubService.getTaskDescription(CHAIN_TASK_ID)).thenReturn(
TaskDescription.builder().trust(BigInteger.ONE).isTeeTask(true).callback("0x2").build());
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));

Replicate replicate = getReplicateWithStatus(CONTRIBUTING);
when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate));
when(iexecHubService.isContributed(any(), any())).thenReturn(true);
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
when(iexecHubService.getContributionBlock(anyString(), anyString(), anyLong())).thenReturn(ChainReceipt.builder()
.blockNumber(10L)
.txHash("0xabcef")
.build());

detector.detectOnchainDone();

Mockito.verify(replicatesService, never())
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
}

Expand Down
Loading