Skip to content

Commit

Permalink
Merge pull request #918 from ctripcorp/bugfix/proxy_mem_leak
Browse files Browse the repository at this point in the history
fix proxy direct mem leak
  • Loading branch information
LanternLee authored Dec 10, 2024
2 parents ae59b36 + fe24ce1 commit 8545726
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.ctrip.xpipe.redis.proxy.exception;

import com.ctrip.xpipe.exception.XpipeRuntimeException;

public class WriteToClosedSessionException extends XpipeRuntimeException {

public WriteToClosedSessionException(String message){
super(message);
}

public WriteToClosedSessionException(String message, Throwable th){
super(message, th);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import com.ctrip.xpipe.netty.AbstractNettyHandler;
import com.ctrip.xpipe.redis.proxy.Session;
import com.ctrip.xpipe.redis.proxy.Tunnel;
import com.ctrip.xpipe.redis.proxy.exception.ResourceIncorrectException;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import com.ctrip.xpipe.redis.proxy.session.state.SessionClosed;
import com.ctrip.xpipe.utils.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.NotSslRecordException;
import io.netty.util.ReferenceCountUtil;

import static io.netty.util.internal.StringUtil.NEWLINE;

Expand All @@ -23,6 +26,36 @@ public abstract class AbstractSessionNettyHandler extends AbstractNettyHandler {

protected Session session;

protected abstract void doMsgTransfer(ByteBuf msg);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if(!(msg instanceof ByteBuf)) {
logger.error("[channelRead] InCorrect Type: {}", msg.getClass().getName());
ReferenceCountUtil.release(msg);
throw new ResourceIncorrectException("Unexpected type for read: {}" + msg.getClass().getName());
}

boolean sessionBroken = false;
try {
doMsgTransfer((ByteBuf) msg);
} catch (WriteToClosedSessionException e) {
logger.info("[forward][forward closed, close] {}", e.getMessage());
sessionBroken = true;
} catch (Throwable th) {
logger.info("[forward][fail, close]", th);
sessionBroken = true;
}

if (sessionBroken) {
ReferenceCountUtil.release(msg);
session.release();
return;
}

ctx.fireChannelRead(msg);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("[channelInactive]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.ctrip.xpipe.redis.proxy.Tunnel;
import com.ctrip.xpipe.redis.proxy.exception.ResourceIncorrectException;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
* @author chen.zhu
Expand All @@ -18,14 +20,8 @@ public BackendSessionHandler(Tunnel tunnel) {
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(!(msg instanceof ByteBuf)) {
logger.error("[channelRead] InCorrect Type: {}", msg.getClass().getName());
throw new ResourceIncorrectException("Unexpected type for read: {}" + msg.getClass().getName());
}
tunnel.forwardToFrontend((ByteBuf) msg);

ctx.fireChannelRead(msg);
protected void doMsgTransfer(ByteBuf msg) {
tunnel.forwardToFrontend(msg);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.ctrip.xpipe.redis.proxy.Tunnel;
import com.ctrip.xpipe.redis.proxy.exception.ResourceIncorrectException;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import com.ctrip.xpipe.utils.ChannelUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;


/**
Expand All @@ -20,20 +22,8 @@ public FrontendSessionNettyHandler(Tunnel tunnel) {
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {

if(msg instanceof ByteBuf) {
if(tunnel != null) {
tunnel.forwardToBackend((ByteBuf) msg);
} else {
logger.error("[doChannelRead] send non-proxy-protocol from channel {}: {} from channel: {}",
ChannelUtil.getDesc(ctx.channel()), formatByteBuf("RECEIVE", (ByteBuf) msg));
ctx.channel().close();
}
} else {
throw new ResourceIncorrectException("Unexpected type for read: " + msg.getClass().getName());
}
ctx.fireChannelRead(msg);
protected void doMsgTransfer(ByteBuf msg) {
tunnel.forwardToBackend(msg);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.ctrip.xpipe.redis.proxy.Tunnel;
import com.ctrip.xpipe.redis.proxy.model.SessionMeta;
import com.ctrip.xpipe.redis.proxy.session.state.SessionClosed;
import com.ctrip.xpipe.redis.proxy.session.state.SessionClosing;
import com.ctrip.xpipe.utils.ChannelUtil;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -130,6 +131,11 @@ public void doWrite(ByteBuf byteBuf) {
getChannel().writeAndFlush(byteBuf.retain(), getChannel().voidPromise());
}

protected boolean isClosed() {
SessionState sessionState = getSessionState();
return sessionState instanceof SessionClosing || sessionState instanceof SessionClosed;
}

protected void setSessionState(SessionState newState) {
if(!getSessionState().isValidNext(newState)) {
logger.error("[setSessionState] Set state failed, state relationship not match, old: {}, new: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCountUtil;

import java.util.Collections;
import java.util.LinkedList;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class DefaultBackendSession extends AbstractSession implements BackendSes

private ProxyEndpointSelector selector;

private Queue<ByteBuf> sendAfterProtocol;
private volatile Queue<ByteBuf> sendAfterProtocol = new LinkedList<>();

private EventLoopGroup nioEventLoopGroup;

Expand Down Expand Up @@ -145,12 +146,18 @@ public void initChannel(SocketChannel ch) {
}

@Override
public void sendAfterProtocol(ByteBuf byteBuf) throws Exception {
if(sendAfterProtocol == null) {
sendAfterProtocol = new LinkedList<>();
public synchronized void sendAfterProtocol(ByteBuf byteBuf) throws Exception {
if (null == sendAfterProtocol) {
if (isClosed()) {
logger.info("[sendAfterProtocol][already closed] skip");
} else {
logger.info("[sendAfterProtocol][unexpected fail] close");
release();
}
} else {
if (logger.isDebugEnabled()) logger.debug("[sendAfterProtocol][{}] {}", getSessionId(), ByteBufUtil.prettyHexDump(byteBuf));
sendAfterProtocol.offer(byteBuf.retain());
}
logger.debug("[sendAfterProtocol] {}", ByteBufUtil.prettyHexDump(byteBuf));
sendAfterProtocol.offer(byteBuf.retain());
}

protected void onChannelEstablished(Channel channel) {
Expand Down Expand Up @@ -203,10 +210,29 @@ protected void doSetSessionState(SessionState newState) {
logger.info("[setSessionState][Backend] Session state change from {} to {} ({})", oldState, newState, getSessionMeta());
EventMonitor.DEFAULT.logEvent(SESSION_STATE_CHANGE, String.format("[Backend]%s->%s", oldState.toString(), newState.toString()),
Collections.singletonMap("channel", ChannelUtil.getDesc(getChannel())));
if (isClosed()) {
releaseByteBufSendAfterProtocol();
}
notifyObservers(new SessionStateChangeEvent(oldState, newState));
}
}



private void releaseByteBufSendAfterProtocol() {
if (null == sendAfterProtocol) return;

synchronized (this) {
if (null == sendAfterProtocol) return;
logger.info("[release][{}] release sendAfterProtocol", getSessionId());
while(!sendAfterProtocol.isEmpty()) {
ByteBuf byteBuf = sendAfterProtocol.poll();
ReferenceCountUtil.release(byteBuf);
}
sendAfterProtocol = null;
}
}

@Override
public SESSION_TYPE getSessionType() {
return SESSION_TYPE.BACKEND;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.proxy.session.state;

import com.ctrip.xpipe.redis.proxy.Session;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import com.ctrip.xpipe.redis.proxy.session.SessionState;
import io.netty.buffer.ByteBuf;

Expand All @@ -27,7 +28,7 @@ protected SessionState doNextAfterFail() {

@Override
public void tryWrite(ByteBuf byteBuf) {
throw new UnsupportedOperationException("Session's been closed");
throw new WriteToClosedSessionException("Session's been closed");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.proxy.session.state;

import com.ctrip.xpipe.redis.proxy.Session;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import com.ctrip.xpipe.redis.proxy.session.SessionState;
import io.netty.buffer.ByteBuf;

Expand All @@ -27,7 +28,7 @@ protected SessionState doNextAfterFail() {

@Override
public void tryWrite(ByteBuf byteBuf) {
throw new UnsupportedOperationException("No write, Session closing");
throw new WriteToClosedSessionException("No write, Session closing");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.ctrip.xpipe.redis.proxy.AbstractNettyTest;
import com.ctrip.xpipe.redis.proxy.Session;
import com.ctrip.xpipe.redis.proxy.tunnel.DefaultTunnel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand All @@ -28,7 +29,12 @@ public class AbstractSessionNettyHandlerTest extends AbstractNettyTest {
@Mock
private Session session;

private AbstractSessionNettyHandler handler = new AbstractSessionNettyHandler(){};
private AbstractSessionNettyHandler handler = new AbstractSessionNettyHandler(){
@Override
protected void doMsgTransfer(ByteBuf msg) {

}
};

@Before
public void beforeAbstractSessionNettyHandlerTest() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.proxy.handler;

import com.ctrip.xpipe.redis.proxy.AbstractNettyTest;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import com.ctrip.xpipe.redis.proxy.session.DefaultBackendSession;
import com.ctrip.xpipe.redis.proxy.session.DefaultFrontendSession;
import com.ctrip.xpipe.redis.proxy.tunnel.DefaultTunnel;
Expand Down Expand Up @@ -67,4 +68,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
verify(tunnel).forwardToFrontend(any());
}

@Test
public void testByteBufReleasedAfterPipelineBroken() {
Throwable th = new WriteToClosedSessionException("session closed");
doThrow(th).when(tunnel).forwardToFrontend(any());

ByteBuf byteBuf = Unpooled.copiedBuffer("test".getBytes());
channel.writeInbound(byteBuf);
Assert.assertEquals(0, byteBuf.refCnt());
Assert.assertFalse(channel.isOpen());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import com.ctrip.xpipe.redis.core.proxy.parser.DefaultProxyConnectProtocolParser;
import com.ctrip.xpipe.redis.proxy.AbstractNettyTest;
import com.ctrip.xpipe.redis.proxy.exception.ResourceIncorrectException;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import com.ctrip.xpipe.redis.proxy.session.DefaultFrontendSession;
import com.ctrip.xpipe.redis.proxy.tunnel.DefaultTunnel;
import com.ctrip.xpipe.redis.proxy.tunnel.DefaultTunnelManager;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -104,4 +106,15 @@ public void testChannelRead3() {
verify(tunnel, never()).forwardToBackend(any());
}

@Test
public void testByteBufReleasedAfterPipelineBroken() {
Throwable th = new WriteToClosedSessionException("session closed");
doThrow(th).when(tunnel).forwardToBackend(any());

ByteBuf byteBuf = Unpooled.copiedBuffer("test".getBytes());
channel.writeInbound(byteBuf);
Assert.assertEquals(0, byteBuf.refCnt());
Assert.assertFalse(channel.isOpen());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.redis.proxy.AbstractRedisProxyServerTest;
import com.ctrip.xpipe.redis.proxy.Session;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import com.ctrip.xpipe.redis.proxy.session.SessionState;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
Expand Down Expand Up @@ -37,7 +38,7 @@ public void testDoNextAfterFail() {
Assert.assertEquals(new SessionClosed(frontend), sessionClosed.nextAfterFail());
}

@Test(expected = UnsupportedOperationException.class)
@Test(expected = WriteToClosedSessionException.class)
public void testTryWrite() {
ByteBuf byteBuf = new UnpooledByteBufAllocator(false).buffer();
byteBuf.setBytes(0, "+OK\r\n".getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.redis.proxy.AbstractRedisProxyServerTest;
import com.ctrip.xpipe.redis.proxy.Session;
import com.ctrip.xpipe.redis.proxy.exception.WriteToClosedSessionException;
import com.ctrip.xpipe.redis.proxy.session.SessionState;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
Expand Down Expand Up @@ -37,7 +38,7 @@ public void testDoNextAfterFail() {
Assert.assertEquals(new SessionClosing(frontend), seesionClosing.nextAfterFail());
}

@Test(expected = UnsupportedOperationException.class)
@Test(expected = WriteToClosedSessionException.class)
public void testTryWrite() {
ByteBuf byteBuf = new UnpooledByteBufAllocator(false).buffer();
byteBuf.setBytes(0, "+OK\r\n".getBytes());
Expand Down

0 comments on commit 8545726

Please sign in to comment.