Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workarounds for blocking threads in IntelliJ libs #2203

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.concurrency;

import com.intellij.openapi.diagnostic.Logger;
import com.intellij.openapi.progress.ProcessCanceledException;
import com.intellij.openapi.util.LowMemoryWatcherManager;
import com.intellij.util.IncorrectOperationException;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;

import java.util.List;
import java.util.concurrent.*;
import java.util.function.BiConsumer;

import static com.intellij.util.concurrency.AppExecutorUtil.propagateContextOrCancellation;

/**
* A ThreadPoolExecutor which also implements {@link ScheduledExecutorService} by awaiting scheduled tasks in a separate thread
* and then executing them in the owned ThreadPoolExecutor.
* Unlike the existing {@link ScheduledThreadPoolExecutor}, this pool is unbounded.
* @see AppExecutorUtil#getAppScheduledExecutorService()
*/
@ApiStatus.Internal
public final class AppScheduledExecutorService extends SchedulingWrapper {
static final String POOLED_THREAD_PREFIX = "ApplicationImpl pooled thread ";
private final @NotNull String myName;
private final LowMemoryWatcherManager myLowMemoryWatcherManager;

private static final class Holder {
private static final AppScheduledExecutorService INSTANCE = new AppScheduledExecutorService("Global instance", 1, TimeUnit.MINUTES);
}

static @NotNull ScheduledExecutorService getInstance() {
return Holder.INSTANCE;
}

AppScheduledExecutorService(@NotNull String name, long keepAliveTime, @NotNull TimeUnit unit) {
super(new BackendThreadPoolExecutor(new MyThreadFactory(), keepAliveTime, unit), new AppDelayQueue());
myName = name;
myLowMemoryWatcherManager = new LowMemoryWatcherManager(this);
}

private MyThreadFactory getCountingThreadFactory() {
return (MyThreadFactory)((BackendThreadPoolExecutor)backendExecutorService).getThreadFactory();
}

private static final class MyThreadFactory extends CountingThreadFactory {
private BiConsumer<? super Thread, ? super Runnable> newThreadListener;
private final ThreadFactory myThreadFactory = Executors.privilegedThreadFactory();

@Override
public @NotNull Thread newThread(final @NotNull Runnable r) {
Thread thread = myThreadFactory.newThread(r);
thread.setDaemon(true);
thread.setName(POOLED_THREAD_PREFIX + counter.incrementAndGet());

thread.setPriority(Thread.NORM_PRIORITY - 1);

BiConsumer<? super Thread, ? super Runnable> listener = newThreadListener;
if (listener != null) {
listener.accept(thread, r);
}
return thread;
}

void setNewThreadListener(@NotNull BiConsumer<? super Thread, ? super Runnable> threadListener) {
if (newThreadListener != null) throw new IllegalStateException("Listener was already set: " + newThreadListener);
newThreadListener = threadListener;
}
}

public void setNewThreadListener(@NotNull BiConsumer<? super Thread, ? super Runnable> threadListener) {
getCountingThreadFactory().setNewThreadListener(threadListener);
}

@Override
public @NotNull List<Runnable> shutdownNow() {
return notAllowedMethodCall();
}

@Override
public void shutdown() {
notAllowedMethodCall();
}

static List<Runnable> notAllowedMethodCall() {
throw new IncorrectOperationException("You must not call this method on the global app pool");
}

@Override
void onDelayQueuePurgedOnShutdown() {
((BackendThreadPoolExecutor)backendExecutorService).superShutdown();
}

void shutdownAppScheduledExecutorService() {
// LowMemoryWatcher starts background threads so stop it now to avoid RejectedExecutionException
myLowMemoryWatcherManager.shutdown();
doShutdown();
delayQueue.shutdown(new SchedulingWrapper.MyScheduledFutureTask<Void>(()->{}, null, 0) {
@Override
boolean executeMeInBackendExecutor() {
set(null);
return false;
}
});
}

@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
// let this task to bubble through the AppDelayQueue global queue to make sure there are no in-flight tasks leaked on stack of com.intellij.util.concurrency.AppDelayQueue.transferrerThread
long deadline = System.nanoTime() + unit.toNanos(timeout);
if (!delayQueue.awaitTermination(deadline - System.nanoTime(), TimeUnit.NANOSECONDS)) {
return false;
}

return super.awaitTermination(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@TestOnly
public @NotNull String statistics() {
return myName + " threads created counter = " + getCountingThreadFactory().counter;
}

@TestOnly
public String dumpQueue() {
return delayQueue.toString();
}

public int getBackendPoolExecutorSize() {
return ((BackendThreadPoolExecutor)backendExecutorService).getPoolSize();
}

@TestOnly
void setBackendPoolCorePoolSize(int size) {
((BackendThreadPoolExecutor)backendExecutorService).superSetCorePoolSize(size);
}

static final class BackendThreadPoolExecutor extends ThreadPoolExecutor {

BackendThreadPoolExecutor(@NotNull ThreadFactory factory,
long keepAliveTime,
@NotNull TimeUnit unit) {
super(1, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<>(), factory);
}

@Override
public void execute(@NotNull Runnable command) {
super.execute(capturePropagationAndCancellationContext(command));
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return newTaskFor(Executors.callable(runnable, value));
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return capturePropagationAndCancellationContext(callable);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null && !(t instanceof ProcessCanceledException)) {
Logger.getInstance(SchedulingWrapper.class).error("Worker exited due to exception", t);
}
}

private void superShutdown() {
super.shutdown();
}

private @NotNull List<Runnable> superShutdownNow() {
return super.shutdownNow();
}

// stub out sensitive methods
@Override
public void shutdown() {
notAllowedMethodCall();
}

@Override
public @NotNull List<Runnable> shutdownNow() {
return notAllowedMethodCall();
}

@Override
public void setCorePoolSize(int corePoolSize) {
notAllowedMethodCall();
}

private void superSetCorePoolSize(int corePoolSize) {
super.setCorePoolSize(corePoolSize);
}

@Override
public void allowCoreThreadTimeOut(boolean value) {
notAllowedMethodCall();
}

@Override
public void setMaximumPoolSize(int maximumPoolSize) {
notAllowedMethodCall();
}

@Override
public void setKeepAliveTime(long time, TimeUnit unit) {
notAllowedMethodCall();
}

void superSetKeepAliveTime(long time, TimeUnit unit) {
super.setKeepAliveTime(time, unit);
}

@Override
public void setThreadFactory(@NotNull ThreadFactory threadFactory) {
notAllowedMethodCall();
}
}

public static @NotNull Thread getPeriodicTasksThread() {
return Holder.INSTANCE.delayQueue.getThread();
}

@TestOnly
void waitForLowMemoryWatcherManagerInit(int timeout, @NotNull TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
myLowMemoryWatcherManager.waitForInitComplete(timeout, unit);
}

public static @NotNull Runnable capturePropagationAndCancellationContext(@NotNull Runnable command) {
if (!propagateContextOrCancellation()) {
return command;
}
return Propagation.capturePropagationAndCancellationContext(command);
}

public static <T> @NotNull FutureTask<T> capturePropagationAndCancellationContext(@NotNull Callable<T> callable) {
if (!propagateContextOrCancellation()) {
return new FutureTask<>(callable);
}
return Propagation.capturePropagationAndCancellationContext(callable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.google.devtools.ksp.processing.kspJvmArgParserHelp
import java.io.File
import java.net.URLClassLoader
import java.util.ServiceLoader
import kotlin.system.exitProcess

class KSPJvmMain {
companion object {
Expand Down Expand Up @@ -45,5 +46,6 @@ internal fun runWithArgs(args: Array<String>, parse: (Array<String>) -> Pair<KSP
)
.toList() as List<SymbolProcessorProvider>

KotlinSymbolProcessing(config, processorProviders, logger).execute()
val exitCode = KotlinSymbolProcessing(config, processorProviders, logger).execute()
exitProcess(exitCode.code)
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class KotlinSymbolProcessing(
val logger: KSPLogger
) {
enum class ExitCode(
@Suppress("UNUSED_PARAMETER") code: Int
val code: Int
) {
OK(0),

Expand Down
Loading