From 546d670303e288aff2fc9602ceee729c1af7a3d0 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 27 Jan 2017 18:47:52 +1100 Subject: [PATCH 1/2] use RpcFuture and remove old BundlingSettings --- google-cloud-core/pom.xml | 2 +- .../com/google/cloud/logging/LoggingImpl.java | 4 - .../cloud/logging/spi/DefaultLoggingRpc.java | 57 ++++++----- .../spi/v2/LoggingServiceV2Settings.java | 5 +- .../com/google/cloud/pubsub/PubSubImpl.java | 4 - .../cloud/pubsub/spi/DefaultPubSubRpc.java | 95 ++++++++++++------- .../pubsub/spi/v1/PublisherSettings.java | 5 +- 7 files changed, 92 insertions(+), 80 deletions(-) diff --git a/google-cloud-core/pom.xml b/google-cloud-core/pom.xml index 34974a496520..6a5ca5b30f8e 100644 --- a/google-cloud-core/pom.xml +++ b/google-cloud-core/pom.xml @@ -111,7 +111,7 @@ com.google.api gax - 0.0.25 + 0.0.28-SNAPSHOT io.grpc diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java index 47dfc175d403..b7bf7161a04f 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java @@ -40,7 +40,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.logging.v2.CreateLogMetricRequest; import com.google.logging.v2.CreateSinkRequest; @@ -107,9 +106,6 @@ private static V get(Future future) { private static Future transform(Future future, Function function) { - if (future instanceof ListenableFuture) { - return Futures.transform((ListenableFuture) future, function); - } return Futures.lazyTransform(future, function); } diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java index dd1ed10f40f2..b718a93733c6 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java @@ -22,6 +22,7 @@ import com.google.api.gax.grpc.FixedChannelProvider; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.ProviderManager; +import com.google.api.gax.grpc.RpcFuture; import com.google.api.gax.grpc.UnaryCallSettings; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.NoCredentials; @@ -33,10 +34,6 @@ import com.google.cloud.logging.spi.v2.LoggingServiceV2Settings; import com.google.cloud.logging.spi.v2.MetricsServiceV2Client; import com.google.cloud.logging.spi.v2.MetricsServiceV2Settings; -import com.google.common.base.Function; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.logging.v2.CreateLogMetricRequest; import com.google.logging.v2.CreateSinkRequest; import com.google.logging.v2.DeleteLogMetricRequest; @@ -59,13 +56,13 @@ import com.google.logging.v2.WriteLogEntriesRequest; import com.google.logging.v2.WriteLogEntriesResponse; import com.google.protobuf.Empty; - import io.grpc.ManagedChannel; import io.grpc.Status.Code; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; - import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -148,21 +145,25 @@ public DefaultLoggingRpc(LoggingOptions options) throws IOException { } } - private static Future translate(ListenableFuture from, final boolean idempotent, - int... returnNullOn) { - final Set returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length); - for (int value : returnNullOn) { - returnNullOnSet.add(value); + private static Future translate( + RpcFuture from, final boolean idempotent, Code... returnNullOn) { + final Set returnNullOnSet; + if (returnNullOn.length > 0) { + returnNullOnSet = EnumSet.of(returnNullOn[0], returnNullOn); + } else { + returnNullOnSet = Collections.emptySet(); } - return Futures.catching(from, ApiException.class, new Function() { - @Override - public V apply(ApiException exception) { - if (returnNullOnSet.contains(exception.getStatusCode().value())) { - return null; - } - throw new LoggingException(exception, idempotent); - } - }); + return from.catching( + ApiException.class, + new RpcFuture.Function() { + @Override + public V apply(ApiException exception) { + if (returnNullOnSet.contains(exception.getStatusCode().value())) { + return null; + } + throw new LoggingException(exception, idempotent); + } + }); } @Override @@ -177,7 +178,7 @@ public Future update(UpdateSinkRequest request) { @Override public Future get(GetSinkRequest request) { - return translate(configClient.getSinkCallable().futureCall(request), true, Code.NOT_FOUND.value()); + return translate(configClient.getSinkCallable().futureCall(request), true, Code.NOT_FOUND); } @Override @@ -187,14 +188,12 @@ public Future list(ListSinksRequest request) { @Override public Future delete(DeleteSinkRequest request) { - return translate(configClient.deleteSinkCallable().futureCall(request), true, - Code.NOT_FOUND.value()); + return translate(configClient.deleteSinkCallable().futureCall(request), true, Code.NOT_FOUND); } @Override public Future delete(DeleteLogRequest request) { - return translate(loggingClient.deleteLogCallable().futureCall(request), true, - Code.NOT_FOUND.value()); + return translate(loggingClient.deleteLogCallable().futureCall(request), true, Code.NOT_FOUND); } @Override @@ -226,8 +225,8 @@ public Future update(UpdateLogMetricRequest request) { @Override public Future get(GetLogMetricRequest request) { - return translate(metricsClient.getLogMetricCallable().futureCall(request), true, - Code.NOT_FOUND.value()); + return translate( + metricsClient.getLogMetricCallable().futureCall(request), true, Code.NOT_FOUND); } @Override @@ -237,8 +236,8 @@ public Future list(ListLogMetricsRequest request) { @Override public Future delete(DeleteLogMetricRequest request) { - return translate(metricsClient.deleteLogMetricCallable().futureCall(request), true, - Code.NOT_FOUND.value()); + return translate( + metricsClient.deleteLogMetricCallable().futureCall(request), true, Code.NOT_FOUND); } @Override diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java index 9ea30ed25120..d4803273cf43 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java @@ -546,11 +546,8 @@ private static Builder createDefault() { .writeLogEntriesSettings() .getBundlingSettingsBuilder() .setElementCountThreshold(1) - .setElementCountLimit(1000) .setRequestByteThreshold(1024) - .setRequestByteLimit(10485760) - .setDelayThreshold(Duration.millis(10)) - .setBlockingCallCountThreshold(1); + .setDelayThreshold(Duration.millis(10)); builder .writeLogEntriesSettings() .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent")) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 35dff5ab2524..e67aee158979 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -41,7 +41,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.iam.v1.SetIamPolicyRequest; import com.google.iam.v1.TestIamPermissionsRequest; @@ -222,9 +221,6 @@ private static V get(Future future) { private static Future transform(Future future, Function function) { - if (future instanceof ListenableFuture) { - return Futures.transform((ListenableFuture) future, function); - } return Futures.lazyTransform(future, function); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index 390b16326ef8..aac2504e4944 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -22,6 +22,8 @@ import com.google.api.gax.grpc.FixedChannelProvider; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.ProviderManager; +import com.google.api.gax.grpc.RpcFuture; +import com.google.api.gax.grpc.RpcFutureCallback; import com.google.api.gax.grpc.UnaryCallSettings; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.NoCredentials; @@ -31,12 +33,7 @@ import com.google.cloud.pubsub.spi.v1.PublisherSettings; import com.google.cloud.pubsub.spi.v1.SubscriberClient; import com.google.cloud.pubsub.spi.v1.SubscriberSettings; -import com.google.common.base.Function; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ForwardingListenableFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.iam.v1.GetIamPolicyRequest; import com.google.iam.v1.Policy; import com.google.iam.v1.SetIamPolicyRequest; @@ -62,18 +59,18 @@ import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; - import io.grpc.ManagedChannel; import io.grpc.Status.Code; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; - -import org.joda.time.Duration; - import java.io.IOException; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.joda.time.Duration; public class DefaultPubSubRpc implements PubSubRpc { @@ -111,26 +108,54 @@ protected ChannelProvider getChannelProvider() { } private static final class PullFutureImpl - extends ForwardingListenableFuture.SimpleForwardingListenableFuture implements PullFuture { - PullFutureImpl(ListenableFuture delegate) { - super(delegate); + private final RpcFuture delegate; + + PullFutureImpl(RpcFuture delegate) { + this.delegate = delegate; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public PullResponse get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public PullResponse get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); } @Override public void addCallback(final PullCallback callback) { - Futures.addCallback(delegate(), new FutureCallback() { - @Override - public void onSuccess(PullResponse result) { - callback.success(result); - } - - @Override - public void onFailure(Throwable error) { - callback.failure(error); - } - }); + delegate.addCallback( + new RpcFutureCallback() { + @Override + public void onSuccess(PullResponse result) { + callback.success(result); + } + + @Override + public void onFailure(Throwable error) { + callback.failure(error); + } + }); } } @@ -178,21 +203,23 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException { } } - private static ListenableFuture translate(ListenableFuture from, - final boolean idempotent, int... returnNullOn) { + private static RpcFuture translate( + RpcFuture from, final boolean idempotent, int... returnNullOn) { final Set returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length); for (int value : returnNullOn) { returnNullOnSet.add(value); } - return Futures.catching(from, ApiException.class, new Function() { - @Override - public V apply(ApiException exception) { - if (returnNullOnSet.contains(exception.getStatusCode().value())) { - return null; - } - throw new PubSubException(exception, idempotent); - } - }); + return from.catching( + ApiException.class, + new RpcFuture.Function() { + @Override + public V apply(ApiException exception) { + if (returnNullOnSet.contains(exception.getStatusCode().value())) { + return null; + } + throw new PubSubException(exception, idempotent); + } + }); } @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java index 72e0d0128d62..db1a202392ab 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java @@ -527,11 +527,8 @@ private static Builder createDefault() { .publishSettings() .getBundlingSettingsBuilder() .setElementCountThreshold(10) - .setElementCountLimit(1000) .setRequestByteThreshold(1024) - .setRequestByteLimit(10485760) - .setDelayThreshold(Duration.millis(10)) - .setBlockingCallCountThreshold(1); + .setDelayThreshold(Duration.millis(10)); builder .publishSettings() .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("one_plus_delivery")) From 6ee024f7542eda5e010c12852b7a1d387e9e5b39 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 30 Jan 2017 11:11:46 +1100 Subject: [PATCH 2/2] pr comment --- google-cloud-core/pom.xml | 2 +- .../cloud/logging/spi/DefaultLoggingRpc.java | 5 +- .../cloud/pubsub/spi/DefaultPubSubRpc.java | 50 ++++--------------- .../google/cloud/pubsub/spi/PubSubRpc.java | 1 - 4 files changed, 14 insertions(+), 44 deletions(-) diff --git a/google-cloud-core/pom.xml b/google-cloud-core/pom.xml index 6a5ca5b30f8e..47f215f24233 100644 --- a/google-cloud-core/pom.xml +++ b/google-cloud-core/pom.xml @@ -111,7 +111,7 @@ com.google.api gax - 0.0.28-SNAPSHOT + 0.0.28 io.grpc diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java index b718a93733c6..1673151a315b 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java @@ -16,13 +16,14 @@ package com.google.cloud.logging.spi; +import com.google.api.gax.core.Function; +import com.google.api.gax.core.RpcFuture; import com.google.api.gax.grpc.ApiException; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.FixedChannelProvider; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.ProviderManager; -import com.google.api.gax.grpc.RpcFuture; import com.google.api.gax.grpc.UnaryCallSettings; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.NoCredentials; @@ -155,7 +156,7 @@ private static Future translate( } return from.catching( ApiException.class, - new RpcFuture.Function() { + new Function() { @Override public V apply(ApiException exception) { if (returnNullOnSet.contains(exception.getStatusCode().value())) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index aac2504e4944..4af17e458aeb 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -16,14 +16,16 @@ package com.google.cloud.pubsub.spi; +import com.google.api.gax.core.ForwardingRpcFuture; +import com.google.api.gax.core.Function; +import com.google.api.gax.core.RpcFuture; +import com.google.api.gax.core.RpcFutureCallback; import com.google.api.gax.grpc.ApiException; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.FixedChannelProvider; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.ProviderManager; -import com.google.api.gax.grpc.RpcFuture; -import com.google.api.gax.grpc.RpcFutureCallback; import com.google.api.gax.grpc.UnaryCallSettings; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.NoCredentials; @@ -65,11 +67,8 @@ import io.grpc.netty.NettyChannelBuilder; import java.io.IOException; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.joda.time.Duration; public class DefaultPubSubRpc implements PubSubRpc { @@ -107,48 +106,19 @@ protected ChannelProvider getChannelProvider() { } } - private static final class PullFutureImpl + private static final class PullFutureImpl extends ForwardingRpcFuture implements PullFuture { - - private final RpcFuture delegate; - PullFutureImpl(RpcFuture delegate) { - this.delegate = delegate; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return delegate.cancel(mayInterruptIfRunning); - } - - @Override - public PullResponse get() throws InterruptedException, ExecutionException { - return delegate.get(); - } - - @Override - public PullResponse get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return delegate.get(timeout, unit); - } - - @Override - public boolean isCancelled() { - return delegate.isCancelled(); - } - - @Override - public boolean isDone() { - return delegate.isDone(); + super(delegate); } @Override public void addCallback(final PullCallback callback) { - delegate.addCallback( + addCallback( new RpcFutureCallback() { @Override - public void onSuccess(PullResponse result) { - callback.success(result); + public void onSuccess(PullResponse response) { + callback.success(response); } @Override @@ -211,7 +181,7 @@ private static RpcFuture translate( } return from.catching( ApiException.class, - new RpcFuture.Function() { + new Function() { @Override public V apply(ApiException exception) { if (returnNullOnSet.contains(exception.getStatusCode().value())) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java index 172f2e8dcc5f..30a5688b01c5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java @@ -40,7 +40,6 @@ import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; - import java.util.concurrent.Future; public interface PubSubRpc extends AutoCloseable {