Skip to content

Commit

Permalink
[improvement] Implement a generalized ContentDecodingChannel for gzip…
Browse files Browse the repository at this point in the history
… support (#289)
  • Loading branch information
carterkozak authored Feb 17, 2020
1 parent 489eabb commit 7a93437
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 38 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-289.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Implement a generalized ContentDecodingChannel for gzip support
links:
- https://github.com/palantir/dialogue/pull/289
2 changes: 2 additions & 0 deletions dialogue-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ dependencies {
testCompile 'com.palantir.safe-logging:preconditions-assertj'
testCompile 'junit:junit'
testCompile 'org.assertj:assertj-core'
testCompile 'org.assertj:assertj-guava'
testCompile 'org.mockito:mockito-core'

annotationProcessor 'org.immutables:value'
testAnnotationProcessor 'org.immutables:value'
compile 'org.immutables:value::annotations'
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public static Channel create(
// TracedChannel must wrap TracedRequestChannel to ensure requests have tracing headers.
.map(TracedRequestChannel::new)
.map(channel -> new TracedChannel(channel, "Concurrency-Limited Dialogue Request"))
.map(ContentDecodingChannel::new)
.map(ConcurrencyLimitedChannel::create)
.collect(ImmutableList.toImmutableList());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.zip.GZIPInputStream;

/**
* Adds support for transparently requesting and decoding <code>Content-Encoding: gzip</code> responses
* in a client agnostic way. Client implementations may choose to decompress GZIP data more efficiently
* if possible.
* This allows client implementations to avoid considering content-encoding in most cases, and
* sets a specific <code>Accept-Encoding</code> header to avoid potentially using an unexpected
* type based on client defaults (for example apache httpclient requests gzip and deflate by default).
* https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.3
* https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.11
*/
final class ContentDecodingChannel implements Channel {

private static final String ACCEPT_ENCODING = "accept-encoding";
private static final String CONTENT_ENCODING = "content-encoding";
private static final String CONTENT_LENGTH = "content-length";
private static final String GZIP = "gzip";

private final Channel delegate;

ContentDecodingChannel(Channel delegate) {
this.delegate = Preconditions.checkNotNull(delegate, "Channel is required");
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return Futures.transform(
delegate.execute(endpoint, acceptEncoding(request)),
ContentDecodingChannel::decompress,
MoreExecutors.directExecutor());
}

private static Request acceptEncoding(Request request) {
if (request.headerParams().containsKey(ACCEPT_ENCODING)) {
// Do not replace existing accept-encoding values
return request;
}
return Request.builder()
.from(request)
.putHeaderParams(ACCEPT_ENCODING, GZIP)
.build();
}

private static Response decompress(Response input) {
Optional<String> contentEncoding = input.getFirstHeader(CONTENT_ENCODING);
if (contentEncoding.isPresent() && GZIP.equals(contentEncoding.get())) {
return new ContentDecodingResponse(input);
}
return input;
}

private static final class ContentDecodingResponse implements Response {

private final Response delegate;
private final Map<String, List<String>> headers;
private final InputStream body;

ContentDecodingResponse(Response delegate) {
this.delegate = delegate;
this.headers = Maps.filterKeys(delegate.headers(), ContentDecodingResponse::allowHeader);
this.body = new DeferredGzipInputStream(delegate.body());
}

@Override
public InputStream body() {
if (body == null) {
delegate.body();
}
return body;
}

@Override
public int code() {
return delegate.code();
}

@Override
public Map<String, List<String>> headers() {
return headers;
}

// Remove the content-encoding header once content is decompressed, otherwise consumers may attempt
// to decode again.
private static boolean allowHeader(String headerName) {
return !CONTENT_ENCODING.equalsIgnoreCase(headerName)
// Content-length of compressed data is not representative of the decoded length.
&& !CONTENT_LENGTH.equalsIgnoreCase(headerName);
}

@Override
public String toString() {
return "ContentDecodingResponse{delegate=" + delegate + '}';
}
}

/** Wraps a {@link GZIPInputStream} deferring initialization until first byte is read. */
private static class DeferredGzipInputStream extends InputStream {
private static final int BUFFER_SIZE = 8 * 1024;
private final InputStream original;
private InputStream delegate;

DeferredGzipInputStream(InputStream original) {
this.original = original;
}

private InputStream getDelegate() throws IOException {
if (delegate == null) {
// Buffer the GZIPInputStream contents in order to reduce expensive native Deflater interactions.
delegate = new BufferedInputStream(
// Increase the buffer size from the default of 512 bytes
new GZIPInputStream(original, BUFFER_SIZE /* input buffer size */),
BUFFER_SIZE /* output buffer size */);
}
return delegate;
}

private InputStream getDelegateSafely() {
try {
return getDelegate();
} catch (IOException e) {
throw new SafeRuntimeException("Failed to create a GZIPInputStream", e);
}
}

@Override
public int read() throws IOException {
return getDelegate().read();
}

@Override
public int read(byte[] buffer) throws IOException {
return getDelegate().read(buffer);
}

@Override
public int read(byte[] buffer, int off, int len) throws IOException {
return getDelegate().read(buffer, off, len);
}

@Override
public long skip(long requested) throws IOException {
return getDelegate().skip(requested);
}

@Override
public int available() throws IOException {
return getDelegate().available();
}

@Override
public void close() throws IOException {
// No need to create the expensive delegate instance to immediately close it.
if (delegate == null) {
original.close();
} else {
delegate.close();
}
}

@Override
public void mark(int readlimit) {
getDelegateSafely().mark(readlimit);
}

@Override
public void reset() throws IOException {
getDelegate().reset();
}

@Override
public boolean markSupported() {
return getDelegateSafely().markSupported();
}

@Override
public String toString() {
return "DeferredGzipInputStream{original=" + original + '}';
}
}
}
Loading

0 comments on commit 7a93437

Please sign in to comment.