Skip to content

Commit

Permalink
Merge branch 'main' into upgrade_to_powsybl_dependencies_2024.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
FranckLecuyer authored Jan 8, 2025
2 parents 52670ce + 8acdae0 commit f6ea175
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 104 deletions.
11 changes: 5 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<groupId>org.gridsuite</groupId>
<artifactId>gridsuite-network-modification-server</artifactId>
<version>2.11.0-SNAPSHOT</version>
<version>2.12.0-SNAPSHOT</version>

<packaging>jar</packaging>
<name>Network modification server</name>
Expand Down Expand Up @@ -159,6 +159,10 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!-- elasticsearch -->
<dependency>
Expand All @@ -173,11 +177,6 @@
<artifactId>powsybl-config-classic</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package org.gridsuite.modification.server;

/**
* @author Joris Mancini <joris.mancini_externe at rte-france.com>
*/
public class BuildException extends RuntimeException {
public BuildException(String message, Throwable e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import com.powsybl.iidm.network.Network;
import com.powsybl.network.store.client.NetworkStoreService;
import com.powsybl.network.store.client.PreloadingStrategy;

import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -30,6 +28,7 @@
import org.gridsuite.modification.server.elasticsearch.EquipmentInfosService;
import org.gridsuite.modification.server.impacts.AbstractBaseImpact;
import org.gridsuite.modification.server.service.FilterService;
import org.gridsuite.modification.server.service.LargeNetworkModificationExecutionService;
import org.gridsuite.modification.server.service.NetworkModificationObserver;
import org.gridsuite.modification.server.service.ReportService;
import org.slf4j.Logger;
Expand All @@ -39,9 +38,6 @@

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Slimane Amar <slimane.amar at rte-france.com>
Expand All @@ -58,7 +54,7 @@ public class NetworkModificationApplicator {

@Getter private final FilterService filterService;

private final ExecutorService applicationExecutor;
private final LargeNetworkModificationExecutionService largeNetworkModificationExecutionService;

private final NetworkModificationObserver networkModificationObserver;

Expand All @@ -68,18 +64,18 @@ public class NetworkModificationApplicator {

public NetworkModificationApplicator(NetworkStoreService networkStoreService, EquipmentInfosService equipmentInfosService,
ReportService reportService, FilterService filterService,
@Value("${max-large-concurrent-applications}") int maxConcurrentApplications,
NetworkModificationObserver networkModificationObserver) {
NetworkModificationObserver networkModificationObserver,
LargeNetworkModificationExecutionService largeNetworkModificationExecutionService) {
this.networkStoreService = networkStoreService;
this.equipmentInfosService = equipmentInfosService;
this.reportService = reportService;
this.filterService = filterService;
this.applicationExecutor = Executors.newFixedThreadPool(maxConcurrentApplications);
this.networkModificationObserver = networkModificationObserver;
this.largeNetworkModificationExecutionService = largeNetworkModificationExecutionService;

}

/* This method is used when creating, inserting, moving or duplicating modifications
/* This method is used for incremental modifications
* Since there is no queue for these operations and they can be memory consuming when the preloading strategy is large
* (for example for VOLTAGE_INIT_MODIFICATION),
* we limit the number of concurrent applications of these modifications to avoid out of memory issues.
Expand All @@ -97,15 +93,16 @@ public NetworkModificationResult applyModifications(List<ModificationInfos> modi
.map(ModificationType::getStrategy)
.orElse(PreloadingStrategy.NONE);
if (preloadingStrategy == PreloadingStrategy.ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW) {
CompletableFuture<NetworkModificationResult> future = CompletableFuture.supplyAsync(() -> processApplication(modificationInfosList, networkInfos, reportInfos), applicationExecutor);
return future.join();
return largeNetworkModificationExecutionService
.supplyAsync(() -> apply(modificationInfosList, networkInfos, reportInfos))
.join();
} else {
return processApplication(modificationInfosList, networkInfos, reportInfos);
return apply(modificationInfosList, networkInfos, reportInfos);
}
}

// used for creating, inserting, moving or duplicating modifications
private NetworkModificationResult processApplication(List<ModificationInfos> modificationInfosList, NetworkInfos networkInfos, ReportInfos reportInfos) {
// This method is used for incremental modifications
private NetworkModificationResult apply(List<ModificationInfos> modificationInfosList, NetworkInfos networkInfos, ReportInfos reportInfos) {
NetworkStoreListener listener = NetworkStoreListener.create(networkInfos.getNetwork(), networkInfos.getNetworkUuuid(), networkStoreService, equipmentInfosService, collectionThreshold);
ApplicationStatus groupApplicationStatus = apply(modificationInfosList, listener.getNetwork(), reportInfos);
List<AbstractBaseImpact> networkImpacts = listener.flushNetworkModifications();
Expand Down Expand Up @@ -134,15 +131,16 @@ public NetworkModificationResult applyModifications(List<Pair<ReportInfos, List<
.map(ModificationType::getStrategy)
.orElse(PreloadingStrategy.NONE);
if (preloadingStrategy == PreloadingStrategy.ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW) {
CompletableFuture<NetworkModificationResult> future = CompletableFuture.supplyAsync(() -> processApplication(modificationInfosGroups, networkInfos), applicationExecutor);
return future.join();
return largeNetworkModificationExecutionService
.supplyAsync(() -> apply(modificationInfosGroups, networkInfos))
.join();
} else {
return processApplication(modificationInfosGroups, networkInfos);
return apply(modificationInfosGroups, networkInfos);
}
}

// used for building a variant
private NetworkModificationResult processApplication(List<Pair<ReportInfos, List<ModificationInfos>>> modificationInfosGroups, NetworkInfos networkInfos) {
// This method is used when building a variant
private NetworkModificationResult apply(List<Pair<ReportInfos, List<ModificationInfos>>> modificationInfosGroups, NetworkInfos networkInfos) {
NetworkStoreListener listener = NetworkStoreListener.create(networkInfos.getNetwork(), networkInfos.getNetworkUuuid(), networkStoreService, equipmentInfosService, collectionThreshold);
List<ApplicationStatus> groupsApplicationStatuses =
modificationInfosGroups.stream()
Expand Down Expand Up @@ -233,9 +231,4 @@ public static ApplicationStatus getApplicationStatus(ReportNode reportNode) {
throw new IllegalArgumentException(String.format("Report severity '%s' unknown !", severity.getValue()));
}
}

@PreDestroy
public void shutdown() {
applicationExecutor.shutdown();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import lombok.NonNull;
import org.gridsuite.modification.server.BuildException;
import org.gridsuite.modification.server.dto.BuildInfos;
import org.gridsuite.modification.server.dto.NetworkModificationResult;
import org.slf4j.Logger;
Expand Down Expand Up @@ -46,8 +47,6 @@ public class BuildWorkerService {

private final BuildStoppedPublisherService stoppedPublisherService;

private final BuildFailedPublisherService failedPublisherService;

private final Map<String, CompletableFuture<NetworkModificationResult>> futures = new ConcurrentHashMap<>();

private final Map<String, BuildCancelContext> cancelBuildRequests = new ConcurrentHashMap<>();
Expand All @@ -61,12 +60,10 @@ public class BuildWorkerService {

public BuildWorkerService(@NonNull NetworkModificationService networkModificationService,
@NonNull ObjectMapper objectMapper,
@NonNull BuildStoppedPublisherService stoppedPublisherService,
@NonNull BuildFailedPublisherService failedPublisherService) {
@NonNull BuildStoppedPublisherService stoppedPublisherService) {
this.networkModificationService = networkModificationService;
this.objectMapper = objectMapper;
this.stoppedPublisherService = stoppedPublisherService;
this.failedPublisherService = failedPublisherService;
}

private CompletableFuture<NetworkModificationResult> execBuildVariant(BuildExecContext execContext, BuildInfos buildInfos) {
Expand Down Expand Up @@ -98,11 +95,11 @@ private CompletableFuture<NetworkModificationResult> execBuildVariant(BuildExecC
@Bean
public Consumer<Message<String>> consumeBuild() {
return message -> {
BuildExecContext execContext = null;
BuildExecContext execContext;
try {
execContext = BuildExecContext.fromMessage(message, objectMapper);
} catch (Exception e) {
LOGGER.error("Error retrieving message in consumeBuild", e);
throw new BuildException("Failed to read build message", e);
}
startBuild(Objects.requireNonNull(execContext));
};
Expand All @@ -113,7 +110,7 @@ private void startBuild(BuildExecContext execContext) {
BuildInfos buildInfos = execContext.getBuildInfos();
CompletableFuture<NetworkModificationResult> future = execBuildVariant(execContext, buildInfos);
NetworkModificationResult result;
if (future != null && (result = future.get()) != null) { // result available
if (future != null && (result = future.join()) != null) { // result available
notificationService.emitBuildResultMessage(result, execContext.getReceiver());
LOGGER.info("Build complete on node '{}'", execContext.getReceiver());
} else { // result not available : stop build request
Expand All @@ -123,13 +120,8 @@ private void startBuild(BuildExecContext execContext) {
}
} catch (CancellationException e) {
stoppedPublisherService.publishCancel(execContext.getReceiver(), CANCEL_MESSAGE);
} catch (InterruptedException e) {
LOGGER.error(FAIL_MESSAGE, e);
failedPublisherService.publishFail(execContext.getReceiver(), FAIL_MESSAGE + " : " + e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.error(FAIL_MESSAGE, e);
failedPublisherService.publishFail(execContext.getReceiver(), FAIL_MESSAGE + " : " + e.getMessage());
throw new BuildException("Node build failed", e);
} finally {
futures.remove(execContext.getReceiver());
cancelBuildRequests.remove(execContext.getReceiver());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright (c) 2024, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package org.gridsuite.modification.server.service;

import jakarta.annotation.PreDestroy;
import lombok.NonNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Supplier;

/**
* @author Slimane Amar <slimane.amar at rte-france.com>
*/
@Service
public class LargeNetworkModificationExecutionService {

private ThreadPoolExecutor executorService;

public LargeNetworkModificationExecutionService(@Value("${max-large-concurrent-applications}") int maxConcurrentLargeModifications,
@NonNull NetworkModificationObserver networkModificationObserver) {
executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxConcurrentLargeModifications);
networkModificationObserver.createThreadPoolMetric(executorService);
}

@PreDestroy
private void preDestroy() {
executorService.shutdown();
}

public <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return CompletableFuture.supplyAsync(supplier, executorService);
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package org.gridsuite.modification.server.service;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import lombok.NonNull;
import org.gridsuite.modification.ModificationType;
import org.springframework.stereotype.Service;

import java.util.concurrent.ThreadPoolExecutor;

@Service
public class NetworkModificationObserver {
protected static final String OBSERVATION_PREFIX = "app.network-modification.";
protected static final String MODIFICATION_TYPE_TAG_NAME = "modification_type";
private static final String OBSERVATION_PREFIX = "app.network-modification.";
private static final String MODIFICATION_TYPE_TAG_NAME = "modification_type";

private static final String TASK_TYPE_TAG_NAME = "type";
private static final String TASK_TYPE_TAG_VALUE_CURRENT = "current";
private static final String TASK_TYPE_TAG_VALUE_PENDING = "pending";
private static final String TASK_POOL_METER_NAME_PREFIX = OBSERVATION_PREFIX + "tasks.pool.";

private final ObservationRegistry observationRegistry;
private final MeterRegistry meterRegistry;

public NetworkModificationObserver(@NonNull ObservationRegistry observationRegistry) {
public NetworkModificationObserver(@NonNull ObservationRegistry observationRegistry, @NonNull MeterRegistry meterRegistry) {
this.observationRegistry = observationRegistry;
this.meterRegistry = meterRegistry;
}

public <E extends Throwable> void observe(String name, ModificationType modificationType, Observation.CheckedRunnable<E> runnable) throws E {
Expand All @@ -26,4 +37,14 @@ private Observation createObservation(String name, ModificationType modification
.lowCardinalityKeyValue(MODIFICATION_TYPE_TAG_NAME, modificationType.name());
}

public void createThreadPoolMetric(ThreadPoolExecutor threadPoolExecutor) {
Gauge.builder(TASK_POOL_METER_NAME_PREFIX + TASK_TYPE_TAG_VALUE_CURRENT, threadPoolExecutor, ThreadPoolExecutor::getActiveCount)
.description("The number of active large network modification tasks in the thread pool")
.tag(TASK_TYPE_TAG_NAME, TASK_TYPE_TAG_VALUE_CURRENT)
.register(meterRegistry);
Gauge.builder(TASK_POOL_METER_NAME_PREFIX + TASK_TYPE_TAG_VALUE_PENDING, threadPoolExecutor, executor -> executor.getQueue().size())
.description("The number of pending large network modification tasks in the thread pool")
.tag(TASK_TYPE_TAG_NAME, TASK_TYPE_TAG_VALUE_PENDING)
.register(meterRegistry);
}
}
16 changes: 13 additions & 3 deletions src/main/resources/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ spring:
group: buildGroup
consumer:
concurrency: 2
max-attempts: 1
publishBuild-out-0:
destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.run
publishResultBuild-out-0:
Expand All @@ -31,9 +32,18 @@ spring:
destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.cancel
publishStoppedBuild-out-0:
destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.stopped
publishFailedBuild-out-0:
destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.failed
output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0;publishFailedBuild-out-0
output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0
rabbit:
bindings:
consumeBuild-in-0:
consumer:
auto-bind-dlq: true
dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx
dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx.dlq
dead-letter-exchange-type: topic
quorum:
enabled: true
delivery-limit: 2

powsybl-ws:
database:
Expand Down
Loading

0 comments on commit f6ea175

Please sign in to comment.