Skip to content

Commit

Permalink
del pea
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Aug 16, 2024
1 parent 0bfe531 commit 4aba741
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 197 deletions.
40 changes: 0 additions & 40 deletions rxlib/src/main/java/org/rx/bean/ProceedEventArgs.java

This file was deleted.

81 changes: 10 additions & 71 deletions rxlib/src/main/java/org/rx/core/Sys.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.rx.annotation.Subscribe;
import org.rx.bean.DynamicProxyBean;
import org.rx.bean.LogStrategy;
import org.rx.bean.ProceedEventArgs;
import org.rx.codec.CodecUtil;
import org.rx.exception.FallbackException;
import org.rx.exception.InvalidException;
Expand Down Expand Up @@ -380,73 +379,6 @@ public static void clearLogCtx() {
mdc.clear();
}

public static void logHttp(@NonNull ProceedEventArgs eventArgs, String url) {
RxConfig conf = RxConfig.INSTANCE;
eventArgs.setLogStrategy(conf.logStrategy);
eventArgs.setLogTypeWhitelist(conf.logTypeWhitelist);
log(eventArgs, msg -> {
msg.appendLine("Url:\t%s", url)
.appendLine("Request:\t%s", toJsonString(eventArgs.getParameters()))
.appendLine("Response:\t%s\tElapsed=%s", toJsonString(eventArgs.getReturnValue()), formatNanosElapsed(eventArgs.getElapsedNanos()));
if (eventArgs.getError() != null) {
msg.appendLine("Error:\t%s", eventArgs.getError());
}
});
}

@SneakyThrows
public static void log(@NonNull ProceedEventArgs eventArgs, @NonNull BiAction<StringBuilder> formatMessage) {
Map<String, String> mappedDiagnosticCtx = getMDCCtxMap();
boolean doWrite = !mappedDiagnosticCtx.isEmpty();
if (!doWrite) {
if (eventArgs.getLogStrategy() == null) {
eventArgs.setLogStrategy(eventArgs.getError() != null ? LogStrategy.WRITE_ON_ERROR : LogStrategy.WRITE_ON_NULL);
}
switch (eventArgs.getLogStrategy()) {
case WRITE_ON_NULL:
doWrite = eventArgs.getError() != null
|| (!eventArgs.isVoid() && eventArgs.getReturnValue() == null)
|| (!Arrays.isEmpty(eventArgs.getParameters()) && Arrays.contains(eventArgs.getParameters(), null));
break;
case WRITE_ON_ERROR:
if (eventArgs.getError() != null) {
doWrite = true;
}
break;
case ALWAYS:
doWrite = true;
break;
}
}
if (doWrite) {
Set<String> whitelist = eventArgs.getLogTypeWhitelist();
if (!CollectionUtils.isEmpty(whitelist)) {
doWrite = Linq.from(whitelist).any(p -> eventArgs.getDeclaringType().getName().startsWith(p));
}
}
if (doWrite) {
org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(eventArgs.getDeclaringType());
StringBuilder msg = new StringBuilder(Constants.HEAP_BUF_SIZE);
formatMessage.invoke(msg);
boolean first = true;
for (Map.Entry<String, String> entry : mappedDiagnosticCtx.entrySet()) {
if (first) {
msg.append("MDC:\t");
first = false;
}
msg.appendFormat("%s=%s ", entry.getKey(), entry.getValue());
}
if (!first) {
msg.appendLine();
}
if (eventArgs.getError() != null) {
TraceHandler.INSTANCE.log(msg.toString(), eventArgs.getError());
} else {
log.info(msg.toString());
}
}
}

public interface ProceedFunc<T> extends Func<T> {
default boolean isVoid() {
return false;
Expand Down Expand Up @@ -479,7 +411,7 @@ public String buildParamSnapshot(Class<?> declaringType, String methodName, Obje
return "{}";
}
List<Object> list = Linq.from(args).select(p -> shortArg(declaringType, methodName, p)).toList();
return toJsonString(list.size() == 1 ? list.get(0) : list);
return toString(list.size() == 1 ? list.get(0) : list);
}

protected Object shortArg(Class<?> declaringType, String methodName, Object arg) {
Expand All @@ -498,17 +430,24 @@ protected Object shortArg(Class<?> declaringType, String methodName, Object arg)
return arg;
}

protected String toString(Object arg) {
if (arg == null) {
return "null";
}
return toJsonString(arg);
}

@Override
public String buildLog(Class<?> declaringType, String methodName, Object[] parameters, String paramSnapshot, Object returnValue, Throwable error, long elapsedNanos) {
StringBuilder msg = new StringBuilder(Constants.HEAP_BUF_SIZE);
if ((flags & HTTP_KEYWORD_FLAG) == HTTP_KEYWORD_FLAG) {
msg.appendLine("Call:\t%s", methodName)
.appendLine("Request:\t%s", paramSnapshot)
.appendLine("Response:\t%s\tElapsed=%s", toJsonString(shortArg(declaringType, methodName, returnValue)), formatNanosElapsed(elapsedNanos));
.appendLine("Response:\t%s\tElapsed=%s", toString(shortArg(declaringType, methodName, returnValue)), formatNanosElapsed(elapsedNanos));
} else {
msg.appendLine("Call:\t%s", methodName)
.appendLine("Parameters:\t%s", paramSnapshot)
.appendLine("ReturnValue:\t%s\tElapsed=%s", toJsonString(shortArg(declaringType, methodName, returnValue)), formatNanosElapsed(elapsedNanos));
.appendLine("ReturnValue:\t%s\tElapsed=%s", toString(shortArg(declaringType, methodName, returnValue)), formatNanosElapsed(elapsedNanos));
}
if (error != null) {
msg.appendLine("Error:\t%s", error.getMessage());
Expand Down
61 changes: 28 additions & 33 deletions rxlib/src/main/java/org/rx/net/http/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import okio.BufferedSink;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.rx.bean.ProceedEventArgs;
import org.rx.bean.Tuple;
import org.rx.core.Arrays;
import org.rx.core.*;
Expand Down Expand Up @@ -44,7 +43,6 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static org.rx.core.Sys.logHttp;
import static org.rx.core.Sys.toJsonString;

@Slf4j
Expand Down Expand Up @@ -492,41 +490,38 @@ Request.Builder createRequest(String url) {

@SneakyThrows
synchronized ResponseContent invoke(String url, HttpMethod method, RequestContent content) {
ProceedEventArgs args = new ProceedEventArgs(this.getClass(), new Object[]{method.toString(), content instanceof JsonContent ? ((JsonContent) content).json : content.toString()}, false);
try {
Request.Builder request = createRequest(url);
RequestBody requestBody = content.toBody();
if (HttpMethod.GET.equals(method)) {
request.get();
} else if (HttpMethod.POST.equals(method)) {
request.post(requestBody);
} else if (HttpMethod.HEAD.equals(method)) {
request.head();
} else if (HttpMethod.PUT.equals(method)) {
request.put(requestBody);
} else if (HttpMethod.PATCH.equals(method)) {
request.patch(requestBody);
} else if (HttpMethod.DELETE.equals(method)) {
request.delete(requestBody);
} else {
throw new UnsupportedOperationException();
}
if (resContent != null) {
resContent.response.close();
}
return resContent = args.proceed(() -> {
Request.Builder request = createRequest(url);
RequestBody requestBody = content.toBody();
if (HttpMethod.GET.equals(method)) {
request.get();
} else if (HttpMethod.POST.equals(method)) {
request.post(requestBody);
} else if (HttpMethod.HEAD.equals(method)) {
request.head();
} else if (HttpMethod.PUT.equals(method)) {
request.put(requestBody);
} else if (HttpMethod.PATCH.equals(method)) {
request.patch(requestBody);
} else if (HttpMethod.DELETE.equals(method)) {
request.delete(requestBody);
} else {
throw new UnsupportedOperationException();
}
if (resContent != null) {
resContent.response.close();
}
if ((featureFlags & ENABLE_LOG_FLAG) == ENABLE_LOG_FLAG) {
resContent = Sys.callLog(this.getClass(), String.format("%s %s", method, url), new Object[]{content instanceof JsonContent ? ((JsonContent) content).json : content.toString()}, () -> {
ResponseContent rc = new ResponseContent(getClient().newCall(request.build()).execute());
rc.cachingStream = (featureFlags & CACHING_STREAM_FLAG) == CACHING_STREAM_FLAG;
return rc;
});
} catch (Throwable e) {
args.setError(e);
throw e;
} finally {
if ((featureFlags & ENABLE_LOG_FLAG) == ENABLE_LOG_FLAG) {
logHttp(args, url);
}
}, Sys.HTTP_LOG_BUILDER, null);
} else {
ResponseContent rc = new ResponseContent(getClient().newCall(request.build()).execute());
rc.cachingStream = (featureFlags & CACHING_STREAM_FLAG) == CACHING_STREAM_FLAG;
resContent = rc;
}
return resContent;
}

public ResponseContent head(@NonNull String url) {
Expand Down
2 changes: 1 addition & 1 deletion rxlib/src/main/java/org/rx/net/http/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public String invoke() {
}
};
}
responseText = Sys.callLog(contract, String.format("%s\t%s", doPost ? "POST" : "GET", reqUrl), args, proceed, Sys.HTTP_LOG_BUILDER, null);
responseText = Sys.callLog(contract, String.format("%s %s", doPost ? "POST" : "GET", reqUrl), args, proceed, Sys.HTTP_LOG_BUILDER, null);
if (checkResponse != null && !checkResponse.invoke(responseText)) {
throw new InvalidException("Response status error");
}
Expand Down
97 changes: 46 additions & 51 deletions rxlib/src/main/java/org/rx/net/rpc/Remoting.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.commons.lang3.BooleanUtils;
import org.rx.bean.$;
import org.rx.bean.DynamicProxyBean;
import org.rx.bean.ProceedEventArgs;
import org.rx.core.*;
import org.rx.exception.TraceHandler;
import org.rx.net.Sockets;
Expand All @@ -33,10 +32,12 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.rx.bean.$.$;
import static org.rx.core.Extends.*;
import static org.rx.core.Sys.*;
import static org.rx.core.Sys.proxy;
import static org.rx.core.Sys.toJsonString;

//snappy + protobuf
@Slf4j
Expand Down Expand Up @@ -184,27 +185,36 @@ public static <T> T createFacade(@NonNull Class<T> contract, @NonNull RpcClientC
}
}
StatefulTcpClient client = sync.v;
Map<Integer, ClientBean> waitBeans = null;
AtomicReference<Map<Integer, ClientBean>> waitBeans = new AtomicReference<>();

MethodMessage methodMessage = as(pack, MethodMessage.class);
ProceedEventArgs eventArgs = methodMessage != null ? new ProceedEventArgs(contract, methodMessage.parameters, false) : null;
boolean isMethodCall = methodMessage != null;
try {
client.send(pack);
if (eventArgs != null) {
waitBeans = getClientBeans(client);
waitBeans.put(clientBean.pack.id, clientBean);
if (!clientBean.syncRoot.waitOne(client.getConfig().getConnectTimeoutMillis())) {
if (!client.isConnected()) {
throw new ClientDisconnectedException(client);
}
if (clientBean.pack.returnValue == null) {
throw new TimeoutException(String.format("The method %s read timeout", clientBean.pack.methodName));
}
}
clientBean.syncRoot.reset();
}
if (clientBean.pack.errorMessage != null) {
throw new RemotingException(clientBean.pack.errorMessage);
if (isMethodCall) {
Sys.callLog(contract,
String.format("Client %s.%s [%s -> %s]", contract.getSimpleName(), methodMessage.methodName,
Sockets.toString(client.getLocalEndpoint()),
Sockets.toString(client.getConfig().getServerEndpoint())), methodMessage.parameters, () -> {
client.send(methodMessage);
Map<Integer, ClientBean> wb = getClientBeans(client);
waitBeans.set(wb);
wb.put(clientBean.pack.id, clientBean);
if (!clientBean.syncRoot.waitOne(client.getConfig().getConnectTimeoutMillis())) {
if (!client.isConnected()) {
throw new ClientDisconnectedException(client);
}
if (clientBean.pack.returnValue == null) {
throw new TimeoutException(String.format("The method %s read timeout", clientBean.pack.methodName));
}
}
clientBean.syncRoot.reset();
if (clientBean.pack.errorMessage != null) {
throw new RemotingException(clientBean.pack.errorMessage);
}
return clientBean.pack.returnValue;
});
} else {
client.send(pack);
}
} catch (ClientDisconnectedException e) {
if (!client.getConfig().isEnableReconnect()) {
Expand All @@ -213,39 +223,24 @@ public static <T> T createFacade(@NonNull Class<T> contract, @NonNull RpcClientC
throw e;
}

if (eventArgs == null) {
throw e;
}
waitBeans = getClientBeans(client);
waitBeans.put(clientBean.pack.id, clientBean);
if (!clientBean.syncRoot.waitOne(client.getConfig().getConnectTimeoutMillis())) {
if (clientBean.pack.returnValue == null) {
eventArgs.setError(e);
throw e;
if (isMethodCall) {
Map<Integer, ClientBean> wb = getClientBeans(client);
waitBeans.set(wb);
wb.put(clientBean.pack.id, clientBean);
if (!clientBean.syncRoot.waitOne(client.getConfig().getConnectTimeoutMillis())) {
if (clientBean.pack.returnValue == null) {
throw e;
}
}
clientBean.syncRoot.reset();
} else {
throw e;
}
clientBean.syncRoot.reset();
} catch (Throwable e) {
if (eventArgs != null) {
eventArgs.setError(e);
}
throw e;
} finally {
if (eventArgs != null) {
log(eventArgs, msg -> {
msg.appendLine("Client invoke %s.%s [%s -> %s]", contract.getSimpleName(), methodMessage.methodName,
Sockets.toString(client.getLocalEndpoint()),
Sockets.toString(client.getConfig().getServerEndpoint()));
msg.appendLine("Request:\t%s", toJsonString(methodMessage.parameters))
.appendLine("Response:\t%s", clientBean.pack == null ? "NULL" : toJsonString(clientBean.pack.returnValue));
if (eventArgs.getError() != null) {
msg.appendLine("Error:\t%s", eventArgs.getError().getMessage());
}
});
}
if (waitBeans != null) {
waitBeans.remove(clientBean.pack.id);
if (waitBeans.isEmpty()) {
Map<Integer, ClientBean> wb = waitBeans.get();
if (wb != null) {
wb.remove(clientBean.pack.id);
if (wb.isEmpty()) {
synchronized (sync) {
sync.v = pool.returnClient(client);
}
Expand Down Expand Up @@ -411,7 +406,7 @@ public static TcpServer register(@NonNull Object contractInstance, @NonNull RpcS
MethodMessage pack = (MethodMessage) e.getValue();
try {
pack.returnValue = Sys.callLog(contractInstance.getClass(),
String.format("Server %s.%s [%s]-> %s", contractInstance.getClass().getSimpleName(), pack.methodName,
String.format("Server %s.%s [%s -> %s]", contractInstance.getClass().getSimpleName(), pack.methodName,
s.getConfig().getListenPort(), Sockets.toString(e.getClient().getRemoteEndpoint())),
pack.parameters, () -> RemotingContext.invoke(() -> {
String tn = RxConfig.INSTANCE.getThreadPool().getTraceName();
Expand Down
Loading

0 comments on commit 4aba741

Please sign in to comment.