Skip to content

Commit

Permalink
Bundling descriptor for Publish
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettjonesgoogle committed Mar 18, 2016
1 parent e32afa6 commit 95a1f90
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@

package com.google.gcloud.pubsub.spi.v1;

import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable;
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo;
import com.google.api.gax.grpc.BundlerFactory;
import com.google.api.gax.protobuf.PathTemplate;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.DeleteTopicRequest;
Expand Down Expand Up @@ -207,7 +208,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException {
this.channel = settings.getChannel();

this.createTopicCallable = settings.createTopicMethod().build(settings);
this.publishCallable = settings.publishMethod().build(settings);
BundlableApiCallableInfo<PublishRequest, PublishResponse> bundlablePublish =
settings.publishMethod().buildBundlable(settings);
this.publishCallable = bundlablePublish.getApiCallable();
BundlerFactory<PublishRequest, PublishResponse> publishBundlerFactory =
bundlablePublish.getBundlerFactory();
if (publishBundlerFactory != null) {
this.closeables.add(publishBundlerFactory);
}
this.getTopicCallable = settings.getTopicMethod().build(settings);
this.listTopicsCallable = settings.listTopicsMethod().build(settings);
this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
import com.google.api.gax.core.RetryParams;
import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder;
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableBuilder;
import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder;
import com.google.api.gax.grpc.PageDescriptor;
import com.google.api.gax.grpc.BundlingDescriptor;
import com.google.api.gax.grpc.PageStreamingDescriptor;
import com.google.api.gax.grpc.RequestIssuer;
import com.google.api.gax.grpc.ServiceApiSettings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -56,8 +59,12 @@
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

// Manually-added imports: add custom (non-generated) imports after this point.

Expand Down Expand Up @@ -134,7 +141,7 @@ public class PublisherSettings extends ServiceApiSettings {

private static class MethodBuilders {
private final ApiCallableBuilder<Topic, Topic> createTopicMethod;
private final ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
private final BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
private final ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod;
private final PageStreamingApiCallableBuilder<ListTopicsRequest, ListTopicsResponse, Topic>
listTopicsMethod;
Expand All @@ -149,7 +156,8 @@ public MethodBuilders() {
createTopicMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent"));
createTopicMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));

publishMethod = new ApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH);
publishMethod =
new BundlableApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH, PUBLISH_BUNDLING_DESC);
publishMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"));
publishMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));

Expand Down Expand Up @@ -223,7 +231,7 @@ protected PublisherSettings(MethodBuilders methods) {
}

/**
* Returns the ApiCallableBuilder for the API method createTopic.
* Returns the builder for the API method createTopic.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -233,17 +241,17 @@ public ApiCallableBuilder<Topic, Topic> createTopicMethod() {
}

/**
* Returns the ApiCallableBuilder for the API method publish.
* Returns the builder for the API method publish.
*
* <!-- manual edit -->
* <!-- end manual edit -->
*/
public ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
public BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
return methods.publishMethod;
}

/**
* Returns the ApiCallableBuilder for the API method getTopic.
* Returns the builder for the API method getTopic.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -253,7 +261,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the PageStreamingApiCallableBuilder for the API method listTopics.
* Returns the builder for the API method listTopics.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -264,7 +272,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions.
* Returns the builder for the API method listTopicSubscriptions.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -276,7 +284,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the ApiCallableBuilder for the API method deleteTopic.
* Returns the builder for the API method deleteTopic.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -285,9 +293,9 @@ public ApiCallableBuilder<DeleteTopicRequest, Empty> deleteTopicMethod() {
return methods.deleteTopicMethod;
}

private static PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
private static PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
LIST_TOPICS_PAGE_STR_DESC =
new PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
new PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
@Override
public Object emptyToken() {
return "";
Expand All @@ -309,10 +317,10 @@ public Iterable<Topic> extractResources(ListTopicsResponse payload) {
}
};

private static PageDescriptor<
private static PageStreamingDescriptor<
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>
LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC =
new PageDescriptor<
new PageStreamingDescriptor<
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>() {
@Override
public Object emptyToken() {
Expand All @@ -337,4 +345,66 @@ public Iterable<String> extractResources(ListTopicSubscriptionsResponse payload)
return payload.getSubscriptionsList();
}
};

private static BundlingDescriptor<PublishRequest, PublishResponse> PUBLISH_BUNDLING_DESC =
new BundlingDescriptor<PublishRequest, PublishResponse>() {
@Override
public String getBundlePartitionKey(PublishRequest request) {
return request.getTopic();
}

@Override
public PublishRequest mergeRequests(Collection<PublishRequest> requests) {
PublishRequest firstRequest = requests.iterator().next();

List<PubsubMessage> elements = new ArrayList<>();
for (PublishRequest request : requests) {
elements.addAll(request.getMessagesList());
}

PublishRequest bundleRequest =
PublishRequest.newBuilder()
.setTopic(firstRequest.getTopic())
.addAllMessages(elements)
.build();
return bundleRequest;
}

@Override
public void splitResponse(
PublishResponse bundleResponse,
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
int bundleMessageIndex = 0;
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
List<String> subresponseElements = new ArrayList<>();
int subresponseCount = responder.getRequest().getMessagesCount();
for (int i = 0; i < subresponseCount; i++) {
subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex));
bundleMessageIndex += 1;
}
PublishResponse response =
PublishResponse.newBuilder().addAllMessageIds(subresponseElements).build();
responder.setResponse(response);
}
}

@Override
public void splitException(
Throwable throwable,
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
responder.setException(throwable);
}
}

@Override
public long countElements(PublishRequest request) {
return request.getMessagesCount();
}

@Override
public long countBytes(PublishRequest request) {
return request.getSerializedSize();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

package com.google.gcloud.pubsub.spi.v1;

import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable;
import com.google.api.gax.protobuf.PathTemplate;
import com.google.protobuf.Empty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder;
import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder;
import com.google.api.gax.grpc.PageDescriptor;
import com.google.api.gax.grpc.PageStreamingDescriptor;
import com.google.api.gax.grpc.ServiceApiSettings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -234,7 +234,7 @@ protected SubscriberSettings(MethodBuilders methods) {
}

/**
* Returns the ApiCallableBuilder for the API method createSubscription.
* Returns the builder for the API method createSubscription.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -244,7 +244,7 @@ public ApiCallableBuilder<Subscription, Subscription> createSubscriptionMethod()
}

/**
* Returns the ApiCallableBuilder for the API method getSubscription.
* Returns the builder for the API method getSubscription.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -254,7 +254,7 @@ public ApiCallableBuilder<GetSubscriptionRequest, Subscription> getSubscriptionM
}

/**
* Returns the PageStreamingApiCallableBuilder for the API method listSubscriptions.
* Returns the builder for the API method listSubscriptions.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -266,7 +266,7 @@ public ApiCallableBuilder<GetSubscriptionRequest, Subscription> getSubscriptionM
}

/**
* Returns the ApiCallableBuilder for the API method deleteSubscription.
* Returns the builder for the API method deleteSubscription.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -276,7 +276,7 @@ public ApiCallableBuilder<DeleteSubscriptionRequest, Empty> deleteSubscriptionMe
}

/**
* Returns the ApiCallableBuilder for the API method modifyAckDeadline.
* Returns the builder for the API method modifyAckDeadline.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -286,7 +286,7 @@ public ApiCallableBuilder<ModifyAckDeadlineRequest, Empty> modifyAckDeadlineMeth
}

/**
* Returns the ApiCallableBuilder for the API method acknowledge.
* Returns the builder for the API method acknowledge.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -296,7 +296,7 @@ public ApiCallableBuilder<AcknowledgeRequest, Empty> acknowledgeMethod() {
}

/**
* Returns the ApiCallableBuilder for the API method pull.
* Returns the builder for the API method pull.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -306,7 +306,7 @@ public ApiCallableBuilder<PullRequest, PullResponse> pullMethod() {
}

/**
* Returns the ApiCallableBuilder for the API method modifyPushConfig.
* Returns the builder for the API method modifyPushConfig.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -315,9 +315,11 @@ public ApiCallableBuilder<ModifyPushConfigRequest, Empty> modifyPushConfigMethod
return methods.modifyPushConfigMethod;
}

private static PageDescriptor<ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>
private static PageStreamingDescriptor<
ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>
LIST_SUBSCRIPTIONS_PAGE_STR_DESC =
new PageDescriptor<ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>() {
new PageStreamingDescriptor<
ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>() {
@Override
public Object emptyToken() {
return "";
Expand Down
2 changes: 1 addition & 1 deletion gcloud-java-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>0.0.4</version>
<version>0.0.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@

package com.google.gcloud.pubsub.spi.v1;

import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable;
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo;
import com.google.api.gax.grpc.BundlerFactory;
import com.google.api.gax.protobuf.PathTemplate;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.DeleteTopicRequest;
Expand Down Expand Up @@ -207,7 +208,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException {
this.channel = settings.getChannel();

this.createTopicCallable = settings.createTopicMethod().build(settings);
this.publishCallable = settings.publishMethod().build(settings);
BundlableApiCallableInfo<PublishRequest, PublishResponse> bundlablePublish =
settings.publishMethod().buildBundlable(settings);
this.publishCallable = bundlablePublish.getApiCallable();
BundlerFactory<PublishRequest, PublishResponse> publishBundlerFactory =
bundlablePublish.getBundlerFactory();
if (publishBundlerFactory != null) {
this.closeables.add(publishBundlerFactory);
}
this.getTopicCallable = settings.getTopicMethod().build(settings);
this.listTopicsCallable = settings.listTopicsMethod().build(settings);
this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings);
Expand Down
Loading

0 comments on commit 95a1f90

Please sign in to comment.