Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For HTTP/1 unary mode, use Content-Length instead of chunk #13979

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,27 @@
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;

import java.util.List;
import java.util.Map;

public abstract class AbstractServerHttpChannelObserver implements CustomizableHttpChannelObserver<Object> {

private final HttpChannel httpChannel;

private HeadersCustomizer headersCustomizer = HeadersCustomizer.NO_OP;

private TrailersCustomizer trailersCustomizer = TrailersCustomizer.NO_OP;

private ErrorResponseCustomizer errorResponseCustomizer = ErrorResponseCustomizer.NO_OP;

private final HttpChannel httpChannel;
private HttpMessageEncoder responseEncoder;

private boolean headerSent;

private HttpMessageEncoder responseEncoder;

public AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
this.httpChannel = httpChannel;
}

public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
this.responseEncoder = responseEncoder;
}

public HttpMessageEncoder getResponseEncoder() {
return responseEncoder;
@Override
public HttpChannel getHttpChannel() {
return httpChannel;
}

@Override
Expand All @@ -65,107 +59,137 @@ public void setErrorResponseCustomizer(ErrorResponseCustomizer errorResponseCust
this.errorResponseCustomizer = errorResponseCustomizer;
}

protected HeadersCustomizer getHeadersCustomizer() {
return headersCustomizer;
public HttpMessageEncoder getResponseEncoder() {
return responseEncoder;
}

protected TrailersCustomizer getTrailersCustomizer() {
return trailersCustomizer;
public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
this.responseEncoder = responseEncoder;
}

@Override
public void onNext(Object data) {
public final void onNext(Object data) {
try {
if (data instanceof HttpResult) {
HttpResult<?> result = (HttpResult<?>) data;
if (!headerSent) {
doSendHeaders(String.valueOf(result.getStatus()), result.getHeaders());
}
data = result.getBody();
} else if (!headerSent) {
doSendHeaders(HttpStatus.OK.getStatusString(), null);
}
HttpOutputMessage outputMessage = encodeHttpOutputMessage(data);
preOutputMessage(outputMessage);
responseEncoder.encode(outputMessage.getBody(), data);
getHttpChannel().writeMessage(outputMessage);
postOutputMessage(outputMessage);
doOnNext(data);
} catch (Throwable e) {
onError(e);
}
}

protected void preOutputMessage(HttpOutputMessage outputMessage) throws Throwable {}

protected void postOutputMessage(HttpOutputMessage outputMessage) throws Throwable {}

protected abstract HttpMetadata encodeHttpMetadata();

protected HttpOutputMessage encodeHttpOutputMessage(Object data) {
return getHttpChannel().newOutputMessage();
}

protected HttpMetadata encodeTrailers(Throwable throwable) {
return null;
protected void doOnNext(Object data) throws Throwable {
if (!headerSent) {
sendHeader(buildMetadata(resolveStatusCode(data), data, null));
}
sendMessage(buildMessage(data));
}

@Override
public void onError(Throwable throwable) {
public final void onError(Throwable throwable) {
if (throwable instanceof HttpResultPayloadException) {
onNext(((HttpResultPayloadException) throwable).getResult());
return;
}
int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode();
if (throwable instanceof HttpStatusException) {
httpStatusCode = ((HttpStatusException) throwable).getStatusCode();
}
if (!headerSent) {
doSendHeaders(String.valueOf(httpStatusCode), null);
}
try {
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setStatus(String.valueOf(httpStatusCode));
errorResponse.setMessage(throwable.getMessage());
errorResponseCustomizer.accept(errorResponse, throwable);
HttpOutputMessage httpOutputMessage = encodeHttpOutputMessage(errorResponse);
responseEncoder.encode(httpOutputMessage.getBody(), errorResponse);
getHttpChannel().writeMessage(httpOutputMessage);
doOnError(throwable);
} catch (Throwable ex) {
throwable = new EncodeException(ex);
} finally {
doOnCompleted(throwable);
}
}

protected void doOnError(Throwable throwable) throws Throwable {
String statusCode = resolveStatusCode(throwable);
Object data = buildErrorResponse(statusCode, throwable);
if (!headerSent) {
sendHeader(buildMetadata(statusCode, data, null));
}
sendMessage(buildMessage(data));
}

@Override
public void onCompleted() {
public final void onCompleted() {
doOnCompleted(null);
}

@Override
public HttpChannel getHttpChannel() {
return httpChannel;
protected void doOnCompleted(Throwable throwable) {
HttpMetadata httpMetadata = encodeTrailers(throwable);
if (httpMetadata == null) {
return;
}
trailersCustomizer.accept(httpMetadata.headers(), throwable);
getHttpChannel().writeHeader(httpMetadata);
}

protected HttpMetadata encodeTrailers(Throwable throwable) {
return null;
}

private void doSendHeaders(String statusCode, Map<String, List<String>> additionalHeaders) {
protected HttpOutputMessage encodeHttpOutputMessage(Object data) {
return getHttpChannel().newOutputMessage();
}

protected abstract HttpMetadata encodeHttpMetadata();

protected void preOutputMessage(HttpOutputMessage outputMessage) throws Throwable {}

protected void postOutputMessage(HttpOutputMessage outputMessage) throws Throwable {}

protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) {}

protected final String resolveStatusCode(Object data) {
return data instanceof HttpResult
? String.valueOf(((HttpResult<?>) data).getStatus())
: HttpStatus.OK.getStatusString();
}

protected final String resolveStatusCode(Throwable throwable) {
return throwable instanceof HttpStatusException
? String.valueOf(((HttpStatusException) throwable).getStatusCode())
: HttpStatus.INTERNAL_SERVER_ERROR.getStatusString();
}

protected final ErrorResponse buildErrorResponse(String statusCode, Throwable throwable) {
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setStatus(statusCode);
errorResponse.setMessage(throwable.getMessage());
errorResponseCustomizer.accept(errorResponse, throwable);
return errorResponse;
}

protected final HttpOutputMessage buildMessage(Object data) throws Throwable {
if (data instanceof HttpResult) {
data = ((HttpResult<?>) data).getBody();
}
HttpOutputMessage outputMessage = encodeHttpOutputMessage(data);
preOutputMessage(outputMessage);
responseEncoder.encode(outputMessage.getBody(), data);
return outputMessage;
}

protected final void sendMessage(HttpOutputMessage outputMessage) throws Throwable {
getHttpChannel().writeMessage(outputMessage);
postOutputMessage(outputMessage);
}

protected final HttpMetadata buildMetadata(String statusCode, Object data, HttpOutputMessage httpOutputMessage) {
HttpMetadata httpMetadata = encodeHttpMetadata();
HttpHeaders headers = httpMetadata.headers();
headers.set(HttpHeaderNames.STATUS.getName(), statusCode);
headers.set(HttpHeaderNames.CONTENT_TYPE.getName(), responseEncoder.contentType());
headersCustomizer.accept(headers);
if (additionalHeaders != null) {
headers.putAll(additionalHeaders);
if (data instanceof HttpResult) {
HttpResult<?> result = (HttpResult<?>) data;
if (result.getHeaders() != null) {
headers.putAll(result.getHeaders());
}
}
getHttpChannel().writeHeader(httpMetadata);
headerSent = true;
preMetadata(httpMetadata, httpOutputMessage);
headersCustomizer.accept(headers);
return httpMetadata;
}

protected void doOnCompleted(Throwable throwable) {
HttpMetadata httpMetadata = encodeTrailers(throwable);
if (httpMetadata == null) {
return;
}
trailersCustomizer.accept(httpMetadata.headers(), throwable);
protected final void sendHeader(HttpMetadata httpMetadata) {
getHttpChannel().writeHeader(httpMetadata);
headerSent = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver;
import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpChannelObserver;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
Expand All @@ -34,7 +33,6 @@ public Http1ServerChannelObserver(HttpChannel httpChannel) {
@Override
protected HttpMetadata encodeHttpMetadata() {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(), "chunked");
return new Http1Metadata(httpHeaders);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.dubbo.remoting.http12.h1;

import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;

import java.io.IOException;
Expand All @@ -33,6 +36,13 @@ public Http1ServerStreamChannelObserver(HttpChannel httpChannel) {
super(httpChannel);
}

@Override
protected HttpMetadata encodeHttpMetadata() {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(), "chunked");
return new Http1Metadata(httpHeaders);
}

@Override
protected void preOutputMessage(HttpOutputMessage httpMessage) throws IOException {
HttpOutputMessage httpOutputMessage = this.getHttpChannel().newOutputMessage();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12.h1;

import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;

import java.io.OutputStream;

import io.netty.buffer.ByteBufOutputStream;

public class Http1ServerUnaryChannelObserver extends Http1ServerChannelObserver {

public Http1ServerUnaryChannelObserver(HttpChannel httpChannel) {
super(httpChannel);
}

@Override
protected void doOnNext(Object data) throws Throwable {
HttpOutputMessage httpOutputMessage = buildMessage(data);
sendHeader(buildMetadata(resolveStatusCode(data), data, httpOutputMessage));
sendMessage(httpOutputMessage);
}

@Override
protected void doOnError(Throwable throwable) throws Throwable {
String statusCode = resolveStatusCode(throwable);
Object data = buildErrorResponse(statusCode, throwable);
HttpOutputMessage httpOutputMessage = buildMessage(data);
sendHeader(buildMetadata(statusCode, data, httpOutputMessage));
sendMessage(httpOutputMessage);
}

@Override
protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) {
OutputStream body = outputMessage.getBody();
if (body instanceof ByteBufOutputStream) {
int contentLength = ((ByteBufOutputStream) body).writtenBytes();
httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(), String.valueOf(contentLength));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerStreamChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
import org.apache.dubbo.remoting.http12.h1.Http1ServerUnaryChannelObserver;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
Expand Down Expand Up @@ -58,7 +59,7 @@ public DefaultHttp11ServerTransportListener(HttpChannel httpChannel, URL url, Fr
executorSupport = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel())
.getExecutorSupport(url);
this.httpChannel = httpChannel;
serverChannelObserver = new Http1ServerChannelObserver(httpChannel);
serverChannelObserver = new Http1ServerUnaryChannelObserver(httpChannel);
serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE);
}

Expand Down
Loading