Skip to content

Commit

Permalink
Fix memory leak (apache#14127)
Browse files Browse the repository at this point in the history
  • Loading branch information
finefuture authored May 8, 2024
1 parent 1686383 commit 487fa4f
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,13 @@ protected final HttpOutputMessage buildMessage(Object data) throws Throwable {
data = ((HttpResult<?>) data).getBody();
}
HttpOutputMessage outputMessage = encodeHttpOutputMessage(data);
preOutputMessage(outputMessage);
responseEncoder.encode(outputMessage.getBody(), data);
try {
preOutputMessage(outputMessage);
responseEncoder.encode(outputMessage.getBody(), data);
} catch (Throwable t) {
outputMessage.close();
throw t;
}
return outputMessage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
*/
package org.apache.dubbo.remoting.http12;

import java.io.IOException;
import java.io.InputStream;

public interface HttpInputMessage {
public interface HttpInputMessage extends AutoCloseable {

InputStream getBody();

@Override
default void close() throws IOException {
getBody().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.dubbo.remoting.http12;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;

public interface HttpOutputMessage {
public interface HttpOutputMessage extends AutoCloseable {

HttpOutputMessage EMPTY_MESSAGE = new HttpOutputMessage() {

Expand All @@ -32,4 +33,9 @@ public OutputStream getBody() {
};

OutputStream getBody();

@Override
default void close() throws IOException {
getBody().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.remoting.http12.HttpInputMessage;
import org.apache.dubbo.remoting.http12.RequestMetadata;

import java.io.IOException;
import java.io.InputStream;

public class DefaultHttp1Request implements Http1Request {
Expand Down Expand Up @@ -52,4 +53,9 @@ public String method() {
public String path() {
return httpMetadata.path();
}

@Override
public void close() throws IOException {
httpInputMessage.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.remoting.http12.HttpInputMessage;
import org.apache.dubbo.remoting.http12.HttpMetadata;

import java.io.IOException;
import java.io.InputStream;

public class DefaultHttp1Response implements HttpMetadata, HttpInputMessage {
Expand All @@ -42,4 +43,9 @@ public InputStream getBody() {
public HttpHeaders headers() {
return httpMetadata.headers();
}

@Override
public void close() throws IOException {
httpInputMessage.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import org.apache.dubbo.remoting.http12.HttpOutputMessage;

import java.io.IOException;
import java.io.OutputStream;

import io.netty.buffer.ByteBufOutputStream;

public class Http1OutputMessage implements HttpOutputMessage {

private final OutputStream outputStream;
Expand All @@ -32,4 +35,12 @@ public Http1OutputMessage(OutputStream outputStream) {
public OutputStream getBody() {
return outputStream;
}

@Override
public void close() throws IOException {
if (outputStream instanceof ByteBufOutputStream) {
((ByteBufOutputStream) outputStream).buffer().release();
}
outputStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.dubbo.remoting.http12.h2;

import java.io.IOException;
import java.io.OutputStream;

import io.netty.buffer.ByteBufOutputStream;

public class Http2OutputMessageFrame implements Http2OutputMessage {

private final OutputStream body;
Expand All @@ -42,6 +45,14 @@ public OutputStream getBody() {
return body;
}

@Override
public void close() throws IOException {
if (body instanceof ByteBufOutputStream) {
((ByteBufOutputStream) body).buffer().release();
}
body.close();
}

@Override
public boolean isEndStream() {
return endStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ public void onData(MESSAGE message) {
doOnData(message);
} catch (Throwable t) {
logError(t);
onError(t);
onError(message, t);
} finally {
onFinally(message);
}
});
}
Expand Down Expand Up @@ -184,6 +186,18 @@ protected void onError(Throwable throwable) {
throw new HttpStatusException(HttpStatus.INTERNAL_SERVER_ERROR.getCode(), throwable);
}

protected void onError(MESSAGE message, Throwable throwable) {
onError(throwable);
}

protected void onFinally(MESSAGE message) {
try {
message.close();
} catch (Exception e) {
onError(e);
}
}

protected RpcInvocation buildRpcInvocation(RpcInvocationBuildContext context) {
MethodDescriptor methodDescriptor = context.getMethodDescriptor();
if (methodDescriptor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.MethodMetadata;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
Expand Down Expand Up @@ -122,6 +123,19 @@ protected RpcInvocation onBuildRpcInvocationCompletion(RpcInvocation invocation)
return invocation;
}

@Override
protected void onError(Http2InputMessage message, Throwable throwable) {
try {
message.close();
} catch (Exception e) {
throwable.addSuppressed(e);
}
onError(throwable);
}

@Override
protected void onFinally(Http2InputMessage message) {}

@Override
protected GrpcStreamingDecoder getStreamingDecoder() {
return (GrpcStreamingDecoder) super.getStreamingDecoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ protected Executor initializeExecutor(Http2Header metadata) {
return new SerializingExecutor(executorSupport.getExecutor(metadata));
}

@Override
protected void doOnMetadata(Http2Header metadata) {
if (metadata.isEndStream()) {
if (!HttpMethods.supportBody(metadata.method())) {
Expand Down Expand Up @@ -164,7 +165,7 @@ protected void onMetadataCompletion(Http2Header metadata) {
@Override
protected void onDataCompletion(Http2InputMessage message) {
if (message.isEndStream()) {
serverCallListener.onComplete();
getStreamingDecoder().close();
}
}

Expand Down

0 comments on commit 487fa4f

Please sign in to comment.