diff --git a/CHANGELOG.md b/CHANGELOG.md index 400a91f24..1c7e96a73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. +## [[7.3.1]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v7.3.1) 2023-02-17 + +* Subscribe only to deal events targeting a specific workerpool. + ## [[7.3.0]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v7.3.0) 2022-12-18 * Add endpoint to allow health checks. diff --git a/README.md b/README.md index afe03609b..89a77d901 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,12 @@ You can configure the _iExec Core Scheduler_ with the following properties: | `IEXEC_LOGS_PURGE_RATE_IN_DAYS` | Interval in days between 2 executions of the purge mechanism. | Positive integer | `1` | | `IEXEC_LOGS_AVAILABILITY_PERIOD_IN_DAYS` | Number of days to keep logs of past tasks. | Positive integer | `3` | +If it is not the first startup of the _iExec Core Scheduler_ and if it received deals previously, +the _MongoDB_ instance will contain a __configuration Collection__ in the __iexec Database__. +The value stored in this document takes the precedence over the `IEXEC_START_BLOCK_NUMBER` configuration parameter. +To enforce deal observation starting from the `IEXEC_START_BLOCK_NUMBER` value, the aforementioned document has to be deleted in the _MongoDB_. +All deals prior to the `IEXEC_START_BLOCK_NUMBER` will then be ignored. + A more exhaustive documentation is available on [the official documentation of iExec](https://docs.iex.ec/). ## Health checks diff --git a/build.gradle b/build.gradle index e05fb628b..1b345c91e 100644 --- a/build.gradle +++ b/build.gradle @@ -110,9 +110,6 @@ dependencies { testImplementation 'org.testcontainers:testcontainers:1.16.2' testImplementation 'org.testcontainers:junit-jupiter:1.16.2' testImplementation 'org.testcontainers:mongodb:1.16.2' - - // slf4j-test - testImplementation 'uk.org.lidalia:slf4j-test:1.2.0' } jar { @@ -126,11 +123,6 @@ springBoot { test { useJUnitPlatform() - - // To avoid warnings on multiple SLF4J bindings, - // let's remove the Logback implementation - // and keep only SLF4j Test. - classpath = classpath.filter {!it.name.contains('logback-classic')} } task itest { diff --git a/gradle.properties b/gradle.properties index 70668135b..e62901e8c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=7.3.0 +version=7.3.1 iexecCommonVersion=6.2.0 iexecBlockchainAdapterVersion=7.3.0 iexecResultVersion=7.3.0 diff --git a/src/main/java/com/iexec/core/chain/DealEvent.java b/src/main/java/com/iexec/core/chain/DealEvent.java index 78f304be9..afa9be86d 100644 --- a/src/main/java/com/iexec/core/chain/DealEvent.java +++ b/src/main/java/com/iexec/core/chain/DealEvent.java @@ -18,30 +18,20 @@ import com.iexec.common.contract.generated.IexecHubContract; import com.iexec.common.utils.BytesUtils; -import lombok.Builder; -import lombok.Data; import lombok.Getter; -import lombok.NoArgsConstructor; import java.math.BigInteger; -@Data @Getter -@NoArgsConstructor -@Builder public class DealEvent { - private String chainDealId; - private BigInteger blockNumber; - - public DealEvent(String chainDealId, BigInteger blockNumber) { - this.chainDealId = chainDealId; - this.blockNumber = blockNumber; - } + private final String chainDealId; + private final BigInteger blockNumber; public DealEvent(IexecHubContract.SchedulerNoticeEventResponse schedulerNoticeEventResponse) { this.chainDealId = BytesUtils.bytesToString(schedulerNoticeEventResponse.dealid); - this.blockNumber = schedulerNoticeEventResponse.log.getBlockNumber(); + this.blockNumber = schedulerNoticeEventResponse.log.getBlockNumber() != null ? + schedulerNoticeEventResponse.log.getBlockNumber() : BigInteger.ZERO; } } diff --git a/src/main/java/com/iexec/core/chain/DealWatcherService.java b/src/main/java/com/iexec/core/chain/DealWatcherService.java index c1105eaab..61e242b8e 100644 --- a/src/main/java/com/iexec/core/chain/DealWatcherService.java +++ b/src/main/java/com/iexec/core/chain/DealWatcherService.java @@ -17,28 +17,37 @@ package com.iexec.core.chain; import com.iexec.common.chain.ChainDeal; +import com.iexec.common.contract.generated.IexecHubContract; import com.iexec.common.utils.BytesUtils; import com.iexec.core.configuration.ConfigurationService; import com.iexec.core.task.Task; import com.iexec.core.task.TaskService; import com.iexec.core.task.event.TaskCreatedEvent; import io.reactivex.disposables.Disposable; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.web3j.abi.EventEncoder; +import org.web3j.protocol.core.DefaultBlockParameter; +import org.web3j.protocol.core.DefaultBlockParameterName; +import org.web3j.protocol.core.methods.request.EthFilter; +import org.web3j.utils.Numeric; import java.math.BigInteger; import java.util.Optional; +import static com.iexec.common.contract.generated.IexecHubContract.SCHEDULERNOTICE_EVENT; + @Slf4j @Service public class DealWatcherService { + private final ChainConfig chainConfig; private final IexecHubService iexecHubService; private final ConfigurationService configurationService; private final ApplicationEventPublisher applicationEventPublisher; @@ -46,13 +55,22 @@ public class DealWatcherService { private final Web3jService web3jService; // internal variables private Disposable dealEventSubscriptionReplay; + @Getter + private BigInteger latestBlockNumberWithDeal = BigInteger.ZERO; + @Getter + private long dealEventsCount = 0; + @Getter + private long dealsCount = 0; + @Getter + private long replayDealsCount = 0; - @Autowired - public DealWatcherService(IexecHubService iexecHubService, + public DealWatcherService(ChainConfig chainConfig, + IexecHubService iexecHubService, ConfigurationService configurationService, ApplicationEventPublisher applicationEventPublisher, TaskService taskService, Web3jService web3jService) { + this.chainConfig = chainConfig; this.iexecHubService = iexecHubService; this.configurationService = configurationService; this.applicationEventPublisher = applicationEventPublisher; @@ -80,8 +98,10 @@ public void run() { */ Disposable subscribeToDealEventFromOneBlockToLatest(BigInteger from) { log.info("Watcher DealEvent started [from:{}, to:{}]", from, "latest"); - return iexecHubService.getDealEventObservableToLatest(from) - .subscribe(dealEvent -> dealEvent.ifPresent(this::onDealEvent)); + EthFilter filter = createDealEventFilter(from, null); + return iexecHubService.getDealEventObservable(filter) + .map(this::schedulerNoticeToDealEvent) + .subscribe(dealEvent -> dealEvent.ifPresent(event -> onDealEvent(event, "start"))); } /** @@ -90,16 +110,20 @@ Disposable subscribeToDealEventFromOneBlockToLatest(BigInteger from) { * * @param dealEvent */ - private void onDealEvent(DealEvent dealEvent) { + private void onDealEvent(DealEvent dealEvent, String watcher) { + if ("replay".equals(watcher)) { + replayDealsCount++; + } else { + dealsCount++; + } String dealId = dealEvent.getChainDealId(); BigInteger dealBlock = dealEvent.getBlockNumber(); - log.info("Received deal [dealId:{}, block:{}]", dealId, - dealBlock); - if (dealBlock == null || dealBlock.equals(BigInteger.ZERO)){ + log.info("Received deal [dealId:{}, block:{}, watcher: {}]", + dealId, dealBlock, watcher); + if (dealBlock.equals(BigInteger.ZERO)) { log.warn("Deal block number is empty, fetching later blockchain " + "events will be more expensive [chainDealId:{}, dealBlock:{}, " + "lastBlock:{}]", dealId, dealBlock, web3jService.getLatestBlockNumber()); - dealEvent.setBlockNumber(BigInteger.ZERO); } this.handleDeal(dealEvent); if (configurationService.getLastSeenBlockWithDeal().compareTo(dealBlock) < 0) { @@ -175,7 +199,37 @@ void replayDealEvent() { private Disposable subscribeToDealEventInRange(BigInteger from, BigInteger to) { log.info("Replay Watcher DealEvent started [from:{}, to:{}]", from, (to == null) ? "latest" : to); - return iexecHubService.getDealEventObservable(from, to) - .subscribe(dealEvent -> dealEvent.ifPresent(this::onDealEvent)); + EthFilter filter = createDealEventFilter(from, to); + return iexecHubService.getDealEventObservable(filter) + .map(this::schedulerNoticeToDealEvent) + .subscribe(dealEvent -> dealEvent.ifPresent(event -> onDealEvent(event, "replay"))); + } + + EthFilter createDealEventFilter(BigInteger from, BigInteger to) { + final DefaultBlockParameter fromBlock = DefaultBlockParameter.valueOf(from); + final DefaultBlockParameter toBlock = to == null + ? DefaultBlockParameterName.LATEST + : DefaultBlockParameter.valueOf(to); + final EthFilter filter = new EthFilter(fromBlock, toBlock, chainConfig.getHubAddress()); + final BigInteger poolAddressBigInt = Numeric.toBigInt(chainConfig.getPoolAddress()); + filter.addSingleTopic(EventEncoder.encode(SCHEDULERNOTICE_EVENT)); + filter.addSingleTopic(Numeric.toHexStringWithPrefixZeroPadded(poolAddressBigInt, 64)); + return filter; + } + + Optional schedulerNoticeToDealEvent(IexecHubContract.SchedulerNoticeEventResponse schedulerNotice) { + dealEventsCount++; + BigInteger noticeBlockNumber = schedulerNotice.log.getBlockNumber(); + if (latestBlockNumberWithDeal.compareTo(noticeBlockNumber) < 0) { + latestBlockNumberWithDeal = noticeBlockNumber; + } + log.info("Received new deal [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]", + schedulerNotice.log.getBlockNumber(), BytesUtils.bytesToString(schedulerNotice.dealid), dealEventsCount); + if (schedulerNotice.workerpool.equalsIgnoreCase(chainConfig.getPoolAddress())) { + return Optional.of(new DealEvent(schedulerNotice)); + } + log.warn("This deal event should not have been received [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]", + schedulerNotice.log.getBlockNumber(), BytesUtils.bytesToString(schedulerNotice.dealid), dealEventsCount); + return Optional.empty(); } } diff --git a/src/main/java/com/iexec/core/chain/IexecHubService.java b/src/main/java/com/iexec/core/chain/IexecHubService.java index 97f362f02..0dfe9b62d 100644 --- a/src/main/java/com/iexec/core/chain/IexecHubService.java +++ b/src/main/java/com/iexec/core/chain/IexecHubService.java @@ -24,9 +24,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.web3j.protocol.core.DefaultBlockParameter; -import org.web3j.protocol.core.DefaultBlockParameterName; import org.web3j.protocol.core.RemoteCall; +import org.web3j.protocol.core.methods.request.EthFilter; import org.web3j.protocol.core.methods.response.BaseEventResponse; import org.web3j.protocol.core.methods.response.TransactionReceipt; @@ -52,7 +51,6 @@ public class IexecHubService extends IexecHubAbstractService { private final ThreadPoolExecutor executor; private final CredentialsService credentialsService; private final Web3jService web3jService; - private final String poolAddress; @Autowired public IexecHubService(CredentialsService credentialsService, @@ -65,7 +63,6 @@ public IexecHubService(CredentialsService credentialsService, this.credentialsService = credentialsService; this.web3jService = web3jService; this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); - this.poolAddress = chainConfig.getPoolAddress(); if (!hasEnoughGas()) { System.exit(0); } @@ -366,24 +363,8 @@ private long getWaitingTransactionCount() { return executor.getTaskCount() - executor.getCompletedTaskCount(); } - - Flowable> getDealEventObservableToLatest(BigInteger from) { - return getDealEventObservable(from, null); - } - - Flowable> getDealEventObservable(BigInteger from, BigInteger to) { - DefaultBlockParameter fromBlock = DefaultBlockParameter.valueOf(from); - DefaultBlockParameter toBlock = DefaultBlockParameterName.LATEST; - if (to != null) { - toBlock = DefaultBlockParameter.valueOf(to); - } - return getHubContract().schedulerNoticeEventFlowable(fromBlock, toBlock).map(schedulerNotice -> { - - if (schedulerNotice.workerpool.equalsIgnoreCase(poolAddress)) { - return Optional.of(new DealEvent(schedulerNotice)); - } - return Optional.empty(); - }); + Flowable getDealEventObservable(EthFilter filter) { + return getHubContract().schedulerNoticeEventFlowable(filter); } public boolean hasEnoughGas() { diff --git a/src/main/java/com/iexec/core/metric/MetricController.java b/src/main/java/com/iexec/core/metric/MetricController.java index 5c9c7e6f2..215758f69 100644 --- a/src/main/java/com/iexec/core/metric/MetricController.java +++ b/src/main/java/com/iexec/core/metric/MetricController.java @@ -16,10 +16,8 @@ package com.iexec.core.metric; - import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import static org.springframework.http.ResponseEntity.ok; @@ -27,15 +25,13 @@ @RestController public class MetricController { - - private MetricService metricService; + private final MetricService metricService; public MetricController(MetricService metricService) { this.metricService = metricService; } - - @RequestMapping(method = RequestMethod.GET, path = "/metrics") + @GetMapping("/metrics") public ResponseEntity getPlatformMetric() { return ok(metricService.getPlatformMetrics()); } diff --git a/src/main/java/com/iexec/core/metric/MetricService.java b/src/main/java/com/iexec/core/metric/MetricService.java index 3e3a31e86..b297d5a81 100644 --- a/src/main/java/com/iexec/core/metric/MetricService.java +++ b/src/main/java/com/iexec/core/metric/MetricService.java @@ -16,6 +16,7 @@ package com.iexec.core.metric; +import com.iexec.core.chain.DealWatcherService; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import com.iexec.core.worker.WorkerService; @@ -24,12 +25,14 @@ @Service public class MetricService { - + private final DealWatcherService dealWatcherService; private final WorkerService workerService; private final TaskService taskService; - public MetricService(WorkerService workerService, + public MetricService(DealWatcherService dealWatcherService, + WorkerService workerService, TaskService taskService) { + this.dealWatcherService = dealWatcherService; this.workerService = workerService; this.taskService = taskService; } @@ -42,6 +45,10 @@ public PlatformMetric getPlatformMetrics() { .aliveTotalGpu(workerService.getAliveTotalGpu()) .aliveAvailableGpu(workerService.getAliveAvailableGpu()) .completedTasks(taskService.findByCurrentStatus(TaskStatus.COMPLETED).size()) + .dealEventsCount(dealWatcherService.getDealEventsCount()) + .dealsCount(dealWatcherService.getDealsCount()) + .replayDealsCount(dealWatcherService.getReplayDealsCount()) + .latestBlockNumberWithDeal(dealWatcherService.getLatestBlockNumberWithDeal()) .build(); } diff --git a/src/main/java/com/iexec/core/metric/PlatformMetric.java b/src/main/java/com/iexec/core/metric/PlatformMetric.java index 675331fd8..77d9c0c61 100644 --- a/src/main/java/com/iexec/core/metric/PlatformMetric.java +++ b/src/main/java/com/iexec/core/metric/PlatformMetric.java @@ -20,6 +20,8 @@ import lombok.Builder; import lombok.Data; +import java.math.BigInteger; + @Data @AllArgsConstructor @Builder @@ -31,5 +33,9 @@ public class PlatformMetric { private int aliveTotalGpu; private int aliveAvailableGpu; private int completedTasks; + private long dealEventsCount; + private long dealsCount; + private long replayDealsCount; + private BigInteger latestBlockNumberWithDeal; } diff --git a/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java b/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java index f6d9c893d..c86a48816 100644 --- a/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java +++ b/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java @@ -20,6 +20,8 @@ import com.iexec.common.chain.ChainCategory; import com.iexec.common.chain.ChainDeal; import com.iexec.common.chain.DealParams; +import com.iexec.common.contract.generated.IexecHubContract; +import com.iexec.common.utils.BytesUtils; import com.iexec.core.configuration.ConfigurationService; import com.iexec.core.task.Task; import com.iexec.core.task.TaskService; @@ -29,6 +31,7 @@ import org.junit.jupiter.api.Test; import org.mockito.*; import org.springframework.context.ApplicationEventPublisher; +import org.web3j.protocol.core.methods.response.Log; import java.math.BigInteger; import java.util.Optional; @@ -38,6 +41,8 @@ class DealWatcherServiceTests { + @Mock + private ChainConfig chainConfig; @Mock private IexecHubService iexecHubService; @@ -53,41 +58,44 @@ class DealWatcherServiceTests { @InjectMocks private DealWatcherService dealWatcherService; + private IexecHubContract.SchedulerNoticeEventResponse createSchedulerNotice(BigInteger noticeBlockNumber) { + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = new IexecHubContract.SchedulerNoticeEventResponse(); + schedulerNotice.workerpool = "0x1"; + schedulerNotice.dealid = "chainDealId".getBytes(); + Log schedulerNoticeLog = new Log(); + schedulerNoticeLog.setBlockNumber(noticeBlockNumber.toString()); + schedulerNotice.log = schedulerNoticeLog; + return schedulerNotice; + } + @BeforeEach void init() { MockitoAnnotations.openMocks(this); + when(chainConfig.getHubAddress()).thenReturn("hubAddress"); + when(chainConfig.getPoolAddress()).thenReturn("0x1"); } @Test void shouldRun() { BigInteger blockNumber = BigInteger.TEN; when(configurationService.getLastSeenBlockWithDeal()).thenReturn(blockNumber); - when(iexecHubService.getDealEventObservableToLatest(blockNumber)) - .thenReturn(Flowable.just(Optional.empty())); - + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.empty()); dealWatcherService.run(); - - Mockito.verify(iexecHubService, Mockito.times(1)) - .getDealEventObservableToLatest(blockNumber); + verify(iexecHubService).getDealEventObservable(any()); } @Test void shouldUpdateLastSeenBlockWhenOneDeal() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - Optional dealEvent = Optional.of(DealEvent - .builder() - .chainDealId("chainDealId") - .blockNumber(blockOfDeal) - .build()); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - when(iexecHubService.getDealEventObservableToLatest(from)).thenReturn(Flowable.just(dealEvent)); + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); - Mockito.verify(configurationService, Mockito.times(1)) - .setLastSeenBlockWithDeal(blockOfDeal); + verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); } @Test @@ -108,31 +116,24 @@ void shouldUpdateLastSeenBlockWhenOneDealAndCreateTask() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - Optional dealEvent = Optional.of(DealEvent.builder() - .chainDealId("chainDealId") - .blockNumber(blockOfDeal) - .build()); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); Task task = new Task(); - when(iexecHubService.getDealEventObservableToLatest(from)).thenReturn(Flowable.just(dealEvent)); - when(iexecHubService.getChainDeal(dealEvent.get().getChainDealId())).thenReturn(Optional.of(chainDeal)); - when(taskService.addTask(any(), Mockito.anyInt(), anyLong(), any(), any(), Mockito.anyInt(), anyLong(), any(), any(), any())) + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); + when(iexecHubService.getChainDeal(BytesUtils.bytesToString(schedulerNotice.dealid))).thenReturn(Optional.of(chainDeal)); + when(iexecHubService.isBeforeContributionDeadline(chainDeal)).thenReturn(true); + when(taskService.addTask(any(), anyInt(), anyLong(), any(), any(), anyInt(), anyLong(), any(), any(), any())) .thenReturn(Optional.of(task)); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - when(iexecHubService.isBeforeContributionDeadline(chainDeal)).thenReturn(true); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(TaskCreatedEvent.class); dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); - Mockito.verify(configurationService, Mockito.times(1)) - .setLastSeenBlockWithDeal(blockOfDeal); - Mockito.verify(applicationEventPublisher, Mockito.times(1)) - .publishEvent(Mockito.any(TaskCreatedEvent.class)); - - Mockito.verify(applicationEventPublisher, Mockito.times(1)) - .publishEvent(argumentCaptor.capture()); + verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); + verify(applicationEventPublisher).publishEvent(any(TaskCreatedEvent.class)); + verify(applicationEventPublisher).publishEvent(argumentCaptor.capture()); assertThat(argumentCaptor.getValue()).isEqualTo(new TaskCreatedEvent(task.getChainTaskId())); } @@ -149,165 +150,112 @@ void shouldUpdateLastSeenBlockWhenOneDealAndNotCreateTaskSinceDealIsExpired() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - Optional dealEvent = Optional.of(DealEvent.builder() - .chainDealId("chainDealId") - .blockNumber(blockOfDeal) - .build()); - - when(iexecHubService.getDealEventObservableToLatest(from)) - .thenReturn(Flowable.just(dealEvent)); - when(iexecHubService.getChainDeal(anyString())) - .thenReturn(Optional.of(chainDeal)); - when(iexecHubService.isBeforeContributionDeadline(chainDeal)) - .thenReturn(false); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); + + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); + when(iexecHubService.getChainDeal(BytesUtils.bytesToString(schedulerNotice.dealid))).thenReturn(Optional.of(chainDeal)); + when(iexecHubService.isBeforeContributionDeadline(chainDeal)).thenReturn(false); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); - verify(configurationService, times(1)) - .setLastSeenBlockWithDeal(blockOfDeal); - verify(applicationEventPublisher, never()) - .publishEvent(any()); - verify(taskService, never()) - .addTask(anyString(), anyInt(), anyLong(), - anyString(), anyString(), anyInt(), anyLong(), - anyString(), any(), any()); + verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); + verifyNoInteractions(taskService, applicationEventPublisher); } @Test void shouldUpdateLastSeenBlockWhenOneDealAndNotCreateTaskSinceBotSizeIsZero() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - Optional dealEvent = Optional.of(DealEvent.builder() - .chainDealId("chainDealId") - .blockNumber(blockOfDeal) - .build()); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); ChainDeal chainDeal = ChainDeal.builder() .botFirst(BigInteger.valueOf(0)) .botSize(BigInteger.valueOf(0)) .build(); - when(iexecHubService.getDealEventObservableToLatest(from)).thenReturn(Flowable.just(dealEvent)); - when(iexecHubService.getChainDeal(dealEvent.get().getChainDealId())).thenReturn(Optional.of(chainDeal)); + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); + when(iexecHubService.getChainDeal(BytesUtils.bytesToString(schedulerNotice.dealid))).thenReturn(Optional.of(chainDeal)); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); - Mockito.verify(configurationService, Mockito.times(1)) - .setLastSeenBlockWithDeal(blockOfDeal); - Mockito.verify(applicationEventPublisher, Mockito.times(0)) - .publishEvent(any()); + verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); + verifyNoInteractions(taskService, applicationEventPublisher); } @Test void shouldUpdateLastSeenBlockWhenOneDealButNotCreateTaskSinceExceptionThrown() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - Optional dealEvent = Optional.of(DealEvent.builder() - .chainDealId("chainDealId") - .blockNumber(blockOfDeal) - .build()); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); ChainDeal chainDeal = ChainDeal.builder() .botFirst(BigInteger.valueOf(0)) .botSize(BigInteger.valueOf(1)) .build(); - - when(iexecHubService.getDealEventObservableToLatest(from)).thenReturn(Flowable.just(dealEvent)); - when(iexecHubService.getChainDeal(dealEvent.get().getChainDealId())).thenReturn(Optional.of(chainDeal)); + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); + when(iexecHubService.getChainDeal(BytesUtils.bytesToString(schedulerNotice.dealid))).thenReturn(Optional.of(chainDeal)); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); - Mockito.verify(configurationService, Mockito.times(1)) - .setLastSeenBlockWithDeal(blockOfDeal); - Mockito.verify(applicationEventPublisher, Mockito.times(0)) - .publishEvent(any()); + verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); + verifyNoInteractions(taskService, applicationEventPublisher); } @Test void shouldUpdateLastSeenBlockTwiceWhenTwoDeals() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal1 = BigInteger.valueOf(3); - Optional dealEvent1 = Optional.of(DealEvent - .builder() - .chainDealId("chainDealId1") - .blockNumber(blockOfDeal1) - .build()); BigInteger blockOfDeal2 = BigInteger.valueOf(5); - Optional dealEvent2 = Optional.of(DealEvent - .builder() - .chainDealId("chainDealId2") - .blockNumber(blockOfDeal2) - .build()); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice1 = createSchedulerNotice(blockOfDeal1); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice2 = createSchedulerNotice(blockOfDeal2); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - when(iexecHubService.getDealEventObservableToLatest(from)).thenReturn(Flowable.just(dealEvent1, dealEvent2)); + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice1, schedulerNotice2)); dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); - Mockito.verify(configurationService, Mockito.times(1)) - .setLastSeenBlockWithDeal(blockOfDeal1); - Mockito.verify(configurationService, Mockito.times(1)) - .setLastSeenBlockWithDeal(blockOfDeal2); + verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal1); + verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal2); } @Test - void shouldNOtUpdateLastSeenBlockWhenReceivingOldMissedDeal() { + void shouldNotUpdateLastSeenBlockWhenReceivingOldMissedDeal() { BigInteger from = BigInteger.valueOf(5); - BigInteger blockOfDeal1 = BigInteger.valueOf(3); - Optional dealEvent1 = Optional.of(DealEvent - .builder() - .chainDealId("chainDealId1") - .blockNumber(blockOfDeal1) - .build()); + BigInteger blockOfDeal = BigInteger.valueOf(3); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - when(iexecHubService.getDealEventObservableToLatest(from)).thenReturn(Flowable.just(dealEvent1)); + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); - Mockito.verify(configurationService, Mockito.times(0)) - .setLastSeenBlockWithDeal(blockOfDeal1); + verify(configurationService).getLastSeenBlockWithDeal(); + verify(configurationService, never()).setLastSeenBlockWithDeal(blockOfDeal); } @Test void shouldReplayAllEventInRange() { - BigInteger blockOfDeal1 = BigInteger.valueOf(3); - Optional dealEvent1 = Optional.of(DealEvent - .builder() - .chainDealId("chainDealId1") - .blockNumber(blockOfDeal1) - .build()); + BigInteger blockOfDeal = BigInteger.valueOf(3); + IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(BigInteger.TEN); when(configurationService.getFromReplay()).thenReturn(BigInteger.ZERO); - when(iexecHubService.getDealEventObservable(any(), any())).thenReturn(Flowable.just(dealEvent1)); + when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); dealWatcherService.replayDealEvent(); - Mockito.verify(iexecHubService, Mockito.times(1)) - .getChainDeal(any()); + verify(iexecHubService).getChainDeal(any()); } @Test void shouldNotReplayIfFromReplayEqualsLastSeenBlock() { - BigInteger blockOfDeal1 = BigInteger.valueOf(3); - Optional dealEvent1 = Optional.of(DealEvent - .builder() - .chainDealId("chainDealId1") - .blockNumber(blockOfDeal1) - .build()); - when(configurationService.getLastSeenBlockWithDeal()).thenReturn(BigInteger.ZERO); when(configurationService.getFromReplay()).thenReturn(BigInteger.ZERO); - when(iexecHubService.getDealEventObservable(any(), any())).thenReturn(Flowable.just(dealEvent1)); - dealWatcherService.replayDealEvent(); - - Mockito.verify(iexecHubService, Mockito.times(0)) - .getChainDeal(any()); + verifyNoInteractions(iexecHubService); } } diff --git a/src/test/java/com/iexec/core/metric/MetricServiceTests.java b/src/test/java/com/iexec/core/metric/MetricServiceTests.java index d0998fd9f..bd6ccfd0e 100644 --- a/src/test/java/com/iexec/core/metric/MetricServiceTests.java +++ b/src/test/java/com/iexec/core/metric/MetricServiceTests.java @@ -16,6 +16,7 @@ package com.iexec.core.metric; +import com.iexec.core.chain.DealWatcherService; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import com.iexec.core.worker.Worker; @@ -26,6 +27,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import java.math.BigInteger; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -33,6 +35,8 @@ class MetricServiceTests { + @Mock + private DealWatcherService dealWatcherService; @Mock private WorkerService workerService; @Mock @@ -56,6 +60,10 @@ void shouldGetPlatformMetrics() { when(workerService.getAliveAvailableGpu()).thenReturn(1); when(taskService.findByCurrentStatus(TaskStatus.COMPLETED)) .thenReturn(List.of()); + when(dealWatcherService.getDealEventsCount()).thenReturn(10L); + when(dealWatcherService.getDealsCount()).thenReturn(8L); + when(dealWatcherService.getReplayDealsCount()).thenReturn(2L); + when(dealWatcherService.getLatestBlockNumberWithDeal()).thenReturn(BigInteger.valueOf(255L)); PlatformMetric metric = metricService.getPlatformMetrics(); assertThat(metric.getAliveWorkers()).isEqualTo(aliveWorkers.size()); @@ -64,6 +72,10 @@ void shouldGetPlatformMetrics() { assertThat(metric.getAliveTotalGpu()).isEqualTo(1); assertThat(metric.getAliveAvailableGpu()).isEqualTo(1); assertThat(metric.getCompletedTasks()).isZero(); + assertThat(metric.getDealEventsCount()).isEqualTo(10); + assertThat(metric.getDealsCount()).isEqualTo(8); + assertThat(metric.getReplayDealsCount()).isEqualTo(2); + assertThat(metric.getLatestBlockNumberWithDeal()).isEqualTo(255); } } diff --git a/src/test/java/com/iexec/core/task/listener/TaskListenerTest.java b/src/test/java/com/iexec/core/task/listener/TaskListenerTest.java index c4963f322..0dc3c4060 100644 --- a/src/test/java/com/iexec/core/task/listener/TaskListenerTest.java +++ b/src/test/java/com/iexec/core/task/listener/TaskListenerTest.java @@ -28,10 +28,10 @@ import com.iexec.core.worker.WorkerService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.*; -import uk.org.lidalia.slf4jtest.LoggingEvent; -import uk.org.lidalia.slf4jtest.TestLogger; -import uk.org.lidalia.slf4jtest.TestLoggerFactory; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; import java.util.Collections; import java.util.List; @@ -39,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.*; +@ExtendWith(OutputCaptureExtension.class) class TaskListenerTest { private static final String CHAIN_TASK_ID = "chainTaskId"; @@ -133,14 +134,10 @@ void shouldSendTaskNotificationOnPleaseUploadEvent() { } @Test - void onResultUploadTimeoutEvent() { - TestLogger logger = TestLoggerFactory.getTestLogger(TaskListeners.class); - logger.clear(); - + void onResultUploadTimeoutEvent(CapturedOutput output) { taskListeners.onResultUploadTimeoutEvent(new ResultUploadTimeoutEvent(CHAIN_TASK_ID)); - - assertThat(logger.getLoggingEvents()) - .isEqualTo(Collections.singletonList(LoggingEvent.info("Received ResultUploadTimeoutEvent [chainTaskId:{}] ", CHAIN_TASK_ID))); + assertThat(output.getOut()) + .contains("Received ResultUploadTimeoutEvent [chainTaskId:" + CHAIN_TASK_ID + "]"); } /**