Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
trustin committed Jun 4, 2016
1 parent d6ed257 commit a189c79
Show file tree
Hide file tree
Showing 276 changed files with 8,247 additions and 22,695 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<logback.version>1.1.7</logback.version>
<metrics.version>3.1.2</metrics.version>
<netty.version>4.1.0.CR7</netty.version>
<reactive-streams.version>1.0.0</reactive-streams.version>
<slf4j.version>1.7.21</slf4j.version>
<tomcat.version>8.0.33</tomcat.version>
<jetty.alpnAgent.version>2.0.1</jetty.alpnAgent.version>
Expand Down Expand Up @@ -140,6 +141,13 @@
<classifier>linux-x86_64</classifier>
</dependency>

<!-- Reactive Streams API -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive-streams.version}</version>
</dependency>

<!-- ALPN -->
<dependency>
<groupId>org.eclipse.jetty.alpn</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 LINE Corporation
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -20,11 +20,11 @@

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.EnumMap;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;
Expand All @@ -33,7 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.Scheme;
import com.linecorp.armeria.common.util.NativeLibraries;

import io.netty.bootstrap.Bootstrap;
Expand All @@ -60,62 +60,14 @@
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

/**
* Creates and manages {@link RemoteInvoker}s.
*
* <h3>Life cycle of the default {@link RemoteInvokerFactory}</h3>
* <p>
* {@link Clients} or {@link ClientBuilder} uses {@link #DEFAULT}, the default {@link RemoteInvokerFactory},
* unless you specified a {@link RemoteInvokerFactory} explicitly. Calling {@link #close()} on the default
* {@link RemoteInvokerFactory} won't terminate its I/O threads and release other related resources unlike
* other {@link RemoteInvokerFactory} to protect itself from accidental premature termination.
* </p><p>
* Instead, when the current {@link ClassLoader} is {@linkplain ClassLoader#getSystemClassLoader() the system
* class loader}, a {@link Runtime#addShutdownHook(Thread) shutdown hook} is registered so that they are
* released when the JVM exits.
* </p><p>
* If you are in an environment managed by a container or you desire the early termination of the default
* {@link RemoteInvokerFactory}, use {@link #closeDefault()}.
* </p>
*/
public final class RemoteInvokerFactory implements AutoCloseable {
public abstract class AbstractClientFactory implements ClientFactory {

private static final Logger logger = LoggerFactory.getLogger(RemoteInvokerFactory.class);
private static final Logger logger = LoggerFactory.getLogger(AbstractClientFactory.class);

private enum TransportType {
NIO, EPOLL
}

/**
* The default {@link RemoteInvokerFactory} implementation.
*/
public static final RemoteInvokerFactory DEFAULT = new RemoteInvokerFactory(
RemoteInvokerOptions.DEFAULT,
type -> {
switch (type) {
case NIO:
return new DefaultThreadFactory("default-armeria-client-nio", true);
case EPOLL:
return new DefaultThreadFactory("default-armeria-client-epoll", true);
default:
throw new Error();
}
});

/**
* Closes the default {@link RemoteInvokerFactory}.
*/
public static void closeDefault() {
logger.debug("Closing the default {}", RemoteInvokerFactory.class.getSimpleName());
DEFAULT.close0();
}

static {
if (RemoteInvokerFactory.class.getClassLoader() == ClassLoader.getSystemClassLoader()) {
Runtime.getRuntime().addShutdownHook(new Thread(RemoteInvokerFactory::closeDefault));
}
}

private static final ThreadFactory DEFAULT_THREAD_FACTORY_NIO =
new DefaultThreadFactory("armeria-client-nio", false);

Expand All @@ -124,12 +76,14 @@ public static void closeDefault() {

private final EventLoopGroup eventLoopGroup;
private final boolean closeEventLoopGroup;
private final Map<SessionProtocol, RemoteInvoker> remoteInvokers;
private final RemoteInvokerOptions options;
private final Bootstrap baseBootstrap;

/**
* Creates a new instance with the specified {@link RemoteInvokerOptions}.
*/
public RemoteInvokerFactory(RemoteInvokerOptions options) {
protected AbstractClientFactory() {
this(RemoteInvokerOptions.DEFAULT);
}

protected AbstractClientFactory(RemoteInvokerOptions options) {
this(options, type -> {
switch (type) {
case NIO:
Expand All @@ -142,8 +96,8 @@ public RemoteInvokerFactory(RemoteInvokerOptions options) {
});
}

private RemoteInvokerFactory(RemoteInvokerOptions options,
Function<TransportType, ThreadFactory> threadFactoryFactory) {
private AbstractClientFactory(RemoteInvokerOptions options,
Function<TransportType, ThreadFactory> threadFactoryFactory) {

requireNonNull(options, "options");
requireNonNull(threadFactoryFactory, "threadFactoryFactory");
Expand All @@ -169,13 +123,8 @@ private RemoteInvokerFactory(RemoteInvokerOptions options,
closeEventLoopGroup = true;
}

final EnumMap<SessionProtocol, RemoteInvoker> remoteInvokers = new EnumMap<>(SessionProtocol.class);
final HttpRemoteInvoker remoteInvoker = new HttpRemoteInvoker(baseBootstrap, options);

SessionProtocol.ofHttp().stream().forEach(
protocol -> remoteInvokers.put(protocol, remoteInvoker));

this.remoteInvokers = Collections.unmodifiableMap(remoteInvokers);
this.baseBootstrap = baseBootstrap;
this.options = options;
}

private static Class<? extends SocketChannel> channelType() {
Expand All @@ -192,43 +141,71 @@ private static EventLoopGroup createGroup(Function<TransportType, ThreadFactory>
new NioEventLoopGroup(0, threadFactoryFactory.apply(TransportType.NIO));
}

/**
* Returns the {@link EventLoopGroup} being used by this remote invoker factory. Can be used to, e.g.,
* schedule a periodic task without creating a separate event loop.
*/
public EventLoopGroup eventLoopGroup() {
@Override
public final EventLoopGroup eventLoopGroup() {
return eventLoopGroup;
}

/**
* Returns a {@link RemoteInvoker} that can handle the specified {@link SessionProtocol}.
*/
public RemoteInvoker getInvoker(SessionProtocol sessionProtocol) {
final RemoteInvoker remoteInvoker = remoteInvokers.get(sessionProtocol);
if (remoteInvoker == null) {
throw new IllegalArgumentException("unsupported session protocol: " + sessionProtocol);
@Override
public final RemoteInvokerOptions options() {
return options;
}

protected final Bootstrap baseBootstrap() {
return baseBootstrap;
}

protected final Scheme validate(URI uri, Class<?> interfaceClass, ClientOptions options) {
requireNonNull(uri, "uri");
requireNonNull(interfaceClass, "interfaceClass");
requireNonNull(options, "options");

final String scheme = uri.getScheme();
if (scheme == null) {
throw new IllegalArgumentException("URI with missing scheme: " + uri);
}

if (uri.getHost() == null) {
throw new IllegalArgumentException("URI with missing host: " + uri);
}
return remoteInvoker;

final Optional<Scheme> parsedSchemeOpt = Scheme.tryParse(scheme);
if (!parsedSchemeOpt.isPresent()) {
throw new IllegalArgumentException("URI with unknown scheme: " + uri);
}

final Scheme parsedScheme = parsedSchemeOpt.get();
final Map<Scheme, Set<Class<?>>> supportedSchemes = supportedSchemes();
final Set<Class<?>> supportedTypes = supportedSchemes.get(parsedScheme);
if (supportedTypes == null) {
throw new IllegalArgumentException(
"URI with unsupported scheme: " + uri + " (expected: " + supportedSchemes.keySet() + ')');
}

for (Class<?> supportedType : supportedTypes) {
if (supportedType == interfaceClass || supportedType.isAssignableFrom(interfaceClass)) {
return parsedScheme;
}
}

throw new IllegalArgumentException(
"unknown interface class for scheme '" + parsedScheme.uriText() + "': " +
interfaceClass.getName() + " (expected: " + supportedTypes + ')');
}

/**
* Closes all {@link RemoteInvoker}s managed by this factory and shuts down the {@link EventLoopGroup}
* created implicitly by this factory.
*/
@Override
public void close() {
// The global default should never be closed.
if (this == DEFAULT) {
if (this == ClientFactory.DEFAULT) {
logger.debug("Refusing to close the default {}; must be closed via closeDefault()",
RemoteInvokerFactory.class.getSimpleName());
ClientFactory.class.getSimpleName());
return;
}

close0();
}

private void close0() {
remoteInvokers.forEach((k, v) -> v.close());
void close0() {
if (closeEventLoopGroup) {
eventLoopGroup.shutdownGracefully().syncUninterruptibly();
}
Expand Down
71 changes: 17 additions & 54 deletions src/main/java/com/linecorp/armeria/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,74 +18,37 @@

import java.util.function.Function;

import com.linecorp.armeria.client.routing.Endpoint;
import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.Response;
import com.linecorp.armeria.common.SessionProtocol;

/**
* A set of components required for invoking a remote service.
*/
public interface Client {

/**
* Creates a new function that decorates a {@link Client} with the specified {@code codecDecorator}
* and {@code invokerDecorator}.
* <p>
* This factory method is useful when you want to write a reusable factory method that returns
* a decorator function and the returned decorator function is expected to be consumed by
* {@link #decorate(Function)}. For example, this may be a factory which combines multiple decorators,
* any of which could be decorating a codec and/or a handler.
* </p><p>
* Consider using {@link #decorateCodec(Function)} or {@link #decorateInvoker(Function)}
* instead for simplicity unless you are writing a factory method of a decorator function.
* </p><p>
* If you need a function that decorates only a codec or an invoker, use {@link Function#identity()} for
* the non-decorated property. e.g:
* </p><pre>{@code
* newDecorator(Function.identity(), (handler) -> { ... });
* }</pre>
*/
@SuppressWarnings("unchecked")
static <T extends ClientCodec, U extends ClientCodec,
V extends RemoteInvoker, W extends RemoteInvoker>
Function<Client, Client> newDecorator(Function<T, U> codecDecorator, Function<V, W> invokerDecorator) {
return client -> new DecoratingClient(client, codecDecorator, invokerDecorator);
}
@FunctionalInterface
public interface Client<I extends Request<?>, O extends Response<?>> extends AutoCloseable {

/**
* Returns the {@link ClientCodec}.
*/
ClientCodec codec();

/**
* Returns the {@link RemoteInvoker}.
*/
RemoteInvoker invoker();
O apply(SessionProtocol sessionProtocol, ClientOptions options, Endpoint endpoint,
I request) throws Exception;

/**
* Creates a new {@link Client} that decorates this {@link Client} with the specified {@code decorator}.
*/
default Client decorate(Function<Client, Client> decorator) {
default <NEW_I extends Request<?>, NEW_O extends Response<?>> Client<NEW_I, NEW_O> decorate(
Function<? extends Client<I, O>, ? extends Client<NEW_I, NEW_O>> decorator) {
@SuppressWarnings("unchecked")
final Client newClient = decorator.apply(this);
final Function<Client<I, O>, Client<NEW_I, NEW_O>> castDecorator =
(Function<Client<I, O>, Client<NEW_I, NEW_O>>) decorator;

final Client<NEW_I, NEW_O> newClient = castDecorator.apply(this);
if (newClient == null) {
throw new NullPointerException("decorator.apply() returned null: " + decorator);
}

return newClient;
}

/**
* Creates a new {@link Client} that decorates the {@link ClientCodec} of this {@link Client} with the
* specified {@code codecDecorator}.
*/
default <T extends ClientCodec, U extends ClientCodec>
Client decorateCodec(Function<T, U> codecDecorator) {
return new DecoratingClient(this, codecDecorator, Function.identity());
}

/**
* Creates a new {@link Client} that decorates the {@link RemoteInvoker} of this
* {@link Client} with the specified {@code invokerDecorator}.
*/
default <T extends RemoteInvoker, U extends RemoteInvoker>
Client decorateInvoker(Function<T, U> invokerDecorator) {
return new DecoratingClient(this, Function.identity(), invokerDecorator);
}
@Override
default void close() {}
}
Loading

0 comments on commit a189c79

Please sign in to comment.