From 608fb555496e9ad0119fe6a696c317c95679d2dc Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Wed, 4 Dec 2024 16:17:14 +0800 Subject: [PATCH] up --- .../src/main/java/org/rx/core/ObjectPool.java | 70 +++++++++++-------- .../rx/net/rpc/protocol/MethodMessage.java | 10 +++ .../java/org/rx/net/transport/TcpServer.java | 4 +- 3 files changed, 52 insertions(+), 32 deletions(-) diff --git a/rxlib/src/main/java/org/rx/core/ObjectPool.java b/rxlib/src/main/java/org/rx/core/ObjectPool.java index aea7d6f43..4c0da3247 100644 --- a/rxlib/src/main/java/org/rx/core/ObjectPool.java +++ b/rxlib/src/main/java/org/rx/core/ObjectPool.java @@ -9,6 +9,7 @@ import org.rx.util.function.Func; import org.rx.util.function.PredicateFunc; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; @@ -51,7 +52,8 @@ public synchronized boolean isLeaked(long threshold) { final PredicateFunc validateHandler; final BiAction passivateHandler; final ConcurrentLinkedDeque> stack = new ConcurrentLinkedDeque<>(); - final Map, ObjectConf> conf = new ConcurrentHashMap<>(); + final ConcurrentHashMap, ObjectConf> conf = new ConcurrentHashMap<>(); + // final HashMap, ObjectConf> conf = new HashMap<>(); final AtomicInteger size = new AtomicInteger(); @Getter final int minSize; @@ -152,14 +154,16 @@ IdentityWrapper doCreate() { } IdentityWrapper wrapper = new IdentityWrapper<>(createHandler.get()); - if (!stack.offer(wrapper)) { - log.error("ObjPool create object fail: Offer stack fail"); - return null; - } - ObjectConf c = new ObjectConf(); - c.setBorrowed(true); - if (conf.putIfAbsent(wrapper, c) != null) { - throw new InvalidException("create object fail, object '{}' has already in this pool", wrapper); + synchronized (conf) { + if (!stack.offer(wrapper)) { + log.error("ObjPool create object fail: Offer stack fail"); + return null; + } + ObjectConf c = new ObjectConf(); + c.setBorrowed(true); + if (conf.putIfAbsent(wrapper, c) != null) { + throw new InvalidException("create object fail, object '{}' has already in this pool", wrapper); + } } size.incrementAndGet(); @@ -173,14 +177,16 @@ IdentityWrapper doCreate() { boolean doRetire(IdentityWrapper wrapper, int action) { boolean ok; - ok = stack.remove(wrapper); + synchronized (conf) { + ok = stack.remove(wrapper); // if (ok) { - ObjectConf c = conf.remove(wrapper); - if (c != null) { - size.decrementAndGet(); + ObjectConf c = conf.remove(wrapper); + if (c != null) { + size.decrementAndGet(); - if (!c.isBorrowed()) { - tryClose(wrapper); + if (!c.isBorrowed()) { + tryClose(wrapper); + } } } // } @@ -191,9 +197,11 @@ boolean doRetire(IdentityWrapper wrapper, int action) { IdentityWrapper doPoll() { IdentityWrapper wrapper; ObjectConf c; - while ((wrapper = stack.poll()) != null && (c = conf.get(wrapper)) != null && !c.isBorrowed()) { - c.setBorrowed(true); - return wrapper; + synchronized (conf) { + while ((wrapper = stack.poll()) != null && (c = conf.get(wrapper)) != null && !c.isBorrowed()) { + c.setBorrowed(true); + return wrapper; + } } return null; } @@ -220,19 +228,21 @@ public void recycle(@NonNull T obj) { return; } - ObjectConf c = conf.get(wrapper); - if (c == null) { - throw new InvalidException("Object '{}' not belong to this pool", wrapper); - } - if (!c.isBorrowed()) { - throw new InvalidException("Object '{}' has already in this pool", wrapper); - } - c.setBorrowed(false); - if ( + synchronized (conf) { + ObjectConf c = conf.get(wrapper); + if (c == null) { + throw new InvalidException("Object '{}' not belong to this pool", wrapper); + } + if (!c.isBorrowed()) { + throw new InvalidException("Object '{}' has already in this pool", wrapper); + } + c.setBorrowed(false); + if ( // size() > maxSize || //Not required - !stack.offer(wrapper)) { - doRetire(wrapper, 2); - return; + !stack.offer(wrapper)) { + doRetire(wrapper, 2); + return; + } } if (passivateHandler != null) { diff --git a/rxlib/src/main/java/org/rx/net/rpc/protocol/MethodMessage.java b/rxlib/src/main/java/org/rx/net/rpc/protocol/MethodMessage.java index 43bb6752d..95ae12810 100644 --- a/rxlib/src/main/java/org/rx/net/rpc/protocol/MethodMessage.java +++ b/rxlib/src/main/java/org/rx/net/rpc/protocol/MethodMessage.java @@ -3,6 +3,7 @@ import lombok.RequiredArgsConstructor; import java.io.Serializable; +import java.util.Arrays; @RequiredArgsConstructor public class MethodMessage implements Serializable { @@ -13,4 +14,13 @@ public class MethodMessage implements Serializable { public final String traceId; public Object returnValue; public String errorMessage; + + @Override + public String toString() { + return "MethodMessage[" + id + "]{" + + "methodName='" + methodName + '\'' + + ", parameters=" + Arrays.toString(parameters) + + ", returnValue='" + returnValue + '\'' + + '}'; + } } diff --git a/rxlib/src/main/java/org/rx/net/transport/TcpServer.java b/rxlib/src/main/java/org/rx/net/transport/TcpServer.java index 3f3e4e95e..9f4fb9569 100644 --- a/rxlib/src/main/java/org/rx/net/transport/TcpServer.java +++ b/rxlib/src/main/java/org/rx/net/transport/TcpServer.java @@ -57,7 +57,7 @@ public void send(Serializable pack) { } channel.writeAndFlush(pack); - log.debug("serverWrite {} {}", channel.remoteAddress(), pack.getClass()); + log.debug("serverWrite {} {}", channel.remoteAddress(), pack); } @Override @@ -93,7 +93,7 @@ public void channelActive(ChannelHandlerContext ctx) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); - log.debug("serverRead {} {}", channel.remoteAddress(), msg.getClass()); + log.debug("serverRead {} {}", channel.remoteAddress(), msg); Serializable pack; if ((pack = as(msg, Serializable.class)) == null) {