Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: ThreadWorkerPool支持newVirtualThreadPerTaskExecutor方式使用虚拟线程 #71

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.tencent.trpc.core.management;

/*
* Tencent is pleased to support the open source community by making tRPC available.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

帮加在package上面吧

*
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* All rights reserved.
*
* If you have downloaded a copy of the tRPC source code from Tencent,
* please note that tRPC source code is licensed under the Apache 2.0 License,
* A copy of the Apache 2.0 License can be found in the LICENSE file.
*/

import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class AbstractThreadPoolMXBean implements ThreadPoolMXBean {

private static final AtomicInteger threadPoolIndex = new AtomicInteger(1);

private final String objectName;

public AbstractThreadPoolMXBean() {
this.objectName = WorkerPoolType.THREAD.getName() + BAR + threadPoolIndex.getAndIncrement();
}

@Override
public String getType() {
return WorkerPoolType.THREAD.getName();
}

@Override
public ObjectName getObjectName() {
try {
return new ObjectName(WORKER_POOL_MXBEAN_DOMAIN_TYPE + ",name=" + objectName);
} catch (MalformedObjectNameException e) {
throw new IllegalArgumentException(e);

Check warning on line 38 in trpc-core/src/main/java/com/tencent/trpc/core/management/AbstractThreadPoolMXBean.java

View check run for this annotation

Codecov / codecov/patch

trpc-core/src/main/java/com/tencent/trpc/core/management/AbstractThreadPoolMXBean.java#L37-L38

Added lines #L37 - L38 were not covered by tests
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.tencent.trpc.core.management;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

五个新文件都需要加一下版权说明


import com.tencent.trpc.core.logger.Logger;
import com.tencent.trpc.core.logger.LoggerFactory;

/**
* Implementation of ThreadPoolMXBean for ThreadPerTaskExecutor using ThreadPerTaskExecutorWrapper.
* <p>
* JEP 444 recommends using JFR to monitor virtual threads.
*/
public class ThreadPerTaskExecutorMXBeanImpl extends AbstractThreadPoolMXBean {

protected static final Logger logger = LoggerFactory.getLogger(ThreadPerTaskExecutorMXBeanImpl.class);

private final ThreadPerTaskExecutorWrapper wrapper;

public ThreadPerTaskExecutorMXBeanImpl(ThreadPerTaskExecutorWrapper wrapper) {
this.wrapper = wrapper;
}

private long totalTaskCount() {
return wrapper.getSubmittedTaskCount();
}

private long completedTaskCount() {
return wrapper.getCompletedTaskCount();
}

private long executingTaskCount() {
return totalTaskCount() - completedTaskCount();
}

@Override
public long getTaskCount() {
return executingTaskCount();
}

@Override
public long getCompletedTaskCount() {
return completedTaskCount();
}

@Override
public int getCorePoolSize() {
return 0;
}

@Override
public int getMaximumPoolSize() {
return Integer.MAX_VALUE;
}

@Override
public int getPoolSize() {
return (int) executingTaskCount();
}

@Override
public int getActiveThreadCount() {
return (int) executingTaskCount();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package com.tencent.trpc.core.management;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class ThreadPerTaskExecutorWrapper implements ExecutorService {
private final ExecutorService executorService;
private final AtomicLong submittedTaskCount = new AtomicLong();
private final AtomicLong completedTaskCount = new AtomicLong();

private ThreadPerTaskExecutorWrapper(ExecutorService executorService) {
this.executorService = executorService;
}

public static ThreadPerTaskExecutorWrapper wrap(ExecutorService executorService) {
return new ThreadPerTaskExecutorWrapper(executorService);
}

public long getSubmittedTaskCount() {
return submittedTaskCount.get();
}

public long getCompletedTaskCount() {
return completedTaskCount.get();
}

private TaskCountingRunnable wrap(Runnable task) {

Check warning on line 35 in trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 所有重载方法应彼此相邻放置。上一个重载方法位于 '23' 行。 Raw Output: /github/workspace/./trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java:35:5: warning: 所有重载方法应彼此相邻放置。上一个重载方法位于 '23' 行。 (com.puppycrawl.tools.checkstyle.checks.coding.OverloadMethodsDeclarationOrderCheck)
return new TaskCountingRunnable(task);
}

private <T> TaskCountingCallable<T> wrap(Callable<T> task) {
return new TaskCountingCallable<>(task);
}

private class TaskCountingRunnable implements Runnable {
private final Runnable task;

public TaskCountingRunnable(Runnable task) {
this.task = task;
}

@Override
public void run() {
submittedTaskCount.incrementAndGet();
try {
task.run();
} finally {
completedTaskCount.incrementAndGet();
}
}
}

private class TaskCountingCallable<T> implements Callable<T> {
private final Callable<T> task;

public TaskCountingCallable(Callable<T> task) {
this.task = task;
}

@Override
public T call() throws Exception {
submittedTaskCount.incrementAndGet();
try {
return task.call();
} finally {
completedTaskCount.incrementAndGet();
}
}
}

@Override
public void shutdown() {
executorService.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}

@Override
public boolean isShutdown() {
return executorService.isShutdown();
}

@Override
public boolean isTerminated() {
return executorService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executorService.submit(wrap(task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return executorService.submit(wrap(task), result);
}

@Override
public Future<?> submit(Runnable task) {
return executorService.submit(wrap(task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
List<TaskCountingCallable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toList());
return executorService.invokeAll(wrappedTasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {

Check warning on line 128 in trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 本行字符数 140个,最多:120个。 Raw Output: /github/workspace/./trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java:128:0: warning: 本行字符数 140个,最多:120个。 (com.puppycrawl.tools.checkstyle.checks.sizes.LineLengthCheck)
List<TaskCountingCallable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toList());
return executorService.invokeAll(wrappedTasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
List<TaskCountingCallable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toList());
return executorService.invokeAny(wrappedTasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

Check warning on line 144 in trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 本行字符数 164个,最多:120个。 Raw Output: /github/workspace/./trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java:144:0: warning: 本行字符数 164个,最多:120个。 (com.puppycrawl.tools.checkstyle.checks.sizes.LineLengthCheck)
List<TaskCountingCallable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toList());
return executorService.invokeAny(wrappedTasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
executorService.execute(wrap(command));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,14 @@

import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;

public class ThreadPoolMXBeanImpl implements ThreadPoolMXBean {
public class ThreadPoolMXBeanImpl extends AbstractThreadPoolMXBean {

private static final AtomicInteger threadPoolIndex = new AtomicInteger(1);

private final ThreadPoolExecutor threadPool;

private final String objectName;

public ThreadPoolMXBeanImpl(ThreadPoolExecutor threadPool) {
this.threadPool = Objects.requireNonNull(threadPool, "threadPool is null");
this.objectName = WorkerPoolType.THREAD.getName() + BAR + threadPoolIndex.getAndIncrement();
}

@Override
public String getType() {
return WorkerPoolType.THREAD.getName();
}

@Override
Expand Down Expand Up @@ -65,13 +53,5 @@ public int getMaximumPoolSize() {
return threadPool.getMaximumPoolSize();
}

@Override
public ObjectName getObjectName() {
try {
return new ObjectName(WORKER_POOL_MXBEAN_DOMAIN_TYPE + ",name=" + objectName);
} catch (MalformedObjectNameException e) {
throw new IllegalArgumentException(e);
}
}

}
Loading
Loading