Skip to content

Commit

Permalink
Revamp the core API to support HTTP content streaming
Browse files Browse the repository at this point in the history
Related: line#85

This commit contains a lot of changes. I'll group them into several pairs of
motivations and modifications.

Use the Reactive Streams API to support HTTP content streaming
--------------------------------------------------------------
Motivation:

Reactive Streams API is the de-facto standard API for implementing object
streaming these days; RxJava, gRPC, Akka and Project Reactor are the notable
adoptors.

Modifications:

- Add RichPublisher and its subtypes to provide the foundation for streaming
  HTTP content
  - See the com.linecorp.armeria.common.reactivestreams package, most notably:
    - RichPublisher and Writer
    - QueueBasedPublisher

Decouple the core API from Netty API
------------------------------------
Motivation:

Armeria is meant to be used by an application developer. Exposing too much
detail to him or her is not the best idea.

Modifications:

- Hide all Netty HTTP types under Armeria's own HTTP/2-centric message types
  - HttpObject
  - HttpHeaders
  - HttpData
  - HttpMethod
  - HttpStatus
  - HttpStatusClass
  - HttpHeaderNames
- Introduce HttpRequest and HttpResponse whose content is a RichPublisher, our
  reactive streams API
- Add AggregatedHttpMessage which is the FullHttpMessage counterpart
- Add HttpRequest/ResponseWriter for easier composition of an HTTP message
- Do not expose ByteBuf in the user-facing API
- Use CompletableFuture instead of Netty Future/Promise
- Note that we still use Netty's AsciiString as header names because it's
  generic enough

Redefine Client, Service and their context API
----------------------------------------------
Motivation:

Previously, we shared one context type for both client and server side:
ServiceInvocationContext. This is potentially confusing and both client and
server sides had to shoehorn their models into the common model provided by
ServiceInvocationContext.

Also, ServiceInvocationContext assumed that a request is fully available when
context is created. However, this is not true anymore with content streaming.

Modifications:

- Replace ServiceInvocationContext with RequestContext
  - Add ClientRequestContext and ServiceRequestContext
  - All timeout settings, maximum allowed content length and custom HTTP header
    options are now overridable via the setters of the context.
- Only expose the information that could be available when a request has just
  started rather than when a full request is ready.
  - Add RequestLog and ResponseLog so that a client or a service fills the
    properties as they are available. A user will be notified via
    awaitRequestLog() and awaitResponseLog() when all necessary information is
    ready.
  - For example, RequestContext.method() property always returns the method at
    the session layer. That is, in a Thrift-over-HTTP call, ctx.method() will
    return "POST" rather than "someThriftMethod". It is because such information
    is available only when the full request has been received. You can get the
    Thrift method name from RequestLog once it's ready.
  - See LoggingClient/Server and MetricCollectingClient/Server for code example
- Add type parameters to Client and Service
- Remove ClientCodec, RemoteInvoker, ServiceCodec, ServiceInvocationHandler,
  because they are all merged into Client or Service

Overall reorganization of session layer implementation
------------------------------------------------------
Motivation:

Our code layout is too HTTP-centric and this will eventually make it hard to
add other session protocols.

Modifications:

- Move HTTP-specific code to com.linecorp.armeria.{server,client}.http
- Move the internal classes that could be shared between client and server to
  com.linecorp.armeria.internal.*

Implement HTTP content streaming at the session layer
-----------------------------------------------------
Modifications:

- Use Armeria's own HTTP/2 centric streaming-aware API instead of aggregating
  an HTTP/1 or 2 request into a full HTTP/1 request
  - See Http1/2RequestDecoder, HttpResponseSubscriber and HttpServerHandler to
    learn how this works on the server side
    - Start from Http1/2RequestDecoder to HttpServerHandler.handleRequest()
  - See Http1/2ResponseDecoder, HttpRequestSubscriber and HttpSessionHandler to
    learn how this works on the client side
    - Start from HttpSessionHandler.invoke()

Revamp HttpService, ThriftService and other services with the new core API
--------------------------------------------------------------------------
Motivation:

HttpService and ThriftService assumes a request is fully received when it is
invoked, which is not true anymore. Also, they are split into two components,
ServiceCodec and ServiceInvocationHandler, and they are gone now.

Modifications:

- HttpService is now an interface.
  - Add AbstractHttpService which replaces the old HttpService class
- ThriftService is now THttpService.
  - ThriftService is split into two parts: THttpService and ThriftCallService.
    - THttpService translates an HTTP request into a ThriftCall and
      a ThriftReply into an HTTP response. (similar to ServiceCodec)
    - ThriftCallService delegates a ThriftCall to a stub implementation.
      (similar to ServiceInvocationHandler)
  - Deprecate ThriftService
- Other service implementations underwent similar changes to work with the new
  API.

Revamp client-side service composition and decoration
-----------------------------------------------------
Motivation:

Previous client composition and decoration was based on the assumption that
the full request content is available upon its invocation, which isn't true
anymore.

Modifications:

- Replace the option 'DECORATOR' with 'DECORATION' whose value type is
  'ClientDecorations'
  - A user is now expected to specify the type of the request and response he
    or she desires to intercept, and the ClientFactory will apply the decorator
    at the right place in the invocation chain.
    - builder.add(ThriftCall.class, ThriftReply.class, thriftCallDecorator);
    - builder.add(HttpRequest.class, HttpResponse.class, httpDecorator);

Write new HTTP client API
-------------------------
Motivation:

SimpleHttpClient exposes Netty API and it's not powerful enough.

Modifications:

- Add HttpClient which replaces SimpleHttpClient
- Deprecate SimpleHttpClient

Merge ThriftFunction and ThriftMethod
-------------------------------------
Motivation:

They basically do the same job slightly differently.

Modifications:

- Merge them into one implementation and move to the internal package.
  - See com.linecorp.armeria.internal.thrift.{ThriftFunction,ThriftServiceMetadata}

Provide a way to add a decorator to all services
------------------------------------------------
Motivation:

Some decorators are often meant to be added to all services in a server or in a
VirtualHost.

Modifications:

- Add ServerBuilder.decorator() that adds a decorator to all services in a
  server
- Add VirtualHostBuilder.decorator() that adds a decorator too all services in
  a VirtualHost

Rename RemoteInvokerFactory to ClientFactory
--------------------------------------------
Motivation:

RemoteInvoker is now gone. ClientFactory sounds better in my opinion.

Modification:

- Rename/replace RemoteInvoker to/with ClientFactory
- Add HttpClientFactory and ThriftClientFactory
- Add AllInOneClientFactory that supports both HTTP and Thrift-over-HTTP via
  the two ClientFactories above
- Rename RemoveInvokerOption and its related classes to SessionOption

Result
------
- HTTP content streaming works.
- Frequently used service implementations such as ThriftService, TomcatService,
  JettyService and HttpFileService works without modifying user code.
- Frequently used client implementations such as SimpleHttpClient and usual
  Thrift client stub generation works as before.
  • Loading branch information
trustin committed Jul 1, 2016
1 parent 302868b commit 7d500c7
Show file tree
Hide file tree
Showing 320 changed files with 19,427 additions and 12,937 deletions.
3 changes: 3 additions & 0 deletions .reviewboardrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
REVIEWBOARD_URL = 'https://rbcommons.com/s/armeria/'
REPOSITORY = 'armeria'

5 changes: 5 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ This product depends on Netty, distributed by Netty.io:
* License: licenses/LICENSE.netty.al20.txt (Apache License v2.0)
* Homepage: http://netty.io/

This product depends on Reactive Streams API, distributed by Reactive-Streams.org:

* License: licenses/LICENSE.reactivestreams.cc0.txt
* Homepage: http://www.reactive-streams.org/

This product depends on Reflections, distributed by ronmamo:

* License: licenses/LICENSE.reflections.wtfpl.txt (WTFPL)
Expand Down
9 changes: 9 additions & 0 deletions licenses/LICENSE.reactivestreams.cc0.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Licensed under Public Domain (CC0)

To the extent possible under law, the person who associated CC0 with
this code has waived all copyright and related or neighboring
rights to this code.

You should have received a copy of the CC0 legalcode along with this
work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.

10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<logback.version>1.1.7</logback.version>
<metrics.version>3.1.2</metrics.version>
<netty.version>4.1.0.CR7</netty.version>
<reactive-streams.version>1.0.0</reactive-streams.version>
<slf4j.version>1.7.21</slf4j.version>
<tomcat.version>8.0.33</tomcat.version>
<jetty.alpnAgent.version>2.0.1</jetty.alpnAgent.version>
Expand Down Expand Up @@ -139,6 +140,13 @@
<version>1.1.33.Fork17</version>
</dependency>

<!-- Reactive Streams API -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive-streams.version}</version>
</dependency>

<!-- ALPN -->
<dependency>
<groupId>org.eclipse.jetty.alpn</groupId>
Expand Down Expand Up @@ -564,7 +572,7 @@
<exclude>**/TestUtil*</exclude>
</excludes>
<runOrder>random</runOrder>
<argLine>${argLine.alpnAgent} ${argLine.leak}</argLine>
<argLine>${argLine.alpnAgent} ${argLine.leak} -Xmx128m</argLine>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client;

import static java.util.Objects.requireNonNull;

import java.net.URI;
import java.util.Optional;
import java.util.Set;

import com.linecorp.armeria.common.Scheme;

public abstract class AbstractClientFactory implements ClientFactory {

protected AbstractClientFactory() {}

@Override
public final <T> T newClient(String uri, Class<T> clientType, ClientOptionValue<?>... options) {
requireNonNull(uri, "uri");
requireNonNull(options, "options");
return newClient(URI.create(uri), clientType, ClientOptions.of(options));
}

@Override
public final <T> T newClient(String uri, Class<T> clientType, ClientOptions options) {
requireNonNull(uri, "uri");
return newClient(URI.create(uri), clientType, options);
}

@Override
public final <T> T newClient(URI uri, Class<T> clientType, ClientOptionValue<?>... options) {
requireNonNull(options, "options");
return newClient(uri, clientType, ClientOptions.of(options));
}

protected final Scheme validate(URI uri, Class<?> clientType, ClientOptions options) {
requireNonNull(uri, "uri");
requireNonNull(clientType, "clientType");
requireNonNull(options, "options");

final String scheme = uri.getScheme();
if (scheme == null) {
throw new IllegalArgumentException("URI with missing scheme: " + uri);
}

if (uri.getAuthority() == null) {
throw new IllegalArgumentException("URI with missing authority: " + uri);
}

final Optional<Scheme> parsedSchemeOpt = Scheme.tryParse(scheme);
if (!parsedSchemeOpt.isPresent()) {
throw new IllegalArgumentException("URI with unknown scheme: " + uri);
}

final Scheme parsedScheme = parsedSchemeOpt.get();
final Set<Scheme> supportedSchemes = supportedSchemes();
if (!supportedSchemes.contains(parsedScheme)) {
throw new IllegalArgumentException(
"URI with unsupported scheme: " + uri + " (expected: " + supportedSchemes + ')');
}

return parsedScheme;
}

protected static Endpoint newEndpoint(URI uri) {
return Endpoint.parse(uri.getAuthority());
}
}
110 changes: 110 additions & 0 deletions src/main/java/com/linecorp/armeria/client/AllInOneClientFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client;

import java.net.URI;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.client.http.HttpClientFactory;
import com.linecorp.armeria.client.thrift.ThriftClientFactory;
import com.linecorp.armeria.common.Scheme;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;

public class AllInOneClientFactory extends AbstractClientFactory {

private static final Logger logger = LoggerFactory.getLogger(AllInOneClientFactory.class);

static {
if (AllInOneClientFactory.class.getClassLoader() == ClassLoader.getSystemClassLoader()) {
Runtime.getRuntime().addShutdownHook(new Thread(ClientFactory::closeDefault));
}
}

private final ClientFactory mainFactory;
private final Map<Scheme, ClientFactory> clientFactories;

public AllInOneClientFactory() {
this(SessionOptions.DEFAULT);
}

public AllInOneClientFactory(SessionOptions options) {
// TODO(trustin): Allow specifying different options for different session protocols.
// We have only one session protocol at the moment, so this is OK so far.
final HttpClientFactory httpClientFactory = new HttpClientFactory(options);
final ThriftClientFactory thriftClientFactory = new ThriftClientFactory(httpClientFactory);

final ImmutableMap.Builder<Scheme, ClientFactory> builder = ImmutableMap.builder();
for (ClientFactory f : Arrays.asList(httpClientFactory, thriftClientFactory)) {
f.supportedSchemes().forEach(s -> builder.put(s, f));
}

clientFactories = builder.build();
mainFactory = httpClientFactory;
}

@Override
public Set<Scheme> supportedSchemes() {
return clientFactories.keySet();
}

@Override
public SessionOptions options() {
return mainFactory.options();
}

@Override
public EventLoopGroup eventLoopGroup() {
return mainFactory.eventLoopGroup();
}

@Override
public Supplier<EventLoop> eventLoopSupplier() {
return mainFactory.eventLoopSupplier();
}

@Override
public <T> T newClient(URI uri, Class<T> clientType, ClientOptions options) {
final Scheme scheme = validate(uri, clientType, options);
return clientFactories.get(scheme).newClient(uri, clientType, options);
}

@Override
public void close() {
// The global default should never be closed.
if (this == ClientFactory.DEFAULT) {
logger.debug("Refusing to close the default {}; must be closed via closeDefault()",
ClientFactory.class.getSimpleName());
return;
}

doClose();
}

void doClose() {
clientFactories.values().forEach(ClientFactory::close);
}
}
75 changes: 3 additions & 72 deletions src/main/java/com/linecorp/armeria/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,7 @@

package com.linecorp.armeria.client;

import java.util.function.Function;

/**
* A set of components required for invoking a remote service.
*/
public interface Client {

/**
* Creates a new function that decorates a {@link Client} with the specified {@code codecDecorator}
* and {@code invokerDecorator}.
* <p>
* This factory method is useful when you want to write a reusable factory method that returns
* a decorator function and the returned decorator function is expected to be consumed by
* {@link #decorate(Function)}. For example, this may be a factory which combines multiple decorators,
* any of which could be decorating a codec and/or a handler.
* </p><p>
* Consider using {@link #decorateCodec(Function)} or {@link #decorateInvoker(Function)}
* instead for simplicity unless you are writing a factory method of a decorator function.
* </p><p>
* If you need a function that decorates only a codec or an invoker, use {@link Function#identity()} for
* the non-decorated property. e.g:
* </p><pre>{@code
* newDecorator(Function.identity(), (handler) -> { ... });
* }</pre>
*/
@SuppressWarnings("unchecked")
static <T extends ClientCodec, U extends ClientCodec,
V extends RemoteInvoker, W extends RemoteInvoker>
Function<Client, Client> newDecorator(Function<T, U> codecDecorator, Function<V, W> invokerDecorator) {
return client -> new DecoratingClient(client, codecDecorator, invokerDecorator);
}

/**
* Returns the {@link ClientCodec}.
*/
ClientCodec codec();

/**
* Returns the {@link RemoteInvoker}.
*/
RemoteInvoker invoker();

/**
* Creates a new {@link Client} that decorates this {@link Client} with the specified {@code decorator}.
*/
default Client decorate(Function<Client, Client> decorator) {
@SuppressWarnings("unchecked")
final Client newClient = decorator.apply(this);
if (newClient == null) {
throw new NullPointerException("decorator.apply() returned null: " + decorator);
}

return newClient;
}

/**
* Creates a new {@link Client} that decorates the {@link ClientCodec} of this {@link Client} with the
* specified {@code codecDecorator}.
*/
default <T extends ClientCodec, U extends ClientCodec>
Client decorateCodec(Function<T, U> codecDecorator) {
return new DecoratingClient(this, codecDecorator, Function.identity());
}

/**
* Creates a new {@link Client} that decorates the {@link RemoteInvoker} of this
* {@link Client} with the specified {@code invokerDecorator}.
*/
default <T extends RemoteInvoker, U extends RemoteInvoker>
Client decorateInvoker(Function<T, U> invokerDecorator) {
return new DecoratingClient(this, Function.identity(), invokerDecorator);
}
@FunctionalInterface
public interface Client<I, O> {
O execute(ClientRequestContext ctx, I req) throws Exception;
}
Loading

0 comments on commit 7d500c7

Please sign in to comment.