diff --git a/gcloud-java-gax/pom.xml b/gcloud-java-gax/pom.xml
index 78d852313b77..bbfd8c6007e5 100644
--- a/gcloud-java-gax/pom.xml
+++ b/gcloud-java-gax/pom.xml
@@ -27,6 +27,11 @@
auto-value1.1
+
+ com.google.code.findbugs
+ jsr305
+ 3.0.1
+ junitjunit
@@ -58,7 +63,7 @@
add-source
-
+
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java
new file mode 100644
index 000000000000..edaa0885d46f
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java
@@ -0,0 +1,409 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ExperimentalApi;
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.StatusException;
+import io.grpc.stub.ClientCalls;
+import io.grpc.stub.StreamObserver;
+
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+
+import javax.annotation.Nullable;
+
+/**
+ * A callable is an object which represents one or more rpc calls. Various operators on callables
+ * produce new callables, representing common API programming patterns. Callables can be used to
+ * directly operate against an api, or to efficiently implement wrappers for apis which add
+ * additional functionality and processing.
+ *
+ *
Technically, callables are a factory for grpc {@link ClientCall} objects, and can be executed
+ * by methods of the {@link ClientCalls} class. They also provide shortcuts for direct execution of
+ * the callable instance.
+ */
+@ExperimentalApi
+public abstract class ApiCallable {
+
+ // TODO(wrwg): Support interceptors and method/call option configurations.
+ // TODO(wrwg): gather more feedback whether the overload with java.util.Concurrent hurts that
+ // much that we want to rename this into ClientCallable or such.
+
+ // Subclass Contract
+ // =================
+
+ /**
+ * Creates a new GRPC call from this callable. A channel may or may not be provided.
+ * If a channel is not provided, the callable must be bound to a channel.
+ */
+ public abstract ClientCall newCall(@Nullable Channel channel);
+
+ /**
+ * Return a descriptor for this callable, or null if none available.
+ */
+ @Nullable public CallableDescriptor getDescriptor() {
+ return null;
+ }
+
+ /**
+ * Gets the channel bound to this callable, or null, if none is bound to it.
+ */
+ @Nullable public Channel getBoundChannel() {
+ return null;
+ }
+
+ // Binding Callables
+ // =================
+
+ /**
+ * Returns a callable which is bound to the given channel. Operations on the result can
+ * omit the channel. If a channel is provided anyway, it overrides the bound channel.
+ */
+ public ApiCallable bind(final Channel boundChannel) {
+ return new ApiCallable() {
+ @Override
+ public ClientCall newCall(@Nullable Channel channel) {
+ if (channel == null) {
+ // If the caller does not provide a channel, we use the bound one.
+ channel = boundChannel;
+ }
+ return ApiCallable.this.newCall(channel);
+ }
+
+ @Override
+ @Nullable
+ public CallableDescriptor getDescriptor() {
+ return ApiCallable.this.getDescriptor();
+ }
+
+ @Override
+ @Nullable
+ public Channel getBoundChannel() {
+ return boundChannel;
+ }
+ };
+ }
+
+ // Running Callables
+ // =================
+
+ private void requireMethodType(MethodType type) {
+ MethodType actualType = getDescriptor() != null
+ ? getDescriptor().getMethodDescriptor().getType() : null;
+ if (actualType == null || actualType == MethodType.UNKNOWN || actualType.equals(type)) {
+ return;
+ }
+ throw new IllegalArgumentException(String.format(
+ "Requested method type '%s' differs from actual type '%s'", type, actualType));
+ }
+
+ /**
+ * Convenience method to run a unary callable synchronously. If no channel is provided,
+ * the callable must be bound to one.
+ */
+ public ResponseT call(@Nullable Channel channel, RequestT request) {
+ requireMethodType(MethodType.UNARY);
+ return ClientCalls.blockingUnaryCall(newCall(channel), request);
+ }
+
+ /**
+ * Convenience method to run a unary callable synchronously, without channel. Requires a callable
+ * which is bound to a channel.
+ */
+ public ResponseT call(RequestT request) {
+ return call(null, request);
+ }
+
+ /**
+ * Convenience method to run a unary callable asynchronously. If no channel is provided,
+ * the callable must be bound to one.
+ */
+ public void asyncCall(@Nullable Channel channel, RequestT request,
+ StreamObserver responseObserver) {
+ requireMethodType(MethodType.UNARY);
+ ClientCalls.asyncUnaryCall(newCall(channel), request, responseObserver);
+ }
+
+ /**
+ * Convenience method to run a unary callable asynchronously, without channel. Requires a callable
+ * which is bound to a channel.
+ */
+ public void asyncCall(RequestT request, StreamObserver responseObserver) {
+ asyncCall(null, request, responseObserver);
+ }
+
+ /**
+ * Convenience method to run a unary callable returning a future. If no channel is provided,
+ * the callable must be bound to one.
+ */
+ public ListenableFuture futureCall(@Nullable Channel channel, RequestT request) {
+ requireMethodType(MethodType.UNARY);
+ return ClientCalls.futureUnaryCall(newCall(channel), request);
+ }
+
+ /**
+ * Convenience method to run a unary callable returning a future, without a channel. Requires a
+ * callable which is bound to a channel.
+ */
+ public ListenableFuture futureCall(RequestT request) {
+ return futureCall(null, request);
+ }
+
+ /**
+ * Convenience method for a blocking server streaming call. If no channel is provided,
+ * the callable must be bound to one.
+ *
+ *
Returns an iterable for the responses. Note the returned iterable can be used only once.
+ * Returning an Iterator would be more precise, but iterators cannot be used in Java for loops.
+ */
+ public Iterable iterableResponseStreamCall(@Nullable Channel channel,
+ RequestT request) {
+ requireMethodType(MethodType.SERVER_STREAMING);
+ final Iterator result =
+ ClientCalls.blockingServerStreamingCall(newCall(channel), request);
+ return new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Convenience method for a blocking server streaming call, without a channel. Requires a
+ * callable which is bound to a channel.
+ *
+ *
Returns an iterable for the responses. Note the returned iterable can be used only once.
+ * Returning an Iterator would be more precise, but iterators cannot be used in Java for loops.
+ */
+ public Iterable iterableResponseStreamCall(RequestT request) {
+ return iterableResponseStreamCall(null, request);
+ }
+
+ // Creation
+ // ========
+
+ /**
+ * Returns a callable which executes the described method.
+ *
+ *
+ */
+ public static ApiCallable
+ create(MethodDescriptor descriptor) {
+ return create(CallableDescriptor.create(descriptor));
+ }
+
+ /**
+ * Returns a callable which executes the method described by a {@link CallableDescriptor}.
+ */
+ public static ApiCallable
+ create(final CallableDescriptor descriptor) {
+ return new ApiCallable() {
+ @Override public ClientCall newCall(Channel channel) {
+ if (channel == null) {
+ throw new IllegalStateException(String.format(
+ "unbound callable for method '%s' requires a channel for execution",
+ descriptor.getMethodDescriptor().getFullMethodName()));
+ }
+ return channel.newCall(descriptor.getMethodDescriptor(), CallOptions.DEFAULT);
+ }
+
+ @Override public CallableDescriptor getDescriptor() {
+ return descriptor;
+ }
+
+ @Override public String toString() {
+ return descriptor.getMethodDescriptor().getFullMethodName();
+ }
+ };
+ }
+
+ /**
+ * Returns a callable which executes the given function asynchronously on each provided
+ * input. The supplied executor is used for creating tasks for each input. Example:
+ *
+ *
+ */
+ public static ApiCallable
+ create(Transformer super RequestT, ? extends ResponseT> transformer, Executor executor) {
+ return new TransformingCallable(transformer, executor);
+ }
+
+
+ /**
+ * Returns a callable which executes the given function immediately on each provided input.
+ * Similar as {@link #create(Transformer, Executor)} but does not operate asynchronously and does
+ * not require an executor.
+ *
+ *
Note that the callable returned by this method does not respect flow control. Some
+ * operations applied to it may deadlock because of this. However, it is safe to use this
+ * callable in the context of a {@link #followedBy(ApiCallable)} operation, which is the major
+ * use cases for transformers. But if you use a transformer to simulate a real rpc
+ * you should use {@link #create(Transformer, Executor)} instead.
+ */
+ public static ApiCallable
+ create(Transformer super RequestT, ? extends ResponseT> transformer) {
+ return new TransformingCallable(transformer, null);
+ }
+
+ /**
+ * Interface for a transformer. It can throw a {@link StatusException} to indicate an error.
+ */
+ public interface Transformer {
+ ResponseT apply(RequestT request) throws StatusException;
+ }
+
+ /**
+ * Returns a callable which echos its input.
+ */
+ public static ApiCallable
+ identity() {
+ return new TransformingCallable(new Transformer() {
+ @Override public RequestT apply(RequestT request) throws StatusException {
+ return request;
+ }
+ }, null);
+ }
+
+ /**
+ * Returns a callable which always returns the given constant.
+ */
+ public static ApiCallable
+ constant(final ResponseT result) {
+ return new TransformingCallable(new Transformer() {
+ @Override public ResponseT apply(RequestT request) throws StatusException {
+ return result;
+ }
+ }, null);
+ }
+
+ // Followed-By
+ // ===========
+
+ /**
+ * Returns a callable which forwards the responses from this callable as requests into the other
+ * callable. Works both for unary and streaming operands. Example:
+ *
+ *
For streaming calls, each output of the first callable will be forwarded to the second
+ * one as it arrives, allowing for streaming pipelines.
+ */
+ public ApiCallable
+ followedBy(ApiCallable callable) {
+ return new FollowedByCallable(this, callable);
+ }
+
+ // Retrying
+ // ========
+
+ /**
+ * Returns a callable which retries using exponential back-off on transient errors. Example:
+ *
+ *
The call will be retried if and only if the returned status code is {@code UNAVAILABLE}.
+ *
+ *
No output will be produced until the underlying callable has succeeded. Applied to compound
+ * callables, this can be used to implement simple transactions supposed the underlying callables
+ * are either side-effect free or idempotent.
+ *
+ *
Note that the retry callable requires to buffer all inputs and outputs of the underlying
+ * callable, and should be used with care when applied to streaming calls.
+ */
+ public ApiCallable retrying() {
+ return new RetryingCallable(this);
+ }
+
+ // Page Streaming
+ // ==============
+
+ /**
+ * Returns a callable which streams the resources obtained from a series of calls to a method
+ * implementing the pagination pattern. Example:
+ *
+ *
The returned stream does not buffer results; if it is traversed again, the API will be
+ * called again.
+ */
+ public ApiCallable
+ pageStreaming(PageDescriptor pageDescriptor) {
+ return new PageStreamingCallable(this, pageDescriptor);
+ }
+
+ /**
+ * Returns a callable which behaves the same as {@link #pageStreaming(PageDescriptor)}, with
+ * the page descriptor attempted to derive from the callable descriptor.
+ *
+ * @throws IllegalArgumentException if a page descriptor is not derivable.
+ */
+ public ApiCallable
+ pageStreaming(Class resourceType) {
+ PageDescriptor pageDescriptor =
+ getDescriptor() != null ? getDescriptor().getPageDescriptor(resourceType) : null;
+ if (pageDescriptor == null) {
+ throw new IllegalArgumentException(String.format(
+ "cannot derive page descriptor for '%s'", this));
+ }
+ return pageStreaming(pageDescriptor);
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/CallableDescriptor.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/CallableDescriptor.java
new file mode 100644
index 000000000000..b2c3a6952e32
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/CallableDescriptor.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import com.google.common.base.Preconditions;
+
+import io.grpc.ExperimentalApi;
+import io.grpc.MethodDescriptor;
+
+import javax.annotation.Nullable;
+
+/**
+ * Describes meta data for a {@link ApiCallable}.
+ */
+@ExperimentalApi
+class CallableDescriptor {
+
+ /**
+ * Constructs a descriptor from grpc descriptor.
+ */
+ public static CallableDescriptor
+ create(MethodDescriptor grpcDescriptor) {
+ return new CallableDescriptor(grpcDescriptor);
+ }
+
+ private final MethodDescriptor descriptor;
+
+ private CallableDescriptor(MethodDescriptor descriptor) {
+ this.descriptor = Preconditions.checkNotNull(descriptor);
+ }
+
+ /**
+ * Returns the grpc method descriptor.
+ */
+ public MethodDescriptor getMethodDescriptor() {
+ return descriptor;
+ }
+
+ /**
+ * Returns a page descriptor if one is derivable from the callable descriptor, null if not.
+ * By default, this returns null, but sub-classes may override this.
+ */
+ @Nullable public PageDescriptor
+ getPageDescriptor(@SuppressWarnings("unused") Class resourceType) {
+ return null;
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/CompoundClientCall.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/CompoundClientCall.java
new file mode 100644
index 000000000000..99b8aaa35fbd
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/CompoundClientCall.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.Status;
+
+/**
+ * A helper to implement compound calls which is used for the implementation of other callables in
+ * this package. This allows to have the listener and call types being implemented from one class,
+ * which is not possible out-of-the-box because {@link ClientCall} is a class, not an interface.
+ * (Note that in Java8 ClientCall could be an interface, because it does not has instance data.)
+ */
+abstract class CompoundClientCall
+ extends ClientCall {
+
+ private final InnerListener listener = new InnerListener();
+
+ void onHeaders(@SuppressWarnings("unused") Metadata headers) {
+ // We typically ignore response headers in compound calls.
+ }
+
+ abstract void onMessage(InnerResT message);
+
+ abstract void onClose(Status status, Metadata trailers);
+
+ final ClientCall.Listener listener() {
+ return listener;
+ }
+
+ class InnerListener extends ClientCall.Listener {
+
+ @Override
+ public void onHeaders(Metadata headers) {
+ CompoundClientCall.this.onHeaders(headers);
+ }
+
+ @Override
+ public void onMessage(InnerResT message) {
+ CompoundClientCall.this.onMessage(message);
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ CompoundClientCall.this.onClose(status, trailers);
+ }
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/FollowedByCallable.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/FollowedByCallable.java
new file mode 100644
index 000000000000..3fe019153c15
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/FollowedByCallable.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.Status;
+
+import javax.annotation.Nullable;
+
+/**
+ * Helper type for the implementation of {@link ApiCallable} methods. Please see there first for the
+ * specification of what this is doing. This class is concerned with the how.
+ *
+ *
Implements the followedBy callable, which executes two callables in parallel, piping output
+ * from the first to the second.
+ */
+class FollowedByCallable extends ApiCallable {
+
+ private final ApiCallable first;
+ private final ApiCallable second;
+
+ FollowedByCallable(ApiCallable first, ApiCallable second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("followedBy(%s, %s)", first, second);
+ }
+
+ @Override
+ @Nullable
+ public Channel getBoundChannel() {
+ // Inherit a bound channel from operands.
+ Channel channel = first.getBoundChannel();
+ if (channel != null) {
+ return channel;
+ }
+ return second.getBoundChannel();
+ }
+
+ @Override
+ public ClientCall newCall(Channel channel) {
+ return new FollowedByCall(channel);
+ }
+
+ /**
+ * Both calls are started in parallel, and the output from the first is immediately piped as input
+ * into the second. As we don't know the required input for the second call, we request all output
+ * from the first as soon as some output for the composite is requested.
+ */
+ private class FollowedByCall extends CompoundClientCall {
+
+ private Channel channel;
+ private ClientCall firstCall;
+ private ClientCall secondCall;
+ private ClientCall.Listener listener;
+
+ private FollowedByCall(Channel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void start(ClientCall.Listener listener, Metadata headers) {
+ this.listener = listener;
+ this.firstCall = first.newCall(channel);
+ this.secondCall = second.newCall(channel);
+
+ // This instance's listener will receive output from the first call.
+ this.firstCall.start(this.listener(), headers);
+
+ // The ForwardingCallable listener will receive output from the second call.
+ this.secondCall.start(listener, headers);
+ }
+
+ @Override
+ public void request(int numMessages) {
+ // We don't know how much inputs the second call needs, so we request all what is available.
+ firstCall.request(Integer.MAX_VALUE);
+ secondCall.request(numMessages);
+ }
+
+ @Override
+ public void cancel() {
+ firstCall.cancel();
+ secondCall.cancel();
+ }
+
+ @Override
+ public void sendMessage(RequestT message) {
+ firstCall.sendMessage(message);
+ }
+
+ @Override
+ public void halfClose() {
+ firstCall.halfClose();
+ }
+
+ @Override
+ public void onMessage(InterT message) {
+ secondCall.sendMessage(message);
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ if (status.isOk()) {
+ secondCall.halfClose();
+ return;
+ }
+ secondCall.cancel();
+ listener.onClose(status, trailers);
+ }
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/PageDescriptor.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/PageDescriptor.java
new file mode 100644
index 000000000000..b8f47957e9db
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/PageDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import io.grpc.ExperimentalApi;
+
+/**
+ * An interface which describes the paging pattern.
+ */
+@ExperimentalApi
+public interface PageDescriptor {
+
+ /**
+ * Delivers the empty page token.
+ */
+ Object emptyToken();
+
+ /**
+ * Injects a page token into the request.
+ */
+ RequestT injectToken(RequestT payload, Object token);
+
+ /**
+ * Extracts the next token from the response. Returns the empty token if there are
+ * no more pages.
+ */
+ Object extractNextToken(ResponseT payload);
+
+ /**
+ * Extracts an iterable of resources from the response.
+ */
+ Iterable extractResources(ResponseT payload);
+}
\ No newline at end of file
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/PageStreamingCallable.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/PageStreamingCallable.java
new file mode 100644
index 000000000000..d864ec4fc562
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/PageStreamingCallable.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import com.google.common.base.Preconditions;
+
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.Status;
+
+import java.util.concurrent.Semaphore;
+
+import javax.annotation.Nullable;
+
+/**
+ * Helper type for the implementation of {@link ApiCallable} methods. Please see there first for the
+ * specification of what this is doing. This class is concerned with the how.
+ *
+ *
Implementation of the pageStreaming callable.
+ */
+class PageStreamingCallable extends ApiCallable {
+
+ private final ApiCallable callable;
+ private final PageDescriptor pageDescriptor;
+
+
+ PageStreamingCallable(ApiCallable callable,
+ PageDescriptor pageDescriptor) {
+ this.callable = Preconditions.checkNotNull(callable);
+ this.pageDescriptor = Preconditions.checkNotNull(pageDescriptor);
+ }
+
+ @Override public String toString() {
+ return String.format("pageStreaming(%s)", callable.toString());
+ }
+
+ @Override
+ @Nullable
+ public Channel getBoundChannel() {
+ return callable.getBoundChannel();
+ }
+
+ @Override
+ public ClientCall newCall(Channel channel) {
+ return new PageStreamingCall(channel);
+ }
+
+ /**
+ * Class which handles both the call logic for the callable and listens to page call responses.
+ *
+ *
The implementation uses a semaphore to handle flow control, since the callable level flow
+ * control via request() doesn't map 1:1 to the page call flow control. The semaphore holds at any
+ * time the number of requested messages on callable level. Blocking on the semaphore happens
+ * exclusively in onMessage() calls for pages. Apart of the first page call which is scheduled at
+ * the time the caller half-closes, all future page calls will be triggered from onMessage() as
+ * well. This avoids thread safety issues, assuming the ClientCall concurrency contract.
+ */
+ private class PageStreamingCall extends CompoundClientCall {
+
+ private final Channel channel;
+ private ClientCall.Listener outerListener;
+ private Metadata headers;
+ private RequestT request;
+ private Semaphore requestedSemaphore = new Semaphore(0);
+ private ClientCall currentCall;
+ private Object nextPageToken = pageDescriptor.emptyToken();
+ private boolean sentClose;
+
+ private PageStreamingCall(Channel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void start(ClientCall.Listener responseListener, Metadata headers) {
+ this.outerListener = responseListener;
+ this.headers = headers;
+ currentCall = callable.newCall(channel);
+ currentCall.start(listener(), headers);
+ }
+
+ @Override
+ public void request(int numMessages) {
+ requestedSemaphore.release(numMessages);
+ }
+
+ @Override
+ public void sendMessage(RequestT request) {
+ Preconditions.checkState(this.request == null);
+ this.request = request;
+ }
+
+ @Override
+ public void halfClose() {
+ // Trigger the call for the first page.
+ requestNextPage();
+ }
+
+ @Override
+ public void cancel() {
+ currentCall.cancel();
+ if (!sentClose) {
+ outerListener.onClose(Status.CANCELLED, new Metadata());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void requestNextPage() {
+ currentCall.request(1);
+ currentCall.sendMessage(pageDescriptor.injectToken(request, nextPageToken));
+ currentCall.halfClose();
+ }
+
+ @Override
+ public void onMessage(ResponseT response) {
+ // Extract the token for the next page. If empty, there are no more pages,
+ // and we set the token to null.
+ Object token = pageDescriptor.extractNextToken(response);
+ nextPageToken = token.equals(pageDescriptor.emptyToken()) ? null : token;
+
+ // Deliver as much resources as have been requested. This may block via
+ // our semaphore, and while we are delivering, more requests may come in.
+ for (ResourceT resource : pageDescriptor.extractResources(response)) {
+ try {
+ requestedSemaphore.acquire();
+ } catch (InterruptedException e) {
+ outerListener.onClose(Status.fromThrowable(e), new Metadata());
+ sentClose = true;
+ currentCall.cancel();
+ return;
+ }
+ outerListener.onMessage(resource);
+ }
+
+ // If there is a next page, create a new call and request it.
+ if (nextPageToken != null) {
+ currentCall = callable.newCall(channel);
+ currentCall.start(listener(), headers);
+ requestNextPage();
+ } else {
+ outerListener.onClose(Status.OK, new Metadata());
+ sentClose = true;
+ }
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ if (!status.isOk()) {
+ // If there is an error, propagate it. Otherwise let onMessage determine how to continue.
+ outerListener.onClose(status, trailers);
+ sentClose = true;
+ }
+ }
+ }
+}
+
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/RetryingCallable.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/RetryingCallable.java
new file mode 100644
index 000000000000..d476890c85e9
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/RetryingCallable.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.Status;
+
+import java.util.List;
+import java.util.Random;
+
+import javax.annotation.Nullable;
+
+/**
+ * Helper type for the implementation of {@link ApiCallable} methods. Please see there first for the
+ * specification of what this is doing. This class is concerned with the how.
+ *
+ *
Implementation of the retrying callable.
+ */
+class RetryingCallable extends ApiCallable {
+
+ // TODO(wgg): make the parameters below configurable. They are currently taken from
+ // http://en.wikipedia.org/wiki/Exponential_backoff.
+
+ private static final int SLOT_TIME_MILLIS = 2;
+ private static final int TRUNCATE_AFTER = 10;
+ private static final int MAX_ATTEMPTS = 16;
+ private static final Random randomGenerator = new Random(0);
+
+ private final ApiCallable callable;
+
+ RetryingCallable(ApiCallable callable) {
+ this.callable = callable;
+ }
+
+ @Override public String toString() {
+ return String.format("retrying(%s)", callable.toString());
+ }
+
+ @Override
+ @Nullable
+ public CallableDescriptor getDescriptor() {
+ return callable.getDescriptor();
+ }
+
+ @Override
+ @Nullable
+ public Channel getBoundChannel() {
+ return callable.getBoundChannel();
+ }
+
+ @Override public ClientCall newCall(Channel channel) {
+ return new RetryingCall(channel);
+ }
+
+ private static boolean canRetry(Status status) {
+ return status.getCode() == Status.Code.UNAVAILABLE;
+ }
+
+ /**
+ * Class implementing the call for retry.
+ *
+ *
This remembers all actions from the caller in order to replay the call if needed. No output
+ * will be produced until the call has successfully ended. Thus this call requires full buffering
+ * of inputs and outputs,
+ */
+ private class RetryingCall extends CompoundClientCall {
+
+ private final Channel channel;
+ private ClientCall.Listener listener;
+ private int requested;
+ private Metadata requestHeaders;
+ private final List requestPayloads = Lists.newArrayList();
+ private final List responsePayloads = Lists.newArrayList();
+ private Metadata responseHeaders;
+ private int attempt;
+ private ClientCall currentCall;
+
+ private RetryingCall(Channel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void start(ClientCall.Listener listener, Metadata headers) {
+ this.listener = listener;
+ requestHeaders = headers;
+ currentCall = callable.newCall(channel);
+ currentCall.start(listener(), headers);
+ }
+
+ @Override
+ public void request(int numMessages) {
+ requested += numMessages;
+ currentCall.request(numMessages);
+ }
+
+ @Override
+ public void cancel() {
+ currentCall.cancel();
+ }
+
+ @Override
+ public void halfClose() {
+ currentCall.halfClose();
+ }
+
+ @Override
+ public void sendMessage(RequestT message) {
+ requestPayloads.add(message);
+ currentCall.sendMessage(message);
+ }
+
+ @Override
+ public void onHeaders(Metadata headers) {
+ responseHeaders = headers;
+ }
+
+ @Override
+ void onMessage(ResponseT message) {
+ responsePayloads.add(message);
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ if (status.isOk() || !canRetry(status) || attempt >= MAX_ATTEMPTS) {
+ // Call succeeded or failed non-transiently or failed too often; feed underlying listener
+ // with the result.
+ if (status.isOk()) {
+ if (responseHeaders != null) {
+ listener.onHeaders(responseHeaders);
+ }
+ for (ResponseT response : responsePayloads) {
+ listener.onMessage(response);
+ }
+ }
+ listener.onClose(status, trailers);
+ return;
+ }
+
+ // Sleep using duration calculated by exponential backoff algorithm.
+ attempt++;
+ int slots = 1 << (attempt - 1 > TRUNCATE_AFTER ? TRUNCATE_AFTER : attempt - 1);
+ int slot = randomGenerator.nextInt(slots);
+ if (slot > 0) {
+ try {
+ Thread.sleep(SLOT_TIME_MILLIS * slot);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ // Start call again.
+ responseHeaders = null;
+ responsePayloads.clear();
+ currentCall = callable.newCall(channel);
+ currentCall.start(listener(), requestHeaders);
+ currentCall.request(requested);
+ for (RequestT request : requestPayloads) {
+ currentCall.sendMessage(request);
+ }
+ currentCall.halfClose();
+ }
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java
new file mode 100644
index 000000000000..485d4794d917
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import com.google.auth.Credentials;
+
+import io.grpc.ManagedChannel;
+
+public class ServiceApiSettings {
+ private boolean isIdempotentRetrying;
+
+ private Credentials credentials;
+
+ private String servicePath;
+ private int port;
+
+ private ManagedChannel channel;
+
+ public ServiceApiSettings() {
+ isIdempotentRetrying = true;
+ credentials = null;
+ servicePath = null;
+ port = 0;
+ }
+
+ /**
+ * Set to true in order to have the service retry all idempotent methods,
+ * set to false otherwise. The default is true. This setting generally translates to
+ * doing retries for calls which perform gets, deletes, and updates, but not calls which
+ * perform creates.
+ */
+ public ServiceApiSettings setIsIdempotentRetrying(boolean isIdempotentRetrying) {
+ this.isIdempotentRetrying = isIdempotentRetrying;
+ return this;
+ }
+
+ public boolean getIsIdempotentRetrying() {
+ return isIdempotentRetrying;
+ }
+
+ /**
+ * Sets the credentials to use in order to call the service. The default is to acquire
+ * the credentials using GoogleCredentials.getApplicationDefault().
+ */
+ public ServiceApiSettings setCredentials(Credentials credentials) {
+ this.credentials = credentials;
+ return this;
+ }
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ /**
+ * The path used to reach the service.
+ */
+ public ServiceApiSettings setServicePath(String servicePath) {
+ this.servicePath = servicePath;
+ return this;
+ }
+
+ public String getServicePath() {
+ return servicePath;
+ }
+
+ /**
+ * The port used to reach the service.
+ */
+ public ServiceApiSettings setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * An instance of ManagedChannel; shutdown will be called on this channel when
+ * the instance of LoggingServiceApi is shut down.
+ */
+ public ServiceApiSettings setChannel(ManagedChannel channel) {
+ this.channel = channel;
+ return this;
+ }
+
+ public ManagedChannel getChannel() {
+ return channel;
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/TransformingCallable.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/TransformingCallable.java
new file mode 100644
index 000000000000..507194a50688
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/TransformingCallable.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.internal.SerializingExecutor;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+
+import javax.annotation.Nullable;
+
+/**
+ * Helper type for the implementation of {@link ApiCallable} methods. Please see there first for the
+ * specification of what this is doing. This class is concerned with the how.
+ *
+ *
Implements the transform callable, which executes a function to produce a stream of responses
+ * from a stream of requests.
+ */
+class TransformingCallable extends ApiCallable {
+
+ private final Transformer super RequestT, ? extends ResponseT> transformer;
+ @Nullable private final Executor executor;
+
+ TransformingCallable(Transformer super RequestT, ? extends ResponseT> transformer,
+ Executor executor) {
+ this.transformer = Preconditions.checkNotNull(transformer);
+ this.executor = executor;
+ }
+
+ @Override
+ public String toString() {
+ return "transforming(...)";
+ }
+
+ @Override
+ public ClientCall newCall(Channel channel) {
+ return new TransformCall();
+ }
+
+ /**
+ * Implements the transforming call. If an executor is provided, delivery of results will
+ * happen asynchronously and flow control is respected. If not, results will be delivered
+ * to the listener immediately on sendMessage(). This violates the ClientCall contract as
+ * methods on Call are supposed to be non-blocking, whereas methods on Listener can block.
+ * In most practical cases, this should not matter (see also high-level documentation in
+ * Callable).
+ *
+ *
Note that this class does not need to be thread-safe since (a) the contract for
+ * ClientCall does not require thread-safeness (b) we use a SerializingExecutor for
+ * asynchronous callbacks which is guaranteed to run not more than one thread.
+ */
+ private class TransformCall extends ClientCall {
+
+ private final SerializingExecutor callExecutor =
+ executor == null ? null : new SerializingExecutor(executor);
+ private final Semaphore semaphore = new Semaphore(0);
+ private ClientCall.Listener listener;
+ private boolean sentClose;
+
+ @Override
+ public void start(ClientCall.Listener listener, Metadata headers) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void request(int numMessages) {
+ if (callExecutor != null) {
+ semaphore.release(numMessages);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (!sentClose) {
+ listener.onClose(Status.CANCELLED, new Metadata());
+ sentClose = true;
+ }
+ }
+
+ @Override
+ public void sendMessage(final RequestT message) {
+ if (callExecutor == null) {
+ doSend(message);
+ return;
+ }
+ callExecutor.execute(new Runnable() {
+ @Override public void run() {
+ try {
+ semaphore.acquire();
+ doSend(message);
+ } catch (Throwable t) {
+ cancel();
+ throw Throwables.propagate(t);
+ }
+ }
+ });
+ }
+
+ @SuppressWarnings("deprecation") // e.getStatus()
+ private void doSend(RequestT message) {
+ try {
+ listener.onMessage(transformer.apply(message));
+ } catch (StatusException e) {
+ sentClose = true;
+ listener.onClose(e.getStatus(), new Metadata());
+ } catch (Throwable t) {
+ // TODO(wgg): should we throw anything else here, or catch like below? Catching might
+ // be an issue for debugging.
+ sentClose = true;
+ listener.onClose(Status.fromThrowable(t), new Metadata());
+ }
+ }
+
+ @Override
+ public void halfClose() {
+ if (callExecutor == null) {
+ doClose();
+ return;
+ }
+ callExecutor.execute(new Runnable() {
+ @Override public void run() {
+ doClose();
+ }
+ });
+ }
+
+ private void doClose() {
+ if (!sentClose) {
+ sentClose = true;
+ listener.onClose(Status.OK, new Metadata());
+ }
+ }
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java b/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java
new file mode 100644
index 000000000000..6673726f761e
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.internal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.collect.Lists;
+
+import io.gapi.gax.grpc.ApiCallable;
+import io.gapi.gax.grpc.ServiceApiSettings;
+import io.grpc.ClientInterceptor;
+import io.grpc.ManagedChannel;
+import io.grpc.auth.ClientAuthInterceptor;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
+public class ApiUtils {
+
+ // TODO(wgg): make this configurable
+ private static final int AUTH_THREADS = 4;
+
+ public static ApiCallable prepareIdempotentCallable(
+ ApiCallable callable, ServiceApiSettings settings) {
+ ApiCallable theCallable = callable;
+ if (settings.getIsIdempotentRetrying()) {
+ theCallable = theCallable.retrying();
+ }
+ return theCallable;
+ }
+
+ /**
+ * Creates a channel for the given path, address and port.
+ */
+ public static ManagedChannel createChannel(String address, int port, Collection scopes)
+ throws IOException {
+ List interceptors = Lists.newArrayList();
+ //TODO: MIGRATION interceptors.add(ChannelFactory.authorityInterceptor(address));
+
+ GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
+ if (credentials.createScopedRequired()) {
+ credentials = credentials.createScoped(scopes);
+ }
+ interceptors.add(new ClientAuthInterceptor(credentials,
+ Executors.newFixedThreadPool(AUTH_THREADS)));
+
+ return NettyChannelBuilder
+ .forAddress(address, port)
+ .negotiationType(NegotiationType.TLS)
+ .intercept(interceptors)
+ .build();
+ }
+
+ public static ServiceApiSettings settingsWithChannels(ServiceApiSettings settings,
+ String defaultServicePath, int defaultServicePort, String scopes[]) throws IOException {
+ ManagedChannel channel = settings.getChannel();
+
+ if (channel == null) {
+ String servicePath = defaultServicePath;
+ if (settings.getServicePath() != null) {
+ servicePath = settings.getServicePath();
+ }
+
+ int port = defaultServicePort;
+ if (settings.getPort() != 0) {
+ port = settings.getPort();
+ }
+
+ List scopeList = Arrays.asList(scopes);
+ channel = ApiUtils.createChannel(servicePath, port, scopeList);
+ }
+
+ return new ServiceApiSettings()
+ .setChannel(channel)
+ .setIsIdempotentRetrying(settings.getIsIdempotentRetrying());
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/PathTemplate.java b/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/PathTemplate.java
new file mode 100644
index 000000000000..a20c2b6a1f10
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/PathTemplate.java
@@ -0,0 +1,881 @@
+package io.gapi.gax.protobuf;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+/**
+ * Represents a path template.
+ *
+ *
Templates use the syntax of the API platform; see the protobuf of {@link HttpRule} for
+ * details. A template consists of a sequence of literals, wildcards, and variable bindings,
+ * where each binding can have a sub-path. A string representation can be parsed into an
+ * instance of {@link PathTemplate}, which can then be used to perform matching and instantiation.
+ *
+ *
Matching and instantiation deals with unescaping and escaping using URL encoding rules. For
+ * example, if a template variable for a single segment is instantiated with a string like
+ * {@code "a/b"}, the slash will be escaped to {@code "%2f"}. (Note that slash will not be escaped
+ * for a multiple-segment variable, but other characters will). The literals in the template
+ * itself are not escaped automatically, and must be already URL encoded.
+ *
+ *
Here is an example for a template using simple variables:
+ *
+ *
+ * Path templates can also be used with only wildcards. Each wildcard is associated
+ * with an implicit variable {@code $n}, where n is the zero-based position of the
+ * wildcard. Example:
+ *
+ *
+ * Paths input to matching can use URL relative syntax to indicate a host name by prefixing the
+ * host name, as in {@code //somewhere.io/some/path}. The host name is matched into the special
+ * variable {@link #HOSTNAME_VAR}. Patterns are agnostic about host names, and the same pattern
+ * can be used for URL relative syntax and simple path syntax:
+ *
+ *
+ * For the representation of a resource name see {@link ResourceName}, which is based
+ * on path templates.
+ */
+@Beta
+public class PathTemplate {
+
+ /**
+ * A constant identifying the special variable used for endpoint bindings in
+ * the result of {@link #matchFromFullName(String)}.
+ */
+ public static final String HOSTNAME_VAR = "$hostname";
+
+ // A regexp to match a custom verb at the end of a path.
+ private static final Pattern CUSTOM_VERB_PATTERN = Pattern.compile(":([^/*}{=]+)$");
+
+ // A splitter on slash.
+ private static final Splitter SLASH_SPLITTER = Splitter.on('/').trimResults();
+
+ // Helper Types
+ // ============
+
+ /**
+ * Specifies a path segment kind.
+ */
+ enum SegmentKind {
+ /** A literal path segment. */
+ LITERAL,
+
+ /** A custom verb. Can only appear at the end of path. */
+ CUSTOM_VERB,
+
+ /** A simple wildcard ('*'). */
+ WILDCARD,
+
+ /** A path wildcard ('**'). */
+ PATH_WILDCARD,
+
+ /** A field binding start. */
+ BINDING,
+
+ /** A field binding end. */
+ END_BINDING,
+ }
+
+ /**
+ * Specifies a path segment.
+ */
+ @AutoValue
+ abstract static class Segment {
+
+ /**
+ * A constant for the WILDCARD segment.
+ */
+ private static final Segment WILDCARD = create(SegmentKind.WILDCARD, "*");
+
+ /**
+ * A constant for the PATH_WILDCARD segment.
+ */
+ private static final Segment PATH_WILDCARD = create(SegmentKind.PATH_WILDCARD, "**");
+
+ /**
+ * A constant for the END_BINDING segment.
+ */
+ private static final Segment END_BINDING = create(SegmentKind.END_BINDING, "");
+
+ /**
+ * Creates a segment of given kind and value.
+ */
+ private static Segment create(SegmentKind kind, String value) {
+ return new AutoValue_PathTemplate_Segment(kind, value);
+ }
+
+ /**
+ * The path segment kind.
+ */
+ abstract SegmentKind kind();
+
+ /**
+ * The value for the segment. For literals, custom verbs, and wildcards, this reflects the value
+ * as it appears in the template. For bindings, this represents the variable of the binding.
+ */
+ abstract String value();
+
+ /**
+ * Returns true of this segment is one of the wildcards,
+ */
+ boolean isAnyWildcard() {
+ return kind() == SegmentKind.WILDCARD || kind() == SegmentKind.PATH_WILDCARD;
+ }
+
+ String separator() {
+ switch (kind()) {
+ case CUSTOM_VERB:
+ return ":";
+ case END_BINDING:
+ return "";
+ default:
+ return "/";
+ }
+ }
+ }
+
+ /**
+ * Creates a path template from a string. The string must satisfy the syntax
+ * of path templates of the API platform; see {@link HttpRule}'s proto source.
+ *
+ * @throws ValidationException if there are errors while parsing the template.
+ */
+ public static PathTemplate create(String template) {
+ return new PathTemplate(parseTemplate(template));
+ }
+
+ // Instance State and Methods
+ // ==========================
+
+ // List of segments of this template.
+ private final ImmutableList segments;
+
+ // Map from variable names to bindings in the template.
+ private final ImmutableMap bindings;
+
+ private PathTemplate(Iterable segments) {
+ this.segments = ImmutableList.copyOf(segments);
+ if (this.segments.isEmpty()) {
+ throw new ValidationException("template cannot be empty.");
+ }
+ Map bindings = Maps.newLinkedHashMap();
+ for (Segment seg : this.segments) {
+ if (seg.kind() == SegmentKind.BINDING) {
+ if (bindings.containsKey(seg.value())) {
+ throw new ValidationException("Duplicate binding '%s'", seg.value());
+ }
+ bindings.put(seg.value(), seg);
+ }
+ }
+ this.bindings = ImmutableMap.copyOf(bindings);
+ }
+
+ /**
+ * Returns the set of variable names used in the template.
+ */
+ public Set vars() {
+ return bindings.keySet();
+ }
+
+ /**
+ * Returns a template for the parent of this template.
+ *
+ * @throws ValidationException if the template has no parent.
+ */
+ public PathTemplate parentTemplate() {
+ int i = segments.size();
+ Segment seg = segments.get(--i);
+ if (seg.kind() == SegmentKind.END_BINDING) {
+ while (i > 0 && segments.get(--i).kind() != SegmentKind.BINDING) {}
+ }
+ if (i == 0) {
+ throw new ValidationException("template does not have a parent");
+ }
+ return new PathTemplate(segments.subList(0, i));
+ }
+
+ /**
+ * Returns a template where all variable bindings have been replaced by wildcards, but
+ * which is equivalent regards matching to this one.
+ */
+ public PathTemplate withoutVars() {
+ StringBuilder result = new StringBuilder();
+ ListIterator iterator = segments.listIterator();
+ boolean start = true;
+ while (iterator.hasNext()) {
+ Segment seg = iterator.next();
+ switch (seg.kind()) {
+ case BINDING:
+ case END_BINDING:
+ break;
+ default:
+ if (!start) {
+ result.append(seg.separator());
+ } else {
+ start = false;
+ }
+ result.append(seg.value());
+ }
+ }
+ return create(result.toString());
+ }
+
+ /**
+ * Returns a path template for the sub-path of the given variable. Example:
+ *
+ *
+ *
+ * The returned template will never have named variables, but only wildcards, which are
+ * dealt with in matching and instantiation using '$n'-variables. See the documentation of
+ * {@link #match(String)} and {@link #instantiate(Map)}, respectively.
+ *
+ *
For a variable which has no sub-path, this returns a path template with a single wildcard
+ * ('*').
+ *
+ * @throws ValidationException if the variable does not exist in the template.
+ */
+ public PathTemplate subTemplate(String varName) {
+ List sub = Lists.newArrayList();
+ boolean inBinding = false;
+ for (Segment seg : segments) {
+ if (seg.kind() == SegmentKind.BINDING && seg.value().equals(varName)) {
+ inBinding = true;
+ } else if (inBinding) {
+ if (seg.kind() == SegmentKind.END_BINDING) {
+ return PathTemplate.create(toSyntax(sub, true));
+ } else {
+ sub.add(seg);
+ }
+ }
+ }
+ throw new ValidationException("Variable '%s' is undefined in template '%s'",
+ varName, this.toRawString());
+ }
+
+ /**
+ * Returns true of this template ends with a literal.
+ */
+ public boolean endsWithLiteral() {
+ return segments.get(segments.size() - 1).kind() == SegmentKind.LITERAL;
+ }
+
+ /**
+ * Returns true of this template ends with a custom verb.
+ */
+ public boolean endsWithCustomVerb() {
+ return segments.get(segments.size() - 1).kind() == SegmentKind.CUSTOM_VERB;
+ }
+
+ /**
+ * Creates a resource name from this template and a path.
+ *
+ * @throws ValidationException if the path does not match the template.
+ */
+ public ResourceName parse(String path) {
+ return ResourceName.create(this, path);
+ }
+
+ /**
+ * Returns the name of a singleton variable used by this template. If the template does not
+ * contain a single variable, returns null.
+ */
+ @Nullable
+ public String singleVar() {
+ if (bindings.size() == 1) {
+ return bindings.entrySet().iterator().next().getKey();
+ }
+ return null;
+ }
+
+ // Template Matching
+ // =================
+
+ /**
+ * Returns true if the template matches the path.
+ */
+ public boolean matches(String path) {
+ return match(path) != null;
+ }
+
+ /**
+ * Matches the path, returning a map from variable names to matched values. All matched values
+ * will be properly unescaped using URL encoding rules. If the path does not match the template,
+ * null is returned.
+ *
+ * If the path starts with '//', the first segment will be interpreted as a host name and stored
+ * in the variable {@link #HOSTNAME_VAR}.
+ *
+ *
See the {@link PathTemplate} class documentation for examples.
+ *
+ *
For free wildcards in the template, the matching process creates variables named '$n',
+ * where 'n' is the wildcard's position in the template (starting at n=0). For example:
+ *
+ *
+ *
+ * All matched values will be properly unescaped using URL encoding rules.
+ */
+ @Nullable
+ public ImmutableMap match(String path) {
+ return match(path, false);
+ }
+
+ /**
+ * Matches the path, where the first segment is interpreted as the host name regardless of
+ * whether it starts with '//' or not. Example:
+ *
+ *
+ */
+ @Nullable
+ public ImmutableMap matchFromFullName(String path) {
+ return match(path, true);
+ }
+
+ // Matches a path.
+ private ImmutableMap match(String path, boolean forceHostName) {
+ // Quick check for trailing custom verb.
+ Segment last = segments.get(segments.size() - 1);
+ if (last.kind() == SegmentKind.CUSTOM_VERB) {
+ Matcher matcher = CUSTOM_VERB_PATTERN.matcher(path);
+ if (!matcher.find() || !decodeUrl(matcher.group(1)).equals(last.value())) {
+ return null;
+ }
+ path = path.substring(0, matcher.start(0));
+ }
+
+ // Do full match.
+ boolean withHostName = path.startsWith("//");
+ if (withHostName) {
+ path = path.substring(2);
+ }
+ List input = SLASH_SPLITTER.splitToList(path);
+ int inPos = 0;
+ Map values = Maps.newLinkedHashMap();
+ if (withHostName || forceHostName) {
+ if (input.isEmpty()) {
+ return null;
+ }
+ String hostName = input.get(inPos++);
+ if (withHostName) {
+ // Put the // back, so we can distinguish this case from forceHostName.
+ hostName = "//" + hostName;
+ }
+ values.put(HOSTNAME_VAR, hostName);
+ }
+ if (!match(input, inPos, segments, 0, values)) {
+ return null;
+ }
+ return ImmutableMap.copyOf(values);
+ }
+
+ // Tries to match the input based on the segments at given positions. Returns a boolean
+ // indicating whether the match was successful.
+ private static boolean match(List input, int inPos, List segments, int segPos,
+ Map values) {
+ String currentVar = null;
+ while (segPos < segments.size()) {
+ Segment seg = segments.get(segPos++);
+ switch (seg.kind()) {
+ case END_BINDING:
+ // End current variable binding scope.
+ currentVar = null;
+ continue;
+ case BINDING:
+ // Start variable binding scope.
+ currentVar = seg.value();
+ continue;
+ default:
+ if (inPos >= input.size()) {
+ // End of input
+ return false;
+ }
+ // Check literal match.
+ String next = decodeUrl(input.get(inPos++));
+ if (seg.kind() == SegmentKind.LITERAL) {
+ if (!seg.value().equals(next)) {
+ // Literal does not match.
+ return false;
+ }
+ }
+ if (currentVar != null) {
+ // Create or extend current match
+ String current = values.get(currentVar);
+ if (current == null) {
+ values.put(currentVar, next);
+ } else {
+ values.put(currentVar, current + "/" + next);
+ }
+ }
+ if (seg.kind() == SegmentKind.PATH_WILDCARD) {
+ // Compute the number of additional input the ** can consume. This
+ // is possible because we restrict patterns to have only one **.
+ int segsToMatch = 0;
+ for (int i = segPos; i < segments.size(); i++) {
+ switch (segments.get(i).kind()) {
+ case BINDING:
+ case END_BINDING:
+ // skip
+ continue;
+ default:
+ segsToMatch++;
+ }
+ }
+ int available = (input.size() - inPos) - segsToMatch;
+ while (available-- > 0) {
+ values.put(currentVar, values.get(currentVar) + "/" + decodeUrl(input.get(inPos++)));
+ }
+ }
+ }
+ }
+ return inPos == input.size();
+ }
+
+ // Template Instantiation
+ // ======================
+
+ /**
+ * Instantiate the template based on the given variable assignment. Performs proper
+ * URL escaping of variable assignments.
+ *
+ *
Note that free wildcards in the template must have bindings of '$n' variables, where
+ * 'n' is the position of the wildcard (starting at 0). See the documentation of
+ * {@link #match(String)} for details.
+ *
+ * @throws ValidationException if a variable occurs in the template without a binding.
+ */
+ public String instantiate(Map values) {
+ return instantiate(values, false);
+ }
+
+ /**
+ * Shortcut for {@link #instantiate(Map)} with a vararg parameter for keys and values.
+ */
+ public String instantiate(String... keysAndValues) {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ for (int i = 0; i < keysAndValues.length; i += 2) {
+ builder.put(keysAndValues[i], keysAndValues[i + 1]);
+ }
+ return instantiate(builder.build());
+ }
+
+ /**
+ * Same like {@link #instantiate(Map)} but allows for unbound variables, which are
+ * substituted using their original syntax. Example:
+ *
+ *
+ *
+ * The result of this call can be used to create a new template.
+ */
+ public String instantiatePartial(Map values) {
+ return instantiate(values, true);
+ }
+
+ private String instantiate(Map values, boolean allowPartial) {
+ StringBuilder result = new StringBuilder();
+ if (values.containsKey(HOSTNAME_VAR)) {
+ result.append(values.get(HOSTNAME_VAR));
+ result.append('/');
+ }
+ boolean continueLast = true; // Whether to not append separator
+ boolean skip = false; // Whether we are substituting a binding and segments shall be skipped.
+ ListIterator iterator = segments.listIterator();
+ while (iterator.hasNext()) {
+ Segment seg = iterator.next();
+ if (!skip && !continueLast) {
+ result.append(seg.separator());
+ }
+ continueLast = false;
+ switch (seg.kind()) {
+ case BINDING:
+ String var = seg.value();
+ String value = values.get(seg.value());
+ if (value == null) {
+ if (!allowPartial) {
+ throw new ValidationException("Unbound variable '%s'. Bindings: %s",
+ var, values);
+ }
+ // Append pattern to output
+ if (var.startsWith("$")) {
+ // Eliminate positional variable.
+ result.append(iterator.next().value());
+ iterator.next();
+ continue;
+ }
+ result.append('{');
+ result.append(seg.value());
+ result.append('=');
+ continueLast = true;
+ continue;
+ }
+ Segment next = iterator.next();
+ Segment nextNext = iterator.next();
+ boolean pathEscape = next.kind() == SegmentKind.PATH_WILDCARD
+ || nextNext.kind() != SegmentKind.END_BINDING;
+ restore(iterator, iterator.nextIndex() - 2);
+ if (!pathEscape) {
+ result.append(encodeUrl(value));
+ } else {
+ // For a path wildcard or path of length greater 1, split the value and escape
+ // every sub-segment.
+ boolean first = true;
+ for (String subSeg : SLASH_SPLITTER.split(value)) {
+ if (!first) {
+ result.append('/');
+ }
+ first = false;
+ result.append(encodeUrl(subSeg));
+ }
+ }
+ skip = true;
+ continue;
+ case END_BINDING:
+ if (!skip) {
+ result.append('}');
+ }
+ skip = false;
+ continue;
+ default:
+ if (!skip) {
+ result.append(seg.value());
+ }
+ }
+ }
+ return result.toString();
+ }
+
+ // Positional Matching and Instantiation
+ // =====================================
+
+ /**
+ * Instantiates the template from the given positional parameters. The template must not be build
+ * from named bindings, but only contain wildcards. Each parameter position corresponds to a
+ * wildcard of the according position in the template.
+ */
+ public String encode(String... values) {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ int i = 0;
+ for (String value : values) {
+ builder.put("$" + i++, value);
+ }
+ // We will get an error if there are named bindings which are not reached by values.
+ return instantiate(builder.build());
+ }
+
+ /**
+ * Matches the template into a list of positional values. The template must not be build from
+ * named bindings, but only contain wildcards. For each wildcard in the template, a value
+ * is returned at corresponding position in the list.
+ */
+ public List decode(String path) {
+ Map match = match(path);
+ if (match == null) {
+ throw new IllegalArgumentException(String.format("template '%s' does not match '%s'",
+ this, path));
+ }
+ List result = Lists.newArrayList();
+ for (Map.Entry entry : match.entrySet()) {
+ String key = entry.getKey();
+ if (!key.startsWith("$")) {
+ throw new IllegalArgumentException("template must not contain named bindings");
+ }
+ int i = Integer.parseInt(key.substring(1));
+ while (result.size() <= i) {
+ result.add("");
+ }
+ result.set(i, entry.getValue());
+ }
+ return ImmutableList.copyOf(result);
+ }
+
+ // Template Parsing
+ // ================
+
+ private static ImmutableList parseTemplate(String template) {
+ // Skip useless leading slash.
+ if (template.startsWith("/")) {
+ template = template.substring(1);
+ }
+
+ // Extract trailing custom verb.
+ Matcher matcher = CUSTOM_VERB_PATTERN.matcher(template);
+ String customVerb = null;
+ if (matcher.find()) {
+ customVerb = matcher.group(1);
+ template = template.substring(0, matcher.start(0));
+ }
+
+ ImmutableList.Builder builder = ImmutableList.builder();
+ String varName = null;
+ int freeWildcardCounter = 0;
+ int pathWildCardBound = 0;
+
+ for (String seg : Splitter.on('/').trimResults().split(template)) {
+ // If segment starts with '{', a binding group starts.
+ boolean bindingStarts = seg.startsWith("{");
+ boolean implicitWildcard = false;
+ if (bindingStarts) {
+ if (varName != null) {
+ throw new ValidationException("parse error: nested binding in '%s'", template);
+ }
+ seg = seg.substring(1);
+
+ int i = seg.indexOf('=');
+ if (i <= 0) {
+ // Possibly looking at something like "{name}" with implicit wildcard.
+ if (seg.endsWith("}")) {
+ // Remember to add an implicit wildcard later.
+ implicitWildcard = true;
+ varName = seg.substring(0, seg.length() - 1).trim();
+ seg = seg.substring(seg.length() - 1).trim();
+ } else {
+ throw new ValidationException("parse error: invalid binding syntax in '%s'", template);
+ }
+ } else {
+ // Looking at something like "{name=wildcard}".
+ varName = seg.substring(0, i).trim();
+ seg = seg.substring(i + 1).trim();
+ }
+ builder.add(Segment.create(SegmentKind.BINDING, varName));
+ }
+
+ // If segment ends with '}', a binding group ends. Remove the brace and remember.
+ boolean bindingEnds = seg.endsWith("}");
+ if (bindingEnds) {
+ seg = seg.substring(0, seg.length() - 1).trim();
+ }
+
+ // Process the segment, after stripping off "{name=.." and "..}".
+ switch (seg) {
+ case "**":
+ case "*":
+ if ("**".equals(seg)) {
+ pathWildCardBound++;
+ }
+ Segment wildcard = seg.length() == 2 ? Segment.PATH_WILDCARD : Segment.WILDCARD;
+ if (varName == null) {
+ // Not in a binding, turn wildcard into implicit binding.
+ // "*" => "{$n=*}"
+ builder.add(Segment.create(SegmentKind.BINDING, "$" + freeWildcardCounter));
+ freeWildcardCounter++;
+ builder.add(wildcard);
+ builder.add(Segment.END_BINDING);
+ } else {
+ builder.add(wildcard);
+ }
+ break;
+ case "":
+ if (!bindingEnds) {
+ throw new ValidationException("parse error: empty segment not allowed in '%s'",
+ template);
+ }
+ // If the wildcard is implicit, seg will be empty. Just continue.
+ break;
+ default:
+ builder.add(Segment.create(SegmentKind.LITERAL, seg));
+ }
+
+ // End a binding.
+ if (bindingEnds) {
+ // Reset varName to null for next binding.
+ varName = null;
+
+ if (implicitWildcard) {
+ // Looking at something like "{var}". Insert an implicit wildcard, as it is the same
+ // as "{var=*}".
+ builder.add(Segment.WILDCARD);
+ }
+ builder.add(Segment.END_BINDING);
+ }
+
+ if (pathWildCardBound > 1) {
+ // Report restriction on number of '**' in the pattern. There can be only one, which
+ // enables non-backtracking based matching.
+ throw new ValidationException(
+ "parse error: pattern must not contain more than one path wildcard ('**') in '%s'",
+ template);
+ }
+
+ }
+
+ if (customVerb != null) {
+ builder.add(Segment.create(SegmentKind.CUSTOM_VERB, customVerb));
+ }
+ return builder.build();
+ }
+
+ // Helpers
+ // =======
+
+ private static String encodeUrl(String text) {
+ try {
+ return URLEncoder.encode(text, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new ValidationException("UTF-8 encoding is not supported on this platform");
+ }
+ }
+
+ private static String decodeUrl(String url) {
+ try {
+ return URLDecoder.decode(url, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new ValidationException("UTF-8 encoding is not supported on this platform");
+ }
+ }
+
+ // Checks for the given segments kind. On success, consumes them. Otherwise leaves
+ // the list iterator in its state.
+ private static boolean peek(ListIterator segments, SegmentKind... kinds) {
+ int start = segments.nextIndex();
+ boolean success = false;
+ for (SegmentKind kind : kinds) {
+ if (!segments.hasNext() || segments.next().kind() != kind) {
+ success = false;
+ break;
+ }
+ }
+ if (success) {
+ return true;
+ }
+ restore(segments, start);
+ return false;
+ }
+
+ // Restores a list iterator back to a given index.
+ private static void restore(ListIterator> segments, int index) {
+ while (segments.nextIndex() > index) {
+ segments.previous();
+ }
+ }
+
+ // Equality and String Conversion
+ // ==============================
+
+ /**
+ * Returns a pretty version of the template as a string.
+ */
+ @Override
+ public String toString() {
+ return toSyntax(segments, true);
+ }
+
+ /**
+ * Returns a raw version of the template as a string. This renders the template in its
+ * internal, normalized form.
+ */
+ public String toRawString() {
+ return toSyntax(segments, false);
+ }
+
+ private static String toSyntax(List segments, boolean pretty) {
+ StringBuilder result = new StringBuilder();
+ boolean continueLast = true; // if true, no slash is appended.
+ ListIterator iterator = segments.listIterator();
+ while (iterator.hasNext()) {
+ Segment seg = iterator.next();
+ if (!continueLast) {
+ result.append(seg.separator());
+ }
+ continueLast = false;
+ switch (seg.kind()) {
+ case BINDING:
+ if (pretty && seg.value().startsWith("$")) {
+ // Remove the internal binding.
+ seg = iterator.next(); // Consume wildcard
+ result.append(seg.value());
+ iterator.next(); // Consume END_BINDING
+ continue;
+ }
+ result.append('{');
+ result.append(seg.value());
+ if (pretty && peek(iterator, SegmentKind.WILDCARD, SegmentKind.END_BINDING)) {
+ // Reduce {name=*} to {name}.
+ result.append('}');
+ continue;
+ }
+ result.append('=');
+ continueLast = true;
+ continue;
+ case END_BINDING:
+ result.append('}');
+ continue;
+ default:
+ result.append(seg.value());
+ continue;
+ }
+ }
+ return result.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof PathTemplate)) {
+ return false;
+ }
+ PathTemplate other = (PathTemplate) obj;
+ return Objects.equals(segments, other.segments);
+ }
+
+ @Override
+ public int hashCode() {
+ return segments.hashCode();
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/ResourceName.java b/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/ResourceName.java
new file mode 100644
index 000000000000..5eea8509ff44
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/ResourceName.java
@@ -0,0 +1,275 @@
+package io.gapi.gax.protobuf;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+/**
+ * Class for representing and working with resource names.
+ *
+ *
A resource name is represented by {@link PathTemplate}, an assignment to variables in
+ * the template, and an optional endpoint. The {@code ResourceName} class implements
+ * the map interface (unmodifiable) to work with the variable assignments, and has methods
+ * to reproduce the string representation of the name, to construct new names, and to dereference
+ * names into resources.
+ *
+ *
As a resource name essentially represents a match of a path template against a string, it
+ * can be also used for other purposes than naming resources. However, not all provided methods
+ * may make sense in all applications.
+ *
+ *
+ */
+@Beta
+public class ResourceName implements Map {
+
+ // ResourceName Resolver
+ // =====================
+
+ /**
+ * Represents a resource name resolver which can be registered with this class.
+ */
+ public interface Resolver {
+ /**
+ * Resolves the resource name into a resource by calling the underlying API.
+ */
+ T resolve(Class resourceType, ResourceName name, @Nullable String version);
+ }
+
+ // The registered resource name resolver.
+ // TODO(wgg): its a bit spooky to have this static global. Think of ways to
+ // configure this from the outside instead if programmatically (e.g. java properties).
+ private static volatile Resolver resourceNameResolver = new Resolver() {
+ @Override
+ public T resolve(Class resourceType, ResourceName name, String version) {
+ throw new IllegalStateException(
+ "No resource name resolver is registered in ResourceName class.");
+ }
+ };
+
+ /**
+ * Sets the resource name resolver which is used by the {@link #resolve(Class, String)} method.
+ * By default, no resolver is registered.
+ */
+ public static void registerResourceNameResolver(Resolver resolver) {
+ resourceNameResolver = resolver;
+ }
+
+ // ResourceName
+ // ============
+
+ /**
+ * Creates a new resource name based on given template and path. The path must match
+ * the template, otherwise null is returned.
+ *
+ * @throws ValidationException if the path does not match the template.
+ */
+ public static ResourceName create(PathTemplate template, String path) {
+ ImmutableMap values = template.match(path);
+ if (values == null) {
+ throw new ValidationException("path '%s' does not match template '%s'", path, template);
+ }
+ return new ResourceName(template, values, null);
+ }
+
+ /**
+ * Creates a new resource name from a template and a value assignment for variables.
+ *
+ * @throws ValidationException if not all variables in the template are bound.
+ */
+ public static ResourceName create(PathTemplate template, Map values) {
+ if (!values.keySet().containsAll(template.vars())) {
+ Set unbound = Sets.newLinkedHashSet(template.vars());
+ unbound.removeAll(values.keySet());
+ throw new ValidationException("unbound variables: %s", unbound);
+ }
+ return new ResourceName(template, values, null);
+ }
+
+ /**
+ * Creates a new resource name based on given template and path, where the path contains an
+ * endpoint. If the path does not match, null is returned.
+ */
+ @Nullable
+ public static ResourceName createFromFullName(PathTemplate template, String path) {
+ ImmutableMap values = template.matchFromFullName(path);
+ if (values == null) {
+ return null;
+ }
+ return new ResourceName(template, values, null);
+ }
+
+ private final PathTemplate template;
+ private final ImmutableMap values;
+ private final String endpoint;
+
+ private volatile String stringRepr;
+
+ private ResourceName(PathTemplate template, Map values, String endpoint) {
+ this.template = template;
+ this.values = ImmutableMap.copyOf(values);
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public String toString() {
+ if (stringRepr == null) {
+ stringRepr = template.instantiate(values);
+ }
+ return stringRepr;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ResourceName)) {
+ return false;
+ }
+ ResourceName other = (ResourceName) obj;
+ return Objects.equals(template, other.template)
+ && Objects.equals(endpoint, other.endpoint)
+ && Objects.equals(values, other.values);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(template, endpoint, values);
+ }
+
+ /**
+ * Gets the template associated with this resource name.
+ */
+ public PathTemplate template() {
+ return template;
+ }
+
+ /**
+ * Checks whether the resource name has an endpoint.
+ */
+ public boolean hasEndpoint() {
+ return endpoint != null;
+ }
+
+ /**
+ * Returns the endpoint of this resource name, or null if none is defined.
+ */
+ @Nullable
+ public String endpoint() {
+ return endpoint;
+ }
+
+ /**
+ * Returns a resource name with specified endpoint.
+ */
+ public ResourceName withEndpoint(String endpoint) {
+ return new ResourceName(template, values, Preconditions.checkNotNull(endpoint));
+ }
+
+ /**
+ * Returns the parent resource name. For example, if the name is {@code shelves/s1/books/b1}, the
+ * parent is {@code shelves/s1/books}.
+ */
+ public ResourceName parentName() {
+ PathTemplate parentTemplate = template.parentTemplate();
+ return new ResourceName(parentTemplate, values, endpoint);
+ }
+
+ /**
+ * Returns true of the resource name starts with the parent resource name, i.e. is a child
+ * of the parent.
+ */
+ public boolean startsWith(ResourceName parentName) {
+ // TODO(wgg): more efficient implementation.
+ return toString().startsWith(parentName.toString());
+ }
+
+ /**
+ * Attempts to resolve a resource name into a resource, by calling the associated API.
+ * The resource name must have an endpoint. An optional version can be specified to
+ * determine in which version of the API to call.
+ */
+ public T resolve(Class resourceType, @Nullable String version) {
+ Preconditions.checkArgument(hasEndpoint(), "Resource name must have an endpoint.");
+ return resourceNameResolver.resolve(resourceType, this, version);
+ }
+
+ // Map Interface
+ // =============
+
+ @Override
+ public int size() {
+ return values.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return values.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return values.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return values.containsValue(value);
+ }
+
+ @Override
+ public String get(Object key) {
+ return values.get(key);
+ }
+
+ @Override
+ @Deprecated
+ public String put(String key, String value) {
+ return values.put(key, value);
+ }
+
+ @Override
+ @Deprecated
+ public String remove(Object key) {
+ return values.remove(key);
+ }
+
+ @Override
+ @Deprecated
+ public void putAll(Map extends String, ? extends String> m) {
+ values.putAll(m);
+ }
+
+ @Override
+ @Deprecated
+ public void clear() {
+ values.clear();
+ }
+
+ @Override
+ public Set keySet() {
+ return values.keySet();
+ }
+
+ @Override
+ public Collection values() {
+ return values.values();
+ }
+
+ @Override
+ public Set> entrySet() {
+ return values.entrySet();
+ }
+}
diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/ValidationException.java b/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/ValidationException.java
new file mode 100644
index 000000000000..3e2d90e6d118
--- /dev/null
+++ b/gcloud-java-gax/src/main/java/io/gapi/gax/protobuf/ValidationException.java
@@ -0,0 +1,63 @@
+package io.gapi.gax.protobuf;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+import java.util.Stack;
+
+/**
+ * Exception thrown if there is a validation problem with a path template, http config, or related
+ * framework methods. Comes as an illegal argument exception subclass. Allows to globally
+ * set a thread-local validation context description which each exception inherits.
+ */
+@Beta
+public class ValidationException extends IllegalArgumentException {
+
+ private static ThreadLocal>> contextLocal =
+ new ThreadLocal>>();
+
+ /**
+ * Sets the validation context description. Each thread has its own description, so
+ * this is thread safe.
+ */
+ public static void pushCurrentThreadValidationContext(Supplier supplier) {
+ Stack> stack = contextLocal.get();
+ if (stack == null) {
+ stack = new Stack<>();
+ contextLocal.set(stack);
+ }
+ stack.push(supplier);
+ }
+
+ public static void pushCurrentThreadValidationContext(String context) {
+ pushCurrentThreadValidationContext(Suppliers.ofInstance(context));
+ }
+ /**
+ * Clears the validation context.
+ */
+ public static void popCurrentThreadValidationContext() {
+ Stack> stack = contextLocal.get();
+ if (stack != null) {
+ stack.pop();
+ }
+ }
+
+ /**
+ * Construct validation exception with implicit context.
+ */
+ public ValidationException(String format, Object... args) {
+ super(message(contextLocal.get(), format, args));
+ }
+
+ private static String message(Stack> context, String format, Object... args) {
+ if (context == null || context.isEmpty()) {
+ return String.format(format, args);
+ }
+ StringBuilder result = new StringBuilder();
+ for (Supplier supplier : context) {
+ result.append(supplier.get() + ": ");
+ }
+ return result.toString() + String.format(format, args);
+ }
+}
diff --git a/gcloud-java-gax/src/test/java/io/gapi/gax/grpc/ApiCallableTest.java b/gcloud-java-gax/src/test/java/io/gapi/gax/grpc/ApiCallableTest.java
new file mode 100644
index 000000000000..359f9da3693d
--- /dev/null
+++ b/gcloud-java-gax/src/test/java/io/gapi/gax/grpc/ApiCallableTest.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.gapi.gax.grpc;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.truth.Truth;
+
+import io.gapi.gax.grpc.ApiCallable;
+import io.gapi.gax.grpc.PageDescriptor;
+import io.gapi.gax.grpc.ApiCallable.Transformer;
+import io.grpc.Channel;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.stub.ClientCalls;
+import io.grpc.stub.StreamObserver;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link ApiCallable}.
+ */
+@RunWith(JUnit4.class)
+public class ApiCallableTest {
+
+ private static final Transformer PLUS_ONE =
+ new Transformer() {
+ @Override public Integer apply(Integer request) throws StatusException {
+ return request + 1;
+ }
+ };
+
+ private static final Transformer