diff --git a/servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/BufferAllocator.java b/servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/BufferAllocator.java index f7f4f3323c..21b076e73f 100644 --- a/servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/BufferAllocator.java +++ b/servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/BufferAllocator.java @@ -50,6 +50,15 @@ default Buffer newBuffer(boolean direct) { */ Buffer newBuffer(int initialCapacity); + /** + * Create a new buffer with the given initial capacity and given max capacity. + * + * @param initialCapacity the initial capacity of the buffer. + * @param maxCapacity the maximum capacity of the buffer. + * @return a new buffer. + */ + Buffer newBuffer(int initialCapacity, int maxCapacity); + /** * Create a new buffer with the given initial capacity. * diff --git a/servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/ReadOnlyBufferAllocator.java b/servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/ReadOnlyBufferAllocator.java index c1746591f8..745b4487b5 100644 --- a/servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/ReadOnlyBufferAllocator.java +++ b/servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/ReadOnlyBufferAllocator.java @@ -45,6 +45,11 @@ public Buffer newBuffer(int initialCapacity) { throw new UnsupportedOperationException(); } + @Override + public Buffer newBuffer(final int initialCapacity, final int maxCapacity) { + throw new UnsupportedOperationException(); + } + @Override public Buffer newBuffer(int initialCapacity, boolean direct) { throw new UnsupportedOperationException(); diff --git a/servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/ServiceTalkBufferAllocator.java b/servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/ServiceTalkBufferAllocator.java index cd3c447f27..f6bb7864bb 100644 --- a/servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/ServiceTalkBufferAllocator.java +++ b/servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/ServiceTalkBufferAllocator.java @@ -133,6 +133,11 @@ public Buffer newBuffer(int initialCapacity) { return new NettyBuffer<>(buffer(initialCapacity)); } + @Override + public Buffer newBuffer(final int initialCapacity, final int maxCapacity) { + return new NettyBuffer<>(buffer(initialCapacity, maxCapacity)); + } + @Override public Buffer newBuffer(int initialCapacity, boolean direct) { return new NettyBuffer<>(direct ? directBuffer(initialCapacity) : heapBuffer(initialCapacity)); diff --git a/servicetalk-encoding-api/build.gradle b/servicetalk-encoding-api/build.gradle new file mode 100644 index 0000000000..ed8ba395ce --- /dev/null +++ b/servicetalk-encoding-api/build.gradle @@ -0,0 +1,27 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: "io.servicetalk.servicetalk-gradle-plugin-internal-library" + +dependencies { + api project(":servicetalk-buffer-api") + api project(":servicetalk-concurrent-api") + + implementation project(":servicetalk-annotations") + implementation project(":servicetalk-concurrent-internal") + implementation "com.google.code.findbugs:jsr305:$jsr305Version" + implementation "org.slf4j:slf4j-api:$slf4jVersion" +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/AbstractContentCodec.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/AbstractContentCodec.java new file mode 100644 index 0000000000..d44c9047bc --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/AbstractContentCodec.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +import java.util.Objects; + +abstract class AbstractContentCodec implements ContentCodec { + + private final CharSequence name; + + AbstractContentCodec(final CharSequence name) { + this.name = name; + } + + public final CharSequence name() { + return name; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AbstractContentCodec that = (AbstractContentCodec) o; + return name.equals(that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public String toString() { + return "ContentCodec{" + + "name=" + name + + '}'; + } +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/AbstractZipContentCodec.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/AbstractZipContentCodec.java new file mode 100644 index 0000000000..e2f9df0d4d --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/AbstractZipContentCodec.java @@ -0,0 +1,582 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.buffer.api.BufferAllocator; +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.api.Publisher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.CRC32; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; +import javax.annotation.Nullable; + +import static io.servicetalk.buffer.api.ReadOnlyBufferAllocators.DEFAULT_RO_ALLOCATOR; +import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource; +import static java.util.Objects.requireNonNull; + +abstract class AbstractZipContentCodec extends AbstractContentCodec { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractZipContentCodec.class); + private static final Buffer END_OF_STREAM = DEFAULT_RO_ALLOCATOR.fromAscii(" "); + + protected final int chunkSize; + private final int maxPayloadSize; + + AbstractZipContentCodec(final CharSequence name, final int chunkSize, final int maxPayloadSize) { + super(name); + this.chunkSize = chunkSize; + this.maxPayloadSize = maxPayloadSize; + } + + abstract boolean supportsChecksum(); + + abstract Inflater newRawInflater(); + + abstract DeflaterOutputStream newDeflaterOutputStream(OutputStream out) throws IOException; + + abstract InflaterInputStream newInflaterInputStream(InputStream in); + + @Override + public final Buffer encode(final Buffer src, final int offset, final int length, + final BufferAllocator allocator) { + final Buffer dst = allocator.newBuffer(chunkSize); + DeflaterOutputStream output = null; + try { + output = newDeflaterOutputStream(Buffer.asOutputStream(dst)); + + if (src.hasArray()) { + output.write(src.array(), src.arrayOffset() + offset, length); + } else { + while (src.readableBytes() > 0) { + byte[] onHeap = new byte[Math.min(src.readableBytes(), chunkSize)]; + src.readBytes(onHeap); + output.write(onHeap); + } + } + + output.finish(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + closeQuietly(output); + } + + return dst; + } + + @Override + public final Publisher encode(final Publisher from, + final BufferAllocator allocator) { + return from + .concat(succeeded(END_OF_STREAM)) + .liftSync(subscriber -> new PublisherSource.Subscriber() { + @Nullable + SwappableBufferOutputStream stream; + + @Nullable + DeflaterOutputStream output; + + private boolean headerWritten; + + @Override + public void onSubscribe(PublisherSource.Subscription subscription) { + try { + Buffer dst = allocator.newBuffer(chunkSize); + stream = new SwappableBufferOutputStream(dst); + // This will write header bytes on the stream, which will be consumed along with the first + // onNext part + output = newDeflaterOutputStream(stream); + } catch (IOException e) { + deliverErrorFromSource(subscriber, e); + return; + } + + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(Buffer next) { + assert output != null; + assert stream != null; + + // onNext will produce AT-MOST N items (as received) + // +1 for the encoding footer (ie. END_OF_STREAM) + try { + if (next == END_OF_STREAM) { + // ZIP footer is 10 bytes + Buffer dst = allocator.newBuffer(10); + stream.swap(dst); + output.finish(); + + subscriber.onNext(dst); + return; + } + + Buffer dst; + if (headerWritten) { + dst = allocator.newBuffer(chunkSize); + stream.swap(dst); + } else { + dst = stream.buffer; + } + + if (next.hasArray()) { + output.write(next.array(), next.arrayOffset() + next.readerIndex(), + next.readableBytes()); + } else { + while (next.readableBytes() > 0) { + byte[] onHeap = new byte[Math.min(next.readableBytes(), chunkSize)]; + next.readBytes(onHeap); + output.write(onHeap); + } + } + + output.flush(); + headerWritten = true; + subscriber.onNext(dst); + } catch (IOException e) { + onError(e); + } + } + + @Override + public void onError(Throwable t) { + closeQuietly(output); + subscriber.onError(t); + } + + @Override + public void onComplete() { + try { + if (output != null) { + output.close(); + } + } catch (IOException e) { + onError(e); + return; + } + + subscriber.onComplete(); + } + }); + } + + @Override + public final Buffer decode(final Buffer src, final int offset, final int length, final BufferAllocator allocator) { + final Buffer dst = allocator.newBuffer(chunkSize, maxPayloadSize); + InflaterInputStream input = null; + try { + input = newInflaterInputStream(Buffer.asInputStream(src)); + + int read = dst.setBytesUntilEndStream(0, input, chunkSize); + dst.writerIndex(read); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + closeQuietly(input); + } + + return dst; + } + + @Override + public final Publisher decode(final Publisher from, final BufferAllocator allocator) { + return from.liftSync(subscriber -> new PublisherSource.Subscriber() { + + @Nullable + Inflater inflater; + @Nullable + ZLibStreamDecoder streamDecoder; + @Nullable + PublisherSource.Subscription subscription; + + @Override + public void onSubscribe(final PublisherSource.Subscription subscription) { + inflater = newRawInflater(); + streamDecoder = new ZLibStreamDecoder(inflater, supportsChecksum(), maxPayloadSize); + this.subscription = subscription; + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(@Nullable final Buffer src) { + assert streamDecoder != null; + assert subscription != null; + assert src != null; + + // onNext will produce AT-MOST N items (as received) + try { + if (streamDecoder.isFinished()) { + throw new IllegalStateException("Stream encoder previously closed but more input arrived "); + } + + Buffer part = allocator.newBuffer(chunkSize); + streamDecoder.decode(src, part); + if (part.readableBytes() > 0) { + subscriber.onNext(part); + } + + // Not enough data to decompress, ask for more + subscription.request(1); + } catch (Exception e) { + onError(e); + } + } + + @Override + public void onError(final Throwable t) { + assert inflater != null; + + inflater.end(); + subscriber.onError(t); + } + + @Override + public void onComplete() { + assert inflater != null; + + inflater.end(); + subscriber.onComplete(); + } + }); + } + + private void closeQuietly(@Nullable final Closeable closeable) { + try { + if (closeable != null) { + closeable.close(); + } + } catch (IOException e) { + LOGGER.error("Unexpected IO exception while closing buffer streams", e); + } + } + + // Code forked from Netty's JdkZlibDecoder + static class ZLibStreamDecoder { + private static final int FHCRC = 0x02; + private static final int FEXTRA = 0x04; + private static final int FNAME = 0x08; + private static final int FCOMMENT = 0x10; + private static final int FRESERVED = 0xE0; + + @Nullable + private final CRC32 crc; + private final Inflater inflater; + private final int maxPayloadSize; + + private enum State { + HEADER_START, + HEADER_END, + FLG_READ, + XLEN_READ, + SKIP_FNAME, + SKIP_COMMENT, + PROCESS_FHCRC, + FOOTER_START, + } + + private State state = State.HEADER_START; + private int flags = -1; + private int xlen = -1; + + private int payloadSizeAcc; + private boolean finished; + + ZLibStreamDecoder(Inflater inflater, boolean supportsChksum, int maxPayloadSize) { + this.inflater = inflater; + this.maxPayloadSize = maxPayloadSize; + crc = supportsChksum ? new CRC32() : null; + } + + public boolean isFinished() { + return finished; + } + + @Nullable + protected void decode(Buffer in, Buffer out) throws Exception { + if (finished) { + // Skip data received after finished. + in.skipBytes(in.readableBytes()); + return; + } + + int readableBytes = in.readableBytes(); + if (readableBytes == 0) { + return; + } + + if (crc != null) { + switch (state) { + case FOOTER_START: + if (readGZIPFooter(in)) { + finished = true; + } + return; + default: + if (state != State.HEADER_END && !readGZIPHeader(in)) { + return; + } + } + // Some bytes may have been consumed, and so we must re-set the number of readable bytes. + readableBytes = in.readableBytes(); + } + + if (in.hasArray()) { + inflater.setInput(in.array(), in.arrayOffset() + in.readerIndex(), readableBytes); + } else { + byte[] array = new byte[readableBytes]; + in.getBytes(in.readerIndex(), array); + inflater.setInput(array); + } + + try { + boolean readFooter = false; + while (!inflater.needsInput()) { + byte[] outArray = out.array(); + int writerIndex = out.writerIndex(); + int outIndex = out.arrayOffset() + writerIndex; + int outputLength = inflater.inflate(outArray, outIndex, out.writableBytes()); + payloadSizeAcc += outputLength; + if (payloadSizeAcc > maxPayloadSize) { + throw new IllegalStateException("Max decompressed payload limit has been reached: " + + payloadSizeAcc + " (expected <= " + maxPayloadSize + ") bytes"); + } + + if (outputLength > 0) { + out.writerIndex(writerIndex + outputLength); + if (crc != null) { + crc.update(outArray, outIndex, outputLength); + } + } else { + if (inflater.needsDictionary()) { + throw new IOException( + "decompression failure, unable to set dictionary as non was specified"); + } + } + + if (inflater.finished()) { + if (crc == null) { + finished = true; // Do not decode anymore. + } else { + readFooter = true; + } + break; + } else { + out.ensureWritable(inflater.getRemaining() << 1); + } + } + + in.skipBytes(readableBytes - inflater.getRemaining()); + + if (readFooter) { + state = State.FOOTER_START; + if (readGZIPFooter(in)) { + finished = true; + inflater.end(); + } + } + } catch (DataFormatException e) { + throw new IOException("decompression failure", e); + } + } + + private boolean readGZIPHeader(Buffer in) throws IOException { + switch (state) { + case HEADER_START: + if (in.readableBytes() < 10) { + return false; + } + // read magic numbers + int magic0 = in.readByte(); + int magic1 = in.readByte(); + + if (magic0 != 31) { + throw new IOException("Input is not in the GZIP format"); + } + crc.update(magic0); + crc.update(magic1); + + int method = in.readUnsignedByte(); + if (method != Deflater.DEFLATED) { + throw new IOException("Unsupported compression method " + + method + " in the GZIP header"); + } + crc.update(method); + + flags = in.readUnsignedByte(); + crc.update(flags); + + if ((flags & FRESERVED) != 0) { + throw new IOException( + "Reserved flags are set in the GZIP header"); + } + + // mtime (int) + crc.update(in.readUnsignedByte()); + crc.update(in.readUnsignedByte()); + crc.update(in.readUnsignedByte()); + crc.update(in.readUnsignedByte()); + + crc.update(in.readUnsignedByte()); // extra flags + crc.update(in.readUnsignedByte()); // operating system + + state = State.FLG_READ; + // fall through + case FLG_READ: + if ((flags & FEXTRA) != 0) { + if (in.readableBytes() < 2) { + return false; + } + int xlen1 = in.readUnsignedByte(); + int xlen2 = in.readUnsignedByte(); + crc.update(xlen1); + crc.update(xlen2); + + xlen |= xlen1 << 8 | xlen2; + } + state = State.XLEN_READ; + // fall through + case XLEN_READ: + if (xlen != -1) { + if (in.readableBytes() < xlen) { + return false; + } + for (int i = 0; i < xlen; i++) { + crc.update(in.readUnsignedByte()); + } + } + state = State.SKIP_FNAME; + // fall through + case SKIP_FNAME: + if ((flags & FNAME) != 0) { + if (in.readableBytes() > 0) { + return false; + } + do { + int b = in.readUnsignedByte(); + crc.update(b); + if (b == 0x00) { + break; + } + } while (in.readableBytes() > 0); + } + state = State.SKIP_COMMENT; + // fall through + case SKIP_COMMENT: + if ((flags & FCOMMENT) != 0) { + if (in.readableBytes() > 0) { + return false; + } + do { + int b = in.readUnsignedByte(); + crc.update(b); + if (b == 0x00) { + break; + } + } while (in.readableBytes() > 0); + } + state = State.PROCESS_FHCRC; + // fall through + case PROCESS_FHCRC: + if ((flags & FHCRC) != 0) { + if (in.readableBytes() < 4) { + return false; + } + verifyCrc(in); + } + crc.reset(); + state = State.HEADER_END; + // fall through + case HEADER_END: + return true; + default: + throw new IllegalStateException(); + } + } + + private boolean readGZIPFooter(Buffer buf) throws IOException { + if (buf.readableBytes() < 8) { + return false; + } + + verifyCrc(buf); + + // read ISIZE and verify + int dataLength = 0; + for (int i = 0; i < 4; ++i) { + dataLength |= buf.readUnsignedByte() << i * 8; + } + int readLength = inflater.getTotalOut(); + if (dataLength != readLength) { + throw new IOException( + "Number of bytes mismatch. Expected: " + dataLength + ", Got: " + readLength); + } + return true; + } + + private void verifyCrc(Buffer in) throws IOException { + long crcValue = 0; + for (int i = 0; i < 4; ++i) { + crcValue |= (long) in.readUnsignedByte() << i * 8; + } + long readCrc = crc.getValue(); + if (crcValue != readCrc) { + throw new IOException( + "CRC value mismatch. Expected: " + crcValue + ", Got: " + readCrc); + } + } + } + + static class SwappableBufferOutputStream extends OutputStream { + private Buffer buffer; + + SwappableBufferOutputStream(final Buffer buffer) { + this.buffer = requireNonNull(buffer); + } + + private void swap(final Buffer buffer) { + this.buffer = requireNonNull(buffer); + } + + @Override + public void write(final int b) { + buffer.writeInt(b); + } + + @Override + public void write(byte[] b) { + buffer.writeBytes(b); + } + + @Override + public void write(byte[] b, int off, int len) { + buffer.writeBytes(b, off, len); + } + } +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodec.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodec.java new file mode 100644 index 0000000000..e27d90d8df --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodec.java @@ -0,0 +1,99 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.buffer.api.BufferAllocator; +import io.servicetalk.concurrent.api.Publisher; + +/** + * API to support encode and decode of {@link Buffer}s. + *

+ * Implementations must provide thread safety semantics, since instances could be shared across threads. + */ +public interface ContentCodec { + + /** + * A unique textual representation for the coding. + * + * @return a unique textual representation for the coding. + */ + CharSequence name(); + + /** + * Take a {@link Buffer} and encode its contents resulting in a {@link Buffer} with the encoded contents. + * + * @param src the {@link Buffer} to encode + * @param allocator the {@link BufferAllocator} to use for allocating auxiliary buffers or the returned buffer + * @return {@link Buffer} the result buffer with the content encoded + */ + default Buffer encode(Buffer src, BufferAllocator allocator) { + return encode(src, src.readerIndex(), src.readableBytes(), allocator); + } + + /** + * Take a {@link Buffer} and encode its contents resulting in a {@link Buffer} with the encoded contents. + * + * @param src the {@link Buffer} to encode + * @param offset the offset of the source to start reading from + * @param length the total length available for reading + * @param allocator the {@link BufferAllocator} to use for allocating auxiliary buffers or the returned buffer + * @return {@link Buffer} the result buffer with the content encoded + */ + Buffer encode(Buffer src, int offset, int length, BufferAllocator allocator); + + /** + * Take a {@link Buffer} and decode its contents resulting in a {@link Buffer} with the decoded content. + * + * @param src the {@link Buffer} to decode + * @param allocator the {@link BufferAllocator} to use for allocating auxiliary buffers or the returned buffer + * @return {@link Buffer} the result buffer with the content decoded + */ + default Buffer decode(Buffer src, BufferAllocator allocator) { + return decode(src, src.readerIndex(), src.readableBytes(), allocator); + } + + /** + * Take a {@link Buffer} and decode its contents resulting in a {@link Buffer} with the decoded content. + * + * @param src the {@link Buffer} to decode + * @param offset the offset of the source to start reading from + * @param length the total length available for reading + * @param allocator the {@link BufferAllocator} to use for allocating auxiliary buffers or the returned buffer + * @return {@link Buffer} the result buffer with the content decoded + */ + Buffer decode(Buffer src, int offset, int length, BufferAllocator allocator); + + /** + * Take a {@link Publisher} of {@link Buffer} and encode its contents resulting in a + * {@link Publisher} of {@link Buffer} with the encoded contents. + * + * @param from the {@link Publisher} buffer to encode + * @param allocator the {@link BufferAllocator} to use for allocating auxiliary buffers or the returned buffer + * @return {@link Publisher} the result publisher with the buffers encoded + */ + Publisher encode(Publisher from, BufferAllocator allocator); + + /** + * Take a {@link Publisher} of {@link Buffer} and encode its contents resulting in a + * {@link Publisher} of {@link Buffer} with the decoded contents. + * + * @param from the {@link Publisher} to decoded + * @param allocator the {@link BufferAllocator} to use for allocating auxiliary buffers or the returned buffer + * @return {@link Publisher} the result publisher with the buffers decoded + */ + Publisher decode(Publisher from, BufferAllocator allocator); +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodecBuilder.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodecBuilder.java new file mode 100644 index 0000000000..669fedaa0e --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodecBuilder.java @@ -0,0 +1,39 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +/** + * Builder for {@link ContentCodec}. + */ +public interface ContentCodecBuilder { + + /** + * Sets the maximum allowed decompressed payload size that the codec can process. + * This can help prevent malicious attempts to decode malformed payloads that can drain resources of the + * running instance. + * + * @param maxAllowedPayloadSize the maximum allowed payload size + * @return {@code this} + * @see Zip Bomb + */ + ContentCodecBuilder setMaxAllowedPayloadSize(int maxAllowedPayloadSize); + + /** + * Build and return an instance of the {@link ContentCodec} with the configuration of the builder. + * @return the {@link ContentCodec} with the configuration of the builder + */ + ContentCodec build(); +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodings.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodings.java new file mode 100644 index 0000000000..7c5174229b --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/ContentCodings.java @@ -0,0 +1,78 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +import io.servicetalk.encoding.api.DefaultContentCodecBuilder.DeflateContentCodecBuilder; +import io.servicetalk.encoding.api.DefaultContentCodecBuilder.GzipContentCodecBuilder; + +/** + * Common available encoding implementations. + */ +public final class ContentCodings { + + private static final ContentCodec IDENTITY = IdentityContentCodec.INSTANCE; + + private static final ContentCodec DEFAULT_GZIP = gzip().build(); + + private static final ContentCodec DEFAULT_DEFLATE = deflate().build(); + + private ContentCodings() { + } + + /** + * Returns the default, always supported 'identity' {@link ContentCodec}. + * @return the default, always supported 'identity' {@link ContentCodec} + */ + public static ContentCodec identity() { + return IDENTITY; + } + + /** + * Returns the default GZIP {@link ContentCodec}. + * @return default GZIP based {@link ContentCodec} + */ + public static ContentCodec gzipDefault() { + return DEFAULT_GZIP; + } + + /** + * Returns a GZIP based {@link ContentCodecBuilder} that allows building + * a customizable {@link ContentCodec}. + * @return a GZIP based {@link ContentCodecBuilder} that allows building + * a customizable GZIP {@link ContentCodec} + */ + public static ContentCodecBuilder gzip() { + return new GzipContentCodecBuilder(); + } + + /** + * Returns the default DEFLATE based {@link ContentCodec}. + * @return default DEFLATE based {@link ContentCodec} + */ + public static ContentCodec deflateDefault() { + return DEFAULT_DEFLATE; + } + + /** + * Returns a DEFLATE based {@link ContentCodecBuilder} that allows building + * a customizable {@link ContentCodec}. + * @return a DEFLATE based {@link ContentCodecBuilder} that allows building + * a customizable DEFLATE {@link ContentCodec} + */ + public static ContentCodecBuilder deflate() { + return new DeflateContentCodecBuilder(); + } +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/DefaultContentCodecBuilder.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/DefaultContentCodecBuilder.java new file mode 100644 index 0000000000..21b509918c --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/DefaultContentCodecBuilder.java @@ -0,0 +1,52 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +abstract class DefaultContentCodecBuilder implements ContentCodecBuilder { + + private static final int CHUNK_SIZE = 1 << 10; //1KiB + private static final int DEFAULT_MAX_ALLOWED_DECOMPRESSED_PAYLOAD = 16 << 20; //16MiB + + private int maxAllowedPayloadSize = DEFAULT_MAX_ALLOWED_DECOMPRESSED_PAYLOAD; + + protected int maxAllowedPayloadSize() { + return maxAllowedPayloadSize; + } + + @Override + public ContentCodecBuilder setMaxAllowedPayloadSize(final int maxAllowedPayloadSize) { + if (maxAllowedPayloadSize <= 0) { + throw new IllegalArgumentException("maxAllowedPayloadSize: " + maxAllowedPayloadSize + " (expected > 0)"); + } + + this.maxAllowedPayloadSize = maxAllowedPayloadSize; + return this; + } + + static final class GzipContentCodecBuilder extends DefaultContentCodecBuilder { + @Override + public ContentCodec build() { + return new GzipContentCodec(CHUNK_SIZE, maxAllowedPayloadSize()); + } + } + + static final class DeflateContentCodecBuilder extends DefaultContentCodecBuilder { + @Override + public ContentCodec build() { + return new DeflateContentCodec(CHUNK_SIZE, maxAllowedPayloadSize()); + } + } +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/DeflateContentCodec.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/DeflateContentCodec.java new file mode 100644 index 0000000000..796fb4a25d --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/DeflateContentCodec.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; + +final class DeflateContentCodec extends AbstractZipContentCodec { + + private static final CharSequence NAME = "deflate"; + + DeflateContentCodec(final int chunkSize, final int maxSize) { + super(NAME, chunkSize, maxSize); + } + + @Override + boolean supportsChecksum() { + return false; + } + + @Override + Inflater newRawInflater() { + return new Inflater(false); + } + + @Override + InflaterInputStream newInflaterInputStream(final InputStream in) { + return new InflaterInputStream(in); + } + + @Override + DeflaterOutputStream newDeflaterOutputStream(final OutputStream out) { + // TODO tk - Optimization, we could rely on the Deflater directly to avoid the intermediate + // copy on the stream buffer + return new DeflaterOutputStream(out, new Deflater(), chunkSize, true); + } +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/GzipContentCodec.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/GzipContentCodec.java new file mode 100644 index 0000000000..81befac43d --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/GzipContentCodec.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.GZIPOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; + +final class GzipContentCodec extends AbstractZipContentCodec { + + private static final CharSequence NAME = "gzip"; + + GzipContentCodec(final int chunkSize, final int maxSize) { + super(NAME, chunkSize, maxSize); + } + + @Override + boolean supportsChecksum() { + return true; + } + + @Override + Inflater newRawInflater() { + return new Inflater(true); + } + + @Override + InflaterInputStream newInflaterInputStream(final InputStream in) { + return new InflaterInputStream(in); + } + + @Override + DeflaterOutputStream newDeflaterOutputStream(final OutputStream out) throws IOException { + // TODO tk - Optimization, we could rely on the Deflater directly to avoid the intermediate + // copy on the stream buffer + return new GZIPOutputStream(out, chunkSize, true); + } +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/IdentityContentCodec.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/IdentityContentCodec.java new file mode 100644 index 0000000000..06553760e6 --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/IdentityContentCodec.java @@ -0,0 +1,56 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.encoding.api; + +import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.buffer.api.BufferAllocator; +import io.servicetalk.concurrent.api.Publisher; + +/** + * NOOP Message encoding codec. + */ +final class IdentityContentCodec extends AbstractContentCodec { + + private static final CharSequence NAME = "identity"; + + public static final ContentCodec INSTANCE = new IdentityContentCodec(); + + private IdentityContentCodec() { + super(NAME); + } + + @Override + public Buffer encode(final Buffer src, final int offset, final int length, + final BufferAllocator allocator) { + return src; + } + + @Override + public Publisher encode(final Publisher from, + final BufferAllocator allocator) { + return from; + } + + @Override + public Buffer decode(final Buffer src, final int offset, final int length, final BufferAllocator allocator) { + return src; + } + + @Override + public Publisher decode(final Publisher from, final BufferAllocator allocator) { + return from; + } +} diff --git a/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/package-info.java b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/package-info.java new file mode 100644 index 0000000000..a86799849f --- /dev/null +++ b/servicetalk-encoding-api/src/main/java/io/servicetalk/encoding/api/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@ElementsAreNonnullByDefault +package io.servicetalk.encoding.api; + +import io.servicetalk.annotations.ElementsAreNonnullByDefault; diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/ZipGrpcMessageCodec.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/AbstractZipMessageCodec.java similarity index 86% rename from servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/ZipGrpcMessageCodec.java rename to servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/AbstractZipMessageCodec.java index d82c1fc15b..ae6183ca39 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/ZipGrpcMessageCodec.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/AbstractZipMessageCodec.java @@ -33,21 +33,21 @@ import static io.servicetalk.buffer.api.Buffer.asOutputStream; import static java.lang.Math.min; -abstract class ZipGrpcMessageCodec implements GrpcMessageCodec { +abstract class AbstractZipMessageCodec implements MessageCodec { - private static final Logger LOGGER = LoggerFactory.getLogger(ZipGrpcMessageCodec.class); - private static final int ONE_KB = 1 << 10; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractZipMessageCodec.class); + protected static final int ONE_KB = 1 << 10; - abstract DeflaterOutputStream newCodecOutputStream(OutputStream out) throws IOException; + abstract DeflaterOutputStream newDeflaterOutputStream(OutputStream out) throws IOException; - abstract InflaterInputStream newCodecInputStream(InputStream in) throws IOException; + abstract InflaterInputStream newInflaterInputStream(InputStream in) throws IOException; @Override public final Buffer encode(final Buffer src, final int offset, final int length, final BufferAllocator allocator) { final Buffer dst = allocator.newBuffer(ONE_KB); DeflaterOutputStream output = null; try { - output = newCodecOutputStream(asOutputStream(dst)); + output = newDeflaterOutputStream(asOutputStream(dst)); if (src.hasArray()) { output.write(src.array(), offset, length); @@ -74,7 +74,7 @@ public final Buffer decode(final Buffer src, final int offset, final int length, final Buffer dst = allocator.newBuffer(ONE_KB); InflaterInputStream input = null; try { - input = newCodecInputStream(asInputStream(src)); + input = newInflaterInputStream(asInputStream(src)); int read = dst.setBytesUntilEndStream(0, input, ONE_KB); dst.writerIndex(read); diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientMetadata.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientMetadata.java index 29be53ceba..c80cf9962a 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientMetadata.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientMetadata.java @@ -17,7 +17,7 @@ import javax.annotation.Nullable; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; /** * Default implementation for {@link DefaultGrpcClientMetadata}. @@ -51,7 +51,7 @@ protected DefaultGrpcClientMetadata(final String path, final GrpcMessageEncoding */ protected DefaultGrpcClientMetadata(final String path, @Nullable final GrpcExecutionStrategy strategy) { - this(path, strategy, none()); + this(path, strategy, identity()); } protected DefaultGrpcClientMetadata(final String path, diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcMessageEncoding.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcMessageEncoding.java index ea95dc2ae5..737d9e68e6 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcMessageEncoding.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcMessageEncoding.java @@ -18,9 +18,9 @@ class DefaultGrpcMessageEncoding implements GrpcMessageEncoding { private final String encoding; - private final GrpcMessageCodec codec; + private final MessageCodec codec; - DefaultGrpcMessageEncoding(final String encoding, final GrpcMessageCodec messageCodec) { + DefaultGrpcMessageEncoding(final String encoding, final MessageCodec messageCodec) { this.encoding = encoding; this.codec = messageCodec; } @@ -31,7 +31,7 @@ public String name() { } @Override - public GrpcMessageCodec codec() { + public MessageCodec codec() { return codec; } diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DeflateGrpcMessageCodec.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DeflateMessageCodec.java similarity index 66% rename from servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DeflateGrpcMessageCodec.java rename to servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DeflateMessageCodec.java index 2322b1ab42..495483d9af 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DeflateGrpcMessageCodec.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DeflateMessageCodec.java @@ -17,18 +17,21 @@ import java.io.InputStream; import java.io.OutputStream; +import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -class DeflateGrpcMessageCodec extends ZipGrpcMessageCodec { +final class DeflateMessageCodec extends AbstractZipMessageCodec { @Override - DeflaterOutputStream newCodecOutputStream(final OutputStream out) { - return new DeflaterOutputStream(out); + DeflaterOutputStream newDeflaterOutputStream(final OutputStream out) { + // TODO tk - Optimization, we could rely on the Deflater directly to avoid the intermediate + // copy on the stream buffer + return new DeflaterOutputStream(out, new Deflater(), ONE_KB, true); } @Override - InflaterInputStream newCodecInputStream(final InputStream in) { + InflaterInputStream newInflaterInputStream(final InputStream in) { return new InflaterInputStream(in); } } diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcClientFactory.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcClientFactory.java index 380976d05b..2f6867992f 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcClientFactory.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcClientFactory.java @@ -18,7 +18,7 @@ import java.util.Set; import javax.annotation.Nullable; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; import static java.util.Collections.singleton; import static java.util.Collections.unmodifiableSet; import static java.util.Objects.requireNonNull; @@ -43,7 +43,7 @@ public abstract class GrpcClientFactory supportedEncodings = unmodifiableSet(singleton(none())); + private Set supportedEncodings = unmodifiableSet(singleton(identity())); /** * Create a new client that follows the specified gRPC @@ -107,7 +107,7 @@ final BlockingClient newBlockingClientForCallFactory(GrpcClientCallFactory clien /** * Sets the supported message encodings for this client factory. - * By default only {@link GrpcMessageEncodings#none()} is supported + * By default only {@link GrpcMessageEncodings#identity()} is supported * * @param supportedEncodings {@link GrpcMessageEncoding} supported encodings for this client. * @return {@code this} diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageEncoding.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageEncoding.java index 9d7638cd55..22e966214c 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageEncoding.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageEncoding.java @@ -16,8 +16,8 @@ package io.servicetalk.grpc.api; /** - * Supported - * gRPC message encoding schemes. + * API for message + * coding schemes. */ public interface GrpcMessageEncoding { @@ -33,5 +33,5 @@ public interface GrpcMessageEncoding { * * @return a shared instance of the codec for that message-encoding */ - GrpcMessageCodec codec(); + MessageCodec codec(); } diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageEncodings.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageEncodings.java index 0e83e99687..e3d8eb8a2a 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageEncodings.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageEncodings.java @@ -26,23 +26,23 @@ /** * Default available encoding implementations. - * Encoding {@link #none()} is always supported regardless of the client or server settings. + * Encoding {@link #identity()} is always supported regardless of the client or server settings. * * {@link #all()} is a set that includes all default encodings {@link #deflate()} and {@link #gzip()}. */ public final class GrpcMessageEncodings { - private static final GrpcMessageEncoding NONE = - new DefaultGrpcMessageEncoding("identity", new IdentityGrpcMessageCodec()); + private static final GrpcMessageEncoding IDENTITY = + new DefaultGrpcMessageEncoding("identity", new IdentityMessageCodec()); private static final GrpcMessageEncoding GZIP = - new DefaultGrpcMessageEncoding("gzip", new GzipGrpcMessageCodec()); + new DefaultGrpcMessageEncoding("gzip", new GzipMessageCodec()); private static final GrpcMessageEncoding DEFLATE = - new DefaultGrpcMessageEncoding("deflate", new DeflateGrpcMessageCodec()); + new DefaultGrpcMessageEncoding("deflate", new DeflateMessageCodec()); private static final Set ALL = - unmodifiableSet(new HashSet<>(asList(NONE, GZIP, DEFLATE))); + unmodifiableSet(new HashSet<>(asList(IDENTITY, GZIP, DEFLATE))); private GrpcMessageEncodings() { } @@ -51,8 +51,8 @@ private GrpcMessageEncodings() { * Returns the default, always supported 'identity' {@link GrpcMessageEncoding}. * @return the default, always supported 'identity' {@link GrpcMessageEncoding} */ - public static GrpcMessageEncoding none() { - return NONE; + public static GrpcMessageEncoding identity() { + return IDENTITY; } /** @@ -83,7 +83,7 @@ public static Set all() { * Returns a {@link GrpcMessageEncoding} that matches the {@code name}. * Returns {@code null} if {@code name} is {@code null} or empty. * If {@code name} is {@code 'identity'} this will always result in - * {@link GrpcMessageEncodings#NONE} regardless of its presence in the {@code allowedList}. + * {@link GrpcMessageEncodings#IDENTITY} regardless of its presence in the {@code allowedList}. * * @param allowedList the source list to find a matching encoding in * @param name the encoding name used for the matching predicate @@ -99,8 +99,8 @@ public static GrpcMessageEncoding encodingFor(final CollectiongRPC service. */ default Set supportedEncodings() { - return singleton(none()); + return singleton(identity()); } @Override diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java index 9377d8ed57..8a29733fa8 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java @@ -47,7 +47,7 @@ import static io.servicetalk.concurrent.api.Publisher.empty; import static io.servicetalk.concurrent.api.Publisher.failed; import static io.servicetalk.grpc.api.GrpcMessageEncodings.encodingFor; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; import static io.servicetalk.grpc.api.GrpcStatusCode.INTERNAL; import static io.servicetalk.grpc.api.GrpcStatusCode.UNIMPLEMENTED; import static io.servicetalk.http.api.CharSequences.newAsciiString; @@ -68,12 +68,12 @@ final class GrpcUtils { private static final CharSequence GRPC_STATUS_MESSAGE_TRAILER = newAsciiString("grpc-message"); // TODO (nkant): add project version private static final CharSequence GRPC_USER_AGENT = newAsciiString("grpc-service-talk/"); - private static final CharSequence IDENTITY = newAsciiString(none().name()); + private static final CharSequence IDENTITY = newAsciiString(identity().name()); private static final CharSequence GRPC_MESSAGE_ENCODING_KEY = newAsciiString("grpc-encoding"); private static final CharSequence GRPC_ACCEPT_ENCODING_KEY = newAsciiString("grpc-accept-encoding"); private static final CharSequence GRPC_STATUS_UNIMPLEMENTED = newAsciiString("grpc-status-unimplemented"); private static final GrpcStatus STATUS_OK = GrpcStatus.fromCodeValue(GrpcStatusCode.OK.value()); - private static final Set GRPC_ACCEPT_ENCODING_NONE = Collections.singleton(none()); + private static final Set GRPC_ACCEPT_ENCODING_NONE = Collections.singleton(identity()); private static final TrailersTransformer ENSURE_GRPC_STATUS_RECEIVED = new StatelessTrailersTransformer() { @Override @@ -245,7 +245,7 @@ static GrpcMessageEncoding readGrpcMessageEncoding(final HttpMetaData httpMetaDa final Set allowedEncodings) { final CharSequence encoding = httpMetaData.headers().get(GRPC_MESSAGE_ENCODING_KEY); if (encoding == null) { - return none(); + return identity(); } GrpcMessageEncoding enc = encodingFor(allowedEncodings, encoding.toString()); @@ -281,8 +281,8 @@ static GrpcMessageEncoding firstMatchingEncodingOrNone(final HttpMetaData httpMe final Set serverSupportedEncodings) { // Fast path, server has no encodings configured or has only None configured as encoding if (serverSupportedEncodings.isEmpty() || - (serverSupportedEncodings.size() == 1 && serverSupportedEncodings.contains(none()))) { - return none(); + (serverSupportedEncodings.size() == 1 && serverSupportedEncodings.contains(identity()))) { + return identity(); } Set clientSupportedEncodings = @@ -294,8 +294,8 @@ static GrpcMessageEncoding firstMatchingEncodingOrNone(final Set serverSupportedEncodings) { // Fast path, Client has no encodings configured, or has None as the only encoding configured if (clientSupportedEncodings == GRPC_ACCEPT_ENCODING_NONE || - (clientSupportedEncodings.size() == 1 && clientSupportedEncodings.contains(none()))) { - return none(); + (clientSupportedEncodings.size() == 1 && clientSupportedEncodings.contains(identity()))) { + return identity(); } /* @@ -307,12 +307,12 @@ static GrpcMessageEncoding firstMatchingEncodingOrNone(final Set encodings) { StringBuilder builder = new StringBuilder(); for (GrpcMessageEncoding enc : encodings) { - if (enc == none()) { + if (enc == identity()) { continue; } diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GzipGrpcMessageCodec.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GzipMessageCodec.java similarity index 69% rename from servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GzipGrpcMessageCodec.java rename to servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GzipMessageCodec.java index 5041a6cd98..3ff97cf94b 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GzipGrpcMessageCodec.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GzipMessageCodec.java @@ -23,15 +23,17 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.InflaterInputStream; -class GzipGrpcMessageCodec extends ZipGrpcMessageCodec { +final class GzipMessageCodec extends AbstractZipMessageCodec { @Override - InflaterInputStream newCodecInputStream(final InputStream in) throws IOException { + InflaterInputStream newInflaterInputStream(final InputStream in) throws IOException { return new GZIPInputStream(in); } @Override - DeflaterOutputStream newCodecOutputStream(final OutputStream out) throws IOException { - return new GZIPOutputStream(out); + DeflaterOutputStream newDeflaterOutputStream(final OutputStream out) throws IOException { + // TODO tk - Optimization, we could rely on the Deflater directly to avoid the intermediate + // copy on the stream buffer + return new GZIPOutputStream(out, ONE_KB, true); } } diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/IdentityGrpcMessageCodec.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/IdentityMessageCodec.java similarity index 91% rename from servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/IdentityGrpcMessageCodec.java rename to servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/IdentityMessageCodec.java index 78ec9668bf..bca1b29115 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/IdentityGrpcMessageCodec.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/IdentityMessageCodec.java @@ -19,9 +19,9 @@ import io.servicetalk.buffer.api.BufferAllocator; /** - * NOOP Message encoding codec + * NOOP Message encoding codec. */ -final class IdentityGrpcMessageCodec implements GrpcMessageCodec { +final class IdentityMessageCodec implements MessageCodec { @Override public Buffer encode(final Buffer src, final int offset, int length, final BufferAllocator allocator) { diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageCodec.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/MessageCodec.java similarity index 61% rename from servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageCodec.java rename to servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/MessageCodec.java index 17062c3b21..fd7e5b483b 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcMessageCodec.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/MessageCodec.java @@ -19,10 +19,21 @@ import io.servicetalk.buffer.api.BufferAllocator; /** - * Codec used to encode and decode gRPC messages. - * This instance is shared across all requests/responses therefore it must provide thread safety semantics. + * Codec used to encode and decode {@link Buffer} content. + * This instance is shared therefore it must provide thread safety semantics. */ -public interface GrpcMessageCodec { +public interface MessageCodec { + + /** + * Take a {@link Buffer} and encode its contents resulting in a {@link Buffer} with the encoded contents. + * + * @param src the {@link Buffer} to encode + * @param allocator the {@link BufferAllocator} to use for allocating auxiliary buffers or the returned buffer + * @return {@link Buffer} the result buffer with the content encoded + */ + default Buffer encode(Buffer src, BufferAllocator allocator) { + return encode(src, src.readerIndex(), src.readableBytes(), allocator); + } /** * Take a {@link Buffer} and encode its contents resulting in a {@link Buffer} with the encoded contents. @@ -35,6 +46,17 @@ public interface GrpcMessageCodec { */ Buffer encode(Buffer src, int offset, int length, BufferAllocator allocator); + /** + * Take a {@link Buffer} and decode its contents resulting in a {@link Buffer} with the decoded content. + * + * @param src the {@link Buffer} to decode + * @param allocator the {@link BufferAllocator} to use for allocating auxiliary buffers or the returned buffer + * @return {@link Buffer} the result buffer with the content decoded + */ + default Buffer decode(Buffer src, BufferAllocator allocator) { + return decode(src, src.readerIndex(), src.readableBytes(), allocator); + } + /** * Take a {@link Buffer} and decode its contents resulting in a {@link Buffer} with the decoded content. * diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java index dc80564048..802b3aff71 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientRequiresTrailersTest.java @@ -46,7 +46,7 @@ import static io.servicetalk.concurrent.api.Publisher.from; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.grpc.api.GrpcExecutionStrategies.noOffloadsStrategy; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; import static io.servicetalk.http.api.HttpApiConversions.toHttpService; import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_TYPE; import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default; @@ -80,7 +80,7 @@ public GrpcClientRequiresTrailersTest(boolean streaming, boolean hasTrailers) th .version(request.version()) .setHeader(CONTENT_TYPE, "application/grpc") .payloadBody(from(TestResponse.newBuilder().setMessage("response").build()), - SERIALIZATION_PROVIDER.serializerFor(none(), TestResponse.class)); + SERIALIZATION_PROVIDER.serializerFor(identity(), TestResponse.class)); if (hasTrailers) { response.transform(new StatelessTrailersTransformer() { diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java index 90f49640d9..979c9037c3 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientValidatesContentTypeTest.java @@ -45,7 +45,7 @@ import static io.servicetalk.concurrent.api.Publisher.from; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.grpc.api.GrpcExecutionStrategies.noOffloadsStrategy; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; import static io.servicetalk.grpc.api.GrpcStatusCode.OK; import static io.servicetalk.http.api.HttpApiConversions.toHttpService; import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_TYPE; @@ -68,7 +68,7 @@ public final class GrpcClientValidatesContentTypeTest { new HttpSerializer() { final HttpSerializer delegate = SERIALIZATION_PROVIDER - .serializerFor(none(), TesterProto.TestResponse.class); + .serializerFor(identity(), TesterProto.TestResponse.class); @Override public Buffer serialize(final HttpHeaders headers, final TesterProto.TestResponse value, diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcMessageEncodingTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcMessageEncodingTest.java index a8719faa61..c68009470f 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcMessageEncodingTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcMessageEncodingTest.java @@ -20,12 +20,12 @@ import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; -import io.servicetalk.grpc.api.GrpcMessageCodec; import io.servicetalk.grpc.api.GrpcMessageEncoding; import io.servicetalk.grpc.api.GrpcServerBuilder; import io.servicetalk.grpc.api.GrpcServiceContext; import io.servicetalk.grpc.api.GrpcStatusCode; import io.servicetalk.grpc.api.GrpcStatusException; +import io.servicetalk.grpc.api.MessageCodec; import io.servicetalk.grpc.netty.TesterProto.TestRequest; import io.servicetalk.grpc.netty.TesterProto.TestResponse; import io.servicetalk.grpc.netty.TesterProto.Tester.TestRequestStreamMetadata; @@ -73,7 +73,7 @@ import static io.servicetalk.grpc.api.GrpcMessageEncodings.deflate; import static io.servicetalk.grpc.api.GrpcMessageEncodings.encodingFor; import static io.servicetalk.grpc.api.GrpcMessageEncodings.gzip; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; import static io.servicetalk.grpc.netty.TesterProto.Tester.ClientFactory; import static io.servicetalk.grpc.netty.TesterProto.Tester.ServiceFactory; import static io.servicetalk.grpc.netty.TesterProto.Tester.TestBiDiStreamMetadata; @@ -111,8 +111,8 @@ public String name() { } @Override - public GrpcMessageCodec codec() { - return new GrpcMessageCodec() { + public MessageCodec codec() { + return new MessageCodec() { private static final int OUGHT_TO_BE_ENOUGH = 1 << 20; @Override @@ -185,7 +185,7 @@ public Single handle(final HttpServiceContext ctx, assertEquals(GZIP_MAGIC, actualHeader); } - if (reqEncoding != none()) { + if (reqEncoding != identity()) { assertTrue("Compressed content length should be less than the " + "original payload size", buffer.readableBytes() < PAYLOAD_SIZE); } else { @@ -193,7 +193,7 @@ public Single handle(final HttpServiceContext ctx, "original payload size", buffer.readableBytes() > PAYLOAD_SIZE); } - assertEquals(reqEncoding != none() ? 1 : 0, compressedFlag); + assertEquals(reqEncoding != identity() ? 1 : 0, compressedFlag); } catch (Throwable t) { t.printStackTrace(); throw t; @@ -208,7 +208,7 @@ public Single handle(final HttpServiceContext ctx, final List expectedReqAcceptedEncodings = (clientSupportedEncodings == null) ? emptyList() : clientSupportedEncodings.stream() - .filter((enc) -> enc != none()) + .filter((enc) -> enc != identity()) .map((GrpcMessageEncoding::name)) .collect(toList()); @@ -231,7 +231,7 @@ public Single handle(final HttpServiceContext ctx, final List expectedRespAcceptedEncodings = (serverSupportedEncodings == null) ? emptyList() : serverSupportedEncodings.stream() - .filter((enc) -> enc != none()) + .filter((enc) -> enc != identity()) .map((GrpcMessageEncoding::name)) .collect(toList()); @@ -243,12 +243,12 @@ public Single handle(final HttpServiceContext ctx, .get(MESSAGE_ENCODING, "identity").toString(); if (clientSupportedEncodings == null) { - assertEquals(none().name(), respEncName); + assertEquals(identity().name(), respEncName); } else if (serverSupportedEncodings == null) { - assertEquals(none().name(), respEncName); + assertEquals(identity().name(), respEncName); } else { if (disjoint(serverSupportedEncodings, clientSupportedEncodings)) { - assertEquals(none().name(), respEncName); + assertEquals(identity().name(), respEncName); } else { assertNotNull("Response encoding not in the client supported list " + "[" + clientSupportedEncodings + "]", @@ -265,20 +265,20 @@ public Single handle(final HttpServiceContext ctx, response.transformPayloadBody(bufferPublisher -> bufferPublisher.map((buffer -> { try { final GrpcMessageEncoding respEnc = - encodingFor(clientSupportedEncodings == null ? of(none()) : + encodingFor(clientSupportedEncodings == null ? of(identity()) : clientSupportedEncodings, valueOf(response.headers() .get(MESSAGE_ENCODING, "identity"))); if (buffer.readableBytes() > 0) { byte compressedFlag = buffer.getByte(0); - assertEquals(respEnc != none() ? 1 : 0, compressedFlag); + assertEquals(respEnc != identity() ? 1 : 0, compressedFlag); if (respEnc == gzip() || respEnc.name().equals(CUSTOM_ENCODING.name())) { int actualHeader = buffer.getShortLE(5) & 0xFFFF; assertEquals(GZIP_MAGIC, actualHeader); } - if (respEnc != none()) { + if (respEnc != identity()) { assertTrue("Compressed content length should be less than the original " + "payload size", buffer.readableBytes() < PAYLOAD_SIZE); } else { @@ -369,23 +369,23 @@ public GrpcMessageEncodingTest(final Set serverSupportedEnc "request-encoding={2} expected-success={3}") public static Object[][] params() { return new Object[][] { - {null, null, none(), true}, - {null, of(gzip(), none()), gzip(), false}, - {null, of(deflate(), none()), deflate(), false}, - {of(gzip(), deflate(), none()), null, none(), true}, - {of(none(), gzip(), deflate()), of(gzip(), none()), gzip(), true}, - {of(none(), gzip(), deflate()), of(deflate(), none()), deflate(), true}, - {of(none(), gzip()), of(deflate(), none()), deflate(), false}, - {of(none(), deflate()), of(gzip(), none()), gzip(), false}, - {of(none(), deflate()), of(deflate(), none()), deflate(), true}, - {of(none(), deflate()), null, none(), true}, - {of(gzip()), of(none()), none(), true}, - {of(gzip()), of(gzip(), none()), none(), true}, - {of(gzip()), of(gzip(), none()), none(), true}, - {of(gzip()), of(gzip(), none()), gzip(), true}, - {null, of(gzip(), none()), gzip(), false}, - {null, of(gzip(), deflate(), none()), deflate(), false}, - {null, of(gzip(), none()), none(), true}, + {null, null, identity(), true}, + {null, of(gzip(), identity()), gzip(), false}, + {null, of(deflate(), identity()), deflate(), false}, + {of(gzip(), deflate(), identity()), null, identity(), true}, + {of(identity(), gzip(), deflate()), of(gzip(), identity()), gzip(), true}, + {of(identity(), gzip(), deflate()), of(deflate(), identity()), deflate(), true}, + {of(identity(), gzip()), of(deflate(), identity()), deflate(), false}, + {of(identity(), deflate()), of(gzip(), identity()), gzip(), false}, + {of(identity(), deflate()), of(deflate(), identity()), deflate(), true}, + {of(identity(), deflate()), null, identity(), true}, + {of(gzip()), of(identity()), identity(), true}, + {of(gzip()), of(gzip(), identity()), identity(), true}, + {of(gzip()), of(gzip(), identity()), identity(), true}, + {of(gzip()), of(gzip(), identity()), gzip(), true}, + {null, of(gzip(), identity()), gzip(), false}, + {null, of(gzip(), deflate(), identity()), deflate(), false}, + {null, of(gzip(), identity()), identity(), true}, {of(CUSTOM_ENCODING), of(CUSTOM_ENCODING), CUSTOM_ENCODING, true}, }; } diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java index f0609e2c06..a3412b5654 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java @@ -41,6 +41,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import java.util.Arrays; import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.Publisher.from; @@ -112,7 +113,7 @@ public void testAggregated() throws Exception { @Test public void testRequestStream() throws Exception { - assertResponse(client.testRequestStream(singleton(newRequest()))); + assertResponse(client.testRequestStream(Arrays.asList(newRequest(), newRequest()))); } @Test diff --git a/servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProvider.java b/servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProvider.java index 970b9119e1..f9d1a2060c 100644 --- a/servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProvider.java +++ b/servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProvider.java @@ -17,8 +17,8 @@ import io.servicetalk.buffer.api.Buffer; import io.servicetalk.buffer.api.CompositeBuffer; -import io.servicetalk.grpc.api.GrpcMessageCodec; import io.servicetalk.grpc.api.GrpcMessageEncoding; +import io.servicetalk.grpc.api.MessageCodec; import io.servicetalk.serialization.api.SerializationException; import io.servicetalk.serialization.api.SerializationProvider; import io.servicetalk.serialization.api.StreamingDeserializer; @@ -40,7 +40,7 @@ import static com.google.protobuf.CodedOutputStream.newInstance; import static com.google.protobuf.UnsafeByteOperations.unsafeWrap; import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; import static java.lang.Math.max; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -107,7 +107,7 @@ private static boolean isCompressed(Buffer buffer) throws SerializationException private static final class ProtoDeserializer implements StreamingDeserializer { private final Parser parser; private final CompositeBuffer accumulate; - private final GrpcMessageCodec encoder; + private final MessageCodec encoder; /** *
    *
  • {@code < 0} - read Length-Prefixed-Message header
  • @@ -273,12 +273,12 @@ private Iterable addToAccumulateIfRequiredAndReturn(final Buffer toDeserializ private static final class ProtoSerializer implements StreamingSerializer { - private final GrpcMessageCodec encoder; + private final MessageCodec encoder; private final boolean encode; ProtoSerializer(final GrpcMessageEncoding encoding) { this.encoder = encoding.codec(); - this.encode = encoding != none(); + this.encode = encoding != identity(); } @Override diff --git a/servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProviderBuilder.java b/servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProviderBuilder.java index 41ac73b6fe..7edff53442 100644 --- a/servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProviderBuilder.java +++ b/servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProviderBuilder.java @@ -40,7 +40,7 @@ import java.util.Map; import java.util.Set; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; import static io.servicetalk.http.api.CharSequences.newAsciiString; import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_TYPE; import static java.util.Arrays.asList; @@ -60,12 +60,12 @@ public final class ProtoBufSerializationProviderBuilder { private final Map> serializers = new HashMap<>(); private final Map> deserializers = new HashMap<>(); - private final Set supportedEncodings = new HashSet<>(asList(none())); + private final Set supportedEncodings = new HashSet<>(asList(identity())); /** * Set the supported message encodings for the serializers and deserializers. * The encodings will be advertised on the endpoint's headers and also used to validate each encoded message - * {@link GrpcMessageEncodings#none()} is always supported regardless of the config passed + * {@link GrpcMessageEncodings#identity()} is always supported regardless of the config passed * * @param supportedEncodings the set of allowed encodings * @param Type of {@link MessageLite} to register. @@ -75,7 +75,7 @@ public final class ProtoBufSerializationProviderBuilder { supportedMessageEncodings(final Set supportedEncodings) { this.supportedEncodings.clear(); this.supportedEncodings.addAll(supportedEncodings); - this.supportedEncodings.add(none()); // Always supported + this.supportedEncodings.add(identity()); // Always supported return this; } @@ -183,7 +183,6 @@ private static final class ProtoHttpSerializer implements private final Serializer serializer; private final GrpcMessageEncoding grpcMessageEncoding; private final Class type; - ProtoHttpSerializer(final Serializer serializer, final GrpcMessageEncoding grpcMessageEncoding, final Class type) { this.serializer = serializer; diff --git a/servicetalk-grpc-protobuf/src/test/java/io/servicetalk/grpc/protobuf/ProtoDeserializerTest.java b/servicetalk-grpc-protobuf/src/test/java/io/servicetalk/grpc/protobuf/ProtoDeserializerTest.java index adeec80ed5..45b146574c 100644 --- a/servicetalk-grpc-protobuf/src/test/java/io/servicetalk/grpc/protobuf/ProtoDeserializerTest.java +++ b/servicetalk-grpc-protobuf/src/test/java/io/servicetalk/grpc/protobuf/ProtoDeserializerTest.java @@ -28,7 +28,7 @@ import java.util.function.Function; import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; -import static io.servicetalk.grpc.api.GrpcMessageEncodings.none; +import static io.servicetalk.grpc.api.GrpcMessageEncodings.identity; import static io.servicetalk.grpc.protobuf.test.TestProtos.DummyMessage; import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; @@ -40,7 +40,7 @@ public class ProtoDeserializerTest { private final Parser parser = DummyMessage.parser(); private final ProtoBufSerializationProvider serializationProvider = - new ProtoBufSerializationProvider<>(DummyMessage.class, none(), parser); + new ProtoBufSerializationProvider<>(DummyMessage.class, identity(), parser); @Test public void zeroLengthMessageAligned() throws IOException { diff --git a/servicetalk-http-api/build.gradle b/servicetalk-http-api/build.gradle index b68399db5d..eca3519071 100644 --- a/servicetalk-http-api/build.gradle +++ b/servicetalk-http-api/build.gradle @@ -24,6 +24,7 @@ dependencies { api project(":servicetalk-serialization-api") api project(":servicetalk-transport-api") api project(":servicetalk-oio-api") + api project(":servicetalk-encoding-api") implementation project(":servicetalk-annotations") implementation project(":servicetalk-client-api-internal") diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractDelegatingHttpRequest.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractDelegatingHttpRequest.java index 14939c6513..39cdaecb11 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractDelegatingHttpRequest.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractDelegatingHttpRequest.java @@ -15,6 +15,7 @@ */ package io.servicetalk.http.api; +import io.servicetalk.encoding.api.ContentCodec; import io.servicetalk.transport.api.HostAndPort; import java.nio.charset.Charset; @@ -88,6 +89,11 @@ public HttpHeaders headers() { return original.headers(); } + @Override + public ContentCodec encoding() { + return original.encoding(); + } + @Override public String toString() { return original.toString(); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractDelegatingHttpResponse.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractDelegatingHttpResponse.java index cda7d730aa..d746487d74 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractDelegatingHttpResponse.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractDelegatingHttpResponse.java @@ -15,6 +15,8 @@ */ package io.servicetalk.http.api; +import io.servicetalk.encoding.api.ContentCodec; + abstract class AbstractDelegatingHttpResponse implements HttpResponseMetaData, PayloadInfo { final DefaultStreamingHttpResponse original; @@ -28,6 +30,11 @@ public HttpProtocolVersion version() { return original.version(); } + @Override + public ContentCodec encoding() { + return original.encoding(); + } + @Override public HttpHeaders headers() { return original.headers(); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractHttpMetaData.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractHttpMetaData.java index ab99abc0e9..c163b1a0fe 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractHttpMetaData.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractHttpMetaData.java @@ -15,12 +15,18 @@ */ package io.servicetalk.http.api; +import io.servicetalk.encoding.api.ContentCodec; + +import javax.annotation.Nullable; + import static java.util.Objects.requireNonNull; /** * Abstract base class for {@link HttpMetaData}. */ abstract class AbstractHttpMetaData implements HttpMetaData { + @Nullable + private ContentCodec encoding; private HttpProtocolVersion version; private final HttpHeaders headers; @@ -29,8 +35,15 @@ abstract class AbstractHttpMetaData implements HttpMetaData { this.headers = requireNonNull(headers); } + AbstractHttpMetaData(final HttpProtocolVersion version, final HttpHeaders headers, + @Nullable final ContentCodec encoding) { + this.version = requireNonNull(version); + this.headers = requireNonNull(headers); + this.encoding = encoding; + } + AbstractHttpMetaData(final AbstractHttpMetaData metaData) { - this(metaData.version, metaData.headers); + this(metaData.version, metaData.headers, metaData.encoding); } @Override @@ -44,6 +57,17 @@ public HttpMetaData version(final HttpProtocolVersion version) { return this; } + @Override + public HttpMetaData encoding(final ContentCodec encoding) { + this.encoding = requireNonNull(encoding); + return this; + } + + @Override + public ContentCodec encoding() { + return encoding; + } + @Override public final HttpHeaders headers() { return headers; diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/BlockingStreamingHttpRequest.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/BlockingStreamingHttpRequest.java index 0df77e2db0..7f8940112b 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/BlockingStreamingHttpRequest.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/BlockingStreamingHttpRequest.java @@ -21,6 +21,7 @@ import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.internal.CloseableIteratorBufferAsInputStream; +import io.servicetalk.encoding.api.ContentCodec; import java.io.InputStream; import java.nio.charset.Charset; @@ -247,6 +248,9 @@ default BlockingStreamingHttpRequest transformPayloadBody( @Override BlockingStreamingHttpRequest version(HttpProtocolVersion version); + @Override + BlockingStreamingHttpRequest encoding(ContentCodec encoding); + @Override BlockingStreamingHttpRequest method(HttpRequestMethod method); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/CharSequences.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/CharSequences.java index 171e9062df..a22d96c319 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/CharSequences.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/CharSequences.java @@ -87,7 +87,6 @@ public static List split(final CharSequence input, final char deli if (input.length() == 0) { return emptyList(); } - int startIndex = 0; List result = new ArrayList<>(); for (int i = 0; i < input.length(); i++) { diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/ContentCodingHttpRequesterFilter.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/ContentCodingHttpRequesterFilter.java new file mode 100644 index 0000000000..5000c72e2b --- /dev/null +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/ContentCodingHttpRequesterFilter.java @@ -0,0 +1,140 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.api; + +import io.servicetalk.buffer.api.BufferAllocator; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.encoding.api.ContentCodec; + +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; + +import static io.servicetalk.encoding.api.ContentCodings.identity; +import static io.servicetalk.http.api.CharSequences.newAsciiString; +import static io.servicetalk.http.api.HeaderUtils.identifyContentEncodingOrNullIfIdentity; +import static io.servicetalk.http.api.HeaderUtils.setAcceptEncoding; +import static io.servicetalk.http.api.HeaderUtils.setContentEncoding; + +/** + * A {@link StreamingHttpClientFilter} that adds encoding / decoding functionality for requests and responses + * respectively, as these are specified by the spec + * Content-Encoding. + * + *

    + * Append this filter before others that are expected to to see compressed content for this request/response, and after + * other filters that expect to manipulate the original payload. + */ +public final class ContentCodingHttpRequesterFilter + implements StreamingHttpClientFilterFactory, StreamingHttpConnectionFilterFactory, + HttpExecutionStrategyInfluencer { + + private final List supportedCodings; + @Nullable + private final CharSequence acceptedEncodingsHeader; + + /** + * Enable support of the provided encodings for requests and responses. + * The order of the codecs provided, matters for the presentation of the header, and may affect selection priority + * on the receiving endpoint. + * + * @param supportedCodings the codecs this clients supports to encode/decode requests and responses accordingly + * and also used to advertise to the server. + */ + public ContentCodingHttpRequesterFilter(final List supportedCodings) { + this.supportedCodings = new ArrayList<>(supportedCodings); + this.acceptedEncodingsHeader = buildAcceptEncodingsHeader(supportedCodings); + } + + @Override + public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) { + return new StreamingHttpClientFilter(client) { + @Override + protected Single request(final StreamingHttpRequester delegate, + final HttpExecutionStrategy strategy, + final StreamingHttpRequest request) { + return Single.defer(() -> codecTransformBidirectionalIfNeeded(delegate(), strategy, request)); + } + }; + } + + @Override + public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) { + return new StreamingHttpConnectionFilter(connection) { + @Override + public Single request(final HttpExecutionStrategy strategy, + final StreamingHttpRequest request) { + return Single.defer(() -> codecTransformBidirectionalIfNeeded(delegate(), strategy, request)); + } + }; + } + + @Override + public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) { + // No influence since we do not block. + return strategy; + } + + private Single codecTransformBidirectionalIfNeeded(final StreamingHttpRequester delegate, + final HttpExecutionStrategy strategy, + final StreamingHttpRequest request) { + final BufferAllocator alloc = delegate.executionContext().bufferAllocator(); + setAcceptEncoding(request.headers(), acceptedEncodingsHeader); + encodePayloadContentIfAvailable(request, alloc); + + return decodePayloadContentIfEncoded(delegate.request(strategy, request), alloc); + } + + private Single decodePayloadContentIfEncoded( + final Single responseSingle, final BufferAllocator allocator) { + + return responseSingle.map(response -> { + ContentCodec coding = identifyContentEncodingOrNullIfIdentity(response.headers(), supportedCodings); + if (coding != null) { + response.transformPayloadBody(bufferPublisher -> coding.decode(bufferPublisher, allocator)); + } + + return response; + }); + } + + @Nullable + private static CharSequence buildAcceptEncodingsHeader(final List codecs) { + StringBuilder builder = new StringBuilder(); + for (ContentCodec enc : codecs) { + if (enc == identity()) { + continue; + } + + if (builder.length() > 0) { + builder.append(", "); + } + + builder.append(enc.name()); + } + + return builder.length() > 0 ? newAsciiString(builder.toString()) : null; + } + + private static void encodePayloadContentIfAvailable(final StreamingHttpRequest request, + final BufferAllocator allocator) { + ContentCodec coding = request.encoding(); + if (coding != null && !coding.equals(identity())) { + setContentEncoding(request.headers(), coding.name()); + request.transformPayloadBody(pub -> coding.encode(pub, allocator)); + } + } +} diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/ContentCodingHttpServiceFilter.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/ContentCodingHttpServiceFilter.java new file mode 100644 index 0000000000..fcaccd0a0e --- /dev/null +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/ContentCodingHttpServiceFilter.java @@ -0,0 +1,155 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.api; + +import io.servicetalk.buffer.api.BufferAllocator; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.encoding.api.ContentCodec; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.encoding.api.ContentCodings.identity; +import static io.servicetalk.http.api.HeaderUtils.hasContentEncoding; +import static io.servicetalk.http.api.HeaderUtils.identifyContentEncodingOrNullIfIdentity; +import static io.servicetalk.http.api.HeaderUtils.negotiateAcceptedEncoding; +import static io.servicetalk.http.api.HeaderUtils.setContentEncoding; +import static java.util.Collections.emptyList; + +/** + * A {@link StreamingHttpService} that adds encoding / decoding functionality for responses and requests respectively, + * as these are specified by the spec + * Content-Encoding. + * + *

    + * Append this filter before others that are expected to to see compressed content for this request/response, and after + * other filters that expect to see/manipulate the original payload. + */ +public final class ContentCodingHttpServiceFilter + implements StreamingHttpServiceFilterFactory, HttpExecutionStrategyInfluencer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ContentCodingHttpServiceFilter.class); + + private final List requestCodings; + private final List responseCodings; + + /** + * Enable support of the provided encodings for this server's responses. + * The encodings will be used for server responses compression where enabled and matched with client ones. + *

    + * Client requests that have compressed payloads will be rejected. + * To enable support of compressed requests, see {@link #ContentCodingHttpServiceFilter(List, List)}. + * + * @param supportedCodings the codecs used to compress responses when allowed. + */ + public ContentCodingHttpServiceFilter(final List supportedCodings) { + this.requestCodings = emptyList(); + this.responseCodings = new ArrayList<>(supportedCodings); + } + + /** + * Enable support of the provided encodings for both client requests and server responses. + * The encodings can differ for requests and responses, allowing a server that supports different compressions for + * requests and different ones for responses. + *

    + * To disable support of compressed requests use an {@link Collections#emptyList()} for the + * supportedRequestCodings param or use {@link #ContentCodingHttpServiceFilter(List)} constructor + * instead. + *

    + * The order of the codecs provided, affect selection priority alongside the order of the incoming + * accept-encoding header from the client. + * + * @param supportedRequestCodings the codecs used to decompress client requests if compressed. + * @param supportedResponseCodings the codecs used to compress server responses if client accepts them. + */ + public ContentCodingHttpServiceFilter(final List supportedRequestCodings, + final List supportedResponseCodings) { + this.requestCodings = new ArrayList<>(supportedRequestCodings); + this.responseCodings = new ArrayList<>(supportedResponseCodings); + } + + @Override + public StreamingHttpServiceFilter create(final StreamingHttpService service) { + return new StreamingHttpServiceFilter(service) { + @Override + public Single handle(final HttpServiceContext ctx, + final StreamingHttpRequest request, + final StreamingHttpResponseFactory responseFactory) { + + return Single.defer(() -> { + BufferAllocator allocator = ctx.executionContext().bufferAllocator(); + try { + ContentCodec coding = + identifyContentEncodingOrNullIfIdentity(request.headers(), requestCodings); + if (coding != null) { + request.transformPayloadBody(bufferPublisher -> coding.decode(bufferPublisher, allocator)); + } + + return super.handle(ctx, request, responseFactory).map(response -> { + encodePayloadContentIfAvailable(request.headers(), responseCodings, response, allocator); + return response; + }); + } catch (UnsupportedContentEncodingException cause) { + LOGGER.error("Request failed for service={}, connection={}", service, this, cause); + // see https://tools.ietf.org/html/rfc7231#section-3.1.2.2 + return succeeded(responseFactory.unsupportedMediaType()); + } + }); + } + }; + } + + @Override + public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) { + // No influence - no blocking + return strategy; + } + + private static void encodePayloadContentIfAvailable(final HttpHeaders requestHeaders, + final List supportedEncodings, + final StreamingHttpResponse response, + final BufferAllocator allocator) { + if (supportedEncodings.isEmpty() || hasContentEncoding(response.headers())) { + return; + } + + ContentCodec coding = codingForResponse(requestHeaders, response, supportedEncodings); + if (coding != null) { + setContentEncoding(response.headers(), coding.name()); + response.transformPayloadBody(bufferPublisher -> coding.encode(bufferPublisher, allocator)); + } + } + + @Nullable + private static ContentCodec codingForResponse(final HttpHeaders requestHeaders, + final StreamingHttpResponse response, + final List supportedEncodings) { + // Enforced selection + ContentCodec encoding = response.encoding(); + if (encoding == null) { + // Negotiated from client headers and server config + encoding = negotiateAcceptedEncoding(requestHeaders, supportedEncodings); + } + + return encoding == identity() ? null : encoding; + } +} diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpRequest.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpRequest.java index f6a159d8a6..458c406ad6 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpRequest.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpRequest.java @@ -19,6 +19,7 @@ import io.servicetalk.concurrent.BlockingIterable; import io.servicetalk.concurrent.CloseableIterable; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.encoding.api.ContentCodec; import java.io.InputStream; import java.nio.charset.Charset; @@ -42,6 +43,12 @@ public BlockingStreamingHttpRequest version(final HttpProtocolVersion version) { return this; } + @Override + public BlockingStreamingHttpRequest encoding(final ContentCodec encoding) { + original.encoding(encoding); + return this; + } + @Override public BlockingStreamingHttpRequest method(final HttpRequestMethod method) { original.method(method); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpResponse.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpResponse.java index 57655ad6a4..4c65072d89 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpResponse.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpResponse.java @@ -19,6 +19,7 @@ import io.servicetalk.concurrent.BlockingIterable; import io.servicetalk.concurrent.CloseableIterable; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.encoding.api.ContentCodec; import java.io.InputStream; import java.util.function.Function; @@ -122,6 +123,12 @@ public BlockingStreamingHttpResponse version(final HttpProtocolVersion version) return this; } + @Override + public BlockingStreamingHttpResponse encoding(final ContentCodec encoding) { + original.encoding(encoding); + return this; + } + @Override public BlockingStreamingHttpResponse status(final HttpResponseStatus status) { original.status(status); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpRequest.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpRequest.java index 048cdfc0d8..5f2de4286e 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpRequest.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpRequest.java @@ -17,6 +17,7 @@ import io.servicetalk.buffer.api.Buffer; import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.encoding.api.ContentCodec; import java.nio.charset.Charset; import javax.annotation.Nullable; @@ -43,6 +44,12 @@ public HttpRequest version(final HttpProtocolVersion version) { return this; } + @Override + public HttpRequest encoding(final ContentCodec encoding) { + original.encoding(encoding); + return this; + } + @Override public HttpRequest method(final HttpRequestMethod method) { original.method(method); @@ -227,7 +234,7 @@ public HttpHeaders catchPayloadFailure(final Object __, final Throwable cause, f @Override public StreamingHttpRequest toStreamingRequest() { Publisher payload = trailers != null ? from(payloadBody, trailers) : from(payloadBody); - return new DefaultStreamingHttpRequest(method(), requestTarget(), version(), headers(), + return new DefaultStreamingHttpRequest(method(), requestTarget(), version(), headers(), encoding(), original.payloadHolder().allocator(), payload, new DefaultPayloadInfo(this), original.payloadHolder().headersFactory()); } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpRequestMetaData.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpRequestMetaData.java index 6f6d1b3a44..a75964cd14 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpRequestMetaData.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpRequestMetaData.java @@ -15,6 +15,7 @@ */ package io.servicetalk.http.api; +import io.servicetalk.encoding.api.ContentCodec; import io.servicetalk.transport.api.HostAndPort; import java.nio.charset.Charset; @@ -69,6 +70,12 @@ public HttpRequestMetaData version(final HttpProtocolVersion version) { return this; } + @Override + public HttpMetaData encoding(final ContentCodec encoding) { + super.encoding(encoding); + return this; + } + @Override public final HttpRequestMethod method() { return method; diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpResponse.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpResponse.java index 489f1e362c..e188c32564 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpResponse.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpResponse.java @@ -17,6 +17,7 @@ import io.servicetalk.buffer.api.Buffer; import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.encoding.api.ContentCodec; import javax.annotation.Nullable; @@ -42,6 +43,12 @@ public HttpResponse version(final HttpProtocolVersion version) { return this; } + @Override + public HttpResponse encoding(final ContentCodec encoding) { + original.encoding(encoding); + return this; + } + @Override public HttpResponse status(final HttpResponseStatus status) { original.status(status); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultStreamingHttpRequest.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultStreamingHttpRequest.java index a525af87cf..161b3098fb 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultStreamingHttpRequest.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultStreamingHttpRequest.java @@ -19,6 +19,7 @@ import io.servicetalk.buffer.api.BufferAllocator; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.encoding.api.ContentCodec; import java.nio.charset.Charset; import java.util.function.Function; @@ -32,10 +33,13 @@ final class DefaultStreamingHttpRequest extends DefaultHttpRequestMetaData DefaultStreamingHttpRequest(final HttpRequestMethod method, final String requestTarget, final HttpProtocolVersion version, final HttpHeaders headers, - final BufferAllocator allocator, @Nullable final Publisher payloadBody, - final DefaultPayloadInfo payloadInfo, + @Nullable final ContentCodec encoding, final BufferAllocator allocator, + @Nullable final Publisher payloadBody, final DefaultPayloadInfo payloadInfo, final HttpHeadersFactory headersFactory) { super(method, requestTarget, version, headers); + if (encoding != null) { + encoding(encoding); + } payloadHolder = new StreamingHttpPayloadHolder(headers, allocator, payloadBody, payloadInfo, headersFactory, version); } @@ -46,6 +50,12 @@ public StreamingHttpRequest version(final HttpProtocolVersion version) { return this; } + @Override + public StreamingHttpRequest encoding(final ContentCodec encoding) { + super.encoding(encoding); + return this; + } + @Override public StreamingHttpRequest method(final HttpRequestMethod method) { super.method(method); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultStreamingHttpResponse.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultStreamingHttpResponse.java index 5d7f501bac..beac6c0963 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultStreamingHttpResponse.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultStreamingHttpResponse.java @@ -19,6 +19,7 @@ import io.servicetalk.buffer.api.BufferAllocator; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.encoding.api.ContentCodec; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -50,6 +51,12 @@ public StreamingHttpResponse status(final HttpResponseStatus status) { return this; } + @Override + public StreamingHttpResponse encoding(final ContentCodec encoding) { + super.encoding(encoding); + return this; + } + @Override public Publisher payloadBody() { return payloadHolder.payloadBody(); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HeaderUtils.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HeaderUtils.java index 80621931fb..73329bb502 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HeaderUtils.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HeaderUtils.java @@ -16,9 +16,13 @@ package io.servicetalk.http.api; import io.servicetalk.buffer.api.ByteProcessor; +import io.servicetalk.encoding.api.ContentCodec; +import io.servicetalk.encoding.api.ContentCodings; import io.servicetalk.serialization.api.SerializationException; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,14 +32,19 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; +import static io.servicetalk.encoding.api.ContentCodings.identity; import static io.servicetalk.http.api.CharSequences.caseInsensitiveHashCode; import static io.servicetalk.http.api.CharSequences.contentEquals; import static io.servicetalk.http.api.CharSequences.contentEqualsIgnoreCase; import static io.servicetalk.http.api.CharSequences.indexOf; import static io.servicetalk.http.api.CharSequences.regionMatches; +import static io.servicetalk.http.api.CharSequences.split; +import static io.servicetalk.http.api.HttpHeaderNames.ACCEPT_ENCODING; +import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_ENCODING; import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH; import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_TYPE; import static io.servicetalk.http.api.HttpHeaderNames.TRANSFER_ENCODING; +import static io.servicetalk.http.api.HttpHeaderNames.VARY; import static io.servicetalk.http.api.HttpHeaderValues.CHUNKED; import static io.servicetalk.http.api.NetUtils.isValidIpV4Address; import static io.servicetalk.http.api.NetUtils.isValidIpV6Address; @@ -44,7 +53,9 @@ import static java.nio.charset.Charset.availableCharsets; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableMap; +import static java.util.Objects.requireNonNull; import static java.util.regex.Pattern.CASE_INSENSITIVE; import static java.util.regex.Pattern.compile; import static java.util.regex.Pattern.quote; @@ -78,6 +89,7 @@ public final class HeaderUtils { private static final Pattern HAS_CHARSET_PATTERN = compile(".+;\\s*charset=.+", CASE_INSENSITIVE); private static final Map CHARSET_PATTERNS; + public static final List NONE_CONTENT_ENCODING_SINGLETON = singletonList(identity()); static { CHARSET_PATTERNS = unmodifiableMap(availableCharsets().entrySet().stream() @@ -226,6 +238,23 @@ static void addChunkedEncoding(final HttpHeaders headers) { } } + static void setContentEncoding(final HttpHeaders headers, CharSequence encoding) { + // H2 does not support TE / Transfer-Encoding, so we rely in the presentation encoding only. + // https://tools.ietf.org/html/rfc7540#section-8.1.2.2 + headers.set(CONTENT_ENCODING, encoding); + headers.set(VARY, CONTENT_ENCODING); + } + + static boolean hasContentEncoding(final HttpHeaders headers) { + return headers.contains(CONTENT_ENCODING); + } + + static void setAcceptEncoding(final HttpHeaders headers, @Nullable final CharSequence encodings) { + if (encodings != null && !headers.contains(ACCEPT_ENCODING)) { + headers.set(ACCEPT_ENCODING, encodings); + } + } + static void validateCookieNameAndValue(final CharSequence cookieName, final CharSequence cookieValue) { if (cookieName == null || cookieName.length() == 0) { throw new IllegalArgumentException("Null or empty cookie names are not allowed."); @@ -636,6 +665,137 @@ private HttpCookiePair findNext(CharSequence cookieHeaderValue) { } } + /** + * Establish a commonly accepted encoding between server and client, according to the supported-encodings + * on the server side and the {@code 'Accepted-Encoding'} incoming header on the request. + *

    + * If no supported encodings are configured then the result is always {@code null} + * If no accepted encodings are present in the request then the result is always {@code null} + * In all other cases, the first matching encoding (that is NOT {@link ContentCodings#identity()}) is preferred, + * otherwise {@code null} is returned. + * + * @param headers The request headers + * @param serverSupportedEncodings The supported encodings as configured for the server + * @return The {@link ContentCodec} that satisfies both client and server needs, + * null if none found or matched to {@link ContentCodings#identity()} + */ + @Nullable + static ContentCodec negotiateAcceptedEncoding( + final HttpHeaders headers, final List serverSupportedEncodings) { + + // Fast path, server has no encodings configured or has only identity configured as encoding + if (serverSupportedEncodings.isEmpty() || + (serverSupportedEncodings.size() == 1 && serverSupportedEncodings.contains(identity()))) { + return null; + } + + List clientSupportedEncodings = readAcceptEncoding(headers, serverSupportedEncodings); + return negotiateAcceptedEncoding(clientSupportedEncodings, serverSupportedEncodings); + } + + @Nullable + static ContentCodec negotiateAcceptedEncoding(final List clientSupportedEncodings, + final List allowedEncodings) { + // Fast path, Client has no encodings configured, or has identity as the only encoding configured + if (clientSupportedEncodings == NONE_CONTENT_ENCODING_SINGLETON || + (clientSupportedEncodings.size() == 1 && clientSupportedEncodings.contains(identity()))) { + return null; + } + + for (ContentCodec encoding : allowedEncodings) { + if (encoding != identity() && clientSupportedEncodings.contains(encoding)) { + return encoding; + } + } + + return null; + } + + static List readAcceptEncoding(final HttpHeaders headers, + final List allowedEncodings) { + final CharSequence acceptEncodingsHeaderVal = headers.get(ACCEPT_ENCODING); + + if (acceptEncodingsHeaderVal == null || acceptEncodingsHeaderVal.length() == 0) { + return NONE_CONTENT_ENCODING_SINGLETON; + } + + List knownEncodings = new ArrayList<>(); + List acceptEncodingValues = split(acceptEncodingsHeaderVal, ','); + for (CharSequence val : acceptEncodingValues) { + ContentCodec enc = encodingFor(allowedEncodings, val.toString().trim()); + if (enc != null) { + knownEncodings.add(enc); + } + } + + return knownEncodings; + } + + /** + * Attempts to identify the {@link ContentCodec} from a name, as found in the {@code 'Content-Encoding'} + * header of a request or a response. + * If the name can not be matched to any of the supported encodings on this endpoint, then + * a {@link UnsupportedContentEncodingException} is thrown. + * If the matched encoding is {@link ContentCodings#identity()} then this returns {@code null}. + * + * @param headers The headers to read the encoding name from + * @param allowedEncodings The supported encodings for this endpoint + * @return The {@link ContentCodec} that matches the name or null if matches to identity + */ + @Nullable + static ContentCodec identifyContentEncodingOrNullIfIdentity( + final HttpHeaders headers, final List allowedEncodings) { + + final CharSequence encoding = headers.get(CONTENT_ENCODING); + if (encoding == null) { + return null; + } + + ContentCodec enc = encodingFor(allowedEncodings, encoding); + if (enc == null) { + throw new UnsupportedContentEncodingException(encoding.toString()); + } + + return enc == identity() ? null : enc; + } + + /** + * Returns the {@link ContentCodec} that matches the {@code name} within the {@code allowedList}. + * if {@code name} is {@code null} or empty it results in {@code null} . + * If {@code name} is {@code 'identity'} this will always result in + * {@link ContentCodings#identity()} regardless of its presence in the {@code allowedList}. + * + * @param allowedList the source list to find a matching codec from. + * @param name the codec name used for the equality predicate. + * @return a codec from the allowed-list that name matches the {@code name}. + */ + @Nullable + static ContentCodec encodingFor(final Collection allowedList, + @Nullable final CharSequence name) { + requireNonNull(allowedList); + if (name == null || name.length() == 0) { + return null; + } + + // Identity is always supported, regardless of its presence in the allowed-list + if (contentEqualsIgnoreCase(name, identity().name())) { + return identity(); + } + + for (ContentCodec enumEnc : allowedList) { + // Encoding values can potentially included compression configurations, we only match on the type + if (startsWith(name, enumEnc.name())) { + return enumEnc; + } + } + + return null; + } + + private static boolean startsWith(final CharSequence string, final CharSequence prefix) { + return regionMatches(string, true, 0, prefix, 0, prefix.length()); + } + /** * Checks if the provider headers contain a {@code Content-Type} header that matches the specified content type, * and optionally the provided charset. diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpMetaData.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpMetaData.java index de24157db2..1ebe777b4d 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpMetaData.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpMetaData.java @@ -15,7 +15,11 @@ */ package io.servicetalk.http.api; +import io.servicetalk.encoding.api.ContentCodec; +import io.servicetalk.encoding.api.ContentCodings; + import java.util.function.BiFunction; +import javax.annotation.Nullable; import static java.lang.System.lineSeparator; @@ -46,6 +50,30 @@ public interface HttpMetaData { */ HttpHeaders headers(); + /** + * The {@link ContentCodec} used to encode the payload of a request or a response. + * If the endpoint is setup with {@link ContentCodingHttpServiceFilter}, the server will + * auto-establish the accepted encoding for the response, unless the caller provides a specific encoding + * by calling this method. + * + * Any encoding passed here, takes precedence. In other words, a compressed response, can + * be disabled by passing {@link ContentCodings#identity()}. + * + * @param encoding The {@link ContentCodec} used for the encoding of the payload. + * @return {@code this}. + * @see Content-Encoding + */ + HttpMetaData encoding(ContentCodec encoding); + + /** + * Returns the {@link ContentCodec} used to encode the payload of a request or a response. + * + * @return The {@link ContentCodec} used for the encoding of the payload. + * @see Content-Encoding + */ + @Nullable + ContentCodec encoding(); + /** * Adds a new header with the specified {@code name} and {@code value}. * diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpRequest.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpRequest.java index 05a9268414..f9fd23fdea 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpRequest.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpRequest.java @@ -16,6 +16,7 @@ package io.servicetalk.http.api; import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.encoding.api.ContentCodec; import java.nio.charset.Charset; import javax.annotation.Nullable; @@ -110,6 +111,9 @@ default T payloadBody(HttpDeserializer deserializer) { @Override HttpRequest version(HttpProtocolVersion version); + @Override + HttpRequest encoding(ContentCodec encoding); + @Override HttpRequest method(HttpRequestMethod method); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequest.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequest.java index ea83b8ef6b..1e9742a301 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequest.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequest.java @@ -18,6 +18,7 @@ import io.servicetalk.buffer.api.Buffer; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.encoding.api.ContentCodec; import java.nio.charset.Charset; import java.util.function.Function; @@ -197,6 +198,9 @@ default StreamingHttpRequest transformPayloadBody(Function, @Override StreamingHttpRequest method(HttpRequestMethod method); + @Override + StreamingHttpRequest encoding(ContentCodec encoding); + @Override StreamingHttpRequest requestTarget(String requestTarget); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequests.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequests.java index 3665b6359a..be3508d145 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequests.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequests.java @@ -43,8 +43,9 @@ private StreamingHttpRequests() { */ public static StreamingHttpRequest newRequest( final HttpRequestMethod method, final String requestTarget, final HttpProtocolVersion version, - final HttpHeaders headers, final BufferAllocator allocator, final HttpHeadersFactory headersFactory) { - return new DefaultStreamingHttpRequest(method, requestTarget, version, headers, allocator, null, + final HttpHeaders headers, final BufferAllocator allocator, + final HttpHeadersFactory headersFactory) { + return new DefaultStreamingHttpRequest(method, requestTarget, version, headers, null, allocator, null, forUserCreated(headers), headersFactory); } @@ -66,9 +67,9 @@ public static StreamingHttpRequest newRequest( */ public static StreamingHttpRequest newTransportRequest( final HttpRequestMethod method, final String requestTarget, final HttpProtocolVersion version, - final HttpHeaders headers, final BufferAllocator allocator, final Publisher payload, - final HttpHeadersFactory headersFactory) { - return new DefaultStreamingHttpRequest(method, requestTarget, version, headers, allocator, payload, + final HttpHeaders headers, final BufferAllocator allocator, + final Publisher payload, final HttpHeadersFactory headersFactory) { + return new DefaultStreamingHttpRequest(method, requestTarget, version, headers, null, allocator, payload, forTransportReceive(headers), headersFactory); } } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpResponse.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpResponse.java index 66039c2626..9f9c439941 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpResponse.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpResponse.java @@ -18,6 +18,7 @@ import io.servicetalk.buffer.api.Buffer; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.encoding.api.ContentCodec; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -160,6 +161,9 @@ default StreamingHttpResponse transformPayloadBody(Function, @Override StreamingHttpResponse version(HttpProtocolVersion version); + @Override + StreamingHttpResponse encoding(ContentCodec encoding); + @Override default StreamingHttpResponse addHeader(final CharSequence name, final CharSequence value) { HttpResponseMetaData.super.addHeader(name, value); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/UnsupportedContentEncodingException.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/UnsupportedContentEncodingException.java new file mode 100644 index 0000000000..96acaad42b --- /dev/null +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/UnsupportedContentEncodingException.java @@ -0,0 +1,44 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.api; + +/** + * Exception thrown when a payload was encoded with an unsupported encoder. + */ +final class UnsupportedContentEncodingException extends RuntimeException { + + private static final long serialVersionUID = 5645078707423180235L; + + private final String encoding; + + /** + * New instance. + * + * @param encoding the name of the encoding used + */ + UnsupportedContentEncodingException(String encoding) { + super("Compression " + encoding + " not supported"); + this.encoding = encoding; + } + + /** + * The name of the encoding used when the Exception was thrown. + * @return the name of the encoding used when the Exception was thrown + */ + public String encoding() { + return encoding; + } +} diff --git a/servicetalk-http-api/src/test/java/io/servicetalk/http/api/RequestConversionTests.java b/servicetalk-http-api/src/test/java/io/servicetalk/http/api/RequestConversionTests.java index 261dfdfeeb..b2fede5a3a 100644 --- a/servicetalk-http-api/src/test/java/io/servicetalk/http/api/RequestConversionTests.java +++ b/servicetalk-http-api/src/test/java/io/servicetalk/http/api/RequestConversionTests.java @@ -26,6 +26,7 @@ import java.util.function.Supplier; import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; +import static io.servicetalk.encoding.api.ContentCodings.identity; import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; import static io.servicetalk.http.api.HttpRequestMethod.GET; import static org.hamcrest.MatcherAssert.assertThat; @@ -55,7 +56,7 @@ public static List data() { private static Object[] newParam(final DefaultPayloadInfo payloadInfo, final String paramName) { return new Object[]{(Supplier) () -> new DefaultStreamingHttpRequest(GET, "/", HTTP_1_1, - DefaultHttpHeadersFactory.INSTANCE.newHeaders(), DEFAULT_ALLOCATOR, + DefaultHttpHeadersFactory.INSTANCE.newHeaders(), identity(), DEFAULT_ALLOCATOR, new SingleSubscribePublisher(payloadInfo), payloadInfo, DefaultHttpHeadersFactory.INSTANCE), payloadInfo, paramName}; } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ProtocolConfigBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ProtocolConfigBuilder.java index 8feff788e8..7fd03fa0e5 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ProtocolConfigBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ProtocolConfigBuilder.java @@ -163,5 +163,5 @@ public UserDataLoggerConfig frameLoggerConfig() { public KeepAlivePolicy keepAlivePolicy() { return keepAlivePolicy; } - } + } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index 763fb3bb4e..433f0e147d 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -149,6 +149,7 @@ protected void initChannel(final Http2StreamChannel streamChannel) { parentChannelInitializer.multiplexedObserver.onNewStream(); // Netty To ServiceTalk type conversion + streamChannel.pipeline().addLast(new H2ToStH1ServerDuplexHandler( connection.executionContext().bufferAllocator(), h2ServerConfig.headersFactory(), diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HeaderUtils.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HeaderUtils.java index 3a995e87be..d6bd72c6c2 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HeaderUtils.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HeaderUtils.java @@ -37,6 +37,7 @@ import static io.servicetalk.http.api.HeaderUtils.isTransferEncodingChunked; import static io.servicetalk.http.api.HttpApiConversions.isSafeToAggregate; import static io.servicetalk.http.api.HttpApiConversions.mayHaveTrailers; +import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_ENCODING; import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH; import static io.servicetalk.http.api.HttpHeaderNames.TRANSFER_ENCODING; import static io.servicetalk.http.api.HttpHeaderValues.CHUNKED; @@ -111,7 +112,9 @@ static boolean canAddResponseTransferEncodingProtocol(final int statusCode, } private static boolean canAddContentLength(final HttpMetaData metadata) { - return !hasContentHeaders(metadata.headers()) && + // TODO once this bug is addressed (https://github.com/apple/servicetalk/pull/1213) + // we should relax the check here, remove content-encoding clause + return !hasContentHeaders(metadata.headers()) && !hasContentEncoding(metadata.headers()) && isSafeToAggregate(metadata) && !mayHaveTrailers(metadata); } @@ -230,6 +233,10 @@ private static boolean hasContentHeaders(final HttpHeaders headers) { return headers.contains(CONTENT_LENGTH) || isTransferEncodingChunked(headers); } + private static boolean hasContentEncoding(final HttpHeaders headers) { + return headers.contains(CONTENT_ENCODING); + } + private static boolean isEmptyConnectResponse(final HttpRequestMethod requestMethod, final int statusCode) { // A server MUST NOT send any Transfer-Encoding or Content-Length header fields in a 2xx (Successful) response // to CONNECT. diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/api/ContentCodingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/api/ContentCodingTest.java new file mode 100644 index 0000000000..4cd3136ae5 --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/api/ContentCodingTest.java @@ -0,0 +1,330 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.api; + +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; +import io.servicetalk.encoding.api.ContentCodec; +import io.servicetalk.http.netty.HttpClients; +import io.servicetalk.http.netty.HttpServers; +import io.servicetalk.transport.api.ServerContext; + +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; + +import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.encoding.api.ContentCodings.deflateDefault; +import static io.servicetalk.encoding.api.ContentCodings.gzipDefault; +import static io.servicetalk.encoding.api.ContentCodings.identity; +import static io.servicetalk.http.api.CharSequences.contentEquals; +import static io.servicetalk.http.api.HeaderUtils.encodingFor; +import static io.servicetalk.http.api.HttpHeaderNames.ACCEPT_ENCODING; +import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_ENCODING; +import static io.servicetalk.http.api.HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE; +import static io.servicetalk.http.api.HttpSerializationProviders.textDeserializer; +import static io.servicetalk.http.api.HttpSerializationProviders.textSerializer; +import static io.servicetalk.http.netty.HttpProtocolConfigs.h1Default; +import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default; +import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; +import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort; +import static java.lang.String.valueOf; +import static java.util.Arrays.asList; +import static java.util.Arrays.stream; +import static java.util.Collections.disjoint; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class ContentCodingTest { + + private static final int PAYLOAD_SIZE = 1024; + + private static final Function REQ_RESP_VERIFIER = (options) + -> new StreamingHttpServiceFilterFactory() { + @Override + public StreamingHttpServiceFilter create(final StreamingHttpService service) { + return new StreamingHttpServiceFilter(service) { + @Override + + public Single handle(final HttpServiceContext ctx, + final StreamingHttpRequest request, + final StreamingHttpResponseFactory responseFactory) { + final ContentCodec reqEncoding = options.requestEncoding; + final List clientSupportedEncodings = options.clientSupported; + + try { + + String requestPayload = request.payloadBody(textDeserializer()) + .collect(StringBuilder::new, StringBuilder::append) + .toFuture().get().toString(); + + assertEquals(payload((byte) 'a'), requestPayload); + + final List actualReqAcceptedEncodings = stream(request.headers() + .get(ACCEPT_ENCODING, "NOT_PRESENT").toString().split(",")) + .map((String::trim)).collect(toList()); + + final List expectedReqAcceptedEncodings = clientSupportedEncodings.stream() + .filter((enc) -> enc != identity()) + .map((ContentCodec::name)) + .map(CharSequence::toString) + .collect(toList()); + + if (reqEncoding != identity()) { + assertTrue("Request encoding should be present in the request headers", + contentEquals(reqEncoding.name(), + request.headers().get(ACCEPT_ENCODING, "null"))); + } + + if (!expectedReqAcceptedEncodings.isEmpty() && !actualReqAcceptedEncodings.isEmpty()) { + assertThat(actualReqAcceptedEncodings, equalTo(expectedReqAcceptedEncodings)); + } + + return super.handle(ctx, request, responseFactory); + } catch (Throwable t) { + t.printStackTrace(); + return succeeded(responseFactory.badRequest()); + } + } + }; + } + }; + + @Rule + public final Timeout timeout = new ServiceTalkTestTimeout(); + + private final HttpServerBuilder httpServerBuilder; + private final ServerContext serverContext; + private final HttpClient client; + protected final TestEncodingScenario testEncodingScenario; + private final boolean expectedSuccess; + + public ContentCodingTest(final List serverSupportedEncodings, + final List clientSupportedEncodings, + final ContentCodec requestEncoding, final boolean expectedSuccess, + final HttpProtocolConfig protocol) throws Exception { + this.testEncodingScenario = new TestEncodingScenario(requestEncoding, clientSupportedEncodings, + serverSupportedEncodings, protocol); + this.expectedSuccess = expectedSuccess; + + httpServerBuilder = HttpServers.forAddress(localAddress(0)); + serverContext = listenAndAwait(); + client = newClient(); + } + + @Parameterized.Parameters(name = + "server-supported-encodings={0} " + + "client-supported-encodings={1} " + + "request-encoding={2} " + + "expected-success={3} " + + "protocol={4}") + public static Object[][] params() { + return new Object[][] { + {emptyList(), emptyList(), identity(), true, h1Default()}, + {emptyList(), emptyList(), identity(), true, h2Default()}, + {emptyList(), asList(gzipDefault(), identity()), gzipDefault(), false, h1Default()}, + {emptyList(), asList(gzipDefault(), identity()), gzipDefault(), false, h2Default()}, + {emptyList(), asList(deflateDefault(), identity()), deflateDefault(), false, h1Default()}, + {emptyList(), asList(deflateDefault(), identity()), deflateDefault(), false, h2Default()}, + {asList(gzipDefault(), deflateDefault(), identity()), emptyList(), identity(), true, h1Default()}, + {asList(gzipDefault(), deflateDefault(), identity()), emptyList(), identity(), true, h2Default()}, + {asList(identity(), gzipDefault(), deflateDefault()), + asList(gzipDefault(), identity()), gzipDefault(), true, h1Default()}, + {asList(identity(), gzipDefault(), deflateDefault()), + asList(gzipDefault(), identity()), gzipDefault(), true, h2Default()}, + {asList(identity(), gzipDefault(), deflateDefault()), + asList(deflateDefault(), identity()), deflateDefault(), true, h1Default()}, + {asList(identity(), gzipDefault(), deflateDefault()), + asList(deflateDefault(), identity()), deflateDefault(), true, h2Default()}, + {asList(identity(), gzipDefault()), asList(deflateDefault(), identity()), + deflateDefault(), false, h1Default()}, + {asList(identity(), gzipDefault()), asList(deflateDefault(), identity()), + deflateDefault(), false, h2Default()}, + {asList(identity(), deflateDefault()), asList(gzipDefault(), identity()), + gzipDefault(), false, h1Default()}, + {asList(identity(), deflateDefault()), asList(gzipDefault(), identity()), + gzipDefault(), false, h2Default()}, + {asList(identity(), deflateDefault()), + asList(deflateDefault(), identity()), deflateDefault(), true, h1Default()}, + {asList(identity(), deflateDefault()), + asList(deflateDefault(), identity()), deflateDefault(), true, h2Default()}, + {asList(identity(), deflateDefault()), emptyList(), identity(), true, h1Default()}, + {asList(identity(), deflateDefault()), emptyList(), identity(), true, h2Default()}, + {asList(gzipDefault()), asList(identity()), identity(), true, h1Default()}, + {asList(gzipDefault()), asList(identity()), identity(), true, h2Default()}, + {asList(gzipDefault()), asList(gzipDefault(), identity()), identity(), true, h1Default()}, + {asList(gzipDefault()), asList(gzipDefault(), identity()), identity(), true, h2Default()}, + {asList(gzipDefault()), asList(gzipDefault(), identity()), identity(), true, h1Default()}, + {asList(gzipDefault()), asList(gzipDefault(), identity()), identity(), true, h2Default()}, + {asList(gzipDefault()), asList(gzipDefault(), identity()), gzipDefault(), true, h1Default()}, + {asList(gzipDefault()), asList(gzipDefault(), identity()), gzipDefault(), true, h2Default()}, + {emptyList(), asList(gzipDefault(), identity()), gzipDefault(), false, h1Default()}, + {emptyList(), asList(gzipDefault(), identity()), gzipDefault(), false, h2Default()}, + {emptyList(), asList(gzipDefault(), deflateDefault(), identity()), + deflateDefault(), false, h1Default()}, + {emptyList(), asList(gzipDefault(), deflateDefault(), identity()), + deflateDefault(), false, h2Default()}, + {emptyList(), asList(gzipDefault(), identity()), identity(), true, h1Default()}, + {emptyList(), asList(gzipDefault(), identity()), identity(), true, h2Default()}, + }; + } + + @After + public void tearDown() throws Exception { + try { + client.close(); + } finally { + serverContext.close(); + } + } + + private ServerContext listenAndAwait() throws Exception { + StreamingHttpService service = (ctx, request, responseFactory) -> Single.succeeded(responseFactory.ok() + .payloadBody(from(payload((byte) 'b')), textSerializer())); + + StreamingHttpServiceFilterFactory filterFactory = REQ_RESP_VERIFIER.apply(testEncodingScenario); + + return httpServerBuilder + .protocols(testEncodingScenario.protocol) + .appendServiceFilter(new ContentCodingHttpServiceFilter(testEncodingScenario.serverSupported, + testEncodingScenario.serverSupported)) + .appendServiceFilter(filterFactory) + .listenStreamingAndAwait(service); + } + + private HttpClient newClient() { + return HttpClients + .forSingleAddress(serverHostAndPort(serverContext)) + .appendClientFilter(new ContentCodingHttpRequesterFilter(testEncodingScenario.clientSupported)) + .protocols(testEncodingScenario.protocol) + .build(); + } + + @Test + public void test() throws Exception { + if (expectedSuccess) { + assertSuccessful(testEncodingScenario.requestEncoding); + } else { + assertNotSupported(testEncodingScenario.requestEncoding); + } + } + + private static String payload(byte b) { + byte[] payload = new byte[PAYLOAD_SIZE]; + Arrays.fill(payload, b); + return new String(payload, StandardCharsets.US_ASCII); + } + + private void assertSuccessful(final ContentCodec encoding) throws Exception { + assertResponse(client.request(client + .get("/") + .encoding(encoding) + .payloadBody(payload((byte) 'a'), textSerializer())).toFuture().get().toStreamingResponse()); + + final BlockingStreamingHttpClient blockingStreamingHttpClient = client.asBlockingStreamingClient(); + assertResponse(blockingStreamingHttpClient.request(blockingStreamingHttpClient + .get("/") + .encoding(encoding) + .payloadBody(singletonList(payload((byte) 'a')), textSerializer())).toStreamingResponse()); + + final StreamingHttpClient streamingHttpClient = client.asStreamingClient(); + assertResponse(streamingHttpClient.request(streamingHttpClient + .get("/") + .encoding(encoding) + .payloadBody(from(payload((byte) 'a')), textSerializer())).toFuture().get()); + } + + private void assertResponse(final StreamingHttpResponse response) throws Exception { + assertResponseHeaders(response.headers()); + + String responsePayload = response.payloadBody(textDeserializer()).collect(StringBuilder::new, + StringBuilder::append).toFuture().get().toString(); + + assertEquals(payload((byte) 'b'), responsePayload); + } + + private void assertResponseHeaders(final HttpHeaders headers) { + final List clientSupportedEncodings = testEncodingScenario.clientSupported; + final List serverSupportedEncodings = testEncodingScenario.serverSupported; + + final String respEncName = headers.get(CONTENT_ENCODING, "identity").toString(); + + if (disjoint(serverSupportedEncodings, clientSupportedEncodings)) { + assertEquals(identity().name().toString(), respEncName); + } else { + assertNotNull("Response encoding not in the client supported list " + + "[" + clientSupportedEncodings + "]", encodingFor(clientSupportedEncodings, + valueOf(headers.get(CONTENT_ENCODING, "identity")))); + + assertNotNull("Response encoding not in the server supported list " + + "[" + serverSupportedEncodings + "]", encodingFor(serverSupportedEncodings, + valueOf(headers.get(CONTENT_ENCODING, "identity")))); + } + } + + private void assertNotSupported(final ContentCodec encoding) throws Exception { + final BlockingStreamingHttpClient blockingStreamingHttpClient = client.asBlockingStreamingClient(); + final StreamingHttpClient streamingHttpClient = client.asStreamingClient(); + + assertEquals(UNSUPPORTED_MEDIA_TYPE, client.request(client + .get("/") + .encoding(encoding) + .payloadBody(payload((byte) 'a'), textSerializer())).toFuture().get().status()); + + assertEquals(UNSUPPORTED_MEDIA_TYPE, blockingStreamingHttpClient.request(blockingStreamingHttpClient + .get("/") + .encoding(encoding) + .payloadBody(singletonList(payload((byte) 'a')), textSerializer())).status()); + + assertEquals(UNSUPPORTED_MEDIA_TYPE, streamingHttpClient.request(streamingHttpClient + .get("/") + .encoding(encoding) + .payloadBody(from(payload((byte) 'a')), textSerializer())).toFuture().get().status()); + } + + static class TestEncodingScenario { + final ContentCodec requestEncoding; + final List clientSupported; + final List serverSupported; + final HttpProtocolConfig protocol; + + TestEncodingScenario(final ContentCodec requestEncoding, + final List clientSupported, + final List serverSupported, + final HttpProtocolConfig protocol) { + this.requestEncoding = requestEncoding; + this.clientSupported = clientSupported; + this.serverSupported = serverSupported; + this.protocol = protocol; + } + } +} diff --git a/settings.gradle b/settings.gradle index a123c4104d..2bea6ea7f6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -33,6 +33,7 @@ include "servicetalk-annotations", "servicetalk-data-jackson-jersey", "servicetalk-data-protobuf", "servicetalk-dns-discovery-netty", + "servicetalk-encoding-api", "servicetalk-examples:grpc:helloworld", "servicetalk-examples:grpc:routeguide", "servicetalk-examples:grpc:protoc-options",