Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Dec 4, 2024
1 parent e84a559 commit 608fb55
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
70 changes: 40 additions & 30 deletions rxlib/src/main/java/org/rx/core/ObjectPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.rx.util.function.Func;
import org.rx.util.function.PredicateFunc;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -51,7 +52,8 @@ public synchronized boolean isLeaked(long threshold) {
final PredicateFunc<T> validateHandler;
final BiAction<T> passivateHandler;
final ConcurrentLinkedDeque<IdentityWrapper<T>> stack = new ConcurrentLinkedDeque<>();
final Map<IdentityWrapper<T>, ObjectConf> conf = new ConcurrentHashMap<>();
final ConcurrentHashMap<IdentityWrapper<T>, ObjectConf> conf = new ConcurrentHashMap<>();
// final HashMap<IdentityWrapper<T>, ObjectConf> conf = new HashMap<>();
final AtomicInteger size = new AtomicInteger();
@Getter
final int minSize;
Expand Down Expand Up @@ -152,14 +154,16 @@ IdentityWrapper<T> doCreate() {
}

IdentityWrapper<T> wrapper = new IdentityWrapper<>(createHandler.get());
if (!stack.offer(wrapper)) {
log.error("ObjPool create object fail: Offer stack fail");
return null;
}
ObjectConf c = new ObjectConf();
c.setBorrowed(true);
if (conf.putIfAbsent(wrapper, c) != null) {
throw new InvalidException("create object fail, object '{}' has already in this pool", wrapper);
synchronized (conf) {
if (!stack.offer(wrapper)) {
log.error("ObjPool create object fail: Offer stack fail");
return null;
}
ObjectConf c = new ObjectConf();
c.setBorrowed(true);
if (conf.putIfAbsent(wrapper, c) != null) {
throw new InvalidException("create object fail, object '{}' has already in this pool", wrapper);
}
}
size.incrementAndGet();

Expand All @@ -173,14 +177,16 @@ IdentityWrapper<T> doCreate() {
boolean doRetire(IdentityWrapper<T> wrapper, int action) {
boolean ok;

ok = stack.remove(wrapper);
synchronized (conf) {
ok = stack.remove(wrapper);
// if (ok) {
ObjectConf c = conf.remove(wrapper);
if (c != null) {
size.decrementAndGet();
ObjectConf c = conf.remove(wrapper);
if (c != null) {
size.decrementAndGet();

if (!c.isBorrowed()) {
tryClose(wrapper);
if (!c.isBorrowed()) {
tryClose(wrapper);
}
}
}
// }
Expand All @@ -191,9 +197,11 @@ boolean doRetire(IdentityWrapper<T> wrapper, int action) {
IdentityWrapper<T> doPoll() {
IdentityWrapper<T> wrapper;
ObjectConf c;
while ((wrapper = stack.poll()) != null && (c = conf.get(wrapper)) != null && !c.isBorrowed()) {
c.setBorrowed(true);
return wrapper;
synchronized (conf) {
while ((wrapper = stack.poll()) != null && (c = conf.get(wrapper)) != null && !c.isBorrowed()) {
c.setBorrowed(true);
return wrapper;
}
}
return null;
}
Expand All @@ -220,19 +228,21 @@ public void recycle(@NonNull T obj) {
return;
}

ObjectConf c = conf.get(wrapper);
if (c == null) {
throw new InvalidException("Object '{}' not belong to this pool", wrapper);
}
if (!c.isBorrowed()) {
throw new InvalidException("Object '{}' has already in this pool", wrapper);
}
c.setBorrowed(false);
if (
synchronized (conf) {
ObjectConf c = conf.get(wrapper);
if (c == null) {
throw new InvalidException("Object '{}' not belong to this pool", wrapper);
}
if (!c.isBorrowed()) {
throw new InvalidException("Object '{}' has already in this pool", wrapper);
}
c.setBorrowed(false);
if (
// size() > maxSize || //Not required
!stack.offer(wrapper)) {
doRetire(wrapper, 2);
return;
!stack.offer(wrapper)) {
doRetire(wrapper, 2);
return;
}
}

if (passivateHandler != null) {
Expand Down
10 changes: 10 additions & 0 deletions rxlib/src/main/java/org/rx/net/rpc/protocol/MethodMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.Arrays;

@RequiredArgsConstructor
public class MethodMessage implements Serializable {
Expand All @@ -13,4 +14,13 @@ public class MethodMessage implements Serializable {
public final String traceId;
public Object returnValue;
public String errorMessage;

@Override
public String toString() {
return "MethodMessage[" + id + "]{" +
"methodName='" + methodName + '\'' +
", parameters=" + Arrays.toString(parameters) +
", returnValue='" + returnValue + '\'' +
'}';
}
}
4 changes: 2 additions & 2 deletions rxlib/src/main/java/org/rx/net/transport/TcpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void send(Serializable pack) {
}

channel.writeAndFlush(pack);
log.debug("serverWrite {} {}", channel.remoteAddress(), pack.getClass());
log.debug("serverWrite {} {}", channel.remoteAddress(), pack);
}

@Override
Expand Down Expand Up @@ -93,7 +93,7 @@ public void channelActive(ChannelHandlerContext ctx) {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel channel = ctx.channel();
log.debug("serverRead {} {}", channel.remoteAddress(), msg.getClass());
log.debug("serverRead {} {}", channel.remoteAddress(), msg);

Serializable pack;
if ((pack = as(msg, Serializable.class)) == null) {
Expand Down

0 comments on commit 608fb55

Please sign in to comment.