Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move task metrics from TaskUpdateManager to TaskService #676

Merged
merged 3 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file.
- Check result has been uploaded for TEE tasks. (#672)
- Check for consensus early if a worker has already `CONTRIBUTED` when the task is updated to `RUNNING`. (#673)
- Always provide a `WorkerpoolAuthorization` to a worker during its recovery. (#674)
- Move task metrics from `TaskUpdateManager` to `TaskService`. (#676)

### Quality

Expand Down
110 changes: 92 additions & 18 deletions src/main/java/com/iexec/core/task/TaskService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,15 @@
import com.iexec.commons.poco.tee.TeeUtils;
import com.iexec.core.chain.IexecHubService;
import com.iexec.core.replicate.ReplicatesList;
import com.iexec.core.task.event.TaskCreatedEvent;
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
import com.mongodb.client.result.UpdateResult;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
Expand All @@ -34,10 +39,11 @@
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.iexec.core.task.TaskStatus.*;
Expand All @@ -48,23 +54,60 @@ public class TaskService {

private static final String CHAIN_TASK_ID_FIELD = "chainTaskId";
public static final String METRIC_TASKS_COMPLETED_COUNT = "iexec.core.tasks.completed";
public static final String METRIC_TASKS_STATUSES_COUNT = "iexec.core.tasks.count";
private final MongoTemplate mongoTemplate;
private final TaskRepository taskRepository;
private final IexecHubService iexecHubService;
private final ApplicationEventPublisher applicationEventPublisher;
private final Counter completedTasksCounter;
private final LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;

public TaskService(MongoTemplate mongoTemplate,
TaskRepository taskRepository,
IexecHubService iexecHubService) {
IexecHubService iexecHubService,
ApplicationEventPublisher applicationEventPublisher) {
this.mongoTemplate = mongoTemplate;
this.taskRepository = taskRepository;
this.iexecHubService = iexecHubService;
this.applicationEventPublisher = applicationEventPublisher;

this.currentTaskStatusesCount = Arrays.stream(TaskStatus.values())
.collect(Collectors.toMap(
Function.identity(),
status -> new AtomicLong(),
(a, b) -> b,
LinkedHashMap::new));

for (TaskStatus status : TaskStatus.values()) {
Gauge.builder(METRIC_TASKS_STATUSES_COUNT, () -> currentTaskStatusesCount.get(status).get())
.tags(
"period", "current",
"status", status.name()
).register(Metrics.globalRegistry);
}
this.completedTasksCounter = Metrics.counter(METRIC_TASKS_COMPLETED_COUNT);
}

@PostConstruct
void init() {
completedTasksCounter.increment(findByCurrentStatus(TaskStatus.COMPLETED).size());
final ExecutorService taskStatusesCountExecutor = Executors.newSingleThreadExecutor();
taskStatusesCountExecutor.submit(this::initializeCurrentTaskStatusesCount);
taskStatusesCountExecutor.shutdown();
}

/**
* The following could take a bit of time, depending on how many tasks are in DB.
* It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
* As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
* even though new deals are detected during the count.
*/
private void initializeCurrentTaskStatusesCount() {
currentTaskStatusesCount
.entrySet()
.parallelStream()
.forEach(entry -> entry.getValue().addAndGet(countByCurrentStatus(entry.getKey())));
publishTaskStatusesCountUpdate();
}

/**
Expand Down Expand Up @@ -103,14 +146,12 @@ public Optional<Task> addTask(
newTask.setContributionDeadline(contributionDeadline);
try {
newTask = taskRepository.save(newTask);
log.info("Added new task [chainDealId:{}, taskIndex:{}, imageName:{}, " +
"commandLine:{}, trust:{}, chainTaskId:{}]", chainDealId,
taskIndex, imageName, commandLine, trust, newTask.getChainTaskId());
log.info("Added new task [chainDealId:{}, taskIndex:{}, imageName:{}, commandLine:{}, trust:{}, chainTaskId:{}]",
chainDealId, taskIndex, imageName, commandLine, trust, newTask.getChainTaskId());
return Optional.of(newTask);
} catch (DuplicateKeyException e) {
log.info("Task already added [chainDealId:{}, taskIndex:{}, " +
"imageName:{}, commandLine:{}, trust:{}]", chainDealId,
taskIndex, imageName, commandLine, trust);
log.info("Task already added [chainDealId:{}, taskIndex:{}, imageName:{}, commandLine:{}, trust:{}]",
chainDealId, taskIndex, imageName, commandLine, trust);
return Optional.empty();
}
}
Expand All @@ -135,15 +176,16 @@ public Optional<Task> updateTask(Task task) {
return optionalTask;
}

public Optional<Task> updateTaskStatus(Task task, List<TaskStatusChange> statusChanges) {
public long updateTaskStatus(Task task, TaskStatus currentStatus, List<TaskStatusChange> statusChanges) {
Update update = Update.update("currentStatus", task.getCurrentStatus());
update.push("dateStatusList").each(statusChanges);
UpdateResult result = mongoTemplate.updateFirst(
Query.query(Criteria.where(CHAIN_TASK_ID_FIELD).is(task.getChainTaskId())),
update,
Task.class);
log.debug("Updated chainTaskId [chainTaskId:{}, result:{}]", task.getChainTaskId(), result);
return getTaskByChainTaskId(task.getChainTaskId());
updateMetricsAfterStatusUpdate(currentStatus, task.getCurrentStatus());
return result.getModifiedCount();
}

public void updateTask(String chainTaskId, Update update) {
Expand Down Expand Up @@ -272,19 +314,23 @@ public Date getTaskFinalDeadline(String chainTaskId) {
}

public boolean isConsensusReached(ReplicatesList replicatesList) {
Optional<ChainTask> optional = iexecHubService.getChainTask(replicatesList.getChainTaskId());
if (optional.isEmpty()) {
final ChainTask chainTask = iexecHubService.getChainTask(replicatesList.getChainTaskId()).orElse(null);
if (chainTask == null) {
log.error("Consensus not reached, task not found on-chain [chainTaskId:{}]", replicatesList.getChainTaskId());
return false;
}

final ChainTask chainTask = optional.get();
boolean isChainTaskRevealing = chainTask.getStatus().equals(ChainTaskStatus.REVEALING);
boolean isChainTaskRevealing = chainTask.getStatus() == ChainTaskStatus.REVEALING;
if (!isChainTaskRevealing) {
log.debug("Consensus not reached, on-chain task is not in REVEALING status [chainTaskId:{}]",
replicatesList.getChainTaskId());
return false;
}

int onChainWinners = chainTask.getWinnerCounter();
int offChainWinners = replicatesList.getNbValidContributedWinners(chainTask.getConsensusValue());
log.debug("Returning off-chain and on-chain winners [offChainWinners:{}, onChainWinners:{}]",
offChainWinners, onChainWinners);
return offChainWinners >= onChainWinners;
}

Expand All @@ -295,4 +341,32 @@ public long getCompletedTasksCount() {
public long countByCurrentStatus(TaskStatus status) {
return taskRepository.countByCurrentStatus(status);
}

void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) {
currentTaskStatusesCount.get(previousStatus).decrementAndGet();
currentTaskStatusesCount.get(newStatus).incrementAndGet();
publishTaskStatusesCountUpdate();
}

@EventListener(TaskCreatedEvent.class)
void onTaskCreatedEvent() {
currentTaskStatusesCount.get(RECEIVED).incrementAndGet();
publishTaskStatusesCountUpdate();
}

private void publishTaskStatusesCountUpdate() {
// Copying the map here ensures the original values can't be updated from outside this class.
// As this data should be read only, no need for any atomic class.
final LinkedHashMap<TaskStatus, Long> currentTaskStatusesCountToPublish = currentTaskStatusesCount
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entrySet -> entrySet.getValue().get(),
(a, b) -> b,
LinkedHashMap::new
));
final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCountToPublish);
applicationEventPublisher.publishEvent(event);
}
}
90 changes: 6 additions & 84 deletions src/main/java/com/iexec/core/task/update/TaskUpdateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,15 @@
import com.iexec.core.task.event.*;
import com.iexec.core.worker.Worker;
import com.iexec.core.worker.WorkerService;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -59,7 +53,6 @@
@Service
@Slf4j
class TaskUpdateManager {
public static final String METRIC_TASKS_STATUSES_COUNT = "iexec.core.tasks.count";

private final TaskService taskService;
private final IexecHubService iexecHubService;
Expand All @@ -69,8 +62,6 @@ class TaskUpdateManager {
private final BlockchainAdapterService blockchainAdapterService;
private final SmsService smsService;

private final LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;

public TaskUpdateManager(TaskService taskService,
IexecHubService iexecHubService,
ReplicatesService replicatesService,
Expand All @@ -85,46 +76,6 @@ public TaskUpdateManager(TaskService taskService,
this.workerService = workerService;
this.blockchainAdapterService = blockchainAdapterService;
this.smsService = smsService;

this.currentTaskStatusesCount = Arrays.stream(TaskStatus.values())
.collect(Collectors.toMap(
Function.identity(),
status -> new AtomicLong(),
(a, b) -> b,
LinkedHashMap::new));

for (TaskStatus status : TaskStatus.values()) {
Gauge.builder(METRIC_TASKS_STATUSES_COUNT, () -> currentTaskStatusesCount.get(status).get())
.tags(
"period", "current",
"status", status.name()
).register(Metrics.globalRegistry);
}
}

@PostConstruct
Future<Void> init() {
final ExecutorService taskStatusesCountExecutor = Executors.newSingleThreadExecutor();
final Future<Void> future = taskStatusesCountExecutor.submit(
this::initializeCurrentTaskStatusesCount,
null // Trick to get a `Future<Void>` instead of a `Future<?>`
);
taskStatusesCountExecutor.shutdown();
return future;
}

/**
* The following could take a bit of time, depending on how many tasks are in DB.
* It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
* As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
* even though new deals are detected during the count.
*/
private void initializeCurrentTaskStatusesCount() {
currentTaskStatusesCount
.entrySet()
.parallelStream()
.forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey())));
publishTaskStatusesCountUpdate();
}

void updateTask(String chainTaskId) {
Expand Down Expand Up @@ -243,10 +194,9 @@ void updateTaskStatusAndSave(Task task, TaskStatus newStatus, ChainReceipt chain
* @param statusChanges List of changes
*/
void saveTask(Task task, TaskStatus currentStatus, List<TaskStatusChange> statusChanges) {
Optional<Task> savedTask = taskService.updateTaskStatus(task, statusChanges);
long updatedTaskCount = taskService.updateTaskStatus(task, currentStatus, statusChanges);
// `savedTask.isPresent()` should always be true if the task exists in the repository.
if (savedTask.isPresent()) {
updateMetricsAfterStatusUpdate(currentStatus, task.getCurrentStatus());
if (updatedTaskCount != 0L) {
log.info("UpdateTaskStatus succeeded [chainTaskId:{}, currentStatus:{}, newStatus:{}]",
task.getChainTaskId(), currentStatus, task.getCurrentStatus());
} else {
Expand Down Expand Up @@ -762,32 +712,4 @@ void emitError(Task task, TaskStatus expectedStatus, String methodName) {
log.error("Cannot initialize task [chainTaskId:{}, currentStatus:{}, expectedStatus:{}, method:{}]",
task.getChainTaskId(), task.getCurrentStatus(), expectedStatus, methodName);
}

void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) {
currentTaskStatusesCount.get(previousStatus).decrementAndGet();
currentTaskStatusesCount.get(newStatus).incrementAndGet();
publishTaskStatusesCountUpdate();
}

@EventListener(TaskCreatedEvent.class)
void onTaskCreatedEvent() {
currentTaskStatusesCount.get(RECEIVED).incrementAndGet();
publishTaskStatusesCountUpdate();
}

private void publishTaskStatusesCountUpdate() {
// Copying the map here ensures the original values can't be updated from outside this class.
// As this data should be read only, no need for any atomic class.
final LinkedHashMap<TaskStatus, Long> currentTaskStatusesCountToPublish = currentTaskStatusesCount
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entrySet -> entrySet.getValue().get(),
(a, b) -> b,
LinkedHashMap::new
));
final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCountToPublish);
applicationEventPublisher.publishEvent(event);
}
}
Loading