Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/2 content streaming #172

Merged
merged 1 commit into from
Aug 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions .reviewboardrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
REVIEWBOARD_URL = 'https://rbcommons.com/s/armeria/'
REPOSITORY = 'armeria'

5 changes: 5 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ This product depends on Netty, distributed by Netty.io:
* License: licenses/LICENSE.netty.al20.txt (Apache License v2.0)
* Homepage: http://netty.io/

This product depends on Reactive Streams API, distributed by Reactive-Streams.org:

* License: licenses/LICENSE.reactivestreams.cc0.txt
* Homepage: http://www.reactive-streams.org/

This product depends on Reflections, distributed by ronmamo:

* License: licenses/LICENSE.reflections.wtfpl.txt (WTFPL)
Expand Down
9 changes: 9 additions & 0 deletions licenses/LICENSE.reactivestreams.cc0.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Licensed under Public Domain (CC0)

To the extent possible under law, the person who associated CC0 with
this code has waived all copyright and related or neighboring
rights to this code.

You should have received a copy of the CC0 legalcode along with this
work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.

14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<logback.version>1.1.7</logback.version>
<metrics.version>3.1.2</metrics.version>
<netty.version>4.1.3.Final</netty.version>
<reactive-streams.version>1.0.0</reactive-streams.version>
<slf4j.version>1.7.21</slf4j.version>
<tomcat.version>8.5.4</tomcat.version>
<jetty.alpnAgent.version>2.0.3</jetty.alpnAgent.version>
Expand Down Expand Up @@ -141,6 +142,13 @@
<version>1.1.33.Fork19</version>
</dependency>

<!-- Reactive Streams API -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive-streams.version}</version>
</dependency>

<!-- ALPN -->
<dependency>
<groupId>org.eclipse.jetty.alpn</groupId>
Expand Down Expand Up @@ -266,6 +274,12 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.5.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
Expand Down
9 changes: 5 additions & 4 deletions src/build/tomcat80-compat.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import org.objectweb.asm.Opcodes
import java.nio.file.Files
import java.nio.file.Paths

// This script removes the methods that did not exist until Tomcat 8.5 from the ProtocolHandler implementation.
// We need the methods to exist during compilation time since we compile the ProtocolHandler implementation
// against Tomcat 8.5, but want to make sure the methods do not exist because otherwise JVM will fail to load
// the ProtocolHandler due to the references to the non-existent classes in the method signatures.
// This script removes the methods that did not exist until Tomcat 8.5 from the ProtocolHandler implementation
// using bytecode manipulation. We need the methods to exist during compilation time since we compile the
// ProtocolHandler implementation against Tomcat 8.5, but want to make sure the methods do not exist because
// otherwise JVM will fail to load the ProtocolHandler due to the references to the non-existent classes in the
// method signatures.

path = Paths.get("target", "classes", "com", "linecorp", "armeria", "server", "http", "tomcat",
"Tomcat80ProtocolHandler.class")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client;

import static java.util.Objects.requireNonNull;

import java.net.URI;
import java.util.Optional;
import java.util.Set;

import com.linecorp.armeria.common.Scheme;

public abstract class AbstractClientFactory implements ClientFactory {

protected AbstractClientFactory() {}

@Override
public final <T> T newClient(String uri, Class<T> clientType, ClientOptionValue<?>... options) {
requireNonNull(uri, "uri");
requireNonNull(options, "options");
return newClient(URI.create(uri), clientType, ClientOptions.of(options));
}

@Override
public final <T> T newClient(String uri, Class<T> clientType, ClientOptions options) {
requireNonNull(uri, "uri");
return newClient(URI.create(uri), clientType, options);
}

@Override
public final <T> T newClient(URI uri, Class<T> clientType, ClientOptionValue<?>... options) {
requireNonNull(options, "options");
return newClient(uri, clientType, ClientOptions.of(options));
}

protected final Scheme validate(URI uri, Class<?> clientType, ClientOptions options) {
requireNonNull(uri, "uri");
requireNonNull(clientType, "clientType");
requireNonNull(options, "options");

final String scheme = uri.getScheme();
if (scheme == null) {
throw new IllegalArgumentException("URI with missing scheme: " + uri);
}

if (uri.getAuthority() == null) {
throw new IllegalArgumentException("URI with missing authority: " + uri);
}

final Optional<Scheme> parsedSchemeOpt = Scheme.tryParse(scheme);
if (!parsedSchemeOpt.isPresent()) {
throw new IllegalArgumentException("URI with unknown scheme: " + uri);
}

final Scheme parsedScheme = parsedSchemeOpt.get();
final Set<Scheme> supportedSchemes = supportedSchemes();
if (!supportedSchemes.contains(parsedScheme)) {
throw new IllegalArgumentException(
"URI with unsupported scheme: " + uri + " (expected: " + supportedSchemes + ')');
}

return parsedScheme;
}

protected static Endpoint newEndpoint(URI uri) {
return Endpoint.parse(uri.getAuthority());
}
}
110 changes: 110 additions & 0 deletions src/main/java/com/linecorp/armeria/client/AllInOneClientFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client;

import java.net.URI;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.client.http.HttpClientFactory;
import com.linecorp.armeria.client.thrift.ThriftClientFactory;
import com.linecorp.armeria.common.Scheme;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;

public class AllInOneClientFactory extends AbstractClientFactory {

private static final Logger logger = LoggerFactory.getLogger(AllInOneClientFactory.class);

static {
if (AllInOneClientFactory.class.getClassLoader() == ClassLoader.getSystemClassLoader()) {
Runtime.getRuntime().addShutdownHook(new Thread(ClientFactory::closeDefault));
}
}

private final ClientFactory mainFactory;
private final Map<Scheme, ClientFactory> clientFactories;

public AllInOneClientFactory() {
this(SessionOptions.DEFAULT);
}

public AllInOneClientFactory(SessionOptions options) {
// TODO(trustin): Allow specifying different options for different session protocols.
// We have only one session protocol at the moment, so this is OK so far.
final HttpClientFactory httpClientFactory = new HttpClientFactory(options);
final ThriftClientFactory thriftClientFactory = new ThriftClientFactory(httpClientFactory);

final ImmutableMap.Builder<Scheme, ClientFactory> builder = ImmutableMap.builder();
for (ClientFactory f : Arrays.asList(httpClientFactory, thriftClientFactory)) {
f.supportedSchemes().forEach(s -> builder.put(s, f));
}

clientFactories = builder.build();
mainFactory = httpClientFactory;
}

@Override
public Set<Scheme> supportedSchemes() {
return clientFactories.keySet();
}

@Override
public SessionOptions options() {
return mainFactory.options();
}

@Override
public EventLoopGroup eventLoopGroup() {
return mainFactory.eventLoopGroup();
}

@Override
public Supplier<EventLoop> eventLoopSupplier() {
return mainFactory.eventLoopSupplier();
}

@Override
public <T> T newClient(URI uri, Class<T> clientType, ClientOptions options) {
final Scheme scheme = validate(uri, clientType, options);
return clientFactories.get(scheme).newClient(uri, clientType, options);
}

@Override
public void close() {
// The global default should never be closed.
if (this == ClientFactory.DEFAULT) {
logger.debug("Refusing to close the default {}; must be closed via closeDefault()",
ClientFactory.class.getSimpleName());
return;
}

doClose();
}

void doClose() {
clientFactories.values().forEach(ClientFactory::close);
}
}
76 changes: 5 additions & 71 deletions src/main/java/com/linecorp/armeria/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,10 @@

package com.linecorp.armeria.client;

import java.util.function.Function;
import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.Response;

/**
* A set of components required for invoking a remote service.
*/
public interface Client {

/**
* Creates a new function that decorates a {@link Client} with the specified {@code codecDecorator}
* and {@code invokerDecorator}.
* <p>
* This factory method is useful when you want to write a reusable factory method that returns
* a decorator function and the returned decorator function is expected to be consumed by
* {@link #decorate(Function)}. For example, this may be a factory which combines multiple decorators,
* any of which could be decorating a codec and/or a handler.
* </p><p>
* Consider using {@link #decorateCodec(Function)} or {@link #decorateInvoker(Function)}
* instead for simplicity unless you are writing a factory method of a decorator function.
* </p><p>
* If you need a function that decorates only a codec or an invoker, use {@link Function#identity()} for
* the non-decorated property. e.g:
* </p><pre>{@code
* newDecorator(Function.identity(), (handler) -> { ... });
* }</pre>
*/
@SuppressWarnings("unchecked")
static <T extends ClientCodec, U extends ClientCodec,
V extends RemoteInvoker, W extends RemoteInvoker>
Function<Client, Client> newDecorator(Function<T, U> codecDecorator, Function<V, W> invokerDecorator) {
return client -> new DecoratingClient(client, codecDecorator, invokerDecorator);
}

/**
* Returns the {@link ClientCodec}.
*/
ClientCodec codec();

/**
* Returns the {@link RemoteInvoker}.
*/
RemoteInvoker invoker();

/**
* Creates a new {@link Client} that decorates this {@link Client} with the specified {@code decorator}.
*/
default Client decorate(Function<Client, Client> decorator) {
@SuppressWarnings("unchecked")
final Client newClient = decorator.apply(this);
if (newClient == null) {
throw new NullPointerException("decorator.apply() returned null: " + decorator);
}

return newClient;
}

/**
* Creates a new {@link Client} that decorates the {@link ClientCodec} of this {@link Client} with the
* specified {@code codecDecorator}.
*/
default <T extends ClientCodec, U extends ClientCodec>
Client decorateCodec(Function<T, U> codecDecorator) {
return new DecoratingClient(this, codecDecorator, Function.identity());
}

/**
* Creates a new {@link Client} that decorates the {@link RemoteInvoker} of this
* {@link Client} with the specified {@code invokerDecorator}.
*/
default <T extends RemoteInvoker, U extends RemoteInvoker>
Client decorateInvoker(Function<T, U> invokerDecorator) {
return new DecoratingClient(this, Function.identity(), invokerDecorator);
}
@FunctionalInterface
public interface Client<I extends Request, O extends Response> {
O execute(ClientRequestContext ctx, I req) throws Exception;
}
Loading