Skip to content

Commit

Permalink
Move task metrics from TaskUpdateManager to TaskService (#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbern0rd authored Mar 15, 2024
1 parent 475a2e2 commit ccd544a
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 214 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file.
- Check result has been uploaded for TEE tasks. (#672)
- Check for consensus early if a worker has already `CONTRIBUTED` when the task is updated to `RUNNING`. (#673)
- Always provide a `WorkerpoolAuthorization` to a worker during its recovery. (#674)
- Move task metrics from `TaskUpdateManager` to `TaskService`. (#676)

### Quality

Expand Down
110 changes: 92 additions & 18 deletions src/main/java/com/iexec/core/task/TaskService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,15 @@
import com.iexec.commons.poco.tee.TeeUtils;
import com.iexec.core.chain.IexecHubService;
import com.iexec.core.replicate.ReplicatesList;
import com.iexec.core.task.event.TaskCreatedEvent;
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
import com.mongodb.client.result.UpdateResult;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
Expand All @@ -34,10 +39,11 @@
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.iexec.core.task.TaskStatus.*;
Expand All @@ -48,23 +54,60 @@ public class TaskService {

private static final String CHAIN_TASK_ID_FIELD = "chainTaskId";
public static final String METRIC_TASKS_COMPLETED_COUNT = "iexec.core.tasks.completed";
public static final String METRIC_TASKS_STATUSES_COUNT = "iexec.core.tasks.count";
private final MongoTemplate mongoTemplate;
private final TaskRepository taskRepository;
private final IexecHubService iexecHubService;
private final ApplicationEventPublisher applicationEventPublisher;
private final Counter completedTasksCounter;
private final LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;

public TaskService(MongoTemplate mongoTemplate,
TaskRepository taskRepository,
IexecHubService iexecHubService) {
IexecHubService iexecHubService,
ApplicationEventPublisher applicationEventPublisher) {
this.mongoTemplate = mongoTemplate;
this.taskRepository = taskRepository;
this.iexecHubService = iexecHubService;
this.applicationEventPublisher = applicationEventPublisher;

this.currentTaskStatusesCount = Arrays.stream(TaskStatus.values())
.collect(Collectors.toMap(
Function.identity(),
status -> new AtomicLong(),
(a, b) -> b,
LinkedHashMap::new));

for (TaskStatus status : TaskStatus.values()) {
Gauge.builder(METRIC_TASKS_STATUSES_COUNT, () -> currentTaskStatusesCount.get(status).get())
.tags(
"period", "current",
"status", status.name()
).register(Metrics.globalRegistry);
}
this.completedTasksCounter = Metrics.counter(METRIC_TASKS_COMPLETED_COUNT);
}

@PostConstruct
void init() {
completedTasksCounter.increment(findByCurrentStatus(TaskStatus.COMPLETED).size());
final ExecutorService taskStatusesCountExecutor = Executors.newSingleThreadExecutor();
taskStatusesCountExecutor.submit(this::initializeCurrentTaskStatusesCount);
taskStatusesCountExecutor.shutdown();
}

/**
* The following could take a bit of time, depending on how many tasks are in DB.
* It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
* As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
* even though new deals are detected during the count.
*/
private void initializeCurrentTaskStatusesCount() {
currentTaskStatusesCount
.entrySet()
.parallelStream()
.forEach(entry -> entry.getValue().addAndGet(countByCurrentStatus(entry.getKey())));
publishTaskStatusesCountUpdate();
}

/**
Expand Down Expand Up @@ -103,14 +146,12 @@ public Optional<Task> addTask(
newTask.setContributionDeadline(contributionDeadline);
try {
newTask = taskRepository.save(newTask);
log.info("Added new task [chainDealId:{}, taskIndex:{}, imageName:{}, " +
"commandLine:{}, trust:{}, chainTaskId:{}]", chainDealId,
taskIndex, imageName, commandLine, trust, newTask.getChainTaskId());
log.info("Added new task [chainDealId:{}, taskIndex:{}, imageName:{}, commandLine:{}, trust:{}, chainTaskId:{}]",
chainDealId, taskIndex, imageName, commandLine, trust, newTask.getChainTaskId());
return Optional.of(newTask);
} catch (DuplicateKeyException e) {
log.info("Task already added [chainDealId:{}, taskIndex:{}, " +
"imageName:{}, commandLine:{}, trust:{}]", chainDealId,
taskIndex, imageName, commandLine, trust);
log.info("Task already added [chainDealId:{}, taskIndex:{}, imageName:{}, commandLine:{}, trust:{}]",
chainDealId, taskIndex, imageName, commandLine, trust);
return Optional.empty();
}
}
Expand All @@ -135,15 +176,16 @@ public Optional<Task> updateTask(Task task) {
return optionalTask;
}

public Optional<Task> updateTaskStatus(Task task, List<TaskStatusChange> statusChanges) {
public long updateTaskStatus(Task task, TaskStatus currentStatus, List<TaskStatusChange> statusChanges) {
Update update = Update.update("currentStatus", task.getCurrentStatus());
update.push("dateStatusList").each(statusChanges);
UpdateResult result = mongoTemplate.updateFirst(
Query.query(Criteria.where(CHAIN_TASK_ID_FIELD).is(task.getChainTaskId())),
update,
Task.class);
log.debug("Updated chainTaskId [chainTaskId:{}, result:{}]", task.getChainTaskId(), result);
return getTaskByChainTaskId(task.getChainTaskId());
updateMetricsAfterStatusUpdate(currentStatus, task.getCurrentStatus());
return result.getModifiedCount();
}

public void updateTask(String chainTaskId, Update update) {
Expand Down Expand Up @@ -272,19 +314,23 @@ public Date getTaskFinalDeadline(String chainTaskId) {
}

public boolean isConsensusReached(ReplicatesList replicatesList) {
Optional<ChainTask> optional = iexecHubService.getChainTask(replicatesList.getChainTaskId());
if (optional.isEmpty()) {
final ChainTask chainTask = iexecHubService.getChainTask(replicatesList.getChainTaskId()).orElse(null);
if (chainTask == null) {
log.error("Consensus not reached, task not found on-chain [chainTaskId:{}]", replicatesList.getChainTaskId());
return false;
}

final ChainTask chainTask = optional.get();
boolean isChainTaskRevealing = chainTask.getStatus().equals(ChainTaskStatus.REVEALING);
boolean isChainTaskRevealing = chainTask.getStatus() == ChainTaskStatus.REVEALING;
if (!isChainTaskRevealing) {
log.debug("Consensus not reached, on-chain task is not in REVEALING status [chainTaskId:{}]",
replicatesList.getChainTaskId());
return false;
}

int onChainWinners = chainTask.getWinnerCounter();
int offChainWinners = replicatesList.getNbValidContributedWinners(chainTask.getConsensusValue());
log.debug("Returning off-chain and on-chain winners [offChainWinners:{}, onChainWinners:{}]",
offChainWinners, onChainWinners);
return offChainWinners >= onChainWinners;
}

Expand All @@ -295,4 +341,32 @@ public long getCompletedTasksCount() {
public long countByCurrentStatus(TaskStatus status) {
return taskRepository.countByCurrentStatus(status);
}

void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) {
currentTaskStatusesCount.get(previousStatus).decrementAndGet();
currentTaskStatusesCount.get(newStatus).incrementAndGet();
publishTaskStatusesCountUpdate();
}

@EventListener(TaskCreatedEvent.class)
void onTaskCreatedEvent() {
currentTaskStatusesCount.get(RECEIVED).incrementAndGet();
publishTaskStatusesCountUpdate();
}

private void publishTaskStatusesCountUpdate() {
// Copying the map here ensures the original values can't be updated from outside this class.
// As this data should be read only, no need for any atomic class.
final LinkedHashMap<TaskStatus, Long> currentTaskStatusesCountToPublish = currentTaskStatusesCount
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entrySet -> entrySet.getValue().get(),
(a, b) -> b,
LinkedHashMap::new
));
final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCountToPublish);
applicationEventPublisher.publishEvent(event);
}
}
90 changes: 6 additions & 84 deletions src/main/java/com/iexec/core/task/update/TaskUpdateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,15 @@
import com.iexec.core.task.event.*;
import com.iexec.core.worker.Worker;
import com.iexec.core.worker.WorkerService;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -59,7 +53,6 @@
@Service
@Slf4j
class TaskUpdateManager {
public static final String METRIC_TASKS_STATUSES_COUNT = "iexec.core.tasks.count";

private final TaskService taskService;
private final IexecHubService iexecHubService;
Expand All @@ -69,8 +62,6 @@ class TaskUpdateManager {
private final BlockchainAdapterService blockchainAdapterService;
private final SmsService smsService;

private final LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;

public TaskUpdateManager(TaskService taskService,
IexecHubService iexecHubService,
ReplicatesService replicatesService,
Expand All @@ -85,46 +76,6 @@ public TaskUpdateManager(TaskService taskService,
this.workerService = workerService;
this.blockchainAdapterService = blockchainAdapterService;
this.smsService = smsService;

this.currentTaskStatusesCount = Arrays.stream(TaskStatus.values())
.collect(Collectors.toMap(
Function.identity(),
status -> new AtomicLong(),
(a, b) -> b,
LinkedHashMap::new));

for (TaskStatus status : TaskStatus.values()) {
Gauge.builder(METRIC_TASKS_STATUSES_COUNT, () -> currentTaskStatusesCount.get(status).get())
.tags(
"period", "current",
"status", status.name()
).register(Metrics.globalRegistry);
}
}

@PostConstruct
Future<Void> init() {
final ExecutorService taskStatusesCountExecutor = Executors.newSingleThreadExecutor();
final Future<Void> future = taskStatusesCountExecutor.submit(
this::initializeCurrentTaskStatusesCount,
null // Trick to get a `Future<Void>` instead of a `Future<?>`
);
taskStatusesCountExecutor.shutdown();
return future;
}

/**
* The following could take a bit of time, depending on how many tasks are in DB.
* It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
* As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
* even though new deals are detected during the count.
*/
private void initializeCurrentTaskStatusesCount() {
currentTaskStatusesCount
.entrySet()
.parallelStream()
.forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey())));
publishTaskStatusesCountUpdate();
}

void updateTask(String chainTaskId) {
Expand Down Expand Up @@ -243,10 +194,9 @@ void updateTaskStatusAndSave(Task task, TaskStatus newStatus, ChainReceipt chain
* @param statusChanges List of changes
*/
void saveTask(Task task, TaskStatus currentStatus, List<TaskStatusChange> statusChanges) {
Optional<Task> savedTask = taskService.updateTaskStatus(task, statusChanges);
long updatedTaskCount = taskService.updateTaskStatus(task, currentStatus, statusChanges);
// `savedTask.isPresent()` should always be true if the task exists in the repository.
if (savedTask.isPresent()) {
updateMetricsAfterStatusUpdate(currentStatus, task.getCurrentStatus());
if (updatedTaskCount != 0L) {
log.info("UpdateTaskStatus succeeded [chainTaskId:{}, currentStatus:{}, newStatus:{}]",
task.getChainTaskId(), currentStatus, task.getCurrentStatus());
} else {
Expand Down Expand Up @@ -762,32 +712,4 @@ void emitError(Task task, TaskStatus expectedStatus, String methodName) {
log.error("Cannot initialize task [chainTaskId:{}, currentStatus:{}, expectedStatus:{}, method:{}]",
task.getChainTaskId(), task.getCurrentStatus(), expectedStatus, methodName);
}

void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) {
currentTaskStatusesCount.get(previousStatus).decrementAndGet();
currentTaskStatusesCount.get(newStatus).incrementAndGet();
publishTaskStatusesCountUpdate();
}

@EventListener(TaskCreatedEvent.class)
void onTaskCreatedEvent() {
currentTaskStatusesCount.get(RECEIVED).incrementAndGet();
publishTaskStatusesCountUpdate();
}

private void publishTaskStatusesCountUpdate() {
// Copying the map here ensures the original values can't be updated from outside this class.
// As this data should be read only, no need for any atomic class.
final LinkedHashMap<TaskStatus, Long> currentTaskStatusesCountToPublish = currentTaskStatusesCount
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entrySet -> entrySet.getValue().get(),
(a, b) -> b,
LinkedHashMap::new
));
final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCountToPublish);
applicationEventPublisher.publishEvent(event);
}
}
Loading

0 comments on commit ccd544a

Please sign in to comment.