From a2c0f0b51e3a52ef424ffef64538199c4d2ae51a Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Thu, 1 Aug 2024 17:48:51 +0800 Subject: [PATCH] dd --- .../java/org/rx/exception/TraceHandler.java | 253 ++++++++++-------- .../java/org/rx/spring/BaseInterceptor.java | 2 +- 2 files changed, 139 insertions(+), 116 deletions(-) diff --git a/rxlib/src/main/java/org/rx/exception/TraceHandler.java b/rxlib/src/main/java/org/rx/exception/TraceHandler.java index 888ddece..0c5d8eff 100644 --- a/rxlib/src/main/java/org/rx/exception/TraceHandler.java +++ b/rxlib/src/main/java/org/rx/exception/TraceHandler.java @@ -7,6 +7,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.rx.annotation.DbColumn; import org.rx.annotation.Subscribe; +import org.rx.bean.CircularBlockingQueue; import org.rx.bean.DateTime; import org.rx.bean.ProceedEventArgs; import org.rx.codec.CodecUtil; @@ -98,6 +99,7 @@ public static Object[] getMessageCandidate(Object... args) { return args; } + final CircularBlockingQueue queue = new CircularBlockingQueue<>(RxConfig.INSTANCE.getTrace().getWriteQueueLength()); ScheduledFuture future; private TraceHandler() { @@ -105,6 +107,16 @@ private TraceHandler() { Thread.setDefaultUncaughtExceptionHandler(this); EntityDatabase db = EntityDatabase.DEFAULT; db.createMapping(ExceptionEntity.class, MethodEntity.class, MetricsEntity.class, ThreadEntity.class); + + queue.onConsume.combine((s, e) -> { + RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace(); + if (conf.getKeepDays() <= 0) { + return; + } + + Reflects.invokeMethod(this, "innerSave", e.getValue()); + }); + queue.setConsumePeriod(RxConfig.INSTANCE.getTrace().getFlushQueuePeriod()); } catch (Throwable e) { log.error("RxMeta init error", e); } @@ -112,6 +124,9 @@ private TraceHandler() { @Subscribe(topicClass = RxConfig.class) void onChanged(ObjectChangedEvent event) { + RxConfig.TraceConfig trace = RxConfig.INSTANCE.getTrace(); + queue.setCapacity(trace.getWriteQueueLength()); + ObjectChangeTracker.ChangedValue changedValue = event.getChangedMap().get(getWithoutPrefix(TRACE_KEEP_DAYS)); if (changedValue == null) { return; @@ -170,89 +185,58 @@ public void uncaughtException(Thread t, Throwable e) { } } - public void saveThreadTrace(Linq snapshot) { - RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace(); - if (conf.getKeepDays() <= 0) { - return; - } - - Tasks.run(() -> { - EntityDatabase db = EntityDatabase.DEFAULT; - db.begin(); - try { - for (ThreadEntity t : snapshot) { - db.save(t, true); - } - db.commit(); - } catch (Throwable ex) { - log.error("dbTrace", ex); - db.rollback(); - } - }); - } - - public Linq queryThreadTrace(Long snapshotId, Date startTime, Date endTime) { - EntityQueryLambda q = new EntityQueryLambda<>(ThreadEntity.class); - if (snapshotId != null) { - q.eq(ThreadEntity::getSnapshotId, snapshotId); - } - if (startTime != null) { - q.ge(ThreadEntity::getSnapshotTime, startTime); - } - if (endTime != null) { - q.lt(ThreadEntity::getSnapshotTime, endTime); - } - EntityDatabase db = EntityDatabase.DEFAULT; - return Linq.from(db.findBy(q)); + public void saveExceptionTrace(Thread t, String msg, Throwable e) { + queue.offer(new Object[]{t.getName(), msg, e}); } - public void saveExceptionTrace(Thread t, String msg, Throwable e) { + void innerSave(String thread, String msg, Throwable e) { RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace(); - if (conf.getKeepDays() <= 0) { - return; - } - String stackTrace = ExceptionUtils.getStackTrace(e); + int eMsgFlag = stackTrace.indexOf("\n"); + String eMsg = stackTrace.substring(0, eMsgFlag); + stackTrace = stackTrace.substring(eMsgFlag + 2); + msg = eMsg + msg; + long pk = CodecUtil.hash64(stackTrace); - Tasks.nextPool().runSerial(() -> { - EntityDatabase db = EntityDatabase.DEFAULT; - db.begin(); - try { - ExceptionEntity entity = db.findById(ExceptionEntity.class, pk); - boolean doInsert = entity == null; - if (doInsert) { - entity = new ExceptionEntity(); - entity.setId(pk); - InvalidException invalidException = as(e, InvalidException.class); - ExceptionLevel level = invalidException != null && invalidException.getLevel() != null ? invalidException.getLevel() - : ExceptionLevel.SYSTEM; - entity.setLevel(level); - entity.setMessages(new ConcurrentLinkedQueue<>()); - entity.setStackTrace(stackTrace); - } - Queue queue = entity.getMessages(); - if (queue.size() > conf.getErrorMessageSize()) { - queue.poll(); - } - StringBuilder b = new StringBuilder(); - b.appendMessageFormat("{}\t{}", DateTime.now().toDateTimeString(), msg); - Map ctxMap = Sys.getMDCCtxMap(); - if (!ctxMap.isEmpty()) { - b.appendMessageFormat("\nMDC:\t{}", ctxMap); - } - queue.offer(b.toString()); - entity.occurCount++; - entity.setAppName(RxConfig.INSTANCE.getId()); - entity.setThreadName(t.getName()); - entity.setModifyTime(DateTime.now()); - db.save(entity, doInsert); - db.commit(); - } catch (Throwable ex) { - log.error("dbTrace", ex); - db.rollback(); +// Tasks.nextPool().runSerial(() -> { + EntityDatabase db = EntityDatabase.DEFAULT; + db.begin(); + try { + ExceptionEntity entity = db.findById(ExceptionEntity.class, pk); + boolean doInsert = entity == null; + if (doInsert) { + entity = new ExceptionEntity(); + entity.setId(pk); + InvalidException invalidException = as(e, InvalidException.class); + ExceptionLevel level = invalidException != null && invalidException.getLevel() != null ? invalidException.getLevel() + : ExceptionLevel.SYSTEM; + entity.setLevel(level); + entity.setMessages(new ConcurrentLinkedQueue<>()); + entity.setStackTrace(stackTrace); } - return null; - }, pk); + Queue queue = entity.getMessages(); + if (queue.size() > conf.getErrorMessageSize()) { + queue.poll(); + } + StringBuilder b = new StringBuilder(); + b.appendMessageFormat("{}\t{}", DateTime.now().toDateTimeString(), msg); + Map ctxMap = Sys.getMDCCtxMap(); + if (!ctxMap.isEmpty()) { + b.appendMessageFormat("\nMDC:\t{}", ctxMap); + } + queue.offer(b.toString()); + entity.occurCount++; + entity.setAppName(RxConfig.INSTANCE.getId()); + entity.setThreadName(thread); + entity.setModifyTime(DateTime.now()); + db.save(entity, doInsert); + db.commit(); + } catch (Throwable ex) { + log.error("dbTrace", ex); + db.rollback(); + } +// return null; +// }, pk); } public List queryExceptionTraces(Date startTime, Date endTime, ExceptionLevel level, String keyword, @@ -286,51 +270,54 @@ public List queryExceptionTraces(Date startTime, Date endTime, return db.findBy(q); } - public void saveMethodTrace(ProceedEventArgs pe, String methodName) { + public void saveMethodTrace(Thread t, Class declaringType, String methodName, Object[] parameters, + Object returnValue, Throwable e, long elapsedMicros) { + queue.offer(new Object[]{t, declaringType, methodName, parameters, returnValue, e, elapsedMicros}); + } + + void innerSave(String thread, Class declaringType, String methodName, Object[] parameters, + Object returnValue, Throwable error, long elapsedNanos) { RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace(); long elapsedMicros; - if (conf.getKeepDays() <= 0 || (elapsedMicros = pe.getElapsedNanos() / 1000L) < conf.getSlowMethodElapsedMicros()) { + if ((elapsedMicros = elapsedNanos / 1000L) < conf.getSlowMethodElapsedMicros()) { return; } - Object[] parameters = pe.getParameters(); - Throwable error = pe.getError(); - Object returnValue = pe.getReturnValue(); - String fullName = String.format("%s.%s(%s)", pe.getDeclaringType().getName(), methodName, parameters == null ? 0 : parameters.length); + String fullName = String.format("%s.%s(%s)", declaringType.getName(), methodName, parameters == null ? 0 : parameters.length); long pk = CodecUtil.hash64(fullName); - Tasks.nextPool().runSerial(() -> { - EntityDatabase db = EntityDatabase.DEFAULT; - db.begin(); - try { - MethodEntity entity = db.findById(MethodEntity.class, pk); - boolean doInsert = entity == null; - if (doInsert) { - entity = new MethodEntity(); - entity.setId(pk); - entity.setMethodName(fullName); - } - if (parameters != null) { - entity.setParameters(toJsonString(parameters)); - } - if (error != null) { - entity.setReturnValue(ExceptionUtils.getStackTrace(error)); - } else if (returnValue != null) { - entity.setReturnValue(toJsonString(returnValue)); - } - entity.setMDC(Sys.getMDCCtxMap()); - entity.elapsedMicros = Math.max(entity.elapsedMicros, elapsedMicros); - entity.occurCount++; - entity.setAppName(RxConfig.INSTANCE.getId()); - entity.setThreadName(Thread.currentThread().getName()); - entity.setModifyTime(DateTime.now()); - db.save(entity, doInsert); - db.commit(); - } catch (Throwable e) { - log.error("dbTrace", e); - db.rollback(); +// Tasks.nextPool().runSerial(() -> { + EntityDatabase db = EntityDatabase.DEFAULT; + db.begin(); + try { + MethodEntity entity = db.findById(MethodEntity.class, pk); + boolean doInsert = entity == null; + if (doInsert) { + entity = new MethodEntity(); + entity.setId(pk); + entity.setMethodName(fullName); } - return null; - }, pk); + if (parameters != null) { + entity.setParameters(toJsonString(parameters)); + } + if (error != null) { + entity.setReturnValue(ExceptionUtils.getStackTrace(error)); + } else if (returnValue != null) { + entity.setReturnValue(toJsonString(returnValue)); + } + entity.setMDC(Sys.getMDCCtxMap()); + entity.elapsedMicros = Math.max(entity.elapsedMicros, elapsedMicros); + entity.occurCount++; + entity.setAppName(RxConfig.INSTANCE.getId()); + entity.setThreadName(thread); + entity.setModifyTime(DateTime.now()); + db.save(entity, doInsert); + db.commit(); + } catch (Throwable e) { + log.error("dbTrace", e); + db.rollback(); + } +// return null; +// }, pk); } public List queryMethodTraces(String methodNamePrefix, Boolean methodOccurMost, Integer limit) { @@ -354,6 +341,42 @@ public List queryMethodTraces(String methodNamePrefix, Boolean met return db.findBy(q); } + public void saveThreadTrace(Linq snapshot) { + RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace(); + if (conf.getKeepDays() <= 0) { + return; + } + + Tasks.run(() -> { + EntityDatabase db = EntityDatabase.DEFAULT; + db.begin(); + try { + for (ThreadEntity t : snapshot) { + db.save(t, true); + } + db.commit(); + } catch (Throwable ex) { + log.error("dbTrace", ex); + db.rollback(); + } + }); + } + + public Linq queryThreadTrace(Long snapshotId, Date startTime, Date endTime) { + EntityQueryLambda q = new EntityQueryLambda<>(ThreadEntity.class); + if (snapshotId != null) { + q.eq(ThreadEntity::getSnapshotId, snapshotId); + } + if (startTime != null) { + q.ge(ThreadEntity::getSnapshotTime, startTime); + } + if (endTime != null) { + q.lt(ThreadEntity::getSnapshotTime, endTime); + } + EntityDatabase db = EntityDatabase.DEFAULT; + return Linq.from(db.findBy(q)); + } + public void saveMetric(String name, String message) { log.info("saveMetric {} {}", name, message); String stackTrace = Reflects.getStackTrace(Thread.currentThread()); diff --git a/rxlib/src/main/java/org/rx/spring/BaseInterceptor.java b/rxlib/src/main/java/org/rx/spring/BaseInterceptor.java index eb4c88c3..5547e32b 100644 --- a/rxlib/src/main/java/org/rx/spring/BaseInterceptor.java +++ b/rxlib/src/main/java/org/rx/spring/BaseInterceptor.java @@ -73,7 +73,7 @@ public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable { throw pe.getError(); } } finally { - TraceHandler.INSTANCE.saveMethodTrace(pe.getDeclaringType(), signature.getName(), pe.getParameters(), pe.getReturnValue(), pe.getError(), pe.getElapsedNanos()); + TraceHandler.INSTANCE.saveMethodTrace(Thread.currentThread(), pe.getDeclaringType(), signature.getName(), pe.getParameters(), pe.getReturnValue(), pe.getError(), pe.getElapsedNanos()); onLog(signature, pe, paramSnapshot); raiseEvent(onProceed, pe); }