From 0bfe531b9bdeb179d6f299b8fa080be636ba9e48 Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Fri, 16 Aug 2024 11:30:27 +0800 Subject: [PATCH] remoting server --- rxlib/src/main/java/org/rx/core/Reflects.java | 6 +- rxlib/src/main/java/org/rx/core/RxConfig.java | 6 +- rxlib/src/main/java/org/rx/core/Sys.java | 160 +++++++++++++++++- .../main/java/org/rx/net/http/RestClient.java | 2 +- .../main/java/org/rx/net/rpc/Remoting.java | 37 ++-- .../java/org/rx/spring/BaseInterceptor.java | 8 +- 6 files changed, 178 insertions(+), 41 deletions(-) diff --git a/rxlib/src/main/java/org/rx/core/Reflects.java b/rxlib/src/main/java/org/rx/core/Reflects.java index 4de4dbb7..99ad7181 100644 --- a/rxlib/src/main/java/org/rx/core/Reflects.java +++ b/rxlib/src/main/java/org/rx/core/Reflects.java @@ -206,8 +206,8 @@ static SerializedLambda getLambda(BiFunc func) { return lambda; } - static final int APPEND_TO_COLLECTION = 1; - static final int WRITE_QUIETLY = 1 << 1; + static final byte APPEND_TO_COLLECTION = 1; + static final byte WRITE_QUIETLY = 1 << 1; /** * @param instance @@ -216,7 +216,7 @@ static SerializedLambda getLambda(BiFunc func) { * @param flags 1 append to Collection * 1 << 1 write quietly */ - public static void writeFieldByPath(Object instance, String fieldPath, Object value, int flags) { + public static void writeFieldByPath(Object instance, String fieldPath, Object value, byte flags) { final String c = "."; int wPathOffset = 0, i; String fieldName; diff --git a/rxlib/src/main/java/org/rx/core/RxConfig.java b/rxlib/src/main/java/org/rx/core/RxConfig.java index 2df008c2..7e199687 100644 --- a/rxlib/src/main/java/org/rx/core/RxConfig.java +++ b/rxlib/src/main/java/org/rx/core/RxConfig.java @@ -220,7 +220,7 @@ public int getIntId() { private RxConfig() { } - public void refreshFrom(Environment env, int flags) { + public void refreshFrom(Environment env, byte flags) { Map rsProps = Linq.from(Reflects.getFieldMap(RxConfig.ConfigNames.class).values()).select(p -> { String k = (String) p.get(null); return new AbstractMap.SimpleEntry<>(k, env.getProperty(k)); @@ -229,10 +229,10 @@ public void refreshFrom(Environment env, int flags) { } public void refreshFrom(Map props) { - refreshFrom(props, 0); + refreshFrom(props, (byte) 0); } - public void refreshFrom(Map props, int flags) { + public void refreshFrom(Map props, byte flags) { Linq.from(Reflects.getFieldMap(ConfigNames.class).values()).select(p -> p.get(null)).join(props.entrySet(), (p, x) -> eq(p, x.getKey()), (p, x) -> { Reflects.writeFieldByPath(this, ConfigNames.getWithoutPrefix(x.getKey()), x.getValue(), flags); return null; diff --git a/rxlib/src/main/java/org/rx/core/Sys.java b/rxlib/src/main/java/org/rx/core/Sys.java index 294ab774..fa3c4d01 100644 --- a/rxlib/src/main/java/org/rx/core/Sys.java +++ b/rxlib/src/main/java/org/rx/core/Sys.java @@ -6,10 +6,7 @@ import com.sun.management.HotSpotDiagnosticMXBean; import com.sun.management.OperatingSystemMXBean; import io.netty.util.Timeout; -import lombok.Getter; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; +import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.IterableUtils; @@ -27,6 +24,7 @@ import org.rx.util.Lazy; import org.rx.util.function.BiAction; import org.rx.util.function.BiFunc; +import org.rx.util.function.Func; import org.rx.util.function.TripleFunc; import org.slf4j.MDC; import org.slf4j.spi.MDCAdapter; @@ -449,6 +447,160 @@ public static void log(@NonNull ProceedEventArgs eventArgs, @NonNull BiAction extends Func { + default boolean isVoid() { + return false; + } + } + + public interface CallLogBuilder { + String buildParamSnapshot(Class declaringType, String methodName, Object[] parameters); + + String buildLog(Class declaringType, String methodName, Object[] parameters, String paramSnapshot, Object returnValue, Throwable error, long elapsedNanos); + } + + public static class DefaultCallLogBuilder implements CallLogBuilder { + public static final byte HTTP_KEYWORD_FLAG = 1, LOG_MDC_FLAG = 1 << 1; + @Getter + @Setter + byte flags; + + public DefaultCallLogBuilder() { + this((byte) 0); + } + + public DefaultCallLogBuilder(byte flags) { + this.flags = flags; + } + + @Override + public String buildParamSnapshot(Class declaringType, String methodName, Object[] args) { + if (Arrays.isEmpty(args)) { + return "{}"; + } + List list = Linq.from(args).select(p -> shortArg(declaringType, methodName, p)).toList(); + return toJsonString(list.size() == 1 ? list.get(0) : list); + } + + protected Object shortArg(Class declaringType, String methodName, Object arg) { + if (arg instanceof byte[]) { + byte[] b = (byte[]) arg; + if (b.length > MAX_FIELD_SIZE) { + return "[BigBytes]"; + } + } + if (arg instanceof String) { + String s = (String) arg; + if (s.length() > MAX_FIELD_SIZE) { + return "[BigString]"; + } + } + return arg; + } + + @Override + public String buildLog(Class declaringType, String methodName, Object[] parameters, String paramSnapshot, Object returnValue, Throwable error, long elapsedNanos) { + StringBuilder msg = new StringBuilder(Constants.HEAP_BUF_SIZE); + if ((flags & HTTP_KEYWORD_FLAG) == HTTP_KEYWORD_FLAG) { + msg.appendLine("Call:\t%s", methodName) + .appendLine("Request:\t%s", paramSnapshot) + .appendLine("Response:\t%s\tElapsed=%s", toJsonString(shortArg(declaringType, methodName, returnValue)), formatNanosElapsed(elapsedNanos)); + } else { + msg.appendLine("Call:\t%s", methodName) + .appendLine("Parameters:\t%s", paramSnapshot) + .appendLine("ReturnValue:\t%s\tElapsed=%s", toJsonString(shortArg(declaringType, methodName, returnValue)), formatNanosElapsed(elapsedNanos)); + } + if (error != null) { + msg.appendLine("Error:\t%s", error.getMessage()); + } + + if ((flags & LOG_MDC_FLAG) == LOG_MDC_FLAG) { + Map mappedDiagnosticCtx = getMDCCtxMap(); + boolean first = true; + for (Map.Entry entry : mappedDiagnosticCtx.entrySet()) { + if (first) { + msg.append("MDC:\t"); + first = false; + } + msg.appendFormat("%s=%s ", entry.getKey(), entry.getValue()); + } + if (!first) { + msg.appendLine(); + } + } + return msg.toString(); + } + } + + static final int MAX_FIELD_SIZE = 1024 * 4; + public static final CallLogBuilder DEFAULT_LOG_BUILDER = new DefaultCallLogBuilder(); + public static final CallLogBuilder HTTP_LOG_BUILDER = new DefaultCallLogBuilder(DefaultCallLogBuilder.HTTP_KEYWORD_FLAG); + + public static T callLog(Class declaringType, String methodName, Object[] parameters, ProceedFunc proceed) { + return callLog(declaringType, methodName, parameters, proceed, DEFAULT_LOG_BUILDER, null); + } + + @SneakyThrows + public static T callLog(@NonNull Class declaringType, @NonNull String methodName, Object[] parameters, @NonNull ProceedFunc proceed, @NonNull CallLogBuilder builder, LogStrategy strategy) { + Object returnValue = null; + Throwable error = null; + String paramSnapshot = builder.buildParamSnapshot(declaringType, methodName, parameters); + + long start = System.nanoTime(); + try { + T retVal = proceed.invoke(); + returnValue = retVal; + return retVal; + } catch (Throwable e) { + throw error = e; + } finally { + long elapsedNanos = System.nanoTime() - start; + try { + TraceHandler.INSTANCE.saveMethodTrace(Thread.currentThread(), declaringType.getName(), methodName, parameters, returnValue, error, elapsedNanos); + + RxConfig conf = RxConfig.INSTANCE; + if (strategy == null) { + strategy = conf.logStrategy; + } + if (strategy == null) { + strategy = error != null ? LogStrategy.WRITE_ON_ERROR : LogStrategy.WRITE_ON_NULL; + } + + boolean doWrite; + switch (strategy) { + case WRITE_ON_NULL: + doWrite = error != null + || (!proceed.isVoid() && returnValue == null) + || (!Arrays.isEmpty(parameters) && Arrays.contains(parameters, null)); + break; + case WRITE_ON_ERROR: + doWrite = error != null; + break; + case ALWAYS: + doWrite = true; + break; + default: + doWrite = false; + break; + } + if (doWrite && !CollectionUtils.isEmpty(conf.logTypeWhitelist)) { + doWrite = Linq.from(conf.logTypeWhitelist).any(p -> declaringType.getName().startsWith(p)); + } + if (doWrite) { + String msg = builder.buildLog(declaringType, methodName, parameters, paramSnapshot, returnValue, error, elapsedNanos); + if (error != null) { + TraceHandler.INSTANCE.log(msg, error); + } else { + org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(declaringType); + log.info(msg); + } + } + } catch (Throwable e) { + log.warn("callLog", e); + } + } + } + public static Map getMDCCtxMap() { MDCAdapter mdc = MDC.getMDCAdapter(); if (mdc == null) { diff --git a/rxlib/src/main/java/org/rx/net/http/RestClient.java b/rxlib/src/main/java/org/rx/net/http/RestClient.java index 1adda778..49d483aa 100644 --- a/rxlib/src/main/java/org/rx/net/http/RestClient.java +++ b/rxlib/src/main/java/org/rx/net/http/RestClient.java @@ -97,7 +97,7 @@ public String invoke() { } }; } - responseText = Sys.callLog(contract, String.format("%s\t%s", doPost ? "POST" : "GET", reqUrl), args, proceed); + responseText = Sys.callLog(contract, String.format("%s\t%s", doPost ? "POST" : "GET", reqUrl), args, proceed, Sys.HTTP_LOG_BUILDER, null); if (checkResponse != null && !checkResponse.invoke(responseText)) { throw new InvalidException("Response status error"); } diff --git a/rxlib/src/main/java/org/rx/net/rpc/Remoting.java b/rxlib/src/main/java/org/rx/net/rpc/Remoting.java index 0134447c..25d73426 100644 --- a/rxlib/src/main/java/org/rx/net/rpc/Remoting.java +++ b/rxlib/src/main/java/org/rx/net/rpc/Remoting.java @@ -409,33 +409,24 @@ public static TcpServer register(@NonNull Object contractInstance, @NonNull RpcS } MethodMessage pack = (MethodMessage) e.getValue(); - ProceedEventArgs args = new ProceedEventArgs(contractInstance.getClass(), pack.parameters, false); try { - pack.returnValue = RemotingContext.invoke(() -> args.proceed(() -> { - String tn = RxConfig.INSTANCE.getThreadPool().getTraceName(); - if (tn != null) { - ThreadPool.startTrace(pack.traceId, true); - } - try { - return Reflects.invokeMethod(contractInstance, pack.methodName, pack.parameters); - } finally { - ThreadPool.endTrace(); - } - }), s, e.getClient()); + pack.returnValue = Sys.callLog(contractInstance.getClass(), + String.format("Server %s.%s [%s]-> %s", contractInstance.getClass().getSimpleName(), pack.methodName, + s.getConfig().getListenPort(), Sockets.toString(e.getClient().getRemoteEndpoint())), + pack.parameters, () -> RemotingContext.invoke(() -> { + String tn = RxConfig.INSTANCE.getThreadPool().getTraceName(); + if (tn != null) { + ThreadPool.startTrace(pack.traceId, true); + } + try { + return Reflects.invokeMethod(contractInstance, pack.methodName, pack.parameters); + } finally { + ThreadPool.endTrace(); + } + }, s, e.getClient())); } catch (Throwable ex) { Throwable cause = ifNull(ex.getCause(), ex); - args.setError(ex); pack.errorMessage = String.format("%s %s", cause.getClass().getSimpleName(), cause.getMessage()); - } finally { - log(args, msg -> { - msg.appendLine("Server invoke %s.%s [%s]-> %s", contractInstance.getClass().getSimpleName(), pack.methodName, - s.getConfig().getListenPort(), Sockets.toString(e.getClient().getRemoteEndpoint())); - msg.appendLine("Request:\t%s", toJsonString(args.getParameters())) - .appendLine("Response:\t%s", toJsonString(args.getReturnValue())); - if (args.getError() != null) { - msg.appendLine("Error:\t%s", pack.errorMessage); - } - }); } Arrays.fill(pack.parameters, null); e.getClient().send(pack); diff --git a/rxlib/src/main/java/org/rx/spring/BaseInterceptor.java b/rxlib/src/main/java/org/rx/spring/BaseInterceptor.java index 798fe730..46a06c6d 100644 --- a/rxlib/src/main/java/org/rx/spring/BaseInterceptor.java +++ b/rxlib/src/main/java/org/rx/spring/BaseInterceptor.java @@ -6,21 +6,15 @@ import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.reflect.MethodSignature; -import org.rx.bean.FlagsEnum; import org.rx.core.*; import static org.rx.core.Extends.as; import static org.rx.core.Sys.*; -public abstract class BaseInterceptor implements EventPublisher { +public abstract class BaseInterceptor { static final FastThreadLocal idempotent = new FastThreadLocal<>(); protected CallLogBuilder logBuilder = Sys.DEFAULT_LOG_BUILDER; - @Override - public FlagsEnum eventFlags() { - return EventFlags.DYNAMIC_ATTACH.flags(EventFlags.QUIETLY); - } - protected final void enableTrace(String traceName) { if (traceName == null) { traceName = Constants.DEFAULT_TRACE_NAME;