Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
panxiaojun233 committed Dec 28, 2020
2 parents f14bba8 + 16b5b89 commit 3f6c810
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 263 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package org.apache.dubbo.remoting.netty4;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2FrameListener;
import io.netty.handler.codec.http2.Http2FrameLogger;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static io.netty.handler.logging.LogLevel.DEBUG;
import static io.netty.handler.logging.LogLevel.INFO;

public abstract class Http2WireProtocol implements WireProtocol {
public static final Http2FrameLogger CLIENT_LOGGER = new Http2FrameLogger(DEBUG, "H2_CLIENT");
public static final Http2FrameLogger SERVER_LOGGER = new Http2FrameLogger(DEBUG, "H2_SERVER");
public static final Http2FrameLogger SERVER_LOGGER = new Http2FrameLogger(INFO, "H2_SERVER");
private static final Set<Http2SessionHandler> handlers = ConcurrentHashMap.newKeySet();
private final ProtocolDetector detector = new Http2ProtocolDetector();

Expand Down
4 changes: 0 additions & 4 deletions dubbo-rpc/dubbo-rpc-triple/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@
<artifactId>dubbo-rpc-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.dubbo.rpc.protocol.tri;

/**
* See https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
*/
enum GrpcStatus {
OK(0),
NOT_FOUND(5),
UNIMPLEMENTED(12),
INTERNAL(13);

final int code;

GrpcStatus(int code){
this.code=code;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,84 +2,75 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;

public class Http2Request {
private int streamId;
private ByteBuf commulation;
private final int streamId;
private final String path;
private volatile Http2Headers headers;
private CompositeByteBuf pending;
private Http2Stream http2Stream;
private Http2Connection.PropertyKey streamKey;
private String marshaller;
private ByteBufAllocator allocator;
private byte[] content;
private int bytesToRead;
private final ByteBufAllocator alloc;

public Http2Request(int streamId, Http2Stream http2Stream
, Http2Headers headers
//, AbstractHttp2CodecHandler http2CodecHandler
, Http2Connection.PropertyKey streamKey, String marshaller, ByteBufAllocator allocator) {
public Http2Request(int streamId, String path, Http2Stream http2Stream, Http2Headers headers,
Http2Connection.PropertyKey streamKey, ByteBufAllocator allocator) {
this.streamId = streamId;
this.path = path;
this.http2Stream = http2Stream;
this.headers = headers;
//this.http2CodecHandler = http2CodecHandler;
this.alloc=allocator;
this.streamKey = streamKey;
this.marshaller = marshaller;
this.allocator = allocator;
this.commulation = allocator.buffer();
this.pending = allocator.compositeBuffer();
}

public Http2Headers getHeaders() {
return headers;
}

public ByteBuf getData() {
return commulation;
return pending;
}

public int getStreamId() {
return streamId;
}

public byte[] content() {
if (content != null) {
return content;
}
this.content = new byte[commulation.readableBytes()];
commulation.readBytes(content);
commulation.release();
return content;
public void appendData(ByteBuf data) {
pending.addComponent(true, data);
}

public void cumulate(ByteBuf byteBuf) {
commulation = cumulate(allocator, commulation, byteBuf);
}
public ByteBuf getAvailableTrunk(){

public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
return buffer;
}
int type = pending.readUnsignedByte();
// if ((type & RESERVED_MASK) != 0) {
// throw Status.INTERNAL.withDescription(
// "gRPC frame header malformed: reserved bits not zero")
// .asRuntimeException();
// }
// compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;

private ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
// Update the required length to include the length of the frame.
this.bytesToRead = pending.readInt();
/* if (requiredLength < 0 || requiredLength > maxInboundMessageSize) {
throw Status.RESOURCE_EXHAUSTED.withDescription(
String.format("gRPC message exceeds maximum size %d: %d",
maxInboundMessageSize, requiredLength))
.asRuntimeException();
}*/
return tryRead();

}
private ByteBuf tryRead(){
if(bytesToRead>0&&pending.readableBytes()>=bytesToRead){
final ByteBuf ready = alloc.buffer();
pending.readBytes(ready,bytesToRead);
return ready;
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package org.apache.dubbo.rpc.protocol.tri;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

public class Marshaller {

public static Marshaller marshaller = new Marshaller();

public Object unmarshaller(Class<?> requestClass, ByteBuf in) {
in.readByte();
final int len = in.readInt();
System.out.println(len);
final Parser<?> parser = ProtoUtil.getParser(requestClass);
Object result = null;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.dubbo.rpc.protocol.tri;

public interface TripleConstant {
String STATUS_KEY = "grpc-status";
String MESSAGE_KEY = "grpc-message";
String CONTENT_TYPE_KEY = "content-type";
String CONTENT_PROTO = "application/grpc+proto";

String APPLICATION_GRPC = "application/grpc";

}
Loading

0 comments on commit 3f6c810

Please sign in to comment.