From 1fbe8755bd2c88e7965e5e83849f3ca8aa7bf176 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Fri, 22 Mar 2024 14:36:36 +0800 Subject: [PATCH 1/7] For HTTP/1 unary mode, use Content-Length instead of chunk --- .../AbstractServerHttpChannelObserver.java | 10 ++- .../http12/h1/Http1ServerChannelObserver.java | 89 ++++++++++++++++++- .../h1/Http1ServerStreamChannelObserver.java | 15 ++++ 3 files changed, 110 insertions(+), 4 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java index a4086cc08f2..c3aff30eba5 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java @@ -34,11 +34,11 @@ public abstract class AbstractServerHttpChannelObserver implements CustomizableH private final HttpChannel httpChannel; - private boolean headerSent; + protected boolean headerSent; private HttpMessageEncoder responseEncoder; - public AbstractServerHttpChannelObserver(HttpChannel httpChannel) { + protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) { this.httpChannel = httpChannel; } @@ -73,6 +73,10 @@ protected TrailersCustomizer getTrailersCustomizer() { return trailersCustomizer; } + protected ErrorResponseCustomizer getErrorResponseCustomizer() { + return errorResponseCustomizer; + } + @Override public void onNext(Object data) { try { @@ -147,7 +151,7 @@ public HttpChannel getHttpChannel() { return httpChannel; } - private void doSendHeaders(String statusCode, Map> additionalHeaders) { + protected void doSendHeaders(String statusCode, Map> additionalHeaders) { HttpMetadata httpMetadata = encodeHttpMetadata(); HttpHeaders headers = httpMetadata.headers(); headers.set(HttpHeaderNames.STATUS.getName(), statusCode); diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java index 6d92b865541..13dd7f8af61 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java @@ -17,12 +17,25 @@ package org.apache.dubbo.remoting.http12.h1; import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver; +import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; +import org.apache.dubbo.remoting.http12.ErrorResponse; import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpChannelObserver; import org.apache.dubbo.remoting.http12.HttpHeaderNames; import org.apache.dubbo.remoting.http12.HttpHeaders; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import org.apache.dubbo.remoting.http12.HttpResult; +import org.apache.dubbo.remoting.http12.HttpStatus; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.ByteBufOutputStream; public class Http1ServerChannelObserver extends AbstractServerHttpChannelObserver implements HttpChannelObserver { @@ -31,10 +44,69 @@ public Http1ServerChannelObserver(HttpChannel httpChannel) { super(httpChannel); } + @Override + public void onNext(Object data) { + if (supportChunk()) { + super.onNext(data); + return; + } + try { + String status = HttpStatus.OK.getStatusString(); + Map> additionalHeaders = null; + if (data instanceof HttpResult) { + HttpResult result = (HttpResult) data; + data = result.getBody(); + status = String.valueOf(result.getStatus()); + additionalHeaders = result.getHeaders(); + } + HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); + preOutputMessage(outputMessage); + getResponseEncoder().encode(outputMessage.getBody(), data); + if (!headerSent) { + doSendHeaders(status, buildAdditionalHeaders(outputMessage, additionalHeaders)); + } + getHttpChannel().writeMessage(outputMessage); + postOutputMessage(outputMessage); + } catch (Throwable e) { + onError(e); + } + } + + @Override + public void onError(Throwable throwable) { + if (supportChunk()) { + super.onError(throwable); + return; + } + if (throwable instanceof HttpResultPayloadException) { + onNext(((HttpResultPayloadException) throwable).getResult()); + return; + } + int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode(); + if (throwable instanceof HttpStatusException) { + httpStatusCode = ((HttpStatusException) throwable).getStatusCode(); + } + try { + ErrorResponse errorResponse = new ErrorResponse(); + errorResponse.setStatus(String.valueOf(httpStatusCode)); + errorResponse.setMessage(throwable.getMessage()); + getErrorResponseCustomizer().accept(errorResponse, throwable); + HttpOutputMessage httpOutputMessage = encodeHttpOutputMessage(errorResponse); + getResponseEncoder().encode(httpOutputMessage.getBody(), errorResponse); + if (!headerSent) { + doSendHeaders(String.valueOf(httpStatusCode), buildAdditionalHeaders(httpOutputMessage, null)); + } + getHttpChannel().writeMessage(httpOutputMessage); + } catch (Throwable ex) { + throwable = new EncodeException(ex); + } finally { + doOnCompleted(throwable); + } + } + @Override protected HttpMetadata encodeHttpMetadata() { HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(), "chunked"); return new Http1Metadata(httpHeaders); } @@ -43,4 +115,19 @@ protected void doOnCompleted(Throwable throwable) { super.doOnCompleted(throwable); this.getHttpChannel().writeMessage(HttpOutputMessage.EMPTY_MESSAGE); } + + private Map> buildAdditionalHeaders( + HttpOutputMessage outputMessage, Map> additionalHeaders) { + int contentLength = ((ByteBufOutputStream) outputMessage.getBody()).writtenBytes(); + if (additionalHeaders == null) { + additionalHeaders = new HashMap<>(); + } + additionalHeaders.put( + HttpHeaderNames.CONTENT_LENGTH.getName(), Collections.singletonList(String.valueOf(contentLength))); + return additionalHeaders; + } + + protected boolean supportChunk() { + return false; + } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java index e5ee7aff442..01a0f69e7ca 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java @@ -17,6 +17,9 @@ package org.apache.dubbo.remoting.http12.h1; import org.apache.dubbo.remoting.http12.HttpChannel; +import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpHeaders; +import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; import java.io.IOException; @@ -33,6 +36,13 @@ public Http1ServerStreamChannelObserver(HttpChannel httpChannel) { super(httpChannel); } + @Override + protected HttpMetadata encodeHttpMetadata() { + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(), "chunked"); + return new Http1Metadata(httpHeaders); + } + @Override protected void preOutputMessage(HttpOutputMessage httpMessage) throws IOException { HttpOutputMessage httpOutputMessage = this.getHttpChannel().newOutputMessage(); @@ -48,4 +58,9 @@ protected void postOutputMessage(HttpOutputMessage httpMessage) throws IOExcepti httpOutputMessage.getBody().write(SERVER_SENT_EVENT_LF_BYTES, 0, SERVER_SENT_EVENT_LF_BYTES.length); this.getHttpChannel().writeMessage(httpOutputMessage); } + + @Override + protected boolean supportChunk() { + return true; + } } From 26cb2b3bc2eb94469a6145b2db63673a1f92bccf Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Fri, 22 Mar 2024 14:45:13 +0800 Subject: [PATCH 2/7] Fix format issue --- .../remoting/http12/h1/Http1ServerChannelObserver.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java index 13dd7f8af61..a5d3712afcc 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java @@ -17,9 +17,6 @@ package org.apache.dubbo.remoting.http12.h1; import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver; -import org.apache.dubbo.remoting.http12.exception.EncodeException; -import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; -import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.ErrorResponse; import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpChannelObserver; @@ -29,6 +26,9 @@ import org.apache.dubbo.remoting.http12.HttpOutputMessage; import org.apache.dubbo.remoting.http12.HttpResult; import org.apache.dubbo.remoting.http12.HttpStatus; +import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import java.util.Collections; import java.util.HashMap; From f78659a00737922a7fc4e81b9a30143fe97d621a Mon Sep 17 00:00:00 2001 From: TomlongTK <18779116352@163.com> Date: Mon, 25 Mar 2024 15:23:59 +0800 Subject: [PATCH 3/7] Http1 unary --- .../http12/h1/Http1ServerChannelObserver.java | 89 --------------- .../h1/Http1ServerStreamChannelObserver.java | 5 - .../h1/Http1ServerUnaryChannelObserver.java | 104 ++++++++++++++++++ .../DefaultHttp11ServerTransportListener.java | 5 +- 4 files changed, 107 insertions(+), 96 deletions(-) create mode 100644 dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java index a5d3712afcc..e62fbeb8040 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java @@ -17,25 +17,11 @@ package org.apache.dubbo.remoting.http12.h1; import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver; -import org.apache.dubbo.remoting.http12.ErrorResponse; import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpChannelObserver; -import org.apache.dubbo.remoting.http12.HttpHeaderNames; import org.apache.dubbo.remoting.http12.HttpHeaders; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; -import org.apache.dubbo.remoting.http12.HttpResult; -import org.apache.dubbo.remoting.http12.HttpStatus; -import org.apache.dubbo.remoting.http12.exception.EncodeException; -import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; -import org.apache.dubbo.remoting.http12.exception.HttpStatusException; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.netty.buffer.ByteBufOutputStream; public class Http1ServerChannelObserver extends AbstractServerHttpChannelObserver implements HttpChannelObserver { @@ -44,66 +30,6 @@ public Http1ServerChannelObserver(HttpChannel httpChannel) { super(httpChannel); } - @Override - public void onNext(Object data) { - if (supportChunk()) { - super.onNext(data); - return; - } - try { - String status = HttpStatus.OK.getStatusString(); - Map> additionalHeaders = null; - if (data instanceof HttpResult) { - HttpResult result = (HttpResult) data; - data = result.getBody(); - status = String.valueOf(result.getStatus()); - additionalHeaders = result.getHeaders(); - } - HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); - preOutputMessage(outputMessage); - getResponseEncoder().encode(outputMessage.getBody(), data); - if (!headerSent) { - doSendHeaders(status, buildAdditionalHeaders(outputMessage, additionalHeaders)); - } - getHttpChannel().writeMessage(outputMessage); - postOutputMessage(outputMessage); - } catch (Throwable e) { - onError(e); - } - } - - @Override - public void onError(Throwable throwable) { - if (supportChunk()) { - super.onError(throwable); - return; - } - if (throwable instanceof HttpResultPayloadException) { - onNext(((HttpResultPayloadException) throwable).getResult()); - return; - } - int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode(); - if (throwable instanceof HttpStatusException) { - httpStatusCode = ((HttpStatusException) throwable).getStatusCode(); - } - try { - ErrorResponse errorResponse = new ErrorResponse(); - errorResponse.setStatus(String.valueOf(httpStatusCode)); - errorResponse.setMessage(throwable.getMessage()); - getErrorResponseCustomizer().accept(errorResponse, throwable); - HttpOutputMessage httpOutputMessage = encodeHttpOutputMessage(errorResponse); - getResponseEncoder().encode(httpOutputMessage.getBody(), errorResponse); - if (!headerSent) { - doSendHeaders(String.valueOf(httpStatusCode), buildAdditionalHeaders(httpOutputMessage, null)); - } - getHttpChannel().writeMessage(httpOutputMessage); - } catch (Throwable ex) { - throwable = new EncodeException(ex); - } finally { - doOnCompleted(throwable); - } - } - @Override protected HttpMetadata encodeHttpMetadata() { HttpHeaders httpHeaders = new HttpHeaders(); @@ -115,19 +41,4 @@ protected void doOnCompleted(Throwable throwable) { super.doOnCompleted(throwable); this.getHttpChannel().writeMessage(HttpOutputMessage.EMPTY_MESSAGE); } - - private Map> buildAdditionalHeaders( - HttpOutputMessage outputMessage, Map> additionalHeaders) { - int contentLength = ((ByteBufOutputStream) outputMessage.getBody()).writtenBytes(); - if (additionalHeaders == null) { - additionalHeaders = new HashMap<>(); - } - additionalHeaders.put( - HttpHeaderNames.CONTENT_LENGTH.getName(), Collections.singletonList(String.valueOf(contentLength))); - return additionalHeaders; - } - - protected boolean supportChunk() { - return false; - } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java index 01a0f69e7ca..8c3af9d6842 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java @@ -58,9 +58,4 @@ protected void postOutputMessage(HttpOutputMessage httpMessage) throws IOExcepti httpOutputMessage.getBody().write(SERVER_SENT_EVENT_LF_BYTES, 0, SERVER_SENT_EVENT_LF_BYTES.length); this.getHttpChannel().writeMessage(httpOutputMessage); } - - @Override - protected boolean supportChunk() { - return true; - } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java new file mode 100644 index 00000000000..a95ccdf82bc --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http12.h1; + +import org.apache.dubbo.remoting.http12.ErrorResponse; +import org.apache.dubbo.remoting.http12.HttpChannel; +import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import org.apache.dubbo.remoting.http12.HttpResult; +import org.apache.dubbo.remoting.http12.HttpStatus; +import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.ByteBufOutputStream; + +public class Http1ServerUnaryChannelObserver extends Http1ServerChannelObserver { + + public Http1ServerUnaryChannelObserver(HttpChannel httpChannel) { + super(httpChannel); + } + + @Override + public void onNext(Object data) { + try { + String status = HttpStatus.OK.getStatusString(); + Map> additionalHeaders = null; + if (data instanceof HttpResult) { + HttpResult result = (HttpResult) data; + data = result.getBody(); + status = String.valueOf(result.getStatus()); + additionalHeaders = result.getHeaders(); + } + HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); + preOutputMessage(outputMessage); + getResponseEncoder().encode(outputMessage.getBody(), data); + if (!headerSent) { + doSendHeaders(status, buildAdditionalHeaders(outputMessage, additionalHeaders)); + } + getHttpChannel().writeMessage(outputMessage); + postOutputMessage(outputMessage); + } catch (Throwable e) { + onError(e); + } + } + + @Override + public void onError(Throwable throwable) { + if (throwable instanceof HttpResultPayloadException) { + onNext(((HttpResultPayloadException) throwable).getResult()); + return; + } + int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode(); + if (throwable instanceof HttpStatusException) { + httpStatusCode = ((HttpStatusException) throwable).getStatusCode(); + } + try { + ErrorResponse errorResponse = new ErrorResponse(); + errorResponse.setStatus(String.valueOf(httpStatusCode)); + errorResponse.setMessage(throwable.getMessage()); + getErrorResponseCustomizer().accept(errorResponse, throwable); + HttpOutputMessage httpOutputMessage = encodeHttpOutputMessage(errorResponse); + getResponseEncoder().encode(httpOutputMessage.getBody(), errorResponse); + if (!headerSent) { + doSendHeaders(String.valueOf(httpStatusCode), buildAdditionalHeaders(httpOutputMessage, null)); + } + getHttpChannel().writeMessage(httpOutputMessage); + } catch (Throwable ex) { + throwable = new EncodeException(ex); + } finally { + doOnCompleted(throwable); + } + } + + private Map> buildAdditionalHeaders( + HttpOutputMessage outputMessage, Map> additionalHeaders) { + int contentLength = ((ByteBufOutputStream) outputMessage.getBody()).writtenBytes(); + if (additionalHeaders == null) { + additionalHeaders = new HashMap<>(); + } + additionalHeaders.put( + HttpHeaderNames.CONTENT_LENGTH.getName(), Collections.singletonList(String.valueOf(contentLength))); + return additionalHeaders; + } +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java index 11f3012c22a..99a06a15621 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java @@ -27,6 +27,7 @@ import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver; import org.apache.dubbo.remoting.http12.h1.Http1ServerStreamChannelObserver; import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener; +import org.apache.dubbo.remoting.http12.h1.Http1ServerUnaryChannelObserver; import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder; import org.apache.dubbo.remoting.http12.message.MediaType; import org.apache.dubbo.remoting.http12.message.codec.JsonCodec; @@ -58,8 +59,6 @@ public DefaultHttp11ServerTransportListener(HttpChannel httpChannel, URL url, Fr executorSupport = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()) .getExecutorSupport(url); this.httpChannel = httpChannel; - serverChannelObserver = new Http1ServerChannelObserver(httpChannel); - serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); } @Override @@ -85,6 +84,8 @@ private ServerCallListener startListener( RpcInvocation invocation, MethodDescriptor methodDescriptor, Invoker invoker) { switch (methodDescriptor.getRpcType()) { case UNARY: + serverChannelObserver = new Http1ServerUnaryChannelObserver(httpChannel); + serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); return new AutoCompleteUnaryServerCallListener(invocation, invoker, serverChannelObserver); case SERVER_STREAM: serverChannelObserver = new Http1ServerStreamChannelObserver(httpChannel); From 52acf1e77829ba18c02afdb7fee98d9e856cdd5b Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Tue, 26 Mar 2024 14:25:21 +0800 Subject: [PATCH 4/7] Fix unit test --- .../tri/h12/http1/DefaultHttp11ServerTransportListener.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java index 99a06a15621..950a2d93506 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java @@ -59,6 +59,8 @@ public DefaultHttp11ServerTransportListener(HttpChannel httpChannel, URL url, Fr executorSupport = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()) .getExecutorSupport(url); this.httpChannel = httpChannel; + serverChannelObserver = new Http1ServerUnaryChannelObserver(httpChannel); + serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); } @Override @@ -84,8 +86,6 @@ private ServerCallListener startListener( RpcInvocation invocation, MethodDescriptor methodDescriptor, Invoker invoker) { switch (methodDescriptor.getRpcType()) { case UNARY: - serverChannelObserver = new Http1ServerUnaryChannelObserver(httpChannel); - serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); return new AutoCompleteUnaryServerCallListener(invocation, invoker, serverChannelObserver); case SERVER_STREAM: serverChannelObserver = new Http1ServerStreamChannelObserver(httpChannel); From e6aeb99dd1f0e9fce7b0482e8a73a8c4b59dee23 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Wed, 27 Mar 2024 19:43:04 +0800 Subject: [PATCH 5/7] refine --- .../AbstractServerHttpChannelObserver.java | 115 +++++++++++------- .../h1/Http1ServerUnaryChannelObserver.java | 84 +++---------- 2 files changed, 93 insertions(+), 106 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java index c3aff30eba5..10ab093b88c 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java @@ -21,9 +21,6 @@ import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder; -import java.util.List; -import java.util.Map; - public abstract class AbstractServerHttpChannelObserver implements CustomizableHttpChannelObserver { private HeadersCustomizer headersCustomizer = HeadersCustomizer.NO_OP; @@ -34,7 +31,7 @@ public abstract class AbstractServerHttpChannelObserver implements CustomizableH private final HttpChannel httpChannel; - protected boolean headerSent; + private boolean headerSent; private HttpMessageEncoder responseEncoder; @@ -73,32 +70,24 @@ protected TrailersCustomizer getTrailersCustomizer() { return trailersCustomizer; } - protected ErrorResponseCustomizer getErrorResponseCustomizer() { - return errorResponseCustomizer; - } - @Override public void onNext(Object data) { try { - if (data instanceof HttpResult) { - HttpResult result = (HttpResult) data; - if (!headerSent) { - doSendHeaders(String.valueOf(result.getStatus()), result.getHeaders()); - } - data = result.getBody(); - } else if (!headerSent) { - doSendHeaders(HttpStatus.OK.getStatusString(), null); - } - HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); - preOutputMessage(outputMessage); - responseEncoder.encode(outputMessage.getBody(), data); - getHttpChannel().writeMessage(outputMessage); - postOutputMessage(outputMessage); + doOnNext(data); } catch (Throwable e) { onError(e); } } + protected void doOnNext(Object data) throws Throwable { + if (!headerSent) { + HttpMetadata httpMetadata = buildMetadata(httpStatusCode(data, false)); + sendHeader(httpMetadata, data); + } + HttpOutputMessage outputMessage = encodeData(data); + sendData(outputMessage); + } + protected void preOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} protected void postOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} @@ -119,21 +108,8 @@ public void onError(Throwable throwable) { onNext(((HttpResultPayloadException) throwable).getResult()); return; } - int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode(); - if (throwable instanceof HttpStatusException) { - httpStatusCode = ((HttpStatusException) throwable).getStatusCode(); - } - if (!headerSent) { - doSendHeaders(String.valueOf(httpStatusCode), null); - } try { - ErrorResponse errorResponse = new ErrorResponse(); - errorResponse.setStatus(String.valueOf(httpStatusCode)); - errorResponse.setMessage(throwable.getMessage()); - errorResponseCustomizer.accept(errorResponse, throwable); - HttpOutputMessage httpOutputMessage = encodeHttpOutputMessage(errorResponse); - responseEncoder.encode(httpOutputMessage.getBody(), errorResponse); - getHttpChannel().writeMessage(httpOutputMessage); + doOnError(throwable); } catch (Throwable ex) { throwable = new EncodeException(ex); } finally { @@ -141,6 +117,16 @@ public void onError(Throwable throwable) { } } + protected void doOnError(Throwable throwable) throws Throwable { + String httpStatusCode = httpStatusCode(throwable, true); + if (!headerSent) { + HttpMetadata httpMetadata = buildMetadata(httpStatusCode); + sendHeader(httpMetadata, null); + } + HttpOutputMessage httpOutputMessage = encodeData(buildErrorResponse(httpStatusCode, throwable)); + sendData(httpOutputMessage); + } + @Override public void onCompleted() { doOnCompleted(null); @@ -151,17 +137,62 @@ public HttpChannel getHttpChannel() { return httpChannel; } - protected void doSendHeaders(String statusCode, Map> additionalHeaders) { + protected HttpOutputMessage encodeData(Object data) throws Throwable { + if (data instanceof HttpResult) { + data = ((HttpResult) data).getBody(); + } + HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); + preOutputMessage(outputMessage); + responseEncoder.encode(outputMessage.getBody(), data); + return outputMessage; + } + + protected void sendData(HttpOutputMessage outputMessage) throws Throwable { + getHttpChannel().writeMessage(outputMessage); + postOutputMessage(outputMessage); + } + + protected void sendHeader(HttpMetadata httpMetadata, Object data) { + if (data instanceof HttpResult) { + HttpResult result = (HttpResult) data; + if (result.getHeaders() != null) { + httpMetadata.headers().putAll(result.getHeaders()); + } + } + getHttpChannel().writeHeader(httpMetadata); + headerSent = true; + } + + protected String httpStatusCode(Object data, boolean isError) { + String httpStatusCode = HttpStatus.OK.getStatusString(); + if (data instanceof HttpResult) { + httpStatusCode = String.valueOf(((HttpResult) data).getStatus()); + } else if (isError) { + Throwable throwable = (Throwable) data; + int errorCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode(); + if (throwable instanceof HttpStatusException) { + errorCode = ((HttpStatusException) throwable).getStatusCode(); + } + httpStatusCode = String.valueOf(errorCode); + } + return httpStatusCode; + } + + protected ErrorResponse buildErrorResponse(String statusCode, Throwable throwable) { + ErrorResponse errorResponse = new ErrorResponse(); + errorResponse.setStatus(statusCode); + errorResponse.setMessage(throwable.getMessage()); + errorResponseCustomizer.accept(errorResponse, throwable); + return errorResponse; + } + + protected HttpMetadata buildMetadata(String statusCode) { HttpMetadata httpMetadata = encodeHttpMetadata(); HttpHeaders headers = httpMetadata.headers(); headers.set(HttpHeaderNames.STATUS.getName(), statusCode); headers.set(HttpHeaderNames.CONTENT_TYPE.getName(), responseEncoder.contentType()); headersCustomizer.accept(headers); - if (additionalHeaders != null) { - headers.putAll(additionalHeaders); - } - getHttpChannel().writeHeader(httpMetadata); - headerSent = true; + return httpMetadata; } protected void doOnCompleted(Throwable throwable) { diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java index a95ccdf82bc..cf89ad15838 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java @@ -16,20 +16,12 @@ */ package org.apache.dubbo.remoting.http12.h1; -import org.apache.dubbo.remoting.http12.ErrorResponse; import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; -import org.apache.dubbo.remoting.http12.HttpResult; -import org.apache.dubbo.remoting.http12.HttpStatus; -import org.apache.dubbo.remoting.http12.exception.EncodeException; -import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; -import org.apache.dubbo.remoting.http12.exception.HttpStatusException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.io.OutputStream; import io.netty.buffer.ByteBufOutputStream; @@ -40,65 +32,29 @@ public Http1ServerUnaryChannelObserver(HttpChannel httpChannel) { } @Override - public void onNext(Object data) { - try { - String status = HttpStatus.OK.getStatusString(); - Map> additionalHeaders = null; - if (data instanceof HttpResult) { - HttpResult result = (HttpResult) data; - data = result.getBody(); - status = String.valueOf(result.getStatus()); - additionalHeaders = result.getHeaders(); - } - HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); - preOutputMessage(outputMessage); - getResponseEncoder().encode(outputMessage.getBody(), data); - if (!headerSent) { - doSendHeaders(status, buildAdditionalHeaders(outputMessage, additionalHeaders)); - } - getHttpChannel().writeMessage(outputMessage); - postOutputMessage(outputMessage); - } catch (Throwable e) { - onError(e); - } + protected void doOnNext(Object data) throws Throwable { + HttpOutputMessage httpOutputMessage = encodeData(data); + HttpMetadata httpMetadata = buildMetadata(httpStatusCode(data, false)); + preMetadata(httpMetadata, httpOutputMessage); + sendHeader(httpMetadata, data); + sendData(httpOutputMessage); } @Override - public void onError(Throwable throwable) { - if (throwable instanceof HttpResultPayloadException) { - onNext(((HttpResultPayloadException) throwable).getResult()); - return; - } - int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode(); - if (throwable instanceof HttpStatusException) { - httpStatusCode = ((HttpStatusException) throwable).getStatusCode(); - } - try { - ErrorResponse errorResponse = new ErrorResponse(); - errorResponse.setStatus(String.valueOf(httpStatusCode)); - errorResponse.setMessage(throwable.getMessage()); - getErrorResponseCustomizer().accept(errorResponse, throwable); - HttpOutputMessage httpOutputMessage = encodeHttpOutputMessage(errorResponse); - getResponseEncoder().encode(httpOutputMessage.getBody(), errorResponse); - if (!headerSent) { - doSendHeaders(String.valueOf(httpStatusCode), buildAdditionalHeaders(httpOutputMessage, null)); - } - getHttpChannel().writeMessage(httpOutputMessage); - } catch (Throwable ex) { - throwable = new EncodeException(ex); - } finally { - doOnCompleted(throwable); - } + protected void doOnError(Throwable throwable) throws Throwable { + String httpStatusCode = httpStatusCode(throwable, true); + HttpOutputMessage httpOutputMessage = encodeData(buildErrorResponse(httpStatusCode, throwable)); + HttpMetadata httpMetadata = buildMetadata(httpStatusCode); + preMetadata(httpMetadata, httpOutputMessage); + sendHeader(httpMetadata, null); + sendData(httpOutputMessage); } - private Map> buildAdditionalHeaders( - HttpOutputMessage outputMessage, Map> additionalHeaders) { - int contentLength = ((ByteBufOutputStream) outputMessage.getBody()).writtenBytes(); - if (additionalHeaders == null) { - additionalHeaders = new HashMap<>(); + private void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) { + OutputStream body = outputMessage.getBody(); + if (body instanceof ByteBufOutputStream) { + int contentLength = ((ByteBufOutputStream) body).writtenBytes(); + httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(), String.valueOf(contentLength)); } - additionalHeaders.put( - HttpHeaderNames.CONTENT_LENGTH.getName(), Collections.singletonList(String.valueOf(contentLength))); - return additionalHeaders; } } From e37f8b361a1356474d1505e91d1a713bbb4c4c2e Mon Sep 17 00:00:00 2001 From: Sean Yang Date: Tue, 2 Apr 2024 09:37:13 +0800 Subject: [PATCH 6/7] refine --- .../AbstractServerHttpChannelObserver.java | 154 ++++++++---------- .../h1/Http1ServerUnaryChannelObserver.java | 22 ++- 2 files changed, 81 insertions(+), 95 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java index 10ab093b88c..6621e7f3929 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java @@ -23,28 +23,25 @@ public abstract class AbstractServerHttpChannelObserver implements CustomizableHttpChannelObserver { + private final HttpChannel httpChannel; + private HeadersCustomizer headersCustomizer = HeadersCustomizer.NO_OP; private TrailersCustomizer trailersCustomizer = TrailersCustomizer.NO_OP; private ErrorResponseCustomizer errorResponseCustomizer = ErrorResponseCustomizer.NO_OP; - private final HttpChannel httpChannel; + private HttpMessageEncoder responseEncoder; private boolean headerSent; - private HttpMessageEncoder responseEncoder; - protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) { this.httpChannel = httpChannel; } - public void setResponseEncoder(HttpMessageEncoder responseEncoder) { - this.responseEncoder = responseEncoder; - } - - public HttpMessageEncoder getResponseEncoder() { - return responseEncoder; + @Override + public HttpChannel getHttpChannel() { + return httpChannel; } @Override @@ -62,16 +59,16 @@ public void setErrorResponseCustomizer(ErrorResponseCustomizer errorResponseCust this.errorResponseCustomizer = errorResponseCustomizer; } - protected HeadersCustomizer getHeadersCustomizer() { - return headersCustomizer; + public HttpMessageEncoder getResponseEncoder() { + return responseEncoder; } - protected TrailersCustomizer getTrailersCustomizer() { - return trailersCustomizer; + public void setResponseEncoder(HttpMessageEncoder responseEncoder) { + this.responseEncoder = responseEncoder; } @Override - public void onNext(Object data) { + public final void onNext(Object data) { try { doOnNext(data); } catch (Throwable e) { @@ -81,29 +78,13 @@ public void onNext(Object data) { protected void doOnNext(Object data) throws Throwable { if (!headerSent) { - HttpMetadata httpMetadata = buildMetadata(httpStatusCode(data, false)); - sendHeader(httpMetadata, data); + sendHeader(buildMetadata(resolveStatusCode(data), data, null)); } - HttpOutputMessage outputMessage = encodeData(data); - sendData(outputMessage); - } - - protected void preOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} - - protected void postOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} - - protected abstract HttpMetadata encodeHttpMetadata(); - - protected HttpOutputMessage encodeHttpOutputMessage(Object data) { - return getHttpChannel().newOutputMessage(); - } - - protected HttpMetadata encodeTrailers(Throwable throwable) { - return null; + sendMessage(buildMessage(data)); } @Override - public void onError(Throwable throwable) { + public final void onError(Throwable throwable) { if (throwable instanceof HttpResultPayloadException) { onNext(((HttpResultPayloadException) throwable).getResult()); return; @@ -118,67 +99,56 @@ public void onError(Throwable throwable) { } protected void doOnError(Throwable throwable) throws Throwable { - String httpStatusCode = httpStatusCode(throwable, true); + String statusCode = resolveStatusCode(throwable); + Object data = buildErrorResponse(statusCode, throwable); if (!headerSent) { - HttpMetadata httpMetadata = buildMetadata(httpStatusCode); - sendHeader(httpMetadata, null); + sendHeader(buildMetadata(statusCode, data, null)); } - HttpOutputMessage httpOutputMessage = encodeData(buildErrorResponse(httpStatusCode, throwable)); - sendData(httpOutputMessage); + sendMessage(buildMessage(data)); } @Override - public void onCompleted() { + public final void onCompleted() { doOnCompleted(null); } - @Override - public HttpChannel getHttpChannel() { - return httpChannel; + protected void doOnCompleted(Throwable throwable) { + HttpMetadata httpMetadata = encodeTrailers(throwable); + if (httpMetadata == null) { + return; + } + trailersCustomizer.accept(httpMetadata.headers(), throwable); + getHttpChannel().writeHeader(httpMetadata); } - protected HttpOutputMessage encodeData(Object data) throws Throwable { - if (data instanceof HttpResult) { - data = ((HttpResult) data).getBody(); - } - HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); - preOutputMessage(outputMessage); - responseEncoder.encode(outputMessage.getBody(), data); - return outputMessage; + protected HttpMetadata encodeTrailers(Throwable throwable) { + return null; } - protected void sendData(HttpOutputMessage outputMessage) throws Throwable { - getHttpChannel().writeMessage(outputMessage); - postOutputMessage(outputMessage); + protected HttpOutputMessage encodeHttpOutputMessage(Object data) { + return getHttpChannel().newOutputMessage(); } - protected void sendHeader(HttpMetadata httpMetadata, Object data) { - if (data instanceof HttpResult) { - HttpResult result = (HttpResult) data; - if (result.getHeaders() != null) { - httpMetadata.headers().putAll(result.getHeaders()); - } - } - getHttpChannel().writeHeader(httpMetadata); - headerSent = true; + protected abstract HttpMetadata encodeHttpMetadata(); + + protected void preOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} + + protected void postOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} + + protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) {} + + protected final String resolveStatusCode(Object data) { + return String.valueOf(data instanceof HttpResult ? ((HttpResult) data).getStatus() : HttpStatus.OK); } - protected String httpStatusCode(Object data, boolean isError) { - String httpStatusCode = HttpStatus.OK.getStatusString(); - if (data instanceof HttpResult) { - httpStatusCode = String.valueOf(((HttpResult) data).getStatus()); - } else if (isError) { - Throwable throwable = (Throwable) data; - int errorCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode(); - if (throwable instanceof HttpStatusException) { - errorCode = ((HttpStatusException) throwable).getStatusCode(); - } - httpStatusCode = String.valueOf(errorCode); - } - return httpStatusCode; + protected final String resolveStatusCode(Throwable throwable) { + return String.valueOf( + throwable instanceof HttpStatusException + ? ((HttpStatusException) throwable).getStatusCode() + : HttpStatus.INTERNAL_SERVER_ERROR.getCode()); } - protected ErrorResponse buildErrorResponse(String statusCode, Throwable throwable) { + protected final ErrorResponse buildErrorResponse(String statusCode, Throwable throwable) { ErrorResponse errorResponse = new ErrorResponse(); errorResponse.setStatus(statusCode); errorResponse.setMessage(throwable.getMessage()); @@ -186,21 +156,39 @@ protected ErrorResponse buildErrorResponse(String statusCode, Throwable throwabl return errorResponse; } - protected HttpMetadata buildMetadata(String statusCode) { + protected final HttpOutputMessage buildMessage(Object data) throws Throwable { + if (data instanceof HttpResult) { + data = ((HttpResult) data).getBody(); + } + HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); + preOutputMessage(outputMessage); + responseEncoder.encode(outputMessage.getBody(), data); + return outputMessage; + } + + protected final void sendMessage(HttpOutputMessage outputMessage) throws Throwable { + getHttpChannel().writeMessage(outputMessage); + postOutputMessage(outputMessage); + } + + protected final HttpMetadata buildMetadata(String statusCode, Object data, HttpOutputMessage httpOutputMessage) { HttpMetadata httpMetadata = encodeHttpMetadata(); HttpHeaders headers = httpMetadata.headers(); headers.set(HttpHeaderNames.STATUS.getName(), statusCode); headers.set(HttpHeaderNames.CONTENT_TYPE.getName(), responseEncoder.contentType()); + if (data instanceof HttpResult) { + HttpResult result = (HttpResult) data; + if (result.getHeaders() != null) { + headers.putAll(result.getHeaders()); + } + } + preMetadata(httpMetadata, httpOutputMessage); headersCustomizer.accept(headers); return httpMetadata; } - protected void doOnCompleted(Throwable throwable) { - HttpMetadata httpMetadata = encodeTrailers(throwable); - if (httpMetadata == null) { - return; - } - trailersCustomizer.accept(httpMetadata.headers(), throwable); + protected final void sendHeader(HttpMetadata httpMetadata) { getHttpChannel().writeHeader(httpMetadata); + headerSent = true; } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java index cf89ad15838..8cb2a36edd2 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java @@ -33,24 +33,22 @@ public Http1ServerUnaryChannelObserver(HttpChannel httpChannel) { @Override protected void doOnNext(Object data) throws Throwable { - HttpOutputMessage httpOutputMessage = encodeData(data); - HttpMetadata httpMetadata = buildMetadata(httpStatusCode(data, false)); - preMetadata(httpMetadata, httpOutputMessage); - sendHeader(httpMetadata, data); - sendData(httpOutputMessage); + HttpOutputMessage httpOutputMessage = buildMessage(data); + sendHeader(buildMetadata(resolveStatusCode(data), data, httpOutputMessage)); + sendMessage(httpOutputMessage); } @Override protected void doOnError(Throwable throwable) throws Throwable { - String httpStatusCode = httpStatusCode(throwable, true); - HttpOutputMessage httpOutputMessage = encodeData(buildErrorResponse(httpStatusCode, throwable)); - HttpMetadata httpMetadata = buildMetadata(httpStatusCode); - preMetadata(httpMetadata, httpOutputMessage); - sendHeader(httpMetadata, null); - sendData(httpOutputMessage); + String statusCode = resolveStatusCode(throwable); + Object data = buildErrorResponse(statusCode, throwable); + HttpOutputMessage httpOutputMessage = buildMessage(data); + sendHeader(buildMetadata(statusCode, data, httpOutputMessage)); + sendMessage(httpOutputMessage); } - private void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) { + @Override + protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) { OutputStream body = outputMessage.getBody(); if (body instanceof ByteBufOutputStream) { int contentLength = ((ByteBufOutputStream) body).writtenBytes(); From cdc214c764340fb5cfee553f2a3e6e6b7db15d37 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Tue, 2 Apr 2024 17:51:06 +0800 Subject: [PATCH 7/7] Fix status --- .../http12/AbstractServerHttpChannelObserver.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java index 6621e7f3929..515afb2d503 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java @@ -138,14 +138,15 @@ protected void postOutputMessage(HttpOutputMessage outputMessage) throws Throwab protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) {} protected final String resolveStatusCode(Object data) { - return String.valueOf(data instanceof HttpResult ? ((HttpResult) data).getStatus() : HttpStatus.OK); + return data instanceof HttpResult + ? String.valueOf(((HttpResult) data).getStatus()) + : HttpStatus.OK.getStatusString(); } protected final String resolveStatusCode(Throwable throwable) { - return String.valueOf( - throwable instanceof HttpStatusException - ? ((HttpStatusException) throwable).getStatusCode() - : HttpStatus.INTERNAL_SERVER_ERROR.getCode()); + return throwable instanceof HttpStatusException + ? String.valueOf(((HttpStatusException) throwable).getStatusCode()) + : HttpStatus.INTERNAL_SERVER_ERROR.getStatusString(); } protected final ErrorResponse buildErrorResponse(String statusCode, Throwable throwable) {