Skip to content

Commit

Permalink
up ot
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Aug 19, 2024
1 parent ee7a433 commit 137c13b
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 51 deletions.
4 changes: 4 additions & 0 deletions rxlib/src/main/java/org/rx/annotation/Subscribe.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.rx.annotation;

import org.springframework.core.annotation.AliasFor;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand All @@ -11,8 +13,10 @@
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Subscribe {
@AliasFor("topic")
String value() default "";

@AliasFor("value")
String topic() default "";

Class<?> topicClass() default Object.class;
Expand Down
68 changes: 38 additions & 30 deletions rxlib/src/main/java/org/rx/core/Sys.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,32 @@ public boolean hasDiskUsageWarning() {
log.info("RxMeta {} {}_{}_{} @ {} & {}\n{}", JAVA_VERSION, OS_NAME, OS_VERSION, OS_ARCH,
new File(Strings.EMPTY).getAbsolutePath(), Sockets.getLocalAddresses(false), JSON.toJSONString(conf));

ObjectChangeTracker.DEFAULT.watch(conf, true)
.register(Sys.class)
.register(Tasks.class)
.register(TraceHandler.INSTANCE);
// ObjectChangeTracker.DEFAULT.watch(conf, true)
// .register(Sys.class)
// .register(Tasks.class)
// .register(TraceHandler.INSTANCE);
ObjectChangeTracker.DEFAULT.watch(conf).register(Sys.class);
onChanged(new ObjectChangedEvent(conf, Collections.emptyMap()));
}

@Subscribe(topicClass = RxConfig.class)
static void onChanged(ObjectChangedEvent event) {
Map<String, ObjectChangeTracker.ChangedValue> changedMap = event.getChangedMap();
// log.info("RxMeta Sys changed {}", changedMap);
Integer enableFlags = event.readValue(getWithoutPrefix(NTP_ENABLE_FLAGS));
if (enableFlags == null) {
return;
}
log.info("RxMeta {} changed {}", NTP_ENABLE_FLAGS, enableFlags);
if ((enableFlags & 1) == 1) {
NtpClock.scheduleTask();
}
if ((enableFlags & 2) == 2) {
Tasks.setTimeout(() -> {
log.info("TimeAdvice inject..");
NtpClock.transform();
}, 60000);
}
}

static void checkAdviceShare(boolean isInit) {
Expand Down Expand Up @@ -195,26 +217,6 @@ static long[] getAdviceShareTime() {
return !(v instanceof long[]) || (time = (long[]) v).length != 2 ? null : time;
}

@Subscribe(topicClass = RxConfig.class)
static void onChanged(ObjectChangedEvent event) {
Map<String, ObjectChangeTracker.ChangedValue> changedMap = event.getChangedMap();
// log.info("RxMeta Sys changed {}", changedMap);
Integer enableFlags = event.readValue(getWithoutPrefix(NTP_ENABLE_FLAGS));
if (enableFlags == null) {
return;
}
log.info("RxMeta {} changed {}", NTP_ENABLE_FLAGS, enableFlags);
if ((enableFlags & 1) == 1) {
NtpClock.scheduleTask();
}
if ((enableFlags & 2) == 2) {
Tasks.setTimeout(() -> {
log.info("TimeAdvice inject..");
NtpClock.transform();
}, 60000);
}
}

//region basic
public static Map<String, String> mainOptions(String[] args) {
Map<String, String> result = new HashMap<>();
Expand Down Expand Up @@ -526,12 +528,18 @@ public static <T> T callLog(@NonNull Class<?> declaringType, @NonNull String met
doWrite = Linq.from(conf.logTypeWhitelist).any(p -> declaringType.getName().startsWith(p));
}
if (doWrite) {
String msg = builder.buildLog(declaringType, methodName, parameters, paramSnapshot, returnValue, error, elapsedNanos);
if (error != null) {
TraceHandler.INSTANCE.log(msg, error);
} else {
org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(declaringType);
log.info(msg);
try {
String msg = builder.buildLog(declaringType, methodName, parameters, paramSnapshot, returnValue, error, elapsedNanos);
if (msg != null) {
if (error != null) {
TraceHandler.INSTANCE.log(msg, error);
} else {
org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(declaringType);
log.info(msg);
}
}
} catch (Throwable e) {
log.warn("buildLog", e);
}
}
} catch (Throwable e) {
Expand Down
7 changes: 5 additions & 2 deletions rxlib/src/main/java/org/rx/core/Tasks.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.*;

import static org.rx.core.Extends.circuitContinue;
import static org.rx.core.RxConfig.ConfigNames.THREAD_POOL_REPLICAS;

//Java 11 ForkJoinPool.commonPool() has class loading issue
@Slf4j
Expand All @@ -31,7 +32,9 @@ public final class Tasks {
static int poolCount;

static {
onChanged(null);
ObjectChangeTracker.DEFAULT.register(Tasks.class);
onChanged(new ObjectChangedEvent(RxConfig.INSTANCE, Collections.emptyMap()));

executor = new AbstractExecutorService() {
@Getter
boolean shutdown;
Expand Down Expand Up @@ -109,7 +112,7 @@ static synchronized void onChanged(ObjectChangedEvent event) {
return;
}

log.info("RxMeta {} changed {} -> {}", RxConfig.ConfigNames.THREAD_POOL_REPLICAS, poolCount, newCount);
log.info("RxMeta {} changed {} -> {}", THREAD_POOL_REPLICAS, poolCount, newCount);
for (int i = 0; i < newCount; i++) {
nodes.add(0, new ThreadPool(String.format("N%s", i)));
}
Expand Down
26 changes: 11 additions & 15 deletions rxlib/src/main/java/org/rx/exception/TraceHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import static org.rx.core.Extends.as;
import static org.rx.core.RxConfig.ConfigNames.TRACE_KEEP_DAYS;
import static org.rx.core.RxConfig.ConfigNames.getWithoutPrefix;
import static org.rx.core.Sys.toJsonString;

@Slf4j
Expand Down Expand Up @@ -100,37 +99,34 @@ public static Object[] getMessageCandidate(Object... args) {
ScheduledFuture<?> future;

private TraceHandler() {
RxConfig conf = RxConfig.INSTANCE;
try {
Thread.setDefaultUncaughtExceptionHandler(this);
EntityDatabase db = EntityDatabase.DEFAULT;
db.createMapping(ExceptionEntity.class, MethodEntity.class, MetricsEntity.class, ThreadEntity.class);

queue.onConsume.combine((s, e) -> {
RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace();
if (conf.getKeepDays() <= 0) {
RxConfig.TraceConfig c = RxConfig.INSTANCE.getTrace();
if (c.getKeepDays() <= 0) {
return;
}

Reflects.invokeMethod(this, "innerSave", e.getValue());
});
queue.setConsumePeriod(RxConfig.INSTANCE.getTrace().getFlushQueuePeriod());
queue.setConsumePeriod(conf.getTrace().getFlushQueuePeriod());
} catch (Throwable e) {
log.error("RxMeta init error", e);
log.error("RxMeta init db error", e);
}
ObjectChangeTracker.DEFAULT.register(this);
onChanged(new ObjectChangedEvent(conf, Collections.emptyMap()));
Thread.setDefaultUncaughtExceptionHandler(this);
}

@Subscribe(topicClass = RxConfig.class)
void onChanged(ObjectChangedEvent event) {
synchronized void onChanged(ObjectChangedEvent event) {
RxConfig.TraceConfig trace = RxConfig.INSTANCE.getTrace();
queue.setCapacity(trace.getWriteQueueLength());

ObjectChangeTracker.ChangedValue changedValue = event.getChangedMap().get(getWithoutPrefix(TRACE_KEEP_DAYS));
if (changedValue == null) {
return;
}

int keepDays = changedValue.newValue();
log.info("RxMeta {} changed {}", TRACE_KEEP_DAYS, changedValue);
int keepDays = trace.getKeepDays();
log.info("RxMeta {} changed {}", TRACE_KEEP_DAYS, keepDays);
if (keepDays > 0) {
if (future == null) {
future = Tasks.scheduleDaily(() -> {
Expand Down
4 changes: 0 additions & 4 deletions rxlib/src/main/java/org/rx/spring/Interceptors.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.rx.bean.Tuple;
import org.rx.core.*;
import org.rx.exception.ApplicationException;
import org.rx.exception.TraceHandler;
import org.rx.net.http.HttpClient;
import org.rx.util.Servlets;
import org.rx.util.Validator;
Expand Down Expand Up @@ -109,9 +108,6 @@ protected Tuple<HttpServletRequest, HttpServletResponse> httpEnv() {
@ResponseStatus(HttpStatus.OK)
@ResponseBody
public Object onException(Throwable e, HttpServletRequest request) {
if (!Boolean.TRUE.equals(request.getAttribute("_skipGlobalLog"))) {
TraceHandler.INSTANCE.log(request.getRequestURL().toString(), e);
}
String msg = null;
if (e instanceof MethodArgumentNotValidException) {
FieldError fieldError = ((MethodArgumentNotValidException) e).getBindingResult().getFieldError();
Expand Down

0 comments on commit 137c13b

Please sign in to comment.