Skip to content

Commit

Permalink
Add support for QuicStreamFrame (java-native-access#62)
Browse files Browse the repository at this point in the history
Motivation:

For a lot of use-cases just using ByteBuf on top of the QuicStreamChannel is good enough. That said there are situations where a user may want to have some more control about when the FIN is sent etc. For this cases we should directly support QuicStreamFrame which basically wraps a ByteBuf but also allows to specify if a FIN flag should be send or not. Beside this the user may want to also receive QuicStreamFrame's directly and so have fine grained control

Modifications:

- Add QuicStreamFrame and a default implementation
- Add support to write QuicStreamFrame
- Add support to directly read QuichStreamFrame and fire these through the pipeline. This needs to be enabled by the user via a configuration option and is not the default.
- Add new ChannelOption and setter / getter to enable reading frames to QuicStreamChannel
- Let QuicStreamChannel.config() return QuicStreamChannelConfig
- Add unit tests

Result:

More flexible usage of QUIC codec possible
  • Loading branch information
normanmaurer authored Nov 26, 2020
1 parent cb206a5 commit 61ac022
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

final class DefaultQuicStreamChannelConfig extends DefaultChannelConfig implements QuicStreamChannelConfig {
private volatile boolean allowHalfClosure;
private volatile boolean readFrames;

DefaultQuicStreamChannelConfig(QuicStreamChannel channel) {
super(channel);
Expand All @@ -34,7 +35,7 @@ final class DefaultQuicStreamChannelConfig extends DefaultChannelConfig implemen
@Override
public Map<ChannelOption<?>, Object> getOptions() {
if (isHalfClosureSupported()) {
return getOptions(super.getOptions(), ChannelOption.ALLOW_HALF_CLOSURE);
return getOptions(super.getOptions(), ChannelOption.ALLOW_HALF_CLOSURE, QuicChannelOption.READ_FRAMES);
}
return super.getOptions();
}
Expand All @@ -45,7 +46,9 @@ public <T> T getOption(ChannelOption<T> option) {
if (option == ChannelOption.ALLOW_HALF_CLOSURE) {
return (T) Boolean.valueOf(isAllowHalfClosure());
}

if (option == QuicChannelOption.READ_FRAMES) {
return (T) Boolean.valueOf(isReadFrames());
}
return super.getOption(option);
}

Expand All @@ -59,9 +62,22 @@ public <T> boolean setOption(ChannelOption<T> option, T value) {
return true;
}
return false;
} else {
return super.setOption(option, value);
}
if (option == QuicChannelOption.READ_FRAMES) {
setReadFrames((Boolean) value);
}
return super.setOption(option, value);
}

@Override
public QuicStreamChannelConfig setReadFrames(boolean readFrames) {
this.readFrames = readFrames;
return this;
}

@Override
public boolean isReadFrames() {
return readFrames;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.incubator.codec.quic;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;

public final class DefaultQuicStreamFrame extends DefaultByteBufHolder implements QuicStreamFrame {

private final boolean fin;

public DefaultQuicStreamFrame(ByteBuf data, boolean fin) {
super(data);
this.fin = fin;
}

@Override
public boolean hasFin() {
return fin;
}

@Override
public QuicStreamFrame copy() {
return new DefaultQuicStreamFrame(content().copy(), fin);
}

@Override
public QuicStreamFrame duplicate() {
return new DefaultQuicStreamFrame(content().duplicate(), fin);
}

@Override
public QuicStreamFrame retainedDuplicate() {
return new DefaultQuicStreamFrame(content().retainedDuplicate(), fin);
}

@Override
public QuicStreamFrame replace(ByteBuf content) {
return new DefaultQuicStreamFrame(content, fin);
}

@Override
public QuicStreamFrame retain() {
super.retain();
return this;
}

@Override
public QuicStreamFrame retain(int increment) {
super.retain(increment);
return this;
}

@Override
public QuicStreamFrame touch() {
super.touch();
return this;
}

@Override
public QuicStreamFrame touch(Object hint) {
super.touch(hint);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ public final class QuicChannelOption<T> extends ChannelOption<T> {
public static final ChannelOption<String> PEER_CERT_SERVER_NAME =
valueOf(QuicChannelOption.class, "PEER_CERT_SERVER_NAME");

/**
* If set to {@code true} the {@link QuicStreamChannel} will read {@link QuicStreamFrame}s and fire it through
* the pipeline, if {@code false} it will read {@link io.netty.buffer.ByteBuf} and translate the FIN flag to
* events.
*/
public static final ChannelOption<Boolean> READ_FRAMES =
valueOf(QuicChannelOption.class, "READ_FRAMES");

@SuppressWarnings({ "deprecation" })
private QuicChannelOption() {
super(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,7 @@ public interface QuicStreamChannel extends DuplexChannel {

@Override
QuicChannel parent();

@Override
QuicStreamChannelConfig config();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@
* {@link DuplexChannelConfig} for QUIC streams.
*/
public interface QuicStreamChannelConfig extends DuplexChannelConfig {
/**
* Set this to {@code true} if the {@link QuicStreamChannel} should read {@link QuicStreamFrame}s and fire these
* through the {@link io.netty.channel.ChannelPipeline}, {@code false} if it uses {@link io.netty.buffer.ByteBuf}.
*/
QuicStreamChannelConfig setReadFrames(boolean readFrames);

/**
* Returns {@code true} if the {@link QuicStreamChannel} will read {@link QuicStreamFrame}s and fire these through
* the {@link io.netty.channel.ChannelPipeline}, {@code false} if it uses {@link io.netty.buffer.ByteBuf}.
*/
boolean isReadFrames();

@Override
QuicStreamChannelConfig setAllowHalfClosure(boolean allowHalfClosure);
Expand Down
55 changes: 55 additions & 0 deletions src/main/java/io/netty/incubator/codec/quic/QuicStreamFrame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.incubator.codec.quic;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;

/**
* A QUIC STREAM_FRAME.
*/
public interface QuicStreamFrame extends ByteBufHolder {

/**
* Returns {@code true} if the frame has the FIN set, which means it notifies the remote peer that
* there will be no more writing happen. {@code false} otherwise.
*/
boolean hasFin();

@Override
QuicStreamFrame copy();

@Override
QuicStreamFrame duplicate();

@Override
QuicStreamFrame retainedDuplicate();

@Override
QuicStreamFrame replace(ByteBuf content);

@Override
QuicStreamFrame retain();

@Override
QuicStreamFrame retain(int increment);

@Override
QuicStreamFrame touch();

@Override
QuicStreamFrame touch(Object hint);
}
46 changes: 37 additions & 9 deletions src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ enum StreamRecvResult {
OK
}

enum StreamSendResult {
/**
* Nothing more to sent and no FIN flag
*/
DONE,
/**
* FIN flag sent.
*/
FIN,
/**
* No more space, need to retry
*/
NO_SPACE
}

private static final class CloseData implements ChannelFutureListener {
final boolean applicationClose;
final int err;
Expand Down Expand Up @@ -460,17 +475,27 @@ private void streamShutdown0(long streamId, boolean read, boolean write, int err
Quiche.notifyPromise(res, promise);
}

boolean streamSendMultiple(long streamId, ByteBufAllocator allocator, ChannelOutboundBuffer streamOutboundBuffer)
throws Exception {
StreamSendResult streamSendMultiple(long streamId, ByteBufAllocator allocator,
ChannelOutboundBuffer streamOutboundBuffer) throws Exception {
boolean sendSomething = false;
try {
for (;;) {
ByteBuf buffer = (ByteBuf) streamOutboundBuffer.current();
if (buffer == null) {
Object current = streamOutboundBuffer.current();
if (current == null) {
break;
}
final ByteBuf buffer;
final boolean fin;
if (current instanceof ByteBuf) {
buffer = (ByteBuf) current;
fin = false;
} else {
QuicStreamFrame streamFrame = (QuicStreamFrame) current;
buffer = streamFrame.content();
fin = streamFrame.hasFin();
}
int readable = buffer.readableBytes();
if (readable == 0) {
if (readable == 0 && !fin) {
// Skip empty buffers.
streamOutboundBuffer.remove();
continue;
Expand All @@ -481,22 +506,25 @@ boolean streamSendMultiple(long streamId, ByteBufAllocator allocator, ChannelOut
ByteBuf tmpBuffer = allocator.directBuffer(readable);
try {
tmpBuffer.writeBytes(buffer, buffer.readerIndex(), readable);
res = streamSend(streamId, tmpBuffer, false);
res = streamSend(streamId, tmpBuffer, fin);
} finally {
tmpBuffer.release();
}
} else {
res = streamSend(streamId, buffer, false);
res = streamSend(streamId, buffer, fin);
}

if (Quiche.throwIfError(res) || res == 0) {
// stream has no capacity left stop trying to send.
return false;
return StreamSendResult.NO_SPACE;
}
streamOutboundBuffer.removeBytes(res);
sendSomething = true;
if (fin) {
return StreamSendResult.FIN;
}
}
return true;
return StreamSendResult.DONE;
} finally {
if (sendSomething) {
// As we called quiche_conn_stream_send(...) we need to ensure we will call quiche_conn_send(...) either
Expand Down
Loading

0 comments on commit 61ac022

Please sign in to comment.