From 3892e889ad145cedab598bbeafdaff01922abc59 Mon Sep 17 00:00:00 2001 From: Jeremy Bernard Date: Fri, 15 Mar 2024 08:53:22 +0100 Subject: [PATCH 1/2] Move task metrics from `TaskUpdateManager` to `TaskService` --- .../java/com/iexec/core/task/TaskService.java | 110 ++++++++++++++--- .../core/task/update/TaskUpdateManager.java | 90 +------------- .../com/iexec/core/task/TaskServiceTests.java | 116 +++++++++++++++--- .../task/update/TaskUpdateManagerTest.java | 104 ++-------------- 4 files changed, 206 insertions(+), 214 deletions(-) diff --git a/src/main/java/com/iexec/core/task/TaskService.java b/src/main/java/com/iexec/core/task/TaskService.java index 5d9afbc0..286b0f4d 100644 --- a/src/main/java/com/iexec/core/task/TaskService.java +++ b/src/main/java/com/iexec/core/task/TaskService.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +21,15 @@ import com.iexec.commons.poco.tee.TeeUtils; import com.iexec.core.chain.IexecHubService; import com.iexec.core.replicate.ReplicatesList; +import com.iexec.core.task.event.TaskCreatedEvent; +import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent; import com.mongodb.client.result.UpdateResult; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; import org.springframework.dao.DuplicateKeyException; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; @@ -34,10 +39,11 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Optional; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; import static com.iexec.core.task.TaskStatus.*; @@ -48,23 +54,60 @@ public class TaskService { private static final String CHAIN_TASK_ID_FIELD = "chainTaskId"; public static final String METRIC_TASKS_COMPLETED_COUNT = "iexec.core.tasks.completed"; + public static final String METRIC_TASKS_STATUSES_COUNT = "iexec.core.tasks.count"; private final MongoTemplate mongoTemplate; private final TaskRepository taskRepository; private final IexecHubService iexecHubService; + private final ApplicationEventPublisher applicationEventPublisher; private final Counter completedTasksCounter; + private final LinkedHashMap currentTaskStatusesCount; public TaskService(MongoTemplate mongoTemplate, TaskRepository taskRepository, - IexecHubService iexecHubService) { + IexecHubService iexecHubService, + ApplicationEventPublisher applicationEventPublisher) { this.mongoTemplate = mongoTemplate; this.taskRepository = taskRepository; this.iexecHubService = iexecHubService; + this.applicationEventPublisher = applicationEventPublisher; + + this.currentTaskStatusesCount = Arrays.stream(TaskStatus.values()) + .collect(Collectors.toMap( + Function.identity(), + status -> new AtomicLong(), + (a, b) -> b, + LinkedHashMap::new)); + + for (TaskStatus status : TaskStatus.values()) { + Gauge.builder(METRIC_TASKS_STATUSES_COUNT, () -> currentTaskStatusesCount.get(status).get()) + .tags( + "period", "current", + "status", status.name() + ).register(Metrics.globalRegistry); + } this.completedTasksCounter = Metrics.counter(METRIC_TASKS_COMPLETED_COUNT); } @PostConstruct void init() { completedTasksCounter.increment(findByCurrentStatus(TaskStatus.COMPLETED).size()); + final ExecutorService taskStatusesCountExecutor = Executors.newSingleThreadExecutor(); + taskStatusesCountExecutor.submit(this::initializeCurrentTaskStatusesCount); + taskStatusesCountExecutor.shutdown(); + } + + /** + * The following could take a bit of time, depending on how many tasks are in DB. + * It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks). + * As we use AtomicLongs, the final count should be accurate - no race conditions to expect, + * even though new deals are detected during the count. + */ + private void initializeCurrentTaskStatusesCount() { + currentTaskStatusesCount + .entrySet() + .parallelStream() + .forEach(entry -> entry.getValue().addAndGet(countByCurrentStatus(entry.getKey()))); + publishTaskStatusesCountUpdate(); } /** @@ -103,14 +146,12 @@ public Optional addTask( newTask.setContributionDeadline(contributionDeadline); try { newTask = taskRepository.save(newTask); - log.info("Added new task [chainDealId:{}, taskIndex:{}, imageName:{}, " + - "commandLine:{}, trust:{}, chainTaskId:{}]", chainDealId, - taskIndex, imageName, commandLine, trust, newTask.getChainTaskId()); + log.info("Added new task [chainDealId:{}, taskIndex:{}, imageName:{}, commandLine:{}, trust:{}, chainTaskId:{}]", + chainDealId, taskIndex, imageName, commandLine, trust, newTask.getChainTaskId()); return Optional.of(newTask); } catch (DuplicateKeyException e) { - log.info("Task already added [chainDealId:{}, taskIndex:{}, " + - "imageName:{}, commandLine:{}, trust:{}]", chainDealId, - taskIndex, imageName, commandLine, trust); + log.info("Task already added [chainDealId:{}, taskIndex:{}, imageName:{}, commandLine:{}, trust:{}]", + chainDealId, taskIndex, imageName, commandLine, trust); return Optional.empty(); } } @@ -135,7 +176,7 @@ public Optional updateTask(Task task) { return optionalTask; } - public Optional updateTaskStatus(Task task, List statusChanges) { + public long updateTaskStatus(Task task, TaskStatus currentStatus, List statusChanges) { Update update = Update.update("currentStatus", task.getCurrentStatus()); update.push("dateStatusList").each(statusChanges); UpdateResult result = mongoTemplate.updateFirst( @@ -143,7 +184,8 @@ public Optional updateTaskStatus(Task task, List statusC update, Task.class); log.debug("Updated chainTaskId [chainTaskId:{}, result:{}]", task.getChainTaskId(), result); - return getTaskByChainTaskId(task.getChainTaskId()); + updateMetricsAfterStatusUpdate(currentStatus, task.getCurrentStatus()); + return result.getModifiedCount(); } public void updateTask(String chainTaskId, Update update) { @@ -272,19 +314,23 @@ public Date getTaskFinalDeadline(String chainTaskId) { } public boolean isConsensusReached(ReplicatesList replicatesList) { - Optional optional = iexecHubService.getChainTask(replicatesList.getChainTaskId()); - if (optional.isEmpty()) { + final ChainTask chainTask = iexecHubService.getChainTask(replicatesList.getChainTaskId()).orElse(null); + if (chainTask == null) { + log.error("Consensus not reached, task not found on-chain [chainTaskId:{}]", replicatesList.getChainTaskId()); return false; } - final ChainTask chainTask = optional.get(); - boolean isChainTaskRevealing = chainTask.getStatus().equals(ChainTaskStatus.REVEALING); + boolean isChainTaskRevealing = chainTask.getStatus() == ChainTaskStatus.REVEALING; if (!isChainTaskRevealing) { + log.debug("Consensus not reached, on-chain task is not in REVEALING status [chainTaskId:{}]", + replicatesList.getChainTaskId()); return false; } int onChainWinners = chainTask.getWinnerCounter(); int offChainWinners = replicatesList.getNbValidContributedWinners(chainTask.getConsensusValue()); + log.debug("Returning off-chain and on-chain winners [offChainWinners:{}, onChainWinners:{}]", + offChainWinners, onChainWinners); return offChainWinners >= onChainWinners; } @@ -295,4 +341,32 @@ public long getCompletedTasksCount() { public long countByCurrentStatus(TaskStatus status) { return taskRepository.countByCurrentStatus(status); } + + void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) { + currentTaskStatusesCount.get(previousStatus).decrementAndGet(); + currentTaskStatusesCount.get(newStatus).incrementAndGet(); + publishTaskStatusesCountUpdate(); + } + + @EventListener(TaskCreatedEvent.class) + void onTaskCreatedEvent() { + currentTaskStatusesCount.get(RECEIVED).incrementAndGet(); + publishTaskStatusesCountUpdate(); + } + + private void publishTaskStatusesCountUpdate() { + // Copying the map here ensures the original values can't be updated from outside this class. + // As this data should be read only, no need for any atomic class. + final LinkedHashMap currentTaskStatusesCountToPublish = currentTaskStatusesCount + .entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entrySet -> entrySet.getValue().get(), + (a, b) -> b, + LinkedHashMap::new + )); + final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCountToPublish); + applicationEventPublisher.publishEvent(event); + } } diff --git a/src/main/java/com/iexec/core/task/update/TaskUpdateManager.java b/src/main/java/com/iexec/core/task/update/TaskUpdateManager.java index c8ae530a..f0d9b45c 100644 --- a/src/main/java/com/iexec/core/task/update/TaskUpdateManager.java +++ b/src/main/java/com/iexec/core/task/update/TaskUpdateManager.java @@ -35,21 +35,15 @@ import com.iexec.core.task.event.*; import com.iexec.core.worker.Worker; import com.iexec.core.worker.WorkerService; -import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.Metrics; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.event.EventListener; import org.springframework.data.mongodb.core.query.Update; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -59,7 +53,6 @@ @Service @Slf4j class TaskUpdateManager { - public static final String METRIC_TASKS_STATUSES_COUNT = "iexec.core.tasks.count"; private final TaskService taskService; private final IexecHubService iexecHubService; @@ -69,8 +62,6 @@ class TaskUpdateManager { private final BlockchainAdapterService blockchainAdapterService; private final SmsService smsService; - private final LinkedHashMap currentTaskStatusesCount; - public TaskUpdateManager(TaskService taskService, IexecHubService iexecHubService, ReplicatesService replicatesService, @@ -85,46 +76,6 @@ public TaskUpdateManager(TaskService taskService, this.workerService = workerService; this.blockchainAdapterService = blockchainAdapterService; this.smsService = smsService; - - this.currentTaskStatusesCount = Arrays.stream(TaskStatus.values()) - .collect(Collectors.toMap( - Function.identity(), - status -> new AtomicLong(), - (a, b) -> b, - LinkedHashMap::new)); - - for (TaskStatus status : TaskStatus.values()) { - Gauge.builder(METRIC_TASKS_STATUSES_COUNT, () -> currentTaskStatusesCount.get(status).get()) - .tags( - "period", "current", - "status", status.name() - ).register(Metrics.globalRegistry); - } - } - - @PostConstruct - Future init() { - final ExecutorService taskStatusesCountExecutor = Executors.newSingleThreadExecutor(); - final Future future = taskStatusesCountExecutor.submit( - this::initializeCurrentTaskStatusesCount, - null // Trick to get a `Future` instead of a `Future` - ); - taskStatusesCountExecutor.shutdown(); - return future; - } - - /** - * The following could take a bit of time, depending on how many tasks are in DB. - * It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks). - * As we use AtomicLongs, the final count should be accurate - no race conditions to expect, - * even though new deals are detected during the count. - */ - private void initializeCurrentTaskStatusesCount() { - currentTaskStatusesCount - .entrySet() - .parallelStream() - .forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey()))); - publishTaskStatusesCountUpdate(); } void updateTask(String chainTaskId) { @@ -243,10 +194,9 @@ void updateTaskStatusAndSave(Task task, TaskStatus newStatus, ChainReceipt chain * @param statusChanges List of changes */ void saveTask(Task task, TaskStatus currentStatus, List statusChanges) { - Optional savedTask = taskService.updateTaskStatus(task, statusChanges); + long updatedTaskCount = taskService.updateTaskStatus(task, currentStatus, statusChanges); // `savedTask.isPresent()` should always be true if the task exists in the repository. - if (savedTask.isPresent()) { - updateMetricsAfterStatusUpdate(currentStatus, task.getCurrentStatus()); + if (updatedTaskCount != 0L) { log.info("UpdateTaskStatus succeeded [chainTaskId:{}, currentStatus:{}, newStatus:{}]", task.getChainTaskId(), currentStatus, task.getCurrentStatus()); } else { @@ -762,32 +712,4 @@ void emitError(Task task, TaskStatus expectedStatus, String methodName) { log.error("Cannot initialize task [chainTaskId:{}, currentStatus:{}, expectedStatus:{}, method:{}]", task.getChainTaskId(), task.getCurrentStatus(), expectedStatus, methodName); } - - void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) { - currentTaskStatusesCount.get(previousStatus).decrementAndGet(); - currentTaskStatusesCount.get(newStatus).incrementAndGet(); - publishTaskStatusesCountUpdate(); - } - - @EventListener(TaskCreatedEvent.class) - void onTaskCreatedEvent() { - currentTaskStatusesCount.get(RECEIVED).incrementAndGet(); - publishTaskStatusesCountUpdate(); - } - - private void publishTaskStatusesCountUpdate() { - // Copying the map here ensures the original values can't be updated from outside this class. - // As this data should be read only, no need for any atomic class. - final LinkedHashMap currentTaskStatusesCountToPublish = currentTaskStatusesCount - .entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entrySet -> entrySet.getValue().get(), - (a, b) -> b, - LinkedHashMap::new - )); - final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCountToPublish); - applicationEventPublisher.publishEvent(event); - } } diff --git a/src/test/java/com/iexec/core/task/TaskServiceTests.java b/src/test/java/com/iexec/core/task/TaskServiceTests.java index d54b49a5..ac63b9ce 100644 --- a/src/test/java/com/iexec/core/task/TaskServiceTests.java +++ b/src/test/java/com/iexec/core/task/TaskServiceTests.java @@ -21,8 +21,9 @@ import com.iexec.commons.poco.chain.ChainUtils; import com.iexec.core.chain.IexecHubService; import com.iexec.core.replicate.ReplicatesList; -import com.iexec.core.replicate.ReplicatesService; +import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.junit.jupiter.api.AfterEach; @@ -34,31 +35,33 @@ import org.mockito.MockitoAnnotations; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.dao.DuplicateKeyException; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.util.ReflectionTestUtils; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; +import org.web3j.utils.Numeric; +import java.math.BigInteger; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import static com.iexec.core.task.TaskStatus.COMPLETED; -import static com.iexec.core.task.TaskStatus.INITIALIZED; +import static com.iexec.core.task.TaskService.METRIC_TASKS_STATUSES_COUNT; +import static com.iexec.core.task.TaskStatus.*; import static com.iexec.core.task.TaskTestsUtils.*; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @DataMongoTest @Testcontainers @@ -82,8 +85,7 @@ static void registerProperties(DynamicPropertyRegistry registry) { private TaskRepository taskRepository; @Mock - private ReplicatesService replicatesService; - + private ApplicationEventPublisher applicationEventPublisher; @Mock private IexecHubService iexecHubService; @@ -97,9 +99,8 @@ static void initRegistry() { @BeforeEach void init() { MockitoAnnotations.openMocks(this); - taskService = new TaskService(mongoTemplate, taskRepository, iexecHubService); - taskService.init(); taskRepository.deleteAll(); + taskService = new TaskService(mongoTemplate, taskRepository, iexecHubService, applicationEventPublisher); } @AfterEach @@ -344,7 +345,6 @@ void shouldConsensusNotBeReachedAsUnknownTask() { .isFalse(); Mockito.verify(iexecHubService).getChainTask(any()); - Mockito.verifyNoInteractions(replicatesService); } @Test @@ -362,7 +362,6 @@ void shouldConsensusNotBeReachedAsNotRevealing() { .isFalse(); Mockito.verify(iexecHubService).getChainTask(any()); - Mockito.verifyNoInteractions(replicatesService); } @Test @@ -406,7 +405,7 @@ void shouldConsensusBeReached() { } // endregion - // region getCompletedTasksCount + // region metrics @Test void shouldGet0CompletedTasksCountWhenNoTaskCompleted() { final long completedTasksCount = taskService.getCompletedTasksCount(); @@ -425,5 +424,92 @@ void shouldGet3CompletedTasksCount() { final long completedTasksCount = taskService.getCompletedTasksCount(); assertThat(completedTasksCount).isEqualTo(3); } + + @Test + void shouldBuildGaugesAndFireEvent() { + final List tasks = new ArrayList<>(); + BigInteger taskId = BigInteger.ZERO; + for (final TaskStatus status : TaskStatus.values()) { + // Give a unique initial count for each status + taskId = taskId.add(BigInteger.ONE); + final Task task = new Task(Numeric.toHexStringWithPrefix(taskId), 0, "", "", 0, 0, "0x0"); + task.setChainTaskId(Numeric.toHexStringWithPrefix(taskId)); + task.changeStatus(status); + tasks.add(task); + } + taskRepository.saveAll(tasks); + + taskService.init(); + + verify(applicationEventPublisher, timeout(1000L)).publishEvent(any(TaskStatusesCountUpdatedEvent.class)); + for (final TaskStatus status : TaskStatus.values()) { + final Gauge gauge = getCurrentTasksCountGauge(status); + assertThat(gauge).isNotNull() + // Check the gauge value is equal to the unique count for each status + .extracting(Gauge::value) + .isEqualTo(1.0); + } + } + // endregion + + // region onTaskCreatedEvent + @Test + void shouldIncrementCurrentReceivedGaugeWhenTaskReceived() { + taskService.init(); + final Gauge currentReceivedTasks = getCurrentTasksCountGauge(RECEIVED); + assertThat(currentReceivedTasks.value()).isZero(); + taskService.onTaskCreatedEvent(); + assertThat(currentReceivedTasks.value()).isOne(); + } + + @Test + void shouldUpdateCurrentReceivedCountAndFireEvent() { + final LinkedHashMap currentTaskStatusesCount = new LinkedHashMap<>(); + currentTaskStatusesCount.put(RECEIVED, new AtomicLong(0L)); + ReflectionTestUtils.setField(taskService, "currentTaskStatusesCount", currentTaskStatusesCount); + + taskService.onTaskCreatedEvent(); + + assertThat(currentTaskStatusesCount.get(RECEIVED).get()).isOne(); + verify(applicationEventPublisher).publishEvent(any(TaskStatusesCountUpdatedEvent.class)); + } // endregion + + // region updateMetricsAfterStatusUpdate + @Test + void shouldUpdateMetricsAfterStatusUpdate() throws ExecutionException, InterruptedException { + Task receivedTask = new Task("", "", 0); + taskRepository.save(receivedTask); + + // Init gauges + taskService.init(); + // Called a first time during init + verify(applicationEventPublisher, timeout(1000L)).publishEvent(any(TaskStatusesCountUpdatedEvent.class)); + + final Gauge currentReceivedTasks = getCurrentTasksCountGauge(RECEIVED); + final Gauge currentInitializingTasks = getCurrentTasksCountGauge(INITIALIZING); + + assertThat(currentReceivedTasks.value()).isOne(); + assertThat(currentInitializingTasks.value()).isZero(); + + taskService.updateMetricsAfterStatusUpdate(RECEIVED, INITIALIZING); + + assertThat(currentReceivedTasks.value()).isZero(); + assertThat(currentInitializingTasks.value()).isOne(); + // Called a second time during update + verify(applicationEventPublisher, times(2)).publishEvent(any(TaskStatusesCountUpdatedEvent.class)); + } + // endregion + + // region util + Gauge getCurrentTasksCountGauge(TaskStatus status) { + return Metrics.globalRegistry + .find(METRIC_TASKS_STATUSES_COUNT) + .tags( + "period", "current", + "status", status.name() + ).gauge(); + } + // endregion + } diff --git a/src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java b/src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java index 8b889280..edb10763 100644 --- a/src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java +++ b/src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java @@ -38,10 +38,8 @@ import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import com.iexec.core.task.event.PleaseUploadEvent; -import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent; import com.iexec.core.worker.Worker; import com.iexec.core.worker.WorkerService; -import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.junit.jupiter.api.AfterEach; @@ -53,20 +51,19 @@ import org.springframework.boot.test.system.CapturedOutput; import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.test.util.ReflectionTestUtils; import java.math.BigInteger; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static com.iexec.common.replicate.ReplicateStatus.RESULT_UPLOAD_REQUESTED; import static com.iexec.core.task.TaskStatus.*; import static com.iexec.core.task.TaskTestsUtils.*; -import static com.iexec.core.task.update.TaskUpdateManager.METRIC_TASKS_STATUSES_COUNT; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -131,28 +128,6 @@ void afterEach() { Metrics.globalRegistry.clear(); } - // region init - @Test - void shouldBuildGaugesAndFireEvent() throws ExecutionException, InterruptedException { - for (final TaskStatus status : TaskStatus.values()) { - // Give a unique initial count for each status - when(taskService.countByCurrentStatus(status)).thenReturn((long) status.ordinal()); - } - - taskUpdateManager.init().get(); - - for (final TaskStatus status : TaskStatus.values()) { - final Gauge gauge = getCurrentTasksCountGauge(status); - assertThat(gauge).isNotNull() - // Check the gauge value is equal to the unique count for each status - .extracting(Gauge::value) - .isEqualTo(((double) status.ordinal())); - } - - verify(applicationEventPublisher, times(1)).publishEvent(any(TaskStatusesCountUpdatedEvent.class)); - } - // endregion - // region consensusReached2Reopening @Test @@ -455,7 +430,7 @@ void shouldUpdateReceived2Initializing2InitializedOnStandard() { assertThat(task.getEnclaveChallenge()).isEqualTo(BytesUtils.EMPTY_ADDRESS); assertThat(task.getSmsUrl()).isNull(); verify(smsService, times(0)).getVerifiedSmsUrl(anyString(), anyString()); - verify(taskService, times(2)).updateTaskStatus(any(), any()); // INITIALIZING & INITIALIZED + verify(taskService, times(2)).updateTaskStatus(any(), any(), any()); // INITIALIZING & INITIALIZED } @Test @@ -491,7 +466,7 @@ void shouldUpdateReceived2Initializing2InitializedOnTee() { assertThat(task.getEnclaveChallenge()).isEqualTo(BytesUtils.EMPTY_ADDRESS); assertThat(task.getSmsUrl()).isEqualTo(smsUrl); verify(smsService, times(1)).getVerifiedSmsUrl(CHAIN_TASK_ID, tag); - verify(taskService, times(2)).updateTaskStatus(any(), any()); // INITIALIZING & INITIALIZED + verify(taskService, times(2)).updateTaskStatus(any(), any(), any()); // INITIALIZING & INITIALIZED } @Test @@ -527,7 +502,7 @@ void shouldNotUpdateReceived2Initializing2InitializedOnTeeSinceCannotRetrieveSms assertThat(task.getSmsUrl()).isNull(); verify(smsService, times(1)).getVerifiedSmsUrl(CHAIN_TASK_ID, tag); verify(smsService, times(0)).getEnclaveChallenge(anyString(), anyString()); - verify(taskService, times(1)).updateTaskStatus(any(), any()); // INITIALIZE_FAILED & FAILED + verify(taskService, times(1)).updateTaskStatus(any(), any(), any()); // INITIALIZE_FAILED & FAILED } // endregion @@ -2019,62 +1994,6 @@ void shouldTriggerUpdateTaskAsynchronously() { // endregion - // region onTaskCreatedEvent - @Test - void shouldIncrementCurrentReceivedGaugeWhenTaskReceived() { - when(taskService.countByCurrentStatus(RECEIVED)).thenReturn(0L); - - // Init gauges - taskUpdateManager.init(); - - final Gauge currentReceivedTasks = getCurrentTasksCountGauge(RECEIVED); - assertThat(currentReceivedTasks.value()).isZero(); - - taskUpdateManager.onTaskCreatedEvent(); - - assertThat(currentReceivedTasks.value()).isOne(); - } - // endregion - - // region updateMetricsAfterStatusUpdate - @Test - void shouldUpdateMetricsAfterStatusUpdate() throws ExecutionException, InterruptedException { - when(taskService.countByCurrentStatus(RECEIVED)).thenReturn(1L); - when(taskService.countByCurrentStatus(INITIALIZING)).thenReturn(0L); - - // Init gauges - taskUpdateManager.init().get(); - - final Gauge currentReceivedTasks = getCurrentTasksCountGauge(RECEIVED); - final Gauge currentInitializingTasks = getCurrentTasksCountGauge(INITIALIZING); - - assertThat(currentReceivedTasks.value()).isOne(); - assertThat(currentInitializingTasks.value()).isZero(); - - taskUpdateManager.updateMetricsAfterStatusUpdate(RECEIVED, INITIALIZING); - - assertThat(currentReceivedTasks.value()).isZero(); - assertThat(currentInitializingTasks.value()).isOne(); - // Called a first time during init, then a second time during update - verify(applicationEventPublisher, times(2)).publishEvent(any(TaskStatusesCountUpdatedEvent.class)); - } - // endregion - - // region onTaskCreatedEvent - @Test - void shouldUpdateCurrentReceivedCountAndFireEvent() { - final AtomicLong receivedCount = - (AtomicLong) ((LinkedHashMap) ReflectionTestUtils.getField(taskUpdateManager, "currentTaskStatusesCount")) - .get(RECEIVED); - final long initialCount = receivedCount.get(); - - taskUpdateManager.onTaskCreatedEvent(); - - assertThat(receivedCount.get() - initialCount).isOne(); - verify(applicationEventPublisher, times(1)).publishEvent(any(TaskStatusesCountUpdatedEvent.class)); - } - // endregion - // region utils private void mockTaskDescriptionFromTask(Task task) { final TaskDescription taskDescription = TaskDescription.builder() @@ -2085,14 +2004,5 @@ private void mockTaskDescriptionFromTask(Task task) { .build(); when(iexecHubService.getTaskDescription(task.getChainTaskId())).thenReturn(taskDescription); } - - Gauge getCurrentTasksCountGauge(TaskStatus status) { - return Metrics.globalRegistry - .find(METRIC_TASKS_STATUSES_COUNT) - .tags( - "period", "current", - "status", status.name() - ).gauge(); - } // endregion } From 7b5dc0e5395ed754ac8340e38d8e83cf643b2ae3 Mon Sep 17 00:00:00 2001 From: Jeremy Bernard Date: Fri, 15 Mar 2024 11:46:11 +0100 Subject: [PATCH 2/2] Upate CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66760443..552795c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. - Check result has been uploaded for TEE tasks. (#672) - Check for consensus early if a worker has already `CONTRIBUTED` when the task is updated to `RUNNING`. (#673) - Always provide a `WorkerpoolAuthorization` to a worker during its recovery. (#674) +- Move task metrics from `TaskUpdateManager` to `TaskService`. (#676) ### Quality