Skip to content

Commit

Permalink
up t t
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Aug 6, 2024
1 parent 6cc68dd commit c34174a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 17 deletions.
4 changes: 2 additions & 2 deletions rxlib/src/main/java/org/rx/core/Extends.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static <T> void each(Iterable<T> iterable, BiAction<T> fn, boolean throwOnNext,
}
TraceHandler.INSTANCE.log("each", e);
}
if (!ThreadPool.asyncContinueFlag(true)) {
if (!ThreadPool.continueFlag(true)) {
break;
}
if (interruptedFlag < 0) {
Expand All @@ -212,7 +212,7 @@ static <T> void each(Iterable<T> iterable, BiAction<T> fn, boolean throwOnNext,

//CircuitBreakingException
static void circuitContinue(boolean flag) {
ThreadPool.ASYNC_CONTINUE.set(flag);
ThreadPool.CONTINUE_FLAG.set(flag);
}

@SneakyThrows
Expand Down
30 changes: 19 additions & 11 deletions rxlib/src/main/java/org/rx/core/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static <T> Task<T> adapt(Callable<T> fn) {

static <T> Task<T> adapt(Callable<T> fn, FlagsEnum<RunFlag> flags, Object id) {
Task<T> t = as(fn);
return t != null && t.id == id ? t : new Task<>(fn::call, flags, id);
return t != null && t.id == id ? t : new Task<>(fn, flags, id);
}

static <T> Task<T> adapt(Runnable fn) {
Expand All @@ -163,22 +163,29 @@ static <T> Task<T> as(Object fn) {
return fn instanceof Task ? (Task<T>) fn : null;
}

final Func<T> fn;
final Callable<T> fn;
final FlagsEnum<RunFlag> flags;
final Object id;
final InternalThreadLocalMap parent;
final String traceId;
final StackTraceElement[] stackTrace;

private Task(Func<T> fn, FlagsEnum<RunFlag> flags, Object id) {
private Task(Callable<T> fn, FlagsEnum<RunFlag> flags, Object id) {
if (flags == null) {
flags = RunFlag.NONE.flags();
}
RxConfig conf = RxConfig.INSTANCE;
if (conf.threadPool.traceName != null) {
flags.add(RunFlag.THREAD_TRACE);
}
if (conf.trace.slowMethodElapsedMicros > 0 && ThreadLocalRandom.current().nextInt(0, 100) < conf.threadPool.slowMethodSamplingPercent) {
Object ctxST = CTX_STACK_TRACE.getIfExists();
if (ctxST != null) {
if (ctxST instanceof StackTraceElement[]) {
stackTrace = (StackTraceElement[]) ctxST;
} else {
stackTrace = null;
}
} else if (conf.trace.slowMethodElapsedMicros > 0 && ThreadLocalRandom.current().nextInt(0, 100) < conf.threadPool.slowMethodSamplingPercent) {
stackTrace = new Throwable().getStackTrace();
} else {
stackTrace = null;
Expand All @@ -199,7 +206,7 @@ public T call() {
Throwable ex = null;
long s = System.nanoTime();
try {
r = fn.invoke();
r = fn.call();
} catch (Throwable e) {
TraceHandler.INSTANCE.log(toString(), ex = e);
throw e;
Expand All @@ -217,7 +224,7 @@ public T call() {
}

try {
return fn.invoke();
return fn.call();
} catch (Throwable e) {
TraceHandler.INSTANCE.log(toString(), e);
throw e;
Expand Down Expand Up @@ -260,8 +267,9 @@ public FutureTaskAdapter(Runnable runnable, T result) {
public static final Delegate<EventPublisher.StaticEventPublisher, NEventArgs<String>> onTraceIdChanged = Delegate.create();
static final ThreadLocal<Queue<String>> CTX_PARENT_TRACE_ID = new InheritableThreadLocal<>();
static final ThreadLocal<String> CTX_TRACE_ID = new InheritableThreadLocal<>();
static final FastThreadLocal<Boolean> ASYNC_CONTINUE = new FastThreadLocal<>();
static final FastThreadLocal<Object> COMPLETION_RETURNED_VALUE = new FastThreadLocal<>();
static final FastThreadLocal<Object> CTX_STACK_TRACE = new FastThreadLocal<>();
static final FastThreadLocal<Boolean> CONTINUE_FLAG = new FastThreadLocal<>();
private static final FastThreadLocal<Object> COMPLETION_RETURNED_VALUE = new FastThreadLocal<>();
static final String POOL_NAME_PREFIX = "℞Threads-";
static final IntWaterMark DEFAULT_CPU_WATER_MARK = new IntWaterMark(RxConfig.INSTANCE.threadPool.lowCpuWaterMark,
RxConfig.INSTANCE.threadPool.highCpuWaterMark);
Expand Down Expand Up @@ -338,9 +346,9 @@ static ThreadFactory newThreadFactory(String name) {
);
}

static boolean asyncContinueFlag(boolean def) {
Boolean ac = ASYNC_CONTINUE.getIfExists();
ASYNC_CONTINUE.remove();
static boolean continueFlag(boolean def) {
Boolean ac = CONTINUE_FLAG.getIfExists();
CONTINUE_FLAG.remove();
if (ac == null) {
return def;
}
Expand Down
17 changes: 13 additions & 4 deletions rxlib/src/main/java/org/rx/core/WheelTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.internal.ThreadLocalRandom;
import lombok.*;
import org.rx.bean.$;
import org.rx.bean.FlagsEnum;
Expand Down Expand Up @@ -45,20 +46,26 @@ class Task<T> implements TimerTask, TimeoutFuture<T> {
final Object id;
final LongUnaryOperator nextDelayFn;
final String traceId;
final StackTraceElement[] stackTrace;
long delay;
long expiredTime;
volatile Timeout timeout;
volatile Future<T> future;
long p0, p1;
int p2;

Task(Func<T> fn, FlagsEnum<TimeoutFlag> flags, Object id, LongUnaryOperator nextDelayFn) {
if (flags == null) {
flags = TimeoutFlag.NONE.flags();
}
if (RxConfig.INSTANCE.threadPool.traceName != null) {
RxConfig conf = RxConfig.INSTANCE;
if (conf.threadPool.traceName != null) {
flags.add(TimeoutFlag.THREAD_TRACE);
}
if (conf.trace.slowMethodElapsedMicros > 0 && ThreadLocalRandom.current().nextInt(0, 100) < conf.threadPool.slowMethodSamplingPercent) {
stackTrace = new Throwable().getStackTrace();
} else {
stackTrace = null;
}

this.fn = fn;
this.flags = flags;
Expand All @@ -74,20 +81,22 @@ public synchronized void run(Timeout timeout) throws Exception {
if (traceFlag) {
ThreadPool.startTrace(traceId);
}
ThreadPool.CTX_STACK_TRACE.set(stackTrace != null ? stackTrace : Boolean.TRUE);
try {
future = executor.submit(() -> {
boolean doContinue = flags.has(TimeoutFlag.PERIOD);
try {
return fn.get();
} finally {
if (ThreadPool.asyncContinueFlag(doContinue)) {
if (ThreadPool.continueFlag(doContinue)) {
newTimeout(this, delay, timeout.timer());
} else if (id != null) {
holder.remove(id);
}
}
});
} finally {
ThreadPool.CTX_STACK_TRACE.remove();
if (traceFlag) {
ThreadPool.endTrace();
}
Expand All @@ -98,7 +107,7 @@ public synchronized void run(Timeout timeout) throws Exception {
@Override
public String toString() {
String hc = id != null ? id.toString() : Integer.toHexString(hashCode());
return String.format("WheelTask-%s[%s]", hc, flags.getValue());
return String.format("TimeTask-%s[%s]", hc, flags.getValue());
}

@Override
Expand Down

0 comments on commit c34174a

Please sign in to comment.