Skip to content

Commit

Permalink
[INLONG-9784][Audit] Optimize sending memory management when the audi…
Browse files Browse the repository at this point in the history
…t-proxy config is null (#9786)
  • Loading branch information
doleyzi authored Mar 6, 2024
1 parent 293ab1a commit 55f6395
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,17 @@ public SenderGroup(SenderManager senderManager) {
* @return
*/
public SenderResult send(ByteBuf dataBuf) {
if (dataBuf == null) {
return new SenderResult("dataBuf is null", 0, false);
}
LinkedBlockingQueue<SenderChannel> channels = channelGroups.get(mIndex);
SenderChannel channel = null;
boolean dataBufReleased = false;
try {
if (channels.size() <= 0) {
LOG.error("channels is empty");
dataBuf.release();
dataBufReleased = true;
return new SenderResult("channels is empty", 0, false);
}
boolean isOk = false;
Expand Down Expand Up @@ -133,6 +138,7 @@ public SenderResult send(ByteBuf dataBuf) {
if (channel == null) {
LOG.error("can not get a channel");
dataBuf.release();
dataBufReleased = true;
return new SenderResult("can not get a channel", 0, false);
}

Expand All @@ -145,8 +151,10 @@ public SenderResult send(ByteBuf dataBuf) {
}
t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
}
dataBufReleased = true;
} else {
dataBuf.release();
dataBufReleased = true;
}
return new SenderResult(channel.getAddr().getHostString(), channel.getAddr().getPort(), t.isSuccess());
} catch (Throwable ex) {
Expand All @@ -155,8 +163,12 @@ public SenderResult send(ByteBuf dataBuf) {
return new SenderResult("127.0.0.1", 0, false);
} finally {
if (channel != null) {
channel.release();
channels.offer(channel);
}
if (!dataBufReleased) {
dataBuf.release();
}
}
}

Expand Down

0 comments on commit 55f6395

Please sign in to comment.