Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x] - Remove non-virtual executor support #7324

Merged
merged 4 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
package io.helidon.common.configurable;

import java.lang.System.Logger.Level;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -59,10 +57,6 @@ class ObserverManager {

private static final Map<ExecutorService, SupplierInfo> EXECUTOR_SERVICES = new ConcurrentHashMap<>();

// Defer building list until use to avoid loading problems if this JDK does not support ThreadPerTaskExecutor.
private static final LazyValue<List<ExecutorServiceSupplierObserver.MethodInvocation>> METRICS_RELATED_METHOD_INVOCATIONS =
LazyValue.create(() -> List.of(
MethodInvocationImpl.create("Thread count", "thread-count", "threadCount")));

private ObserverManager() {
}
Expand All @@ -73,33 +67,17 @@ private ObserverManager() {
* @param supplier the supplier of {@code ExecutorService} instances
* @param supplierCategory category of the supplier (e.g., scheduled, server)
* @param executorServiceCategory category of executor services the supplier creates (e.g., ad-hoc)
* @param useVirtualThreads whether virtual threads should be used
*/
static void registerSupplier(Supplier<? extends ExecutorService> supplier,
String supplierCategory,
String executorServiceCategory,
boolean useVirtualThreads) {
String executorServiceCategory) {
int supplierIndex = SUPPLIER_CATEGORY_NEXT_INDEX_VALUES.computeIfAbsent(supplierCategory, key -> new AtomicInteger())
.getAndIncrement();
SUPPLIERS.computeIfAbsent(supplier,
s -> SupplierInfo.create(s,
executorServiceCategory,
supplierCategory,
supplierIndex,
useVirtualThreads));
}

/**
* Registers a supplier which will never use thread-per-task thread pools.
*
* @param supplier the supplier of {@code ExecutorService} instances
* @param supplierCategory category of the supplier (e.g., server, scheduled)
* @param executorServiceCategory category of thread pools which the supplier provides
*/
static void registerSupplier(Supplier<? extends ExecutorService> supplier,
String supplierCategory,
String executorServiceCategory) {
registerSupplier(supplier, supplierCategory, executorServiceCategory, false);
supplierIndex));
}

/**
Expand Down Expand Up @@ -152,44 +130,31 @@ private static class SupplierInfo {
private final String executorServiceCategory;
private final String supplierCategory;
private final int supplierIndex;
private final boolean useVirtualThreads;
private final AtomicInteger nextThreadPoolIndex = new AtomicInteger(0);
private final List<ExecutorServiceSupplierObserver.SupplierObserverContext> observerContexts;

private static SupplierInfo create(Supplier<? extends ExecutorService> supplier,
String executorServiceCategory,
String supplierCategory,
int supplierIndex,
boolean useVirtualThreads) {
return new SupplierInfo(supplier, supplierCategory, executorServiceCategory, supplierIndex, useVirtualThreads);
int supplierIndex) {
return new SupplierInfo(supplier, supplierCategory, executorServiceCategory, supplierIndex);
}

private SupplierInfo(Supplier<? extends ExecutorService> supplier,
String supplierCategory,
String executorServiceCategory,
int supplierIndex,
boolean useVirtualThreads) {
int supplierIndex) {
this.supplier = supplier;
this.supplierCategory = supplierCategory;
this.executorServiceCategory = executorServiceCategory;
this.supplierIndex = supplierIndex;
this.useVirtualThreads = useVirtualThreads;
observerContexts = collectObserverContexts();
}

private List<ExecutorServiceSupplierObserver.SupplierObserverContext> collectObserverContexts() {
return OBSERVERS.get()
.stream()
.map(observer ->
useVirtualThreads
? observer.registerSupplier(supplier,
supplierIndex,
supplierCategory,
METRICS_RELATED_METHOD_INVOCATIONS.get())
: observer.registerSupplier(supplier,
supplierIndex,
supplierCategory))

.map(this::apply)
.collect(Collectors.toList());
}

Expand All @@ -205,57 +170,11 @@ void unregisterExecutorService(ExecutorService executorService) {
.forEach(observer -> observer.unregisterExecutorService(executorService));
EXECUTOR_SERVICES.remove(executorService);
}
}

/**
* Encapsulation of information needed to invoke methods on {@code ThreadPerTaskExecutor} and to create metrics from the
* returned values.
*/
private static class MethodInvocationImpl implements ExecutorServiceSupplierObserver.MethodInvocation {
private final String displayName;
private final String description;
private final Method method;
private final Class<?> type;

private static final LazyValue<ExecutorService> VIRTUAL_EXECUTOR_SERVICE = LazyValue
.create(Executors::newVirtualThreadPerTaskExecutor);

static MethodInvocationImpl create(String displayName, String description, String methodName) {
ExecutorService executorService = VIRTUAL_EXECUTOR_SERVICE.get();
Method method = null;
try {
method = executorService.getClass().getDeclaredMethod(methodName);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
return new MethodInvocationImpl(displayName, description, method);
}

MethodInvocationImpl(String displayName, String description, Method method) {
this.displayName = displayName;
this.description = description;
this.method = method;
this.type = method.getReturnType();
}

@Override
public String displayName() {
return displayName;
}

@Override
public String description() {
return description;
}

@Override
public Method method() {
return method;
}

@Override
public Class<?> type() {
return type;
private ExecutorServiceSupplierObserver.SupplierObserverContext apply(ExecutorServiceSupplierObserver observer) {
return observer.registerSupplier(supplier,
supplierIndex,
supplierCategory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private ThreadPoolSupplier(Builder builder) {
this.growthRate = builder.growthRate;
this.rejectionHandler = builder.rejectionHandler == null ? DEFAULT_REJECTION_POLICY : builder.rejectionHandler;
this.useVirtualThreads = builder.useVirtualThreads;
ObserverManager.registerSupplier(this, name, "general", useVirtualThreads);
ObserverManager.registerSupplier(this, name, "general");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,8 +15,6 @@
*/
package io.helidon.common.configurable.spi;

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

Expand Down Expand Up @@ -53,23 +51,6 @@ SupplierObserverContext registerSupplier(Supplier<? extends ExecutorService> sup
int supplierIndex,
String supplierCategory);

/**
* Makes a supplier known to the observer and returns a supplier context for the supplier to use for future interactions
* with the observer, using the {@link io.helidon.common.configurable.spi.ExecutorServiceSupplierObserver.MethodInvocation}
* abstraction for invoking methods to obtain metric values.
*
* @param supplier the executor service supplier registering with the observer
* @param supplierIndex unique index across suppliers with the same name
* @param supplierCategory the category of supplier registering (e.g., scheduled, server, thread-pool)
* @param methodInvocations method invocation information for retrieving interesting information from the supplier's
* executor services
*
* @return the {@code SupplierObserverContext} for the supplier
*/
SupplierObserverContext registerSupplier(Supplier<? extends ExecutorService> supplier,
int supplierIndex,
String supplierCategory,
List<MethodInvocation> methodInvocations);

/**
* Context with which suppliers (or their surrogates) interact with observers.
Expand All @@ -91,41 +72,4 @@ interface SupplierObserverContext {
*/
void unregisterExecutorService(ExecutorService executorService);
}

/**
* Information about method invocations to retrieve interesting (e.g., metrics) values from an executor service.
* <p>
* Used for dealing with {@code ThreadPerTaskExecutor} executor services which might not exist in every JDK we support.
* </p>
*/
interface MethodInvocation {

/**
* Returns a displayable name for the value.
*
* @return display name for the value
*/
String displayName();

/**
* Returns a brief description of the interesting value.
*
* @return description
*/
String description();

/**
* Returns the method to invoke to retrieve the value.
*
* @return {@code Method} which returns the value.
*/
Method method();

/**
* Returns the data type of the interesting value.
*
* @return the type
*/
Class<?> type();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,14 +49,6 @@ public SupplierObserverContext registerSupplier(Supplier<? extends ExecutorServi
return supplierInfo.context;
}

@Override
public SupplierObserverContext registerSupplier(Supplier<? extends ExecutorService> supplier,
int supplierIndex,
String supplierCategory,
List<MethodInvocation> methodInvocations) {
return registerSupplier(supplier, supplierIndex, supplierCategory);
}

Map<Supplier<? extends ExecutorService>, SupplierInfo> suppliers() {
return suppliers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.helidon.metrics;

import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -78,18 +77,6 @@ public SupplierObserverContext registerSupplier(Supplier<? extends ExecutorServi
return supplierInfo.context();
}

@Override
public SupplierObserverContext registerSupplier(Supplier<? extends ExecutorService> supplier,
int supplierIndex,
String supplierCategory,
List<MethodInvocation> methodInvocations) {
SupplierInfo supplierInfo = new SupplierInfoWithMethods(supplierCategory,
supplierIndex,
methodInvocations);
LOGGER.log(Level.FINE, () -> String.format("Metrics thread pool supplier registration: %s", supplierInfo));
return supplierInfo.context();
}

private class MetricsObserverContext implements ExecutorServiceSupplierObserver.SupplierObserverContext {

private final SupplierInfo supplierInfo;
Expand All @@ -107,10 +94,8 @@ public void registerExecutorService(ExecutorService executorService, int index)
supplierInfo.supplierCategory(),
supplierInfo.supplierIndex()));

if (executorService instanceof ThreadPoolExecutor) {
registerMetrics((ThreadPoolExecutor) executorService, index);
} else if (supplierInfo instanceof SupplierInfoWithMethods) {
registerMetrics(executorService, ((SupplierInfoWithMethods) supplierInfo).methodInvocations(), index);
if (executorService instanceof ThreadPoolExecutor tpe) {
registerMetrics(tpe, index);
}
}

Expand All @@ -124,27 +109,6 @@ private void registerMetrics(ThreadPoolExecutor threadPoolExecutor, int index) {
metricsIDs));
}

private void registerMetrics(ExecutorService executorService, List<MethodInvocation> methodInvocations, int index) {
methodInvocations.forEach(mi -> {
Metadata metadata = Metadata.builder()
.withName(METRIC_NAME_PREFIX + mi.displayName())
.withDescription(mi.description())
.withUnit(MetricUnits.NONE)
.build();
Tag[] tags = tags(supplierInfo.supplierCategory(),
supplierInfo.supplierIndex(),
index);
registry.get().gauge(metadata, () -> {
try {
return (Number) mi.method().invoke(executorService);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}, tags);
metricsIDs.add(new MetricID(metadata.getName(), tags));
});
}

@Override
public void unregisterExecutorService(ExecutorService executorService) {
metricsIDs.forEach(metricID -> registry.get().remove(metricID));
Expand Down Expand Up @@ -187,38 +151,6 @@ public String toString() {
}
}

/**
* Information about an executor service supplier that provides its own mechanisms for retrieving metrics.
* <p>
* This is for supporting Loom's {@code ThreadPerTaskExecutorService} when it is available without requiring it to be
* present at compile or runtime.
* </p>
*/
private class SupplierInfoWithMethods extends SupplierInfo {

private final List<MethodInvocation> methodInvocations;

private SupplierInfoWithMethods(String supplierCategory,
int supplierIndex,
List<MethodInvocation> methodInvocations) {
super(supplierCategory, supplierIndex);
this.methodInvocations = methodInvocations;
}

private List<MethodInvocation> methodInvocations() {
return methodInvocations;
}

@Override
public String toString() {
return new StringJoiner(", ", SupplierInfoWithMethods.class.getSimpleName() + "[", "]")
.add("supplierCategory='" + supplierCategory() + "'")
.add("supplierIndex=" + supplierIndex())
.add("methodInvocations=" + methodInvocations)
.toString();
}
}

private static class MetadataTemplates {

private static final Metadata ACTIVE_COUNT_METADATA = Metadata.builder()
Expand Down