Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support passing detect context to help build netty handlers #12460

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ public QosDetector(FrameworkModel frameworkModel) {
@Override
public Result detect(ChannelBuffer in) {
if(!QosEnableFlag) {
return Result.UNRECOGNIZED;
return Result.unrecognized();
}
Result h1Res = qosHTTP1Detector.detect(in);
if(h1Res.equals(Result.RECOGNIZED)) {
if(h1Res.equals(Result.recognized())) {
return h1Res;
}
Result telRes = telnetDetector.detect(in);
if(telRes.equals(Result.RECOGNIZED)) {
if(telRes.equals(Result.recognized())) {
return telRes;
}
if(h1Res.equals(Result.NEED_MORE_DATA) || telRes.equals(Result.NEED_MORE_DATA)) {
return Result.NEED_MORE_DATA;
if(h1Res.equals(Result.needMoreData()) || telRes.equals(Result.needMoreData())) {
return Result.needMoreData();
}
return Result.UNRECOGNIZED;
return Result.unrecognized();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ private static boolean isHttp(int magic) {
@Override
public Result detect(ChannelBuffer in) {
if (in.readableBytes() < 2) {
return Result.NEED_MORE_DATA;
return Result.needMoreData();
}
final int magic = in.getByte(in.readerIndex());
// h2 starts with "PR"
if (isHttp(magic) && in.getByte(in.readerIndex()+1) != 'R' ){
return Result.RECOGNIZED;
return Result.recognized();
}
return Result.UNRECOGNIZED;
return Result.unrecognized();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ public TelnetDetector(FrameworkModel frameworkModel) {
@Override
public Result detect(ChannelBuffer in) {
if (in.readableBytes() >= MaxSize) {
return Result.UNRECOGNIZED;
return Result.unrecognized();
}
Result resCommand = commandDetect(in);
if (resCommand.equals(Result.RECOGNIZED)) {
if (resCommand.equals(Result.recognized())) {
return resCommand;
}
Result resAyt = telnetAytDetect(in);
if (resAyt.equals(Result.RECOGNIZED)) {
if (resAyt.equals(Result.recognized())) {
return resAyt;
}
if (resAyt.equals(Result.UNRECOGNIZED) && resCommand.equals(Result.UNRECOGNIZED)) {
return Result.UNRECOGNIZED;
if (resAyt.equals(Result.unrecognized()) && resCommand.equals(Result.unrecognized())) {
return Result.unrecognized();
}
return Result.NEED_MORE_DATA;
return Result.needMoreData();
}

private Result commandDetect(ChannelBuffer in) {
Expand All @@ -75,26 +75,26 @@ private Result commandDetect(ChannelBuffer in) {
s = s.trim();
CommandContext commandContext = TelnetCommandDecoder.decode(s);
if (frameworkModel.getExtensionLoader(BaseCommand.class).hasExtension(commandContext.getCommandName())) {
return Result.RECOGNIZED;
return Result.recognized();
}
return Result.UNRECOGNIZED;
return Result.unrecognized();
}

private Result telnetAytDetect(ChannelBuffer in) {
// detect if remote channel send a telnet ayt command to server
int prefaceLen = AytPreface.readableBytes();
int bytesRead = min(in.readableBytes(), prefaceLen);
if (bytesRead == 0 || !ChannelBuffers.prefixEquals(in, AytPreface, bytesRead)) {
return Result.UNRECOGNIZED;
return Result.unrecognized();
}
if (bytesRead == prefaceLen) {
// we need to consume preface because it's not a qos command
// consume and remember to mark, pu server handler reset reader index
in.readBytes(AytPreface.readableBytes());
in.markReaderIndex();
return Result.RECOGNIZED;
return Result.recognized();
}
return Result.NEED_MORE_DATA;
return Result.needMoreData();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,59 @@

import org.apache.dubbo.remoting.buffer.ChannelBuffer;

import java.util.HashMap;
import java.util.Map;


/**
* Determine incoming bytes belong to the specific protocol.
*
*/
public interface ProtocolDetector {

Result detect(ChannelBuffer in);

enum Result {
class Result {

private final Flag flag;

private final Map<String, String> detectContext = new HashMap<>(4);

private Result(Flag flag) {
this.flag = flag;
}

public void setAttribute(String key, String value) {
this.detectContext.put(key, value);
}

public String getAttribute(String key) {
return this.detectContext.get(key);
}

public void removeAttribute(String key) {
this.detectContext.remove(key);
}

public Flag flag() {
return flag;
}

public static Result recognized(){
return new Result(Flag.RECOGNIZED);
}


public static Result unrecognized(){
return new Result(Flag.UNRECOGNIZED);
}


public static Result needMoreData(){
return new Result(Flag.NEED_MORE_DATA);
}
}

enum Flag {
RECOGNIZED, UNRECOGNIZED, NEED_MORE_DATA
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package org.apache.dubbo.remoting.api.pu;

import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.api.ProtocolDetector;

import java.util.List;

public interface ChannelOperator {
void configChannelHandler(List<ChannelHandler> handlerList);

ProtocolDetector.Result detectResult();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.dubbo.remoting.Codec;
import org.apache.dubbo.remoting.Codec2;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.ProtocolDetector;
import org.apache.dubbo.remoting.api.pu.ChannelHandlerPretender;
import org.apache.dubbo.remoting.api.pu.ChannelOperator;
import org.apache.dubbo.remoting.api.pu.DefaultCodec;
Expand All @@ -36,6 +37,8 @@ public class NettyConfigOperator implements ChannelOperator {
private final Channel channel;
private ChannelHandler handler;

private ProtocolDetector.Result detectResult;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where will this be used? I don't seem to see a place to use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the triple protocol to support both http1 and http2


public NettyConfigOperator(NettyChannel channel, ChannelHandler handler) {
this.channel = channel;
this.handler = handler;
Expand Down Expand Up @@ -91,6 +94,15 @@ public void configChannelHandler(List<ChannelHandler> handlerList) {
}
}

public void setDetectResult(ProtocolDetector.Result detectResult) {
this.detectResult = detectResult;
}

@Override
public ProtocolDetector.Result detectResult() {
return detectResult;
}

private boolean isClientSide(Channel channel) {
return channel.getUrl().getSide("").equalsIgnoreCase(CommonConstants.CONSUMER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
ChannelBuffer buf = new NettyBackedChannelBuffer(in);
final ProtocolDetector.Result result = protocol.detector().detect(buf);
in.resetReaderIndex();
switch (result) {
switch (result.flag()) {
case UNRECOGNIZED:
continue;
case RECOGNIZED:
Expand All @@ -117,6 +117,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
URL localURL = this.urlMapper.getOrDefault(protocolName, url);
channel.setUrl(localURL);
NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler);
operator.setDetectResult(result);
protocol.configServerProtocolHandler(url, operator);
ctx.pipeline().remove(this);
case NEED_MORE_DATA:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public Result detect(ChannelBuffer in) {
int bytesRead = min(in.readableBytes(), prefaceLen);

if (bytesRead ==0 || !ChannelBuffers.prefixEquals(in, Preface, bytesRead)) {
return Result.UNRECOGNIZED;
return Result.unrecognized();
}
if (bytesRead == prefaceLen) {
return Result.RECOGNIZED;
return Result.recognized();
}

return Result.NEED_MORE_DATA;
return Result.needMoreData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public Result detect(ChannelBuffer in) {

// If the input so far doesn't match the preface, break the connection.
if (bytesRead == 0 || !ChannelBuffers.prefixEquals(in, clientPrefaceString, bytesRead)) {
return Result.UNRECOGNIZED;
return Result.unrecognized();
}
if (bytesRead == prefaceLen) {
return Result.RECOGNIZED;
return Result.recognized();
}
return Result.NEED_MORE_DATA;
return Result.needMoreData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ void testDetect() {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
ChannelBuffer in = new ByteBufferBackedChannelBuffer(byteBuf.nioBuffer());
ProtocolDetector.Result result = detector.detect(in);
Assertions.assertEquals(result, ProtocolDetector.Result.UNRECOGNIZED);
Assertions.assertEquals(result.flag(), ProtocolDetector.Result.unrecognized().flag());

byteBuf.writeBytes(connectionPrefaceBuf);
result = detector.detect(new ByteBufferBackedChannelBuffer(byteBuf.nioBuffer()));
Assertions.assertEquals(result, ProtocolDetector.Result.RECOGNIZED);
Assertions.assertEquals(result.flag(), ProtocolDetector.Result.recognized().flag());

byteBuf.clear();
byteBuf.writeBytes(connectionPrefaceBuf, 0, 1);
result = detector.detect(new ByteBufferBackedChannelBuffer(byteBuf.nioBuffer()));
Assertions.assertEquals(result, ProtocolDetector.Result.NEED_MORE_DATA);
Assertions.assertEquals(result.flag(), ProtocolDetector.Result.needMoreData().flag());

}
}