Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Triple http limiting the size of the HTTP request and response #14246

Merged
merged 12 commits into from
Jun 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public abstract class AbstractServerHttpChannelObserver implements CustomizableH

private boolean headerSent;

private boolean completed;

private boolean closed;

protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
this.httpChannel = httpChannel;
}
Expand Down Expand Up @@ -69,6 +73,9 @@ public void setResponseEncoder(HttpMessageEncoder responseEncoder) {

@Override
public final void onNext(Object data) {
if (closed) {
return;
}
try {
doOnNext(data);
} catch (Throwable e) {
Expand All @@ -85,17 +92,20 @@ protected void doOnNext(Object data) throws Throwable {

@Override
public final void onError(Throwable throwable) {
if (closed) {
return;
}
if (throwable instanceof HttpResultPayloadException) {
onNext(((HttpResultPayloadException) throwable).getResult());
doOnCompleted(null);
onCompleted(null);
return;
}
try {
doOnError(throwable);
} catch (Throwable ex) {
throwable = new EncodeException(ex);
} finally {
doOnCompleted(throwable);
onCompleted(throwable);
}
}

Expand All @@ -110,7 +120,17 @@ protected void doOnError(Throwable throwable) throws Throwable {

@Override
public final void onCompleted() {
doOnCompleted(null);
if (closed) {
return;
}
onCompleted(null);
}

private void onCompleted(Throwable throwable) {
if (!completed) {
doOnCompleted(throwable);
completed = true;
}
}

protected void doOnCompleted(Throwable throwable) {
Expand Down Expand Up @@ -198,4 +218,13 @@ protected final void sendHeader(HttpMetadata httpMetadata) {
getHttpChannel().writeHeader(httpMetadata);
headerSent = true;
}

@Override
public void close() throws Exception {
closed();
}

protected final void closed() {
closed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.dubbo.common.stream.StreamObserver;

public interface HttpChannelObserver<T> extends StreamObserver<T> {
public interface HttpChannelObserver<T> extends StreamObserver<T>, AutoCloseable {

HttpChannel getHttpChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.dubbo.remoting.http12;

public interface HttpTransportListener<HEADER extends HttpMetadata, MESSAGE extends HttpInputMessage> {
public interface HttpTransportListener<HEADER extends HttpMetadata, MESSAGE extends HttpInputMessage>
extends AutoCloseable {

void onMetadata(HEADER metadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@ public static String readPostValue(InterfaceHttpData item) {
}

public static HttpRequest.FileUpload readUpload(InterfaceHttpData item) {
return new DefaultFileUploadAdaptee((FileUpload) item);
return new DefaultFileUploadAdapter((FileUpload) item);
}

private static class DefaultFileUploadAdaptee implements HttpRequest.FileUpload {
private static class DefaultFileUploadAdapter implements HttpRequest.FileUpload {
private final FileUpload fu;
private InputStream inputStream;

DefaultFileUploadAdaptee(FileUpload fu) {
DefaultFileUploadAdapter(FileUpload fu) {
this.fu = fu;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12;

import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException;

import java.io.IOException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;

public class LimitedByteBufOutputStream extends ByteBufOutputStream {

private final int capacity;

public LimitedByteBufOutputStream(ByteBuf byteBuf, int capacity) {
super(byteBuf);
this.capacity = capacity == 0 ? Integer.MAX_VALUE : capacity;
}

@Override
public void write(int b) throws IOException {
ensureCapacity(1);
super.write(b);
}

@Override
public void write(byte[] b) throws IOException {
ensureCapacity(b.length);
super.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
ensureCapacity(len);
super.write(b, off, len);
}

private void ensureCapacity(int len) {
if (writtenBytes() + len > capacity) {
throw new HttpOverPayloadException("Response Entity Too Large");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12.exception;

public class HttpOverPayloadException extends HttpStatusException {

public HttpOverPayloadException(String message) {
super(500, message);
finefuture marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public class Http2ServerChannelObserver extends AbstractServerHttpChannelObserve

private boolean autoRequestN = true;

private boolean closed = false;

public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) {
super(h2StreamChannel);
}
Expand Down Expand Up @@ -78,7 +76,7 @@ public CancellationContext getCancellationContext() {
public void cancel(Throwable throwable) {
if (throwable instanceof CancelStreamException) {
if (((CancelStreamException) throwable).isCancelByRemote()) {
closed = true;
closed();
}
}
this.cancellationContext.cancel(throwable);
Expand All @@ -89,22 +87,6 @@ public void cancel(Throwable throwable) {
getHttpChannel().writeResetFrame(errorCode);
}

@Override
public void doOnNext(Object data) throws Throwable {
if (closed) {
return;
}
super.doOnNext(data);
}

@Override
public void doOnError(Throwable throwable) throws Throwable {
if (closed) {
return;
}
super.doOnError(throwable);
}

@Override
public void request(int count) {
this.streamingDecoder.request(count);
Expand All @@ -120,8 +102,9 @@ public boolean isAutoRequestN() {
return autoRequestN;
}

public void onStreamClosed() {
closed = true;
@Override
public void close() throws Exception {
super.close();
streamingDecoder.onStreamClosed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,4 @@
*/
package org.apache.dubbo.remoting.http12.h2;

public interface Http2TransportListener extends CancelableTransportListener<Http2Header, Http2InputMessage> {

void onStreamClosed();
}
public interface Http2TransportListener extends CancelableTransportListener<Http2Header, Http2InputMessage> {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;

Expand Down Expand Up @@ -49,6 +50,8 @@ public void encode(OutputStream os, Object data, Charset charset) throws EncodeE
public Object decode(InputStream is, Class<?> targetType, Charset charset) throws DecodeException {
try {
return StreamUtils.readBytes(is);
} catch (HttpStatusException e) {
throw e;
} catch (Exception e) {
throw new DecodeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;

Expand Down Expand Up @@ -48,6 +49,8 @@ public Object decode(InputStream is, Class<?> targetType, Charset charset) throw
if (targetType == String.class) {
return StreamUtils.toString(is, charset);
}
} catch (HttpStatusException e) {
throw e;
} catch (Exception e) {
throw new DecodeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;

Expand All @@ -35,6 +36,8 @@ public class JsonCodec implements HttpMessageCodec {
public void encode(OutputStream os, Object data, Charset charset) throws EncodeException {
try {
os.write(JsonUtils.toJson(data).getBytes(charset));
} catch (HttpStatusException e) {
throw e;
} catch (Throwable t) {
throw new EncodeException("Error encoding json", t);
}
Expand All @@ -43,6 +46,8 @@ public void encode(OutputStream os, Object data, Charset charset) throws EncodeE
public void encode(OutputStream os, Object[] data, Charset charset) throws EncodeException {
try {
os.write(JsonUtils.toJson(data).getBytes(charset));
} catch (HttpStatusException e) {
throw e;
} catch (Throwable t) {
throw new EncodeException("Error encoding json", t);
}
Expand All @@ -52,6 +57,8 @@ public void encode(OutputStream os, Object[] data, Charset charset) throws Encod
public Object decode(InputStream is, Class<?> targetType, Charset charset) throws DecodeException {
try {
return JsonUtils.toJavaObject(StreamUtils.toString(is, charset), targetType);
} catch (HttpStatusException e) {
throw e;
} catch (Throwable t) {
throw new DecodeException("Error decoding json", t);
}
Expand All @@ -78,7 +85,7 @@ public Object[] decode(InputStream is, Class<?>[] targetTypes, Charset charset)
return new Object[] {JsonUtils.convertObject(obj, targetTypes[0])};
}
throw new DecodeException("Json must be array");
} catch (DecodeException e) {
} catch (HttpStatusException e) {
throw e;
} catch (Throwable t) {
throw new DecodeException("Error decoding json", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.utils.MethodUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -54,6 +55,8 @@ public Object decode(InputStream is, Class<?> targetType, Charset charset) throw
JsonFormat.parser().ignoringUnknownFields().merge(StreamUtils.toString(is, charset), newBuilder);
return newBuilder.build();
}
} catch (HttpStatusException e) {
throw e;
} catch (Throwable e) {
throw new DecodeException("Error decoding jsonPb", e);
}
Expand All @@ -67,6 +70,8 @@ public Object[] decode(InputStream is, Class<?>[] targetTypes, Charset charset)
// protobuf only support one parameter
return new Object[] {decode(is, targetTypes[0], charset)};
}
} catch (HttpStatusException e) {
throw e;
} catch (Throwable e) {
throw new DecodeException("Error decoding jsonPb", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;

Expand Down Expand Up @@ -51,6 +52,8 @@ public Object decode(InputStream is, Class<?> targetType, Charset charset) throw
if (targetType == String.class) {
return StreamUtils.toString(is, charset);
}
} catch (HttpStatusException e) {
throw e;
} catch (Exception e) {
throw new DecodeException(e);
}
Expand Down
Loading
Loading