From c34174aa323965611a43aa6c2d84a99e25170269 Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Tue, 6 Aug 2024 15:12:18 +0800 Subject: [PATCH] up t t --- rxlib/src/main/java/org/rx/core/Extends.java | 4 +-- .../src/main/java/org/rx/core/ThreadPool.java | 30 ++++++++++++------- .../src/main/java/org/rx/core/WheelTimer.java | 17 ++++++++--- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/rxlib/src/main/java/org/rx/core/Extends.java b/rxlib/src/main/java/org/rx/core/Extends.java index 0ec00585..90f2edf1 100644 --- a/rxlib/src/main/java/org/rx/core/Extends.java +++ b/rxlib/src/main/java/org/rx/core/Extends.java @@ -193,7 +193,7 @@ static void each(Iterable iterable, BiAction fn, boolean throwOnNext, } TraceHandler.INSTANCE.log("each", e); } - if (!ThreadPool.asyncContinueFlag(true)) { + if (!ThreadPool.continueFlag(true)) { break; } if (interruptedFlag < 0) { @@ -212,7 +212,7 @@ static void each(Iterable iterable, BiAction fn, boolean throwOnNext, //CircuitBreakingException static void circuitContinue(boolean flag) { - ThreadPool.ASYNC_CONTINUE.set(flag); + ThreadPool.CONTINUE_FLAG.set(flag); } @SneakyThrows diff --git a/rxlib/src/main/java/org/rx/core/ThreadPool.java b/rxlib/src/main/java/org/rx/core/ThreadPool.java index fd5f2708..88e14926 100644 --- a/rxlib/src/main/java/org/rx/core/ThreadPool.java +++ b/rxlib/src/main/java/org/rx/core/ThreadPool.java @@ -144,7 +144,7 @@ static Task adapt(Callable fn) { static Task adapt(Callable fn, FlagsEnum flags, Object id) { Task t = as(fn); - return t != null && t.id == id ? t : new Task<>(fn::call, flags, id); + return t != null && t.id == id ? t : new Task<>(fn, flags, id); } static Task adapt(Runnable fn) { @@ -163,14 +163,14 @@ static Task as(Object fn) { return fn instanceof Task ? (Task) fn : null; } - final Func fn; + final Callable fn; final FlagsEnum flags; final Object id; final InternalThreadLocalMap parent; final String traceId; final StackTraceElement[] stackTrace; - private Task(Func fn, FlagsEnum flags, Object id) { + private Task(Callable fn, FlagsEnum flags, Object id) { if (flags == null) { flags = RunFlag.NONE.flags(); } @@ -178,7 +178,14 @@ private Task(Func fn, FlagsEnum flags, Object id) { if (conf.threadPool.traceName != null) { flags.add(RunFlag.THREAD_TRACE); } - if (conf.trace.slowMethodElapsedMicros > 0 && ThreadLocalRandom.current().nextInt(0, 100) < conf.threadPool.slowMethodSamplingPercent) { + Object ctxST = CTX_STACK_TRACE.getIfExists(); + if (ctxST != null) { + if (ctxST instanceof StackTraceElement[]) { + stackTrace = (StackTraceElement[]) ctxST; + } else { + stackTrace = null; + } + } else if (conf.trace.slowMethodElapsedMicros > 0 && ThreadLocalRandom.current().nextInt(0, 100) < conf.threadPool.slowMethodSamplingPercent) { stackTrace = new Throwable().getStackTrace(); } else { stackTrace = null; @@ -199,7 +206,7 @@ public T call() { Throwable ex = null; long s = System.nanoTime(); try { - r = fn.invoke(); + r = fn.call(); } catch (Throwable e) { TraceHandler.INSTANCE.log(toString(), ex = e); throw e; @@ -217,7 +224,7 @@ public T call() { } try { - return fn.invoke(); + return fn.call(); } catch (Throwable e) { TraceHandler.INSTANCE.log(toString(), e); throw e; @@ -260,8 +267,9 @@ public FutureTaskAdapter(Runnable runnable, T result) { public static final Delegate> onTraceIdChanged = Delegate.create(); static final ThreadLocal> CTX_PARENT_TRACE_ID = new InheritableThreadLocal<>(); static final ThreadLocal CTX_TRACE_ID = new InheritableThreadLocal<>(); - static final FastThreadLocal ASYNC_CONTINUE = new FastThreadLocal<>(); - static final FastThreadLocal COMPLETION_RETURNED_VALUE = new FastThreadLocal<>(); + static final FastThreadLocal CTX_STACK_TRACE = new FastThreadLocal<>(); + static final FastThreadLocal CONTINUE_FLAG = new FastThreadLocal<>(); + private static final FastThreadLocal COMPLETION_RETURNED_VALUE = new FastThreadLocal<>(); static final String POOL_NAME_PREFIX = "℞Threads-"; static final IntWaterMark DEFAULT_CPU_WATER_MARK = new IntWaterMark(RxConfig.INSTANCE.threadPool.lowCpuWaterMark, RxConfig.INSTANCE.threadPool.highCpuWaterMark); @@ -338,9 +346,9 @@ static ThreadFactory newThreadFactory(String name) { ); } - static boolean asyncContinueFlag(boolean def) { - Boolean ac = ASYNC_CONTINUE.getIfExists(); - ASYNC_CONTINUE.remove(); + static boolean continueFlag(boolean def) { + Boolean ac = CONTINUE_FLAG.getIfExists(); + CONTINUE_FLAG.remove(); if (ac == null) { return def; } diff --git a/rxlib/src/main/java/org/rx/core/WheelTimer.java b/rxlib/src/main/java/org/rx/core/WheelTimer.java index a70f6abb..29d523fb 100644 --- a/rxlib/src/main/java/org/rx/core/WheelTimer.java +++ b/rxlib/src/main/java/org/rx/core/WheelTimer.java @@ -4,6 +4,7 @@ import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; +import io.netty.util.internal.ThreadLocalRandom; import lombok.*; import org.rx.bean.$; import org.rx.bean.FlagsEnum; @@ -45,20 +46,26 @@ class Task implements TimerTask, TimeoutFuture { final Object id; final LongUnaryOperator nextDelayFn; final String traceId; + final StackTraceElement[] stackTrace; long delay; long expiredTime; volatile Timeout timeout; volatile Future future; long p0, p1; - int p2; Task(Func fn, FlagsEnum flags, Object id, LongUnaryOperator nextDelayFn) { if (flags == null) { flags = TimeoutFlag.NONE.flags(); } - if (RxConfig.INSTANCE.threadPool.traceName != null) { + RxConfig conf = RxConfig.INSTANCE; + if (conf.threadPool.traceName != null) { flags.add(TimeoutFlag.THREAD_TRACE); } + if (conf.trace.slowMethodElapsedMicros > 0 && ThreadLocalRandom.current().nextInt(0, 100) < conf.threadPool.slowMethodSamplingPercent) { + stackTrace = new Throwable().getStackTrace(); + } else { + stackTrace = null; + } this.fn = fn; this.flags = flags; @@ -74,13 +81,14 @@ public synchronized void run(Timeout timeout) throws Exception { if (traceFlag) { ThreadPool.startTrace(traceId); } + ThreadPool.CTX_STACK_TRACE.set(stackTrace != null ? stackTrace : Boolean.TRUE); try { future = executor.submit(() -> { boolean doContinue = flags.has(TimeoutFlag.PERIOD); try { return fn.get(); } finally { - if (ThreadPool.asyncContinueFlag(doContinue)) { + if (ThreadPool.continueFlag(doContinue)) { newTimeout(this, delay, timeout.timer()); } else if (id != null) { holder.remove(id); @@ -88,6 +96,7 @@ public synchronized void run(Timeout timeout) throws Exception { } }); } finally { + ThreadPool.CTX_STACK_TRACE.remove(); if (traceFlag) { ThreadPool.endTrace(); } @@ -98,7 +107,7 @@ public synchronized void run(Timeout timeout) throws Exception { @Override public String toString() { String hc = id != null ? id.toString() : Integer.toHexString(hashCode()); - return String.format("WheelTask-%s[%s]", hc, flags.getValue()); + return String.format("TimeTask-%s[%s]", hc, flags.getValue()); } @Override