-
Notifications
You must be signed in to change notification settings - Fork 2
ThreadPool 最佳线程数线程池 & 链路追踪异步trace & 定时器
- Executors.newCachedThreadPool(); 没有queue缓冲,一直new thread执行。当CPU负载高时,更多的thread会造成更多的线程上下文切换损耗和内存占用损耗,而内存占用过多又可能会引发GC,导致性急速下降。
- Executors.newFixedThreadPool(16); 执行的thread数量固定,但当thread执行任务的等待时间(IO Wait / Blocked Time)过长时会造成吞吐量下降。当堆积的任务过多时,无界的LinkedBlockingQueue可能会引发OOM。
- new ThreadPoolExecutor(nThreads, n在Threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10000)); 有界的LinkedBlockingQueue可以避免OOM,但当thread执行任务的等待时间(IO Wait / Blocked Time)过长时吞吐量下降的情况避免不了。
最佳线程数=CPU 线程数 * (1 + CPU 等待时间 / CPU 执行时间),由于执行任务的不同,CPU等待时间和执行时间无法确定。因此换一种思路,当queue满载的情况下,如果CPU使用率小于40%,则会动态增大线程池中的线程数来提高吞吐量。如果CPU使用率大于60%,则会动态减小大线程池中的线程数来限制创建thread。当最线程池中的线程执行不过来时,会把任务丢进queue做缓冲。当queue满载时会阻塞当前线程,降低生产任务速率,平衡任务的生产和消费。
@SneakyThrows
@Test
public void threadPool() {
ThreadPool pool = Tasks.pool();
//RunFlag.SINGLE 根据taskId单线程执行,只要有一个线程在执行,其它线程直接跳过执行。
//RunFlag.SYNCHRONIZED 根据taskId同步执行,只要有一个线程在执行,其它线程等待执行。
//RunFlag.TRANSFER 直到任务被执行或放入队列否则一直阻塞调用线程。
//RunFlag.PRIORITY 如果线程和队列都无可用的则直接新建线程执行。
//RunFlag.INHERIT_THREAD_LOCALS 子线程会继承父线程的FastThreadLocal
//RunFlag.THREAD_TRACE 开启trace,支持timer和CompletableFuture
AtomicInteger c = new AtomicInteger();
for (int i = 0; i < 5; i++) {
int x = i;
Future<Void> f1 = pool.run(() -> {
log.info("exec SINGLE begin {}", x);
c.incrementAndGet();
sleep(oneSecond);
wait.set();
log.info("exec SINGLE end {}", x);
}, c, RunFlag.SINGLE.flags());
}
wait.waitOne();
wait.reset();
assert c.get() == 1;
for (int i = 0; i < 5; i++) {
int x = i;
Future<Void> f1 = pool.run(() -> {
log.info("exec SYNCHRONIZED begin {}", x);
c.incrementAndGet();
sleep(oneSecond);
log.info("exec SYNCHRONIZED end {}", x);
}, c, RunFlag.SYNCHRONIZED.flags());
}
sleep(8000);
assert c.get() == 6;
c.set(0);
for (int i = 0; i < 5; i++) {
int x = i;
CompletableFuture<Void> f1 = pool.runAsync(() -> {
log.info("exec SINGLE begin {}", x);
c.incrementAndGet();
sleep(oneSecond);
wait.set();
log.info("exec SINGLE end {}", x);
}, c, RunFlag.SINGLE.flags());
f1.whenCompleteAsync((r, e) -> log.info("exec SINGLE uni"));
}
wait.waitOne();
wait.reset();
assert c.get() == 1;
for (int i = 0; i < 5; i++) {
int x = i;
CompletableFuture<Void> f1 = pool.runAsync(() -> {
log.info("exec SYNCHRONIZED begin {}", x);
c.incrementAndGet();
sleep(oneSecond);
log.info("exec SYNCHRONIZED end {}", x);
}, c, RunFlag.SYNCHRONIZED.flags());
f1.whenCompleteAsync((r, e) -> log.info("exec SYNCHRONIZED uni"));
}
sleep(8000);
assert c.get() == 6;
pool.runAsync(() -> System.out.println("runAsync"))
.whenCompleteAsync((r, e) -> System.out.println("whenCompleteAsync"))
.join();
List<Func<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
int x = i;
tasks.add(() -> {
log.info("TASK begin {}", x);
sleep(oneSecond);
log.info("TASK end {}", x);
return x + 100;
});
}
List<Future<Integer>> futures = pool.runAll(tasks, 0);
for (Future<Integer> future : futures) {
System.out.println(future.get());
}
ThreadPool.MultiTaskFuture<Integer, Integer> anyMf = pool.runAnyAsync(tasks);
anyMf.getFuture().whenCompleteAsync((r, e) -> log.info("ANY TASK MAIN uni"));
for (CompletableFuture<Integer> sf : anyMf.getSubFutures()) {
sf.whenCompleteAsync((r, e) -> log.info("ANY TASK uni {}", r));
}
for (CompletableFuture<Integer> sf : anyMf.getSubFutures()) {
sf.join();
}
log.info("wait ANY TASK");
anyMf.getFuture().get();
ThreadPool.MultiTaskFuture<Void, Integer> mf = pool.runAllAsync(tasks);
mf.getFuture().whenCompleteAsync((r, e) -> log.info("ALL TASK MAIN uni"));
for (CompletableFuture<Integer> sf : mf.getSubFutures()) {
sf.whenCompleteAsync((r, e) -> log.info("ALL TASK uni {}", r));
}
for (CompletableFuture<Integer> sf : mf.getSubFutures()) {
sf.join();
}
log.info("wait ALL TASK");
mf.getFuture().get();
}
@Test
public void threadPoolAutosize() {
//LinkedTransferQueue基于CAS实现,大部分场景下性能比LinkedBlockingQueue好。
//拒绝策略 当thread和queue都满了后会block调用线程直到queue加入成功,平衡生产和消费
//支持netty FastThreadLocal
long delayMillis = 5000;
ExecutorService pool = new ThreadPool(1, 1, new IntWaterMark(20, 40), "DEV");
for (int i = 0; i < 100; i++) {
int x = i;
pool.execute(() -> {
log.info("exec {} begin..", x);
sleep(delayMillis);
log.info("exec {} end..", x);
});
}
}
zipkin不支持异步trace,rxlib提供支持支持异步trace包括Executor(ThreadPool), ScheduledExecutorService(WheelTimer), CompletableFuture.xxAsync(), parallelStream()系列方法。
@SneakyThrows
@Test
public void inheritThreadLocal() {
//线程trace,支持异步trace包括Executor(ThreadPool), ScheduledExecutorService(WheelTimer), CompletableFuture.xxAsync(), parallelStream()系列方法。
RxConfig.INSTANCE.getThreadPool().setTraceName("rx-traceId");
ThreadPool.traceIdGenerator = () -> UUID.randomUUID().toString().replace("-", "");
ThreadPool.traceIdChangedHandler = t -> MDC.put("rx-traceId", t);
ThreadPool pool = new ThreadPool(3, 1, new IntWaterMark(20, 40), "DEV");
//当线程池无空闲线程时,任务放置队列后,当队列任务执行时会带上正确的traceId
ThreadPool.startTrace(null);
for (int i = 0; i < 2; i++) {
int finalI = i;
pool.run(() -> {
log.info("TRACE DELAY-1 {}", finalI);
pool.run(() -> {
log.info("TRACE DELAY-1_1 {}", finalI);
sleep(oneSecond);
});
sleep(oneSecond);
});
log.info("TRACE DELAY MAIN {}", finalI);
pool.run(() -> {
log.info("TRACE DELAY-2 {}", finalI);
sleep(oneSecond);
});
}
ThreadPool.endTrace();
sleep(8000);
//WheelTimer(ScheduledExecutorService) 异步trace
WheelTimer timer = Tasks.timer();
ThreadPool.startTrace(null);
for (int i = 0; i < 2; i++) {
int finalI = i;
timer.setTimeout(() -> {
log.info("TRACE TIMER {}", finalI);
sleep(oneSecond);
}, oneSecond);
log.info("TRACE TIMER MAIN {}", finalI);
}
ThreadPool.endTrace();
sleep(4000);
//CompletableFuture.xxAsync异步方法正确获取trace
ThreadPool.startTrace(null);
for (int i = 0; i < 2; i++) {
int finalI = i;
pool.runAsync(() -> {
log.info("TRACE ASYNC-1 {}", finalI);
pool.runAsync(() -> {
log.info("TRACE ASYNC-1_1 {}", finalI);
sleep(oneSecond);
}).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1_1 uni {}", r));
sleep(oneSecond);
}).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1 uni {}", r));
log.info("TRACE ASYNC MAIN {}", finalI);
pool.runAsync(() -> {
log.info("TRACE ASYNC-2 {}", finalI);
sleep(oneSecond);
}).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-2 uni {}", r));
}
ThreadPool.endTrace();
sleep(10000);
//netty FastThreadLocal 支持继承
FastThreadLocal<Integer> ftl = new FastThreadLocal<>();
ftl.set(64);
pool.run(() -> {
assert ftl.get() == 64;
log.info("Inherit ok 1");
}, null, RunFlag.INHERIT_FAST_THREAD_LOCALS.flags());
pool.runAsync(() -> {
assert ftl.get() == 64;
log.info("Inherit ok 2");
}, null, RunFlag.INHERIT_FAST_THREAD_LOCALS.flags());
sleep(2000);
//parallelStream
ThreadPool.startTrace(null);
Arrays.toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).parallelStream().map(p -> {
//todo
Arrays.toList("a", "b", "c").parallelStream().map(x -> {
log.info("parallelStream {} -> {}", p, x);
return x.toString();
}).collect(Collectors.toList());
log.info("parallelStream {}", p);
return p.toString();
}).collect(Collectors.toList());
ThreadPool.endTrace();
}
jdk实现的ScheduledExecutorService只会创建coreSize的线程,coreSize设置小了吞吐量低,coreSize设置大了更多的thread会造成更多的线程上下文切换损耗和内存占用损耗,而内存占用过多又可能会引发GC。并且当执行的任务blocking wait过多耗光coreSize的线程后,会导致后续任务堆积不能按时处理。Rxlib实现的ScheduledThreadPool同上述线程池一样依据cpuLoad动态调整coreSize解决痛点问题。
WheelTimer虽然时间精度不是十分准确,但是只消耗1个线程以及更少的内存。单线程的HashedWheelTimer也使blocking wait痛点放大,好在结合Rxlib实现的动态线程池,WheelTimer只做任务调度,任务执行交给线程池异步处理,完美解决所有痛点。
@SneakyThrows
@Test
public void timer() {
WheelTimer timer = Tasks.timer();
//TimeoutFlag.SINGLE 根据taskId单线程执行,只要有一个线程在执行,其它线程直接跳过执行。
//TimeoutFlag.REPLACE 根据taskId执行,如果已有其它线程执行或等待执行则都取消,只执行当前。
//TimeoutFlag.PERIOD 定期重复执行,遇到异常不会终止直到asyncContinue(false) 或 next delay = -1。
//TimeoutFlag.THREAD_TRACE 开启trace
AtomicInteger c = new AtomicInteger();
for (int i = 0; i < 5; i++) {
int finalI = i;
timer.setTimeout(() -> {
log.info("exec SINGLE plus by {}", finalI);
assert finalI == 0;
c.incrementAndGet();
sleep(oneSecond);
wait.set();
}, oneSecond, c, TimeoutFlag.SINGLE.flags());
}
wait.waitOne();
wait.reset();
assert c.get() == 1;
log.info("exec SINGLE flag ok..");
for (int i = 0; i < 5; i++) {
int finalI = i;
timer.setTimeout(() -> {
log.info("exec REPLACE plus by {}", finalI);
assert finalI == 4;
c.incrementAndGet();
sleep(oneSecond);
wait.set();
}, oneSecond, c, TimeoutFlag.REPLACE.flags());
}
wait.waitOne();
wait.reset();
assert c.get() == 2;
log.info("exec REPLACE flag ok..");
TimeoutFuture<Integer> f = timer.setTimeout(() -> {
log.info("exec PERIOD");
int i = c.incrementAndGet();
if (i > 10) {
asyncContinue(false);
return null;
}
if (i == 4) {
throw new InvalidException("Will exec next");
}
asyncContinue(true);
return i;
}, oneSecond, c, TimeoutFlag.PERIOD.flags());
sleep(8000);
f.cancel();
log.info("exec PERIOD flag ok and last value={}", f.get());
assert f.get() == 9;
c.set(0);
timer.setTimeout(() -> {
log.info("exec nextDelayFn");
c.incrementAndGet();
asyncContinue(true);
}, d -> d > 1000 ? -1 : Math.max(d, 100) * 2);
sleep(5000);
log.info("exec nextDelayFn ok");
assert c.get() == 4;
//包装为ScheduledExecutorService
ScheduledExecutorService ses = timer;
ScheduledFuture<Integer> f1 = ses.schedule(() -> 1024, oneSecond, TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
assert f1.get() == 1024;
log.info("schedule wait {}ms", (System.currentTimeMillis() - start));
log.info("scheduleAtFixedRate step 1");
ScheduledFuture<?> f2 = ses.scheduleAtFixedRate(() -> log.info("scheduleAtFixedRate step 2"), 500, oneSecond, TimeUnit.MILLISECONDS);
log.info("scheduleAtFixedRate delay {}ms", f2.getDelay(TimeUnit.MILLISECONDS));
sleep(5000);
f2.cancel(true);
log.info("scheduleAtFixedRate delay {}ms", f2.getDelay(TimeUnit.MILLISECONDS));
}