Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
panxiaojun233 committed Dec 23, 2020
1 parent c8526b5 commit 47ac5a3
Show file tree
Hide file tree
Showing 21 changed files with 1,388 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ public void configServerPipeline(ChannelHandlerContext ctx) {
.frameListener(frameListener())
.build();
p.addLast(handler);
configServerPipeline0(ctx);
}

abstract Http2FrameListener frameListener();
protected abstract Http2FrameListener frameListener();

protected abstract void configServerPipeline0(ChannelHandlerContext ctx);

@Override
public void configClientPipeline(ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package org.apache.dubbo.rpc.protocol.grpc;

import io.netty.util.AsciiString;

public class GrpcElf {
public static final AsciiString GRPC_STATUS = AsciiString.cached("grpc-status");
public static final AsciiString GRPC_MESSAGE = AsciiString.cached("grpc-message");
public static final AsciiString GRPC_ENCODING = AsciiString.cached("grpc-encoding");
public static final AsciiString GRPC_TIMEOUT = AsciiString.cached("grpc-timeout");
public static final AsciiString GRPC_ACCEPT_ENCODING = AsciiString.cached("grpc-accept-encoding");

public static final AsciiString APPLICATION_GRPC = AsciiString.cached("application/grpc");
public static final AsciiString TEXT_MIME = AsciiString.cached("text/plain; encoding=utf-8");
public static final AsciiString GRPC_JSON = AsciiString.cached("application/grpc+json");
public static final AsciiString GRPC_PROTO = AsciiString.cached("application/grpc+proto");


/**
* Indicates whether or not the given value is a valid gRPC content-type.
*/
public static boolean isGrpcContentType(CharSequence contentType) {
if (contentType == null) {
return false;
}

if (APPLICATION_GRPC.length() > contentType.length()) {
return false;
}

if (APPLICATION_GRPC.contentEquals(contentType)) {
return true;
}
if (!AsciiString.of(contentType).startsWith(APPLICATION_GRPC)) {
// Not a gRPC content-type.
return false;
}

if (contentType.length() == APPLICATION_GRPC.length()) {
// The strings match exactly.
return true;
}

// The contentType matches, but is longer than the expected string.
// We need to support variations on the content-type (e.g. +proto, +json) as defined by the
// gRPC wire spec.
char nextChar = contentType.charAt(APPLICATION_GRPC.length());
return nextChar == '+' || nextChar == ';';
}

/**
* All error codes identified by the HTTP/2 spec. Used in GOAWAY and RST_STREAM frames.
*/
//public enum Http2Error {
// /**
// * Servers implementing a graceful shutdown of the connection will send {@code GOAWAY} with
// * {@code NO_ERROR}. In this case it is important to indicate to the application that the
// * request should be retried (i.e. {@link Status#UNAVAILABLE}).
// */
// NO_ERROR(0x0, Status.UNAVAILABLE),
// PROTOCOL_ERROR(0x1, Status.INTERNAL),
// INTERNAL_ERROR(0x2, Status.INTERNAL),
// FLOW_CONTROL_ERROR(0x3, Status.INTERNAL),
// SETTINGS_TIMEOUT(0x4, Status.INTERNAL),
// STREAM_CLOSED(0x5, Status.INTERNAL),
// FRAME_SIZE_ERROR(0x6, Status.INTERNAL),
// REFUSED_STREAM(0x7, Status.UNAVAILABLE),
// CANCEL(0x8, Status.CANCELLED),
// COMPRESSION_ERROR(0x9, Status.INTERNAL),
// CONNECT_ERROR(0xA, Status.INTERNAL),
// ENHANCE_YOUR_CALM(0xB, Status.RESOURCE_EXHAUSTED.withDescription("Bandwidth exhausted")),
// INADEQUATE_SECURITY(0xC, Status.PERMISSION_DENIED.withDescription("Permission denied as "
// + "protocol is not secure enough to call")),
// HTTP_1_1_REQUIRED(0xD, Status.UNKNOWN);
//
// // Populate a mapping of code to enum value for quick look-up.
// private static final Http2Error[] codeMap = buildHttp2CodeMap();
// private final int code;
// // Status is not guaranteed to be deeply immutable. Don't care though, since that's only true
// // when there are exceptions in the Status, which is not true here.
// @SuppressWarnings("ImmutableEnumChecker")
// private final Status status;
//
// Http2Error(int code, Status status) {
// this.code = code;
// this.status = status.augmentDescription("HTTP/2 error code: " + this.name());
// }
//
// private static Http2Error[] buildHttp2CodeMap() {
// Http2Error[] errors = Http2Error.values();
// int size = (int) errors[errors.length - 1].code() + 1;
// Http2Error[] http2CodeMap = new Http2Error[size];
// for (Http2Error error : errors) {
// int index = (int) error.code();
// http2CodeMap[index] = error;
// }
// return http2CodeMap;
// }
//
// /**
// * Looks up the HTTP/2 error code enum value for the specified code.
// *
// * @param code an HTTP/2 error code value.
// * @return the HTTP/2 error code enum or {@code null} if not found.
// */
// public static Http2Error forCode(long code) {
// if (code >= codeMap.length || code < 0) {
// return null;
// }
// return codeMap[(int) code];
// }
//
// /**
// * Looks up the {@link Status} from the given HTTP/2 error code. This is preferred over {@code
// * forCode(code).status()}, to more easily conform to HTTP/2:
// *
// * <blockquote>Unknown or unsupported error codes MUST NOT trigger any special behavior.
// * These MAY be treated by an implementation as being equivalent to INTERNAL_ERROR.</blockquote>
// *
// * @param code the HTTP/2 error code.
// * @return a {@link Status} representing the given error.
// */
// public static Status statusForCode(long code) {
// Http2Error error = forCode(code);
// if (error == null) {
// // This "forgets" the message of INTERNAL_ERROR while keeping the same status code.
// Status.Code statusCode = INTERNAL_ERROR.status().getCode();
// return Status.fromCodeValue(statusCode.value())
// .withDescription("Unrecognized HTTP/2 error code: " + code);
// }
//
// return error.status();
// }
//
// /**
// * Gets the code for this error used on the wire.
// */
// public long code() {
// return code;
// }
//
// /**
// * Gets the {@link Status} associated with this HTTP/2 code.
// */
// public Status status() {
// return status;
// }
//}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.apache.dubbo.rpc.protocol.grpc;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.remoting.netty4.DubboHttp2ConnectionHandler;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.RpcInvocation;

public class GrpcHttp2FrameListener extends Http2FrameAdapter {

protected Http2Connection.PropertyKey streamKey = null;

@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
System.out.println("onDataRead:" + streamId);
final DubboHttp2ConnectionHandler connectionHandler = ctx.pipeline().get(DubboHttp2ConnectionHandler.class);
Http2Connection connection = connectionHandler.encoder().connection();
Http2Stream stream = connection.stream(streamId);
Http2Request request = stream == null ? null : (Http2Request) stream.getProperty(streamKey);

if (request == null || request.getStreamId() != streamId) {
System.out.println("received remote data from streamId:" + streamId + ", but not found payload.");
int processed = data.readableBytes() + padding;
return processed;
}

request.cumulate(data, ctx.alloc());
int processed = data.readableBytes() + padding;
if (endOfStream) {
Invocation invocation = buildInvocation(request.getHeaders(), request.getData());
ctx.pipeline().fireChannelRead(invocation);
}
return processed;
}

private Invocation buildInvocation(Http2Headers http2Headers, ByteBuf data) {
return new RpcInvocation();
}

@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream) throws Http2Exception {
System.out.println("onHeadersRead" + streamId);

final DubboHttp2ConnectionHandler connectionHandler = ctx.pipeline().get(DubboHttp2ConnectionHandler.class);
if (headers.path() == null) {
System.out.println("Expected path but is missing");
return;
}

final String path = headers.path().toString();
if (path.charAt(0) != '/') {
System.out.println("Expected path but is missing1");
return;
}

final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
if (contentType == null) {
System.out.println("Expected path but is missing2");
return;
}

if (!GrpcElf.isGrpcContentType(contentType)) {
System.out.println("Expected path but is missing3");
return;
}

if (!HttpMethod.POST.asciiName().equals(headers.method())) {
System.out.println("Expected path but is missing4");
return;
}

String marshaller;
if (AsciiString.contentEquals(contentType, GrpcElf.APPLICATION_GRPC) || AsciiString.contentEquals(contentType, GrpcElf.GRPC_PROTO)) {
marshaller = "protobuf";
} else if (AsciiString.contentEquals(contentType, GrpcElf.GRPC_JSON)) {
marshaller = "protobuf-json";
} else {
System.out.println("Expected path but is missing5");
return;
}

Http2Connection connection = connectionHandler.encoder().connection();
Http2Stream http2Stream = connection.stream(streamId);
if (streamKey == null) {
streamKey = connection.newKey();
}
Http2Request request = new Http2Request(streamId, http2Stream, headers, streamKey, marshaller);
http2Stream.setProperty(streamKey, request);

if (endStream) {

}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.dubbo.rpc.protocol.grpc;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2FrameListener;
import org.apache.dubbo.remoting.netty4.Http2WireProtocol;

public class GrpcHttp2Protocol extends Http2WireProtocol {

@Override
protected Http2FrameListener frameListener() {
return new GrpcHttp2FrameListener();
}

@Override
protected void configServerPipeline0(ChannelHandlerContext ctx) {
// response -> data header/data
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.apache.dubbo.rpc.protocol.grpc;

import java.util.HashMap;
import java.util.Map;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;

public class Http2Request {
private int streamId;
private ByteBuf commulation;
private volatile Http2Headers headers;
private Http2Stream http2Stream;
private Http2Connection.PropertyKey streamKey;
private String marshaller;

public Http2Request(int streamId, Http2Stream http2Stream
, Http2Headers headers
//, AbstractHttp2CodecHandler http2CodecHandler
, Http2Connection.PropertyKey streamKey, String marshaller) {
this.http2Stream = http2Stream;
this.headers = headers;
//this.http2CodecHandler = http2CodecHandler;
this.streamKey = streamKey;
this.marshaller = marshaller;
}

public Http2Headers getHeaders() {
return headers;
}

public ByteBuf getData() {
return commulation;
}

public int getStreamId() {
return streamId;
}

public void cumulate(ByteBuf byteBuf, ByteBufAllocator allocator) {
commulation = cumulate(allocator, commulation, byteBuf);
}

public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
return buffer;
}

private ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcHttp2Protocol
Loading

0 comments on commit 47ac5a3

Please sign in to comment.