Skip to content

Commit

Permalink
dd
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Aug 1, 2024
1 parent d7675a8 commit a2c0f0b
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 116 deletions.
253 changes: 138 additions & 115 deletions rxlib/src/main/java/org/rx/exception/TraceHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.rx.annotation.DbColumn;
import org.rx.annotation.Subscribe;
import org.rx.bean.CircularBlockingQueue;
import org.rx.bean.DateTime;
import org.rx.bean.ProceedEventArgs;
import org.rx.codec.CodecUtil;
Expand Down Expand Up @@ -98,20 +99,34 @@ public static Object[] getMessageCandidate(Object... args) {
return args;
}

final CircularBlockingQueue<Object[]> queue = new CircularBlockingQueue<>(RxConfig.INSTANCE.getTrace().getWriteQueueLength());
ScheduledFuture<?> future;

private TraceHandler() {
try {
Thread.setDefaultUncaughtExceptionHandler(this);
EntityDatabase db = EntityDatabase.DEFAULT;
db.createMapping(ExceptionEntity.class, MethodEntity.class, MetricsEntity.class, ThreadEntity.class);

queue.onConsume.combine((s, e) -> {
RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace();
if (conf.getKeepDays() <= 0) {
return;
}

Reflects.invokeMethod(this, "innerSave", e.getValue());
});
queue.setConsumePeriod(RxConfig.INSTANCE.getTrace().getFlushQueuePeriod());
} catch (Throwable e) {
log.error("RxMeta init error", e);
}
}

@Subscribe(topicClass = RxConfig.class)
void onChanged(ObjectChangedEvent event) {
RxConfig.TraceConfig trace = RxConfig.INSTANCE.getTrace();
queue.setCapacity(trace.getWriteQueueLength());

ObjectChangeTracker.ChangedValue changedValue = event.getChangedMap().get(getWithoutPrefix(TRACE_KEEP_DAYS));
if (changedValue == null) {
return;
Expand Down Expand Up @@ -170,89 +185,58 @@ public void uncaughtException(Thread t, Throwable e) {
}
}

public void saveThreadTrace(Linq<ThreadEntity> snapshot) {
RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace();
if (conf.getKeepDays() <= 0) {
return;
}

Tasks.run(() -> {
EntityDatabase db = EntityDatabase.DEFAULT;
db.begin();
try {
for (ThreadEntity t : snapshot) {
db.save(t, true);
}
db.commit();
} catch (Throwable ex) {
log.error("dbTrace", ex);
db.rollback();
}
});
}

public Linq<ThreadEntity> queryThreadTrace(Long snapshotId, Date startTime, Date endTime) {
EntityQueryLambda<ThreadEntity> q = new EntityQueryLambda<>(ThreadEntity.class);
if (snapshotId != null) {
q.eq(ThreadEntity::getSnapshotId, snapshotId);
}
if (startTime != null) {
q.ge(ThreadEntity::getSnapshotTime, startTime);
}
if (endTime != null) {
q.lt(ThreadEntity::getSnapshotTime, endTime);
}
EntityDatabase db = EntityDatabase.DEFAULT;
return Linq.from(db.findBy(q));
public void saveExceptionTrace(Thread t, String msg, Throwable e) {
queue.offer(new Object[]{t.getName(), msg, e});
}

public void saveExceptionTrace(Thread t, String msg, Throwable e) {
void innerSave(String thread, String msg, Throwable e) {
RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace();
if (conf.getKeepDays() <= 0) {
return;
}

String stackTrace = ExceptionUtils.getStackTrace(e);
int eMsgFlag = stackTrace.indexOf("\n");
String eMsg = stackTrace.substring(0, eMsgFlag);
stackTrace = stackTrace.substring(eMsgFlag + 2);
msg = eMsg + msg;

long pk = CodecUtil.hash64(stackTrace);
Tasks.nextPool().runSerial(() -> {
EntityDatabase db = EntityDatabase.DEFAULT;
db.begin();
try {
ExceptionEntity entity = db.findById(ExceptionEntity.class, pk);
boolean doInsert = entity == null;
if (doInsert) {
entity = new ExceptionEntity();
entity.setId(pk);
InvalidException invalidException = as(e, InvalidException.class);
ExceptionLevel level = invalidException != null && invalidException.getLevel() != null ? invalidException.getLevel()
: ExceptionLevel.SYSTEM;
entity.setLevel(level);
entity.setMessages(new ConcurrentLinkedQueue<>());
entity.setStackTrace(stackTrace);
}
Queue<String> queue = entity.getMessages();
if (queue.size() > conf.getErrorMessageSize()) {
queue.poll();
}
StringBuilder b = new StringBuilder();
b.appendMessageFormat("{}\t{}", DateTime.now().toDateTimeString(), msg);
Map<String, String> ctxMap = Sys.getMDCCtxMap();
if (!ctxMap.isEmpty()) {
b.appendMessageFormat("\nMDC:\t{}", ctxMap);
}
queue.offer(b.toString());
entity.occurCount++;
entity.setAppName(RxConfig.INSTANCE.getId());
entity.setThreadName(t.getName());
entity.setModifyTime(DateTime.now());
db.save(entity, doInsert);
db.commit();
} catch (Throwable ex) {
log.error("dbTrace", ex);
db.rollback();
// Tasks.nextPool().runSerial(() -> {
EntityDatabase db = EntityDatabase.DEFAULT;
db.begin();
try {
ExceptionEntity entity = db.findById(ExceptionEntity.class, pk);
boolean doInsert = entity == null;
if (doInsert) {
entity = new ExceptionEntity();
entity.setId(pk);
InvalidException invalidException = as(e, InvalidException.class);
ExceptionLevel level = invalidException != null && invalidException.getLevel() != null ? invalidException.getLevel()
: ExceptionLevel.SYSTEM;
entity.setLevel(level);
entity.setMessages(new ConcurrentLinkedQueue<>());
entity.setStackTrace(stackTrace);
}
return null;
}, pk);
Queue<String> queue = entity.getMessages();
if (queue.size() > conf.getErrorMessageSize()) {
queue.poll();
}
StringBuilder b = new StringBuilder();
b.appendMessageFormat("{}\t{}", DateTime.now().toDateTimeString(), msg);
Map<String, String> ctxMap = Sys.getMDCCtxMap();
if (!ctxMap.isEmpty()) {
b.appendMessageFormat("\nMDC:\t{}", ctxMap);
}
queue.offer(b.toString());
entity.occurCount++;
entity.setAppName(RxConfig.INSTANCE.getId());
entity.setThreadName(thread);
entity.setModifyTime(DateTime.now());
db.save(entity, doInsert);
db.commit();
} catch (Throwable ex) {
log.error("dbTrace", ex);
db.rollback();
}
// return null;
// }, pk);
}

public List<ExceptionEntity> queryExceptionTraces(Date startTime, Date endTime, ExceptionLevel level, String keyword,
Expand Down Expand Up @@ -286,51 +270,54 @@ public List<ExceptionEntity> queryExceptionTraces(Date startTime, Date endTime,
return db.findBy(q);
}

public void saveMethodTrace(ProceedEventArgs pe, String methodName) {
public void saveMethodTrace(Thread t, Class<?> declaringType, String methodName, Object[] parameters,
Object returnValue, Throwable e, long elapsedMicros) {
queue.offer(new Object[]{t, declaringType, methodName, parameters, returnValue, e, elapsedMicros});
}

void innerSave(String thread, Class<?> declaringType, String methodName, Object[] parameters,
Object returnValue, Throwable error, long elapsedNanos) {
RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace();
long elapsedMicros;
if (conf.getKeepDays() <= 0 || (elapsedMicros = pe.getElapsedNanos() / 1000L) < conf.getSlowMethodElapsedMicros()) {
if ((elapsedMicros = elapsedNanos / 1000L) < conf.getSlowMethodElapsedMicros()) {
return;
}

Object[] parameters = pe.getParameters();
Throwable error = pe.getError();
Object returnValue = pe.getReturnValue();
String fullName = String.format("%s.%s(%s)", pe.getDeclaringType().getName(), methodName, parameters == null ? 0 : parameters.length);
String fullName = String.format("%s.%s(%s)", declaringType.getName(), methodName, parameters == null ? 0 : parameters.length);
long pk = CodecUtil.hash64(fullName);
Tasks.nextPool().runSerial(() -> {
EntityDatabase db = EntityDatabase.DEFAULT;
db.begin();
try {
MethodEntity entity = db.findById(MethodEntity.class, pk);
boolean doInsert = entity == null;
if (doInsert) {
entity = new MethodEntity();
entity.setId(pk);
entity.setMethodName(fullName);
}
if (parameters != null) {
entity.setParameters(toJsonString(parameters));
}
if (error != null) {
entity.setReturnValue(ExceptionUtils.getStackTrace(error));
} else if (returnValue != null) {
entity.setReturnValue(toJsonString(returnValue));
}
entity.setMDC(Sys.getMDCCtxMap());
entity.elapsedMicros = Math.max(entity.elapsedMicros, elapsedMicros);
entity.occurCount++;
entity.setAppName(RxConfig.INSTANCE.getId());
entity.setThreadName(Thread.currentThread().getName());
entity.setModifyTime(DateTime.now());
db.save(entity, doInsert);
db.commit();
} catch (Throwable e) {
log.error("dbTrace", e);
db.rollback();
// Tasks.nextPool().runSerial(() -> {
EntityDatabase db = EntityDatabase.DEFAULT;
db.begin();
try {
MethodEntity entity = db.findById(MethodEntity.class, pk);
boolean doInsert = entity == null;
if (doInsert) {
entity = new MethodEntity();
entity.setId(pk);
entity.setMethodName(fullName);
}
return null;
}, pk);
if (parameters != null) {
entity.setParameters(toJsonString(parameters));
}
if (error != null) {
entity.setReturnValue(ExceptionUtils.getStackTrace(error));
} else if (returnValue != null) {
entity.setReturnValue(toJsonString(returnValue));
}
entity.setMDC(Sys.getMDCCtxMap());
entity.elapsedMicros = Math.max(entity.elapsedMicros, elapsedMicros);
entity.occurCount++;
entity.setAppName(RxConfig.INSTANCE.getId());
entity.setThreadName(thread);
entity.setModifyTime(DateTime.now());
db.save(entity, doInsert);
db.commit();
} catch (Throwable e) {
log.error("dbTrace", e);
db.rollback();
}
// return null;
// }, pk);
}

public List<MethodEntity> queryMethodTraces(String methodNamePrefix, Boolean methodOccurMost, Integer limit) {
Expand All @@ -354,6 +341,42 @@ public List<MethodEntity> queryMethodTraces(String methodNamePrefix, Boolean met
return db.findBy(q);
}

public void saveThreadTrace(Linq<ThreadEntity> snapshot) {
RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace();
if (conf.getKeepDays() <= 0) {
return;
}

Tasks.run(() -> {
EntityDatabase db = EntityDatabase.DEFAULT;
db.begin();
try {
for (ThreadEntity t : snapshot) {
db.save(t, true);
}
db.commit();
} catch (Throwable ex) {
log.error("dbTrace", ex);
db.rollback();
}
});
}

public Linq<ThreadEntity> queryThreadTrace(Long snapshotId, Date startTime, Date endTime) {
EntityQueryLambda<ThreadEntity> q = new EntityQueryLambda<>(ThreadEntity.class);
if (snapshotId != null) {
q.eq(ThreadEntity::getSnapshotId, snapshotId);
}
if (startTime != null) {
q.ge(ThreadEntity::getSnapshotTime, startTime);
}
if (endTime != null) {
q.lt(ThreadEntity::getSnapshotTime, endTime);
}
EntityDatabase db = EntityDatabase.DEFAULT;
return Linq.from(db.findBy(q));
}

public void saveMetric(String name, String message) {
log.info("saveMetric {} {}", name, message);
String stackTrace = Reflects.getStackTrace(Thread.currentThread());
Expand Down
2 changes: 1 addition & 1 deletion rxlib/src/main/java/org/rx/spring/BaseInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
throw pe.getError();
}
} finally {
TraceHandler.INSTANCE.saveMethodTrace(pe.getDeclaringType(), signature.getName(), pe.getParameters(), pe.getReturnValue(), pe.getError(), pe.getElapsedNanos());
TraceHandler.INSTANCE.saveMethodTrace(Thread.currentThread(), pe.getDeclaringType(), signature.getName(), pe.getParameters(), pe.getReturnValue(), pe.getError(), pe.getElapsedNanos());
onLog(signature, pe, paramSnapshot);
raiseEvent(onProceed, pe);
}
Expand Down

0 comments on commit a2c0f0b

Please sign in to comment.