diff --git a/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java b/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java index 6ace2998698b..fa6da27a72bd 100644 --- a/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java +++ b/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java @@ -33,8 +33,9 @@ package com.google.gcloud.pubsub.spi.v1; -import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiCallable; +import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo; +import com.google.api.gax.grpc.BundlerFactory; import com.google.api.gax.protobuf.PathTemplate; import com.google.protobuf.Empty; import com.google.pubsub.v1.DeleteTopicRequest; @@ -65,9 +66,25 @@ */ @javax.annotation.Generated("by GAPIC") public class PublisherApi implements AutoCloseable { + // ======== + // Members + // ======== + + private final ManagedChannel channel; + private final List closeables = new ArrayList<>(); + + private final ApiCallable createTopicCallable; + private final ApiCallable publishCallable; + private final ApiCallable getTopicCallable; + private final ApiCallable listTopicsCallable; + private final ApiCallable> listTopicsIterableCallable; + private final ApiCallable + listTopicSubscriptionsCallable; + private final ApiCallable> + listTopicSubscriptionsIterableCallable; + private final ApiCallable deleteTopicCallable; public static class ResourceNames { - private ResourceNames() {} // ======================= // ResourceNames Constants @@ -93,6 +110,8 @@ private ResourceNames() {} private static final PathTemplate TOPIC_PATH_TEMPLATE = PathTemplate.create("projects/{project}/topics/{topic}"); + private ResourceNames() {} + // ============================== // Resource Name Helper Functions // ============================== @@ -153,24 +172,6 @@ public static final String parseTopicFromTopicPath(String topicPath) { } } - // ======== - // Members - // ======== - - private final ManagedChannel channel; - private final List closeables = new ArrayList<>(); - - private final ApiCallable createTopicCallable; - private final ApiCallable publishCallable; - private final ApiCallable getTopicCallable; - private final ApiCallable listTopicsCallable; - private final ApiCallable> listTopicsIterableCallable; - private final ApiCallable - listTopicSubscriptionsCallable; - private final ApiCallable> - listTopicSubscriptionsIterableCallable; - private final ApiCallable deleteTopicCallable; - // =============== // Factory Methods // =============== @@ -186,8 +187,9 @@ public static PublisherApi create() throws IOException { } /** - * Constructs an instance of PublisherApi, using the given settings. The channels are created based - * on the settings passed in, or defaults for any settings that are not set. + * Constructs an instance of PublisherApi, using the given settings. + * The channels are created based on the settings passed in, or defaults for any + * settings that are not set. * * * @@ -197,8 +199,9 @@ public static PublisherApi create(PublisherSettings settings) throws IOException } /** - * Constructs an instance of PublisherApi, using the given settings. This is protected so that it - * easy to make a subclass, but otherwise, the static factory methods should be preferred. + * Constructs an instance of PublisherApi, using the given settings. + * This is protected so that it easy to make a subclass, but otherwise, the static + * factory methods should be preferred. * * * @@ -207,7 +210,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException { this.channel = settings.getChannel(); this.createTopicCallable = settings.createTopicMethod().build(settings); - this.publishCallable = settings.publishMethod().build(settings); + BundlableApiCallableInfo bundlablePublish = + settings.publishMethod().buildBundlable(settings); + this.publishCallable = bundlablePublish.getApiCallable(); + BundlerFactory publishBundlerFactory = + bundlablePublish.getBundlerFactory(); + if (publishBundlerFactory != null) { + this.closeables.add(publishBundlerFactory); + } this.getTopicCallable = settings.getTopicMethod().build(settings); this.listTopicsCallable = settings.listTopicsMethod().build(settings); this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings); diff --git a/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java b/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java index 70b188735890..11566404546f 100644 --- a/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java +++ b/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java @@ -38,8 +38,11 @@ import com.google.api.gax.core.RetryParams; import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder; +import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableBuilder; import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder; -import com.google.api.gax.grpc.PageDescriptor; +import com.google.api.gax.grpc.BundlingDescriptor; +import com.google.api.gax.grpc.PageStreamingDescriptor; +import com.google.api.gax.grpc.RequestIssuer; import com.google.api.gax.grpc.ServiceApiSettings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -56,8 +59,12 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.Topic; import io.grpc.Status; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; // Manually-added imports: add custom (non-generated) imports after this point. @@ -132,9 +139,11 @@ public class PublisherSettings extends ServiceApiSettings { RETRY_PARAM_DEFINITIONS = definitions.build(); } + private final MethodBuilders methods; + private static class MethodBuilders { private final ApiCallableBuilder createTopicMethod; - private final ApiCallableBuilder publishMethod; + private final BundlableApiCallableBuilder publishMethod; private final ApiCallableBuilder getTopicMethod; private final PageStreamingApiCallableBuilder listTopicsMethod; @@ -149,7 +158,8 @@ public MethodBuilders() { createTopicMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent")); createTopicMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default")); - publishMethod = new ApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH); + publishMethod = + new BundlableApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH, PUBLISH_BUNDLING_DESC); publishMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent")); publishMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default")); @@ -187,8 +197,6 @@ public MethodBuilders() { } } - private final MethodBuilders methods; - // =============== // Factory Methods // =============== @@ -211,8 +219,9 @@ public static PublisherSettings create() { } /** - * Constructs an instance of PublisherSettings with default settings. This is protected so that it - * easy to make a subclass, but otherwise, the static factory methods should be preferred. + * Constructs an instance of PublisherSettings with default settings. This is protected + * so that it easy to make a subclass, but otherwise, the static factory methods should be + * preferred. * * * @@ -223,7 +232,7 @@ protected PublisherSettings(MethodBuilders methods) { } /** - * Returns the ApiCallableBuilder for the API method createTopic. + * Returns the builder for the API method createTopic. * * * @@ -233,17 +242,17 @@ public ApiCallableBuilder createTopicMethod() { } /** - * Returns the ApiCallableBuilder for the API method publish. + * Returns the builder for the API method publish. * * * */ - public ApiCallableBuilder publishMethod() { + public BundlableApiCallableBuilder publishMethod() { return methods.publishMethod; } /** - * Returns the ApiCallableBuilder for the API method getTopic. + * Returns the builder for the API method getTopic. * * * @@ -253,7 +262,7 @@ public ApiCallableBuilder getTopicMethod() { } /** - * Returns the PageStreamingApiCallableBuilder for the API method listTopics. + * Returns the builder for the API method listTopics. * * * @@ -264,7 +273,7 @@ public ApiCallableBuilder getTopicMethod() { } /** - * Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions. + * Returns the builder for the API method listTopicSubscriptions. * * * @@ -276,7 +285,7 @@ public ApiCallableBuilder getTopicMethod() { } /** - * Returns the ApiCallableBuilder for the API method deleteTopic. + * Returns the builder for the API method deleteTopic. * * * @@ -285,9 +294,9 @@ public ApiCallableBuilder deleteTopicMethod() { return methods.deleteTopicMethod; } - private static PageDescriptor + private static PageStreamingDescriptor LIST_TOPICS_PAGE_STR_DESC = - new PageDescriptor() { + new PageStreamingDescriptor() { @Override public Object emptyToken() { return ""; @@ -309,10 +318,10 @@ public Iterable extractResources(ListTopicsResponse payload) { } }; - private static PageDescriptor< + private static PageStreamingDescriptor< ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String> LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC = - new PageDescriptor< + new PageStreamingDescriptor< ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>() { @Override public Object emptyToken() { @@ -337,4 +346,66 @@ public Iterable extractResources(ListTopicSubscriptionsResponse payload) return payload.getSubscriptionsList(); } }; + + private static BundlingDescriptor PUBLISH_BUNDLING_DESC = + new BundlingDescriptor() { + @Override + public String getBundlePartitionKey(PublishRequest request) { + return request.getTopic(); + } + + @Override + public PublishRequest mergeRequests(Collection requests) { + PublishRequest firstRequest = requests.iterator().next(); + + List elements = new ArrayList<>(); + for (PublishRequest request : requests) { + elements.addAll(request.getMessagesList()); + } + + PublishRequest bundleRequest = + PublishRequest.newBuilder() + .setTopic(firstRequest.getTopic()) + .addAllMessages(elements) + .build(); + return bundleRequest; + } + + @Override + public void splitResponse( + PublishResponse bundleResponse, + Collection> bundle) { + int bundleMessageIndex = 0; + for (RequestIssuer responder : bundle) { + List subresponseElements = new ArrayList<>(); + int subresponseCount = responder.getRequest().getMessagesCount(); + for (int i = 0; i < subresponseCount; i++) { + subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex)); + bundleMessageIndex += 1; + } + PublishResponse response = + PublishResponse.newBuilder().addAllMessageIds(subresponseElements).build(); + responder.setResponse(response); + } + } + + @Override + public void splitException( + Throwable throwable, + Collection> bundle) { + for (RequestIssuer responder : bundle) { + responder.setException(throwable); + } + } + + @Override + public long countElements(PublishRequest request) { + return request.getMessagesCount(); + } + + @Override + public long countBytes(PublishRequest request) { + return request.getSerializedSize(); + } + }; } diff --git a/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java b/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java index 16e1435b8582..d53dca7f8885 100644 --- a/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java +++ b/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java @@ -33,7 +33,6 @@ package com.google.gcloud.pubsub.spi.v1; -import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiCallable; import com.google.api.gax.protobuf.PathTemplate; import com.google.protobuf.Empty; @@ -66,9 +65,26 @@ */ @javax.annotation.Generated("by GAPIC") public class SubscriberApi implements AutoCloseable { + // ======== + // Members + // ======== + + private final ManagedChannel channel; + private final List closeables = new ArrayList<>(); + + private final ApiCallable createSubscriptionCallable; + private final ApiCallable getSubscriptionCallable; + private final ApiCallable + listSubscriptionsCallable; + private final ApiCallable> + listSubscriptionsIterableCallable; + private final ApiCallable deleteSubscriptionCallable; + private final ApiCallable modifyAckDeadlineCallable; + private final ApiCallable acknowledgeCallable; + private final ApiCallable pullCallable; + private final ApiCallable modifyPushConfigCallable; public static class ResourceNames { - private ResourceNames() {} // ======================= // ResourceNames Constants @@ -94,6 +110,8 @@ private ResourceNames() {} private static final PathTemplate SUBSCRIPTION_PATH_TEMPLATE = PathTemplate.create("projects/{project}/subscriptions/{subscription}"); + private ResourceNames() {} + // ============================== // Resource Name Helper Functions // ============================== @@ -155,25 +173,6 @@ public static final String parseSubscriptionFromSubscriptionPath(String subscrip } } - // ======== - // Members - // ======== - - private final ManagedChannel channel; - private final List closeables = new ArrayList<>(); - - private final ApiCallable createSubscriptionCallable; - private final ApiCallable getSubscriptionCallable; - private final ApiCallable - listSubscriptionsCallable; - private final ApiCallable> - listSubscriptionsIterableCallable; - private final ApiCallable deleteSubscriptionCallable; - private final ApiCallable modifyAckDeadlineCallable; - private final ApiCallable acknowledgeCallable; - private final ApiCallable pullCallable; - private final ApiCallable modifyPushConfigCallable; - // =============== // Factory Methods // =============== @@ -189,8 +188,9 @@ public static SubscriberApi create() throws IOException { } /** - * Constructs an instance of SubscriberApi, using the given settings. The channels are created based - * on the settings passed in, or defaults for any settings that are not set. + * Constructs an instance of SubscriberApi, using the given settings. + * The channels are created based on the settings passed in, or defaults for any + * settings that are not set. * * * @@ -200,8 +200,9 @@ public static SubscriberApi create(SubscriberSettings settings) throws IOExcepti } /** - * Constructs an instance of SubscriberApi, using the given settings. This is protected so that it - * easy to make a subclass, but otherwise, the static factory methods should be preferred. + * Constructs an instance of SubscriberApi, using the given settings. + * This is protected so that it easy to make a subclass, but otherwise, the static + * factory methods should be preferred. * * * diff --git a/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java b/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java index 2680d8429938..d9da44aa81f7 100644 --- a/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java +++ b/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java @@ -39,7 +39,7 @@ import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder; import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder; -import com.google.api.gax.grpc.PageDescriptor; +import com.google.api.gax.grpc.PageStreamingDescriptor; import com.google.api.gax.grpc.ServiceApiSettings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -133,6 +133,8 @@ public class SubscriberSettings extends ServiceApiSettings { RETRY_PARAM_DEFINITIONS = definitions.build(); } + private final MethodBuilders methods; + private static class MethodBuilders { private final ApiCallableBuilder createSubscriptionMethod; private final ApiCallableBuilder getSubscriptionMethod; @@ -198,8 +200,6 @@ public MethodBuilders() { } } - private final MethodBuilders methods; - // =============== // Factory Methods // =============== @@ -222,8 +222,9 @@ public static SubscriberSettings create() { } /** - * Constructs an instance of SubscriberSettings with default settings. This is protected so that it - * easy to make a subclass, but otherwise, the static factory methods should be preferred. + * Constructs an instance of SubscriberSettings with default settings. This is protected + * so that it easy to make a subclass, but otherwise, the static factory methods should be + * preferred. * * * @@ -234,7 +235,7 @@ protected SubscriberSettings(MethodBuilders methods) { } /** - * Returns the ApiCallableBuilder for the API method createSubscription. + * Returns the builder for the API method createSubscription. * * * @@ -244,7 +245,7 @@ public ApiCallableBuilder createSubscriptionMethod() } /** - * Returns the ApiCallableBuilder for the API method getSubscription. + * Returns the builder for the API method getSubscription. * * * @@ -254,7 +255,7 @@ public ApiCallableBuilder getSubscriptionM } /** - * Returns the PageStreamingApiCallableBuilder for the API method listSubscriptions. + * Returns the builder for the API method listSubscriptions. * * * @@ -266,7 +267,7 @@ public ApiCallableBuilder getSubscriptionM } /** - * Returns the ApiCallableBuilder for the API method deleteSubscription. + * Returns the builder for the API method deleteSubscription. * * * @@ -276,7 +277,7 @@ public ApiCallableBuilder deleteSubscriptionMe } /** - * Returns the ApiCallableBuilder for the API method modifyAckDeadline. + * Returns the builder for the API method modifyAckDeadline. * * * @@ -286,7 +287,7 @@ public ApiCallableBuilder modifyAckDeadlineMeth } /** - * Returns the ApiCallableBuilder for the API method acknowledge. + * Returns the builder for the API method acknowledge. * * * @@ -296,7 +297,7 @@ public ApiCallableBuilder acknowledgeMethod() { } /** - * Returns the ApiCallableBuilder for the API method pull. + * Returns the builder for the API method pull. * * * @@ -306,7 +307,7 @@ public ApiCallableBuilder pullMethod() { } /** - * Returns the ApiCallableBuilder for the API method modifyPushConfig. + * Returns the builder for the API method modifyPushConfig. * * * @@ -315,9 +316,11 @@ public ApiCallableBuilder modifyPushConfigMethod return methods.modifyPushConfigMethod; } - private static PageDescriptor + private static PageStreamingDescriptor< + ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription> LIST_SUBSCRIPTIONS_PAGE_STR_DESC = - new PageDescriptor() { + new PageStreamingDescriptor< + ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>() { @Override public Object emptyToken() { return ""; diff --git a/gcloud-java-pubsub/pom.xml b/gcloud-java-pubsub/pom.xml index 2fccc5a6560c..2bd755f63a8c 100644 --- a/gcloud-java-pubsub/pom.xml +++ b/gcloud-java-pubsub/pom.xml @@ -19,7 +19,7 @@ com.google.api gax - 0.0.4 + 0.0.5 com.google.api.grpc diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java index 6ace2998698b..fa6da27a72bd 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java @@ -33,8 +33,9 @@ package com.google.gcloud.pubsub.spi.v1; -import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiCallable; +import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo; +import com.google.api.gax.grpc.BundlerFactory; import com.google.api.gax.protobuf.PathTemplate; import com.google.protobuf.Empty; import com.google.pubsub.v1.DeleteTopicRequest; @@ -65,9 +66,25 @@ */ @javax.annotation.Generated("by GAPIC") public class PublisherApi implements AutoCloseable { + // ======== + // Members + // ======== + + private final ManagedChannel channel; + private final List closeables = new ArrayList<>(); + + private final ApiCallable createTopicCallable; + private final ApiCallable publishCallable; + private final ApiCallable getTopicCallable; + private final ApiCallable listTopicsCallable; + private final ApiCallable> listTopicsIterableCallable; + private final ApiCallable + listTopicSubscriptionsCallable; + private final ApiCallable> + listTopicSubscriptionsIterableCallable; + private final ApiCallable deleteTopicCallable; public static class ResourceNames { - private ResourceNames() {} // ======================= // ResourceNames Constants @@ -93,6 +110,8 @@ private ResourceNames() {} private static final PathTemplate TOPIC_PATH_TEMPLATE = PathTemplate.create("projects/{project}/topics/{topic}"); + private ResourceNames() {} + // ============================== // Resource Name Helper Functions // ============================== @@ -153,24 +172,6 @@ public static final String parseTopicFromTopicPath(String topicPath) { } } - // ======== - // Members - // ======== - - private final ManagedChannel channel; - private final List closeables = new ArrayList<>(); - - private final ApiCallable createTopicCallable; - private final ApiCallable publishCallable; - private final ApiCallable getTopicCallable; - private final ApiCallable listTopicsCallable; - private final ApiCallable> listTopicsIterableCallable; - private final ApiCallable - listTopicSubscriptionsCallable; - private final ApiCallable> - listTopicSubscriptionsIterableCallable; - private final ApiCallable deleteTopicCallable; - // =============== // Factory Methods // =============== @@ -186,8 +187,9 @@ public static PublisherApi create() throws IOException { } /** - * Constructs an instance of PublisherApi, using the given settings. The channels are created based - * on the settings passed in, or defaults for any settings that are not set. + * Constructs an instance of PublisherApi, using the given settings. + * The channels are created based on the settings passed in, or defaults for any + * settings that are not set. * * * @@ -197,8 +199,9 @@ public static PublisherApi create(PublisherSettings settings) throws IOException } /** - * Constructs an instance of PublisherApi, using the given settings. This is protected so that it - * easy to make a subclass, but otherwise, the static factory methods should be preferred. + * Constructs an instance of PublisherApi, using the given settings. + * This is protected so that it easy to make a subclass, but otherwise, the static + * factory methods should be preferred. * * * @@ -207,7 +210,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException { this.channel = settings.getChannel(); this.createTopicCallable = settings.createTopicMethod().build(settings); - this.publishCallable = settings.publishMethod().build(settings); + BundlableApiCallableInfo bundlablePublish = + settings.publishMethod().buildBundlable(settings); + this.publishCallable = bundlablePublish.getApiCallable(); + BundlerFactory publishBundlerFactory = + bundlablePublish.getBundlerFactory(); + if (publishBundlerFactory != null) { + this.closeables.add(publishBundlerFactory); + } this.getTopicCallable = settings.getTopicMethod().build(settings); this.listTopicsCallable = settings.listTopicsMethod().build(settings); this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings); diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java index 70b188735890..11566404546f 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java @@ -38,8 +38,11 @@ import com.google.api.gax.core.RetryParams; import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder; +import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableBuilder; import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder; -import com.google.api.gax.grpc.PageDescriptor; +import com.google.api.gax.grpc.BundlingDescriptor; +import com.google.api.gax.grpc.PageStreamingDescriptor; +import com.google.api.gax.grpc.RequestIssuer; import com.google.api.gax.grpc.ServiceApiSettings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -56,8 +59,12 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.Topic; import io.grpc.Status; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; // Manually-added imports: add custom (non-generated) imports after this point. @@ -132,9 +139,11 @@ public class PublisherSettings extends ServiceApiSettings { RETRY_PARAM_DEFINITIONS = definitions.build(); } + private final MethodBuilders methods; + private static class MethodBuilders { private final ApiCallableBuilder createTopicMethod; - private final ApiCallableBuilder publishMethod; + private final BundlableApiCallableBuilder publishMethod; private final ApiCallableBuilder getTopicMethod; private final PageStreamingApiCallableBuilder listTopicsMethod; @@ -149,7 +158,8 @@ public MethodBuilders() { createTopicMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent")); createTopicMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default")); - publishMethod = new ApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH); + publishMethod = + new BundlableApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH, PUBLISH_BUNDLING_DESC); publishMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent")); publishMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default")); @@ -187,8 +197,6 @@ public MethodBuilders() { } } - private final MethodBuilders methods; - // =============== // Factory Methods // =============== @@ -211,8 +219,9 @@ public static PublisherSettings create() { } /** - * Constructs an instance of PublisherSettings with default settings. This is protected so that it - * easy to make a subclass, but otherwise, the static factory methods should be preferred. + * Constructs an instance of PublisherSettings with default settings. This is protected + * so that it easy to make a subclass, but otherwise, the static factory methods should be + * preferred. * * * @@ -223,7 +232,7 @@ protected PublisherSettings(MethodBuilders methods) { } /** - * Returns the ApiCallableBuilder for the API method createTopic. + * Returns the builder for the API method createTopic. * * * @@ -233,17 +242,17 @@ public ApiCallableBuilder createTopicMethod() { } /** - * Returns the ApiCallableBuilder for the API method publish. + * Returns the builder for the API method publish. * * * */ - public ApiCallableBuilder publishMethod() { + public BundlableApiCallableBuilder publishMethod() { return methods.publishMethod; } /** - * Returns the ApiCallableBuilder for the API method getTopic. + * Returns the builder for the API method getTopic. * * * @@ -253,7 +262,7 @@ public ApiCallableBuilder getTopicMethod() { } /** - * Returns the PageStreamingApiCallableBuilder for the API method listTopics. + * Returns the builder for the API method listTopics. * * * @@ -264,7 +273,7 @@ public ApiCallableBuilder getTopicMethod() { } /** - * Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions. + * Returns the builder for the API method listTopicSubscriptions. * * * @@ -276,7 +285,7 @@ public ApiCallableBuilder getTopicMethod() { } /** - * Returns the ApiCallableBuilder for the API method deleteTopic. + * Returns the builder for the API method deleteTopic. * * * @@ -285,9 +294,9 @@ public ApiCallableBuilder deleteTopicMethod() { return methods.deleteTopicMethod; } - private static PageDescriptor + private static PageStreamingDescriptor LIST_TOPICS_PAGE_STR_DESC = - new PageDescriptor() { + new PageStreamingDescriptor() { @Override public Object emptyToken() { return ""; @@ -309,10 +318,10 @@ public Iterable extractResources(ListTopicsResponse payload) { } }; - private static PageDescriptor< + private static PageStreamingDescriptor< ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String> LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC = - new PageDescriptor< + new PageStreamingDescriptor< ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>() { @Override public Object emptyToken() { @@ -337,4 +346,66 @@ public Iterable extractResources(ListTopicSubscriptionsResponse payload) return payload.getSubscriptionsList(); } }; + + private static BundlingDescriptor PUBLISH_BUNDLING_DESC = + new BundlingDescriptor() { + @Override + public String getBundlePartitionKey(PublishRequest request) { + return request.getTopic(); + } + + @Override + public PublishRequest mergeRequests(Collection requests) { + PublishRequest firstRequest = requests.iterator().next(); + + List elements = new ArrayList<>(); + for (PublishRequest request : requests) { + elements.addAll(request.getMessagesList()); + } + + PublishRequest bundleRequest = + PublishRequest.newBuilder() + .setTopic(firstRequest.getTopic()) + .addAllMessages(elements) + .build(); + return bundleRequest; + } + + @Override + public void splitResponse( + PublishResponse bundleResponse, + Collection> bundle) { + int bundleMessageIndex = 0; + for (RequestIssuer responder : bundle) { + List subresponseElements = new ArrayList<>(); + int subresponseCount = responder.getRequest().getMessagesCount(); + for (int i = 0; i < subresponseCount; i++) { + subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex)); + bundleMessageIndex += 1; + } + PublishResponse response = + PublishResponse.newBuilder().addAllMessageIds(subresponseElements).build(); + responder.setResponse(response); + } + } + + @Override + public void splitException( + Throwable throwable, + Collection> bundle) { + for (RequestIssuer responder : bundle) { + responder.setException(throwable); + } + } + + @Override + public long countElements(PublishRequest request) { + return request.getMessagesCount(); + } + + @Override + public long countBytes(PublishRequest request) { + return request.getSerializedSize(); + } + }; } diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java index 16e1435b8582..d53dca7f8885 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java @@ -33,7 +33,6 @@ package com.google.gcloud.pubsub.spi.v1; -import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiCallable; import com.google.api.gax.protobuf.PathTemplate; import com.google.protobuf.Empty; @@ -66,9 +65,26 @@ */ @javax.annotation.Generated("by GAPIC") public class SubscriberApi implements AutoCloseable { + // ======== + // Members + // ======== + + private final ManagedChannel channel; + private final List closeables = new ArrayList<>(); + + private final ApiCallable createSubscriptionCallable; + private final ApiCallable getSubscriptionCallable; + private final ApiCallable + listSubscriptionsCallable; + private final ApiCallable> + listSubscriptionsIterableCallable; + private final ApiCallable deleteSubscriptionCallable; + private final ApiCallable modifyAckDeadlineCallable; + private final ApiCallable acknowledgeCallable; + private final ApiCallable pullCallable; + private final ApiCallable modifyPushConfigCallable; public static class ResourceNames { - private ResourceNames() {} // ======================= // ResourceNames Constants @@ -94,6 +110,8 @@ private ResourceNames() {} private static final PathTemplate SUBSCRIPTION_PATH_TEMPLATE = PathTemplate.create("projects/{project}/subscriptions/{subscription}"); + private ResourceNames() {} + // ============================== // Resource Name Helper Functions // ============================== @@ -155,25 +173,6 @@ public static final String parseSubscriptionFromSubscriptionPath(String subscrip } } - // ======== - // Members - // ======== - - private final ManagedChannel channel; - private final List closeables = new ArrayList<>(); - - private final ApiCallable createSubscriptionCallable; - private final ApiCallable getSubscriptionCallable; - private final ApiCallable - listSubscriptionsCallable; - private final ApiCallable> - listSubscriptionsIterableCallable; - private final ApiCallable deleteSubscriptionCallable; - private final ApiCallable modifyAckDeadlineCallable; - private final ApiCallable acknowledgeCallable; - private final ApiCallable pullCallable; - private final ApiCallable modifyPushConfigCallable; - // =============== // Factory Methods // =============== @@ -189,8 +188,9 @@ public static SubscriberApi create() throws IOException { } /** - * Constructs an instance of SubscriberApi, using the given settings. The channels are created based - * on the settings passed in, or defaults for any settings that are not set. + * Constructs an instance of SubscriberApi, using the given settings. + * The channels are created based on the settings passed in, or defaults for any + * settings that are not set. * * * @@ -200,8 +200,9 @@ public static SubscriberApi create(SubscriberSettings settings) throws IOExcepti } /** - * Constructs an instance of SubscriberApi, using the given settings. This is protected so that it - * easy to make a subclass, but otherwise, the static factory methods should be preferred. + * Constructs an instance of SubscriberApi, using the given settings. + * This is protected so that it easy to make a subclass, but otherwise, the static + * factory methods should be preferred. * * * diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java index 2680d8429938..d9da44aa81f7 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java @@ -39,7 +39,7 @@ import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder; import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder; -import com.google.api.gax.grpc.PageDescriptor; +import com.google.api.gax.grpc.PageStreamingDescriptor; import com.google.api.gax.grpc.ServiceApiSettings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -133,6 +133,8 @@ public class SubscriberSettings extends ServiceApiSettings { RETRY_PARAM_DEFINITIONS = definitions.build(); } + private final MethodBuilders methods; + private static class MethodBuilders { private final ApiCallableBuilder createSubscriptionMethod; private final ApiCallableBuilder getSubscriptionMethod; @@ -198,8 +200,6 @@ public MethodBuilders() { } } - private final MethodBuilders methods; - // =============== // Factory Methods // =============== @@ -222,8 +222,9 @@ public static SubscriberSettings create() { } /** - * Constructs an instance of SubscriberSettings with default settings. This is protected so that it - * easy to make a subclass, but otherwise, the static factory methods should be preferred. + * Constructs an instance of SubscriberSettings with default settings. This is protected + * so that it easy to make a subclass, but otherwise, the static factory methods should be + * preferred. * * * @@ -234,7 +235,7 @@ protected SubscriberSettings(MethodBuilders methods) { } /** - * Returns the ApiCallableBuilder for the API method createSubscription. + * Returns the builder for the API method createSubscription. * * * @@ -244,7 +245,7 @@ public ApiCallableBuilder createSubscriptionMethod() } /** - * Returns the ApiCallableBuilder for the API method getSubscription. + * Returns the builder for the API method getSubscription. * * * @@ -254,7 +255,7 @@ public ApiCallableBuilder getSubscriptionM } /** - * Returns the PageStreamingApiCallableBuilder for the API method listSubscriptions. + * Returns the builder for the API method listSubscriptions. * * * @@ -266,7 +267,7 @@ public ApiCallableBuilder getSubscriptionM } /** - * Returns the ApiCallableBuilder for the API method deleteSubscription. + * Returns the builder for the API method deleteSubscription. * * * @@ -276,7 +277,7 @@ public ApiCallableBuilder deleteSubscriptionMe } /** - * Returns the ApiCallableBuilder for the API method modifyAckDeadline. + * Returns the builder for the API method modifyAckDeadline. * * * @@ -286,7 +287,7 @@ public ApiCallableBuilder modifyAckDeadlineMeth } /** - * Returns the ApiCallableBuilder for the API method acknowledge. + * Returns the builder for the API method acknowledge. * * * @@ -296,7 +297,7 @@ public ApiCallableBuilder acknowledgeMethod() { } /** - * Returns the ApiCallableBuilder for the API method pull. + * Returns the builder for the API method pull. * * * @@ -306,7 +307,7 @@ public ApiCallableBuilder pullMethod() { } /** - * Returns the ApiCallableBuilder for the API method modifyPushConfig. + * Returns the builder for the API method modifyPushConfig. * * * @@ -315,9 +316,11 @@ public ApiCallableBuilder modifyPushConfigMethod return methods.modifyPushConfigMethod; } - private static PageDescriptor + private static PageStreamingDescriptor< + ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription> LIST_SUBSCRIPTIONS_PAGE_STR_DESC = - new PageDescriptor() { + new PageStreamingDescriptor< + ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>() { @Override public Object emptyToken() { return ""; diff --git a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/v1/PublisherApiTest.java b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/v1/PublisherApiTest.java index aaf292ff917d..c25ca51ee713 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/v1/PublisherApiTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/v1/PublisherApiTest.java @@ -16,6 +16,7 @@ import com.google.api.gax.core.BackoffParams; import com.google.api.gax.core.RetryParams; +import com.google.api.gax.grpc.BundlingSettings; import com.google.gcloud.pubsub.testing.LocalPubsubHelper; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; @@ -30,6 +31,7 @@ import java.util.Collections; import java.util.List; +import org.joda.time.Duration; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -40,6 +42,7 @@ public class PublisherApiTest { private static LocalPubsubHelper pubsubHelper; private PublisherApi publisherApi; + private PublisherApi bundledPublisherApi; private SubscriberApi subscriberApi; @BeforeClass @@ -79,6 +82,18 @@ public void setUp() throws Exception { publisherSettings.provideChannelWith(channel); publisherApi = PublisherApi.create(publisherSettings); + BundlingSettings bundlingSettings = + BundlingSettings.newBuilder() + .setElementCountThreshold(10) + .setDelayThreshold(Duration.standardSeconds(30)) + .build(); + + PublisherSettings bundledPublisherSettings = PublisherSettings.create(); + bundledPublisherSettings.setRetryParamsOnAllMethods(retryParams); + bundledPublisherSettings.provideChannelWith(channel); + bundledPublisherSettings.publishMethod().setBundlingSettings(bundlingSettings); + bundledPublisherApi = PublisherApi.create(bundledPublisherSettings); + SubscriberSettings subscriberSettings = SubscriberSettings.create(); subscriberSettings.setRetryParamsOnAllMethods(retryParams); subscriberSettings.provideChannelWith(channel); @@ -93,6 +108,9 @@ public void tearDown() throws Exception { if (subscriberApi != null) { subscriberApi.close(); } + if (bundledPublisherApi != null) { + bundledPublisherApi.close(); + } pubsubHelper.reset(); } @@ -108,7 +126,8 @@ public void testPublish() throws Exception { String topicName = PublisherApi.ResourceNames.formatTopicPath("my-project", "publish-topic"); publisherApi.createTopic(topicName); - String subscriberName = SubscriberApi.ResourceNames.formatSubscriptionPath("my-project", "my-subscribe"); + String subscriberName = + SubscriberApi.ResourceNames.formatSubscriptionPath("my-project", "my-subscribe"); PushConfig config = PushConfig.getDefaultInstance(); subscriberApi.createSubscription(subscriberName, topicName, config, 5); @@ -122,6 +141,27 @@ public void testPublish() throws Exception { "pubsub-message", response.getReceivedMessages(0).getMessage().getData().toStringUtf8()); } + @Test + public void testBundledPublish() throws Exception { + String topicName = PublisherApi.ResourceNames.formatTopicPath("my-project", "publish-topic"); + bundledPublisherApi.createTopic(topicName); + + String subscriberName = + SubscriberApi.ResourceNames.formatSubscriptionPath("my-project", "my-subscribe"); + PushConfig config = PushConfig.getDefaultInstance(); + subscriberApi.createSubscription(subscriberName, topicName, config, 5); + + PubsubMessage msg = + PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("pubsub-message")).build(); + // This is a synchronous publish and should trigger the default blockingCallCountThreshold of 1 + bundledPublisherApi.publish(topicName, Collections.singletonList(msg)); + + PullResponse response = subscriberApi.pull(subscriberName, true, 100); + Assert.assertEquals(1, response.getReceivedMessagesCount()); + Assert.assertEquals( + "pubsub-message", response.getReceivedMessages(0).getMessage().getData().toStringUtf8()); + } + @Test public void testGetTopic() throws Exception { String topicName = PublisherApi.ResourceNames.formatTopicPath("my-project", "fun-topic");