diff --git a/gcloud-java-pubsub/pom.xml b/gcloud-java-pubsub/pom.xml index 77902a588260..621afa656e37 100644 --- a/gcloud-java-pubsub/pom.xml +++ b/gcloud-java-pubsub/pom.xml @@ -48,6 +48,12 @@ 4.12 test + + org.easymock + easymock + 3.4 + test + diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 48de7002d54f..b98315e77a75 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -101,7 +101,7 @@ private PullOption(Option.OptionType option, Object value) { } /** - * Returns an option to specify the maximum number of messages that can be executed + * Returns an option to specify the maximum number of messages that can be processed * concurrently at any time. */ public static PullOption maxConcurrentCallbacks(int maxConcurrency) { @@ -110,74 +110,275 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) { } /** - * A callback to process pulled messages. - * The message will be ack'ed upon successful return or nack'ed if exception is thrown. + * A callback to process pulled messages. The received message will be ack'ed upon successful + * return or nack'ed if exception is thrown. */ interface MessageProcessor { + /** + * Processes the received {@code message}. If this method returns correctly the message is + * ack'ed. If this method throws an exception the message is nack'ed. + */ void process(Message message) throws Exception; } /** - * An interface to control message consumer settings. + * An interface to control a message consumer. */ interface MessageConsumer extends AutoCloseable { + /** + * Stops pulling messages from the subscription associated with this {@code MessageConsumer} and + * frees all resources. Messages that have already been pulled are processed before closing. + */ + @Override + void close() throws Exception; } + /** + * Creates a new topic. + * + * @return the created topic + * @throws PubSubException upon failure + */ Topic create(TopicInfo topic); + /** + * Sends a request for creating a topic. This method returns a {@code Future} object to consume + * the result. {@link Future#get()} returns the created topic or {@code null} if not found. + */ Future createAsync(TopicInfo topic); - // null if not found + /** + * Returns the requested topic or {@code null} if not found. + * + * @throws PubSubException upon failure + */ Topic getTopic(String topic); + /** + * Sends a request for getting a topic. This method returns a {@code Future} object to consume the + * result. {@link Future#get()} returns the requested topic or {@code null} if not found. + * + * @throws PubSubException upon failure + */ Future getTopicAsync(String topic); - // false if not found + /** + * Deletes the requested topic. + * + * @return {@code true} if the topic was deleted, {@code false} if it was not found + */ boolean deleteTopic(String topic); + /** + * Sends a request for deleting a topic. This method returns a {@code Future} object to consume + * the result. {@link Future#get()} returns {@code true} if the topic was deleted, {@code false} + * if it was not found. + */ Future deleteTopicAsync(String topic); + /** + * Lists the topics. This method returns a {@link Page} object that can be used to consume + * paginated results. Use {@link ListOption} to specify the page size or the page token from which + * to start listing topics. + * + * @throws PubSubException upon failure + */ Page listTopics(ListOption... options); + /** + * Sends a request for listing topics. This method returns a {@code Future} object to consume + * the result. {@link Future#get()} returns an {@link AsyncPage} object that can be used to + * asynchronously handle paginated results. Use {@link ListOption} to specify the page size or the + * page token from which to start listing topics. + */ Future> listTopicsAsync(ListOption... options); + /** + * Publishes a message to the provided topic. This method returns a service-generated id for the + * published message. Service-generated ids are guaranteed to be unique within the topic. + * + * @param topic the topic where the message is published + * @param message the message to publish + * @return a unique service-generated id for the message + * @throws PubSubException upon failure, if the topic does not exist or if the message has empty + * payload and no attributes + */ String publish(String topic, Message message); + /** + * Sends a request for publishing a message to the provided topic. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a service-generated + * id for the published message. Service-generated ids are guaranteed to be unique within the + * topic. + * + * @param topic the topic where the message is published + * @param message the message to publish + * @return a {@code Future} for the unique service-generated id for the message + */ Future publishAsync(String topic, Message message); + /** + * Publishes a number of messages to the provided topic. This method returns a list of + * service-generated ids for the published messages. Service-generated ids are guaranteed to be + * unique within the topic. + * + * @param topic the topic where the message is published + * @param message the first message to publish + * @param messages other messages to publish + * @return a list of unique, service-generated, ids. Ids are in the same order as the messages. + * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has + * empty payload and no attributes + */ List publish(String topic, Message message, Message... messages); + /** + * Sends a request to publish a number of messages to the provided topic. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a list of + * service-generated ids for the published messages. Service-generated ids are guaranteed to be + * unique within the topic. + * + * @param topic the topic where the message is published + * @param message the first message to publish + * @param messages other messages to publish + * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as + * the messages. + */ Future> publishAsync(String topic, Message message, Message... messages); + /** + * Publishes a number of messages to the provided topic. This method returns a list of + * service-generated ids for the published messages. Service-generated ids are guaranteed to be + * unique within the topic. + * + * @param topic the topic where the message is published + * @param messages the messages to publish + * @return a list of unique, service-generated, ids. Ids are in the same order as the messages. + * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has + * empty payload and no attributes + */ List publish(String topic, Iterable messages); + /** + * Sends a request to publish a number of messages to the provided topic. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a list of + * service-generated ids for the published messages. Service-generated ids are guaranteed to be + * unique within the topic. + * + * @param topic the topic where the message is published + * @param messages the messages to publish + * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as + * the messages + */ Future> publishAsync(String topic, Iterable messages); + /** + * Creates a new subscription. + * + * @return the created subscription + * @throws PubSubException upon failure + */ Subscription create(SubscriptionInfo subscription); + /** + * Sends a request for creating a subscription. This method returns a {@code Future} object to + * consume the result. {@link Future#get()} returns the created subscription or {@code null} if + * not found. + */ Future createAsync(SubscriptionInfo subscription); - // null if not found + /** + * Returns the requested subscription or {@code null} if not found. + */ Subscription getSubscription(String subscription); + /** + * Sends a request for getting a subscription. This method returns a {@code Future} object to + * consume the result. {@link Future#get()} returns the requested subscription or {@code null} if + * not found. + */ Future getSubscriptionAsync(String subscription); + /** + * Sets the push configuration for a specified subscription. This may be used to change a push + * subscription to a pull one (passing a {@code null} {@code pushConfig} parameter) or vice versa. + * This methods can also be used to change the endpoint URL and other attributes of a push + * subscription. Messages will accumulate for delivery regardless of changes to the push + * configuration. + * + * @param subscription the subscription for which to replace push configuration + * @param pushConfig the new push configuration. Use {@code null} to unset it + * @throws PubSubException upon failure, or if the subscription does not exist + */ void replacePushConfig(String subscription, PushConfig pushConfig); + /** + * Sends a request for updating the push configuration for a specified subscription. This may be + * used to change a push subscription to a pull one (passing a {@code null} {@code pushConfig} + * parameter) or vice versa. This methods can also be used to change the endpoint URL and other + * attributes of a push subscription. Messages will accumulate for delivery regardless of changes + * to the push configuration. The method returns a {@code Future} object that can be used to wait + * for the replace operation to be completed. + * + * @param subscription the subscription for which to replace push configuration + * @param pushConfig the new push configuration. Use {@code null} to unset it + * @return a {@code Future} to wait for the replace operation to be completed. + */ Future replacePushConfigAsync(String subscription, PushConfig pushConfig); - // false if not found + /** + * Deletes the requested subscription. + * + * @return {@code true} if the subscription was deleted, {@code false} if it was not found + * @throws PubSubException upon failure + */ boolean deleteSubscription(String subscription); + /** + * Sends a request for deleting a subscription. This method returns a {@code Future} object to + * consume the result. {@link Future#get()} returns {@code true} if the subscription was deleted, + * {@code false} if it was not found. + */ Future deleteSubscriptionAsync(String subscription); + /** + * Lists the subscriptions. This method returns a {@link Page} object that can be used to consume + * paginated results. Use {@link ListOption} to specify the page size or the page token from which + * to start listing subscriptions. + * + * @throws PubSubException upon failure + */ Page listSubscriptions(ListOption... options); + /** + * Sends a request for listing subscriptions. This method returns a {@code Future} object to + * consume the result. {@link Future#get()} returns an {@link AsyncPage} object that can be used + * to asynchronously handle paginated results. Use {@link ListOption} to specify the page size or + * the page token from which to start listing subscriptions. + * + * @throws PubSubException upon failure + */ Future> listSubscriptionsAsync(ListOption... options); + /** + * Lists the identities of the subscriptions for the provided topic. This method returns a + * {@link Page} object that can be used to consume paginated results. Use {@link ListOption} to + * specify the page size or the page token from which to start listing subscriptions. + * + * @param topic the topic for which to list subscriptions + * @throws PubSubException upon failure + */ Page listSubscriptions(String topic, ListOption... options); + /** + * Sends a request for listing the identities of subscriptions for the provided topic. This method + * returns a {@code Future} object to consume the result. {@link Future#get()} returns an + * {@link AsyncPage} object that can be used to asynchronously handle paginated results. Use + * {@link ListOption} to specify the page size or the page token from which to start listing + * subscriptions. + * + * @param topic the topic for which to list subscriptions + */ Future> listSubscriptionsAsync(String topic, ListOption... options); Iterator pull(String subscription, int maxMessages); diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index bd69103b9819..948f9904b31c 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -16,22 +16,45 @@ package com.google.cloud.pubsub; +import static com.google.api.client.util.Preconditions.checkArgument; +import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; +import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; import static com.google.common.util.concurrent.Futures.lazyTransform; import com.google.cloud.AsyncPage; +import com.google.cloud.AsyncPageImpl; import com.google.cloud.BaseService; import com.google.cloud.Page; +import com.google.cloud.PageImpl; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.v1.PublisherApi; +import com.google.cloud.pubsub.spi.v1.SubscriberApi; import com.google.common.base.Function; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteSubscriptionRequest; import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetSubscriptionRequest; import com.google.pubsub.v1.GetTopicRequest; - +import com.google.pubsub.v1.ListSubscriptionsRequest; +import com.google.pubsub.v1.ListSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.ModifyPushConfigRequest; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; + +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -40,11 +63,96 @@ class PubSubImpl extends BaseService implements PubSub { private final PubSubRpc rpc; + private static final Function EMPTY_TO_VOID_FUNCTION = new Function() { + @Override + public Void apply(Empty empty) { + return null; + } + }; + private static final Function EMPTY_TO_BOOLEAN_FUNCTION = + new Function() { + @Override + public Boolean apply(Empty input) { + return input != null; + } + }; + PubSubImpl(PubSubOptions options) { super(options); rpc = options.rpc(); } + private abstract static class BasePageFetcher implements AsyncPageImpl.NextPageFetcher { + + private static final long serialVersionUID = -2122989557125999209L; + + private final PubSubOptions serviceOptions; + private final Map requestOptions; + + private BasePageFetcher(PubSubOptions serviceOptions, String cursor, + Map requestOptions) { + this.serviceOptions = serviceOptions; + this.requestOptions = + PageImpl.nextRequestOptions(PAGE_TOKEN, cursor, requestOptions); + } + + PubSubOptions serviceOptions() { + return serviceOptions; + } + + Map requestOptions() { + return requestOptions; + } + } + + private static class TopicPageFetcher extends BasePageFetcher { + + private static final long serialVersionUID = -7153536453427361814L; + + TopicPageFetcher(PubSubOptions serviceOptions, String cursor, + Map requestOptions) { + super(serviceOptions, cursor, requestOptions); + } + + @Override + public Future> nextPage() { + return listTopicsAsync(serviceOptions(), requestOptions()); + } + } + + private static class SubscriptionPageFetcher extends BasePageFetcher { + + private static final long serialVersionUID = -5634446170301177992L; + + SubscriptionPageFetcher(PubSubOptions serviceOptions, String cursor, + Map requestOptions) { + super(serviceOptions, cursor, requestOptions); + } + + @Override + public Future> nextPage() { + return listSubscriptionsAsync(serviceOptions(), requestOptions()); + } + } + + private static class SubscriptionNamePageFetcher extends BasePageFetcher { + + private static final long serialVersionUID = 7250525437694464444L; + + private final String topic; + + SubscriptionNamePageFetcher(String topic, PubSubOptions serviceOptions, String cursor, + Map requestOptions) { + super(serviceOptions, cursor, requestOptions); + this.topic = topic; + } + + @Override + public Future> nextPage() { + return listSubscriptionsAsync(topic, serviceOptions(), requestOptions()); + } + } + private static V get(Future future) { try { return Uninterruptibles.getUninterruptibly(future); @@ -86,113 +194,251 @@ public Future deleteTopicAsync(String topic) { DeleteTopicRequest request = DeleteTopicRequest.newBuilder() .setTopic(PublisherApi.formatTopicName(options().projectId(), topic)) .build(); - return lazyTransform(rpc.delete(request), new Function() { + return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); + } + + private static ListTopicsRequest listTopicsRequest(PubSubOptions serviceOptions, + Map options) { + ListTopicsRequest.Builder builder = ListTopicsRequest.newBuilder(); + builder.setProject(SubscriberApi.formatProjectName(serviceOptions.projectId())); + Integer pageSize = PAGE_SIZE.get(options); + String pageToken = PAGE_TOKEN.get(options); + if (pageSize != null) { + builder.setPageSize(pageSize); + } + if (pageToken != null) { + builder.setPageToken(pageToken); + } + return builder.build(); + } + + private static Future> listTopicsAsync(final PubSubOptions serviceOptions, + final Map options) { + final ListTopicsRequest request = listTopicsRequest(serviceOptions, options); + Future list = serviceOptions.rpc().list(request); + return lazyTransform(list, new Function>() { @Override - public Boolean apply(Empty input) { - return true; + public AsyncPage apply(ListTopicsResponse listTopicsResponse) { + List topics = listTopicsResponse.getTopicsList() == null ? ImmutableList.of() + : Lists.transform(listTopicsResponse.getTopicsList(), + Topic.fromPbFunction(serviceOptions.service())); + String cursor = listTopicsResponse.getNextPageToken().equals("") ? null + : listTopicsResponse.getNextPageToken(); + return new AsyncPageImpl<>( + new TopicPageFetcher(serviceOptions, cursor, options), cursor, topics); } }); } @Override public Page listTopics(ListOption... options) { - return null; + return get(listTopicsAsync(options)); } @Override public Future> listTopicsAsync(ListOption... options) { - return null; + return listTopicsAsync(options(), optionMap(options)); } @Override public String publish(String topic, Message message) { - return null; + return get(publishAsync(topic, message)); + } + + private static PublishRequest publishRequest(PubSubOptions serviceOptions, String topic, + Iterable messages) { + PublishRequest.Builder builder = PublishRequest.newBuilder(); + builder.setTopic(PublisherApi.formatTopicName(serviceOptions.projectId(), topic)); + builder.addAllMessages(Iterables.transform(messages, Message.TO_PB_FUNCTION)); + return builder.build(); } @Override public Future publishAsync(String topic, Message message) { - return null; + return lazyTransform( + rpc.publish(publishRequest(options(), topic, Collections.singletonList(message))), + new Function() { + @Override + public String apply(PublishResponse publishResponse) { + return publishResponse.getMessageIdsList().get(0); + } + }); } @Override public List publish(String topic, Message message, Message... messages) { - return null; + return publish(topic, Lists.asList(message, messages)); } @Override public Future> publishAsync(String topic, Message message, Message... messages) { - return null; + return publishAsync(topic, Lists.asList(message, messages)); } @Override public List publish(String topic, Iterable messages) { - return null; + return get(publishAsync(topic, messages)); } @Override public Future> publishAsync(String topic, Iterable messages) { - return null; + return lazyTransform(rpc.publish(publishRequest(options(), topic, messages)), + new Function>() { + @Override + public List apply(PublishResponse publishResponse) { + return publishResponse.getMessageIdsList(); + } + }); } @Override public Subscription create(SubscriptionInfo subscription) { - return null; + return get(createAsync(subscription)); } @Override public Future createAsync(SubscriptionInfo subscription) { - return null; + return lazyTransform(rpc.create(subscription.toPb(options().projectId())), + Subscription.fromPbFunction(this)); } @Override public Subscription getSubscription(String subscription) { - return null; + return get(getSubscriptionAsync(subscription)); } @Override public Future getSubscriptionAsync(String subscription) { - return null; + GetSubscriptionRequest request = GetSubscriptionRequest.newBuilder() + .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) + .build(); + return lazyTransform(rpc.get(request), Subscription.fromPbFunction(this)); } @Override public void replacePushConfig(String subscription, PushConfig pushConfig) { - + get(replacePushConfigAsync(subscription, pushConfig)); } @Override public Future replacePushConfigAsync(String subscription, PushConfig pushConfig) { - return null; + ModifyPushConfigRequest request = ModifyPushConfigRequest.newBuilder() + .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) + .setPushConfig(pushConfig != null ? pushConfig.toPb() + : com.google.pubsub.v1.PushConfig.getDefaultInstance()) + .build(); + return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION); } @Override public boolean deleteSubscription(String subscription) { - return false; + return get(deleteSubscriptionAsync(subscription)); } @Override public Future deleteSubscriptionAsync(String subscription) { - return null; + DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder() + .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) + .build(); + return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); + } + + private static ListSubscriptionsRequest listSubscriptionsRequest(PubSubOptions serviceOptions, + Map options) { + ListSubscriptionsRequest.Builder builder = ListSubscriptionsRequest.newBuilder(); + builder.setProject(SubscriberApi.formatProjectName(serviceOptions.projectId())); + Integer pageSize = PAGE_SIZE.getInteger(options); + String pageToken = PAGE_TOKEN.getString(options); + if (pageSize != null) { + builder.setPageSize(pageSize); + } + if (pageToken != null) { + builder.setPageToken(pageToken); + } + return builder.build(); + } + + private static Future> listSubscriptionsAsync( + final PubSubOptions serviceOptions, final Map options) { + final ListSubscriptionsRequest request = listSubscriptionsRequest(serviceOptions, options); + Future list = serviceOptions.rpc().list(request); + return lazyTransform(list, new Function>() { + @Override + public AsyncPage apply(ListSubscriptionsResponse listSubscriptionsResponse) { + List subscriptions = listSubscriptionsResponse.getSubscriptionsList() == null + ? ImmutableList.of() + : Lists.transform(listSubscriptionsResponse.getSubscriptionsList(), + Subscription.fromPbFunction(serviceOptions.service())); + String cursor = listSubscriptionsResponse.getNextPageToken().equals("") ? null + : listSubscriptionsResponse.getNextPageToken(); + return new AsyncPageImpl<>(new SubscriptionPageFetcher(serviceOptions, cursor, options), + cursor, subscriptions); + } + }); } @Override public Page listSubscriptions(ListOption... options) { - return null; + return get(listSubscriptionsAsync(options)); } - @Override public Future> listSubscriptionsAsync(ListOption... options) { - return null; + return listSubscriptionsAsync(options(), optionMap(options)); + } + + private static ListTopicSubscriptionsRequest listSubscriptionsRequest(String topic, + PubSubOptions serviceOptions, Map options) { + ListTopicSubscriptionsRequest.Builder builder = ListTopicSubscriptionsRequest.newBuilder(); + builder.setTopic(PublisherApi.formatTopicName(serviceOptions.projectId(), topic)); + Integer pageSize = PAGE_SIZE.getInteger(options); + String pageToken = PAGE_TOKEN.getString(options); + if (pageSize != null) { + builder.setPageSize(pageSize); + } + if (pageToken != null) { + builder.setPageToken(pageToken); + } + return builder.build(); + } + + private static Future> listSubscriptionsAsync(final String topic, + final PubSubOptions serviceOptions, final Map options) { + final ListTopicSubscriptionsRequest request = + listSubscriptionsRequest(topic, serviceOptions, options); + Future list = serviceOptions.rpc().list(request); + return lazyTransform(list, + new Function>() { + @Override + public AsyncPage apply( + ListTopicSubscriptionsResponse listSubscriptionsResponse) { + List subscriptions = + listSubscriptionsResponse.getSubscriptionsList() == null + ? ImmutableList.of() + : Lists.transform(listSubscriptionsResponse.getSubscriptionsList(), + new Function() { + @Override + public SubscriptionId apply(String compositeSubscription) { + return SubscriptionId.fromPb(compositeSubscription); + } + }); + String cursor = listSubscriptionsResponse.getNextPageToken().equals("") ? null + : listSubscriptionsResponse.getNextPageToken(); + return new AsyncPageImpl<>( + new SubscriptionNamePageFetcher(topic, serviceOptions, cursor, options), cursor, + subscriptions); + } + }); } @Override public Page listSubscriptions(String topic, ListOption... options) { - return null; + return get(listSubscriptionsAsync(topic, options)); } @Override public Future> listSubscriptionsAsync(String topic, ListOption... options) { - return null; + return listSubscriptionsAsync(topic, options(), optionMap(options)); } @Override @@ -279,6 +525,15 @@ public Future modifyAckDeadlineAsync(String subscription, int deadline, Ti return null; } + static Map optionMap(Option... options) { + Map optionMap = Maps.newHashMap(); + for (Option option : options) { + Object prev = optionMap.put(option.optionType(), option.value()); + checkArgument(prev == null, "Duplicate option %s", option); + } + return optionMap; + } + @Override public void close() throws Exception { rpc.close(); diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java index 7dd203a73348..9e1ed2f53579 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java @@ -178,7 +178,7 @@ static Function fromPbFunction( return new Function() { @Override public Subscription apply(com.google.pubsub.v1.Subscription subscriptionPb) { - return fromPb(pubsub, subscriptionPb); + return subscriptionPb != null ? fromPb(pubsub, subscriptionPb) : null; } }; } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java index 3c8722d6e6f9..8e619b5bf633 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java @@ -102,8 +102,8 @@ public abstract static class Builder { * again during that time (on a best-effort basis). For pull subscriptions, this value is used * as the initial value for the ack deadline. To override the ack deadline value for a given * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. If - * not specified, the default value of 10 seconds is used. + * delivery, this value is used to set the request timeout for the call to the push endpoint. + * This value must be between 10 and 600 seconds, if not specified, 10 seconds is used. */ public abstract Builder ackDeadLineSeconds(int ackDeadLineSeconds); @@ -216,8 +216,8 @@ public PushConfig pushConfig() { * again during that time (on a best-effort basis). For pull subscriptions, this value is used * as the initial value for the ack deadline. To override the ack deadline value for a given * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. If - * not specified, the default value of 10 seconds is used. + * delivery, this value is used to set the request timeout for the call to the push endpoint. This + * value must be between 10 and 600 seconds, if not specified, 10 seconds is used. */ public long ackDeadlineSeconds() { return ackDeadlineSeconds; diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java index 4c08e485bb9d..65ed737fc0cd 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java @@ -155,7 +155,7 @@ static Function fromPbFunction(final PubSub p return new Function() { @Override public Topic apply(com.google.pubsub.v1.Topic topicPb) { - return fromPb(pubsub, topicPb); + return topicPb != null ? fromPb(pubsub, topicPb) : null; } }; } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index 7878a6d83835..1745b03cf7d7 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -153,7 +153,7 @@ public V apply(ApiException exception) { @Override public Future create(Topic topic) { - // TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings + // TODO: it would be nice if we can get the idempotent information from the ApiCallSettings // or from the exception return translate(publisherApi.createTopicCallable().futureCall(topic), true); } @@ -185,7 +185,6 @@ public Future list(ListTopicSubscriptionsRequest @Override public Future delete(DeleteTopicRequest request) { - // TODO: check if null is not going to work for Empty return translate(publisherApi.deleteTopicCallable().futureCall(request), true, Code.NOT_FOUND.value()); } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java new file mode 100644 index 000000000000..3360632b45ce --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -0,0 +1,431 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.AsyncPage; +import com.google.cloud.Page; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * A base class for system tests. This class can be extended to run system tests in different + * environments (e.g. local emulator or remote Pub/Sub service). + */ +public abstract class BaseSystemTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Returns the Pub/Sub service used to issue requests. This service can be such that it interacts + * with the remote Pub/Sub service (for integration tests) or with an emulator + * (for local testing). + */ + protected abstract PubSub pubsub(); + + /** + * Formats a resource name for testing purpose. For instance, for tests against the remote + * service, it is recommended to append to the name a random or time-based seed to prevent + * name clashes. + */ + protected abstract String formatForTest(String resourceName); + + @Test + public void testCreateGetAndDeleteTopic() { + String name = formatForTest("test-create-get-delete-topic"); + Topic topic = pubsub().create(TopicInfo.of(name)); + assertEquals(name, topic.name()); + Topic remoteTopic = pubsub().getTopic(name); + assertEquals(topic, remoteTopic); + assertTrue(topic.delete()); + } + + @Test + public void testGetTopic_NotExist() { + String name = formatForTest("test-get-non-existing-topic"); + assertNull(pubsub().getTopic(name)); + } + + @Test + public void testDeleteTopic_NotExist() { + assertFalse(pubsub().deleteTopic(formatForTest("test-delete-non-existing-topic"))); + } + + @Test + public void testCreateGetAndDeleteTopicAsync() throws ExecutionException, InterruptedException { + String name = formatForTest("test-create-get-delete-async-topic"); + Future topicFuture = pubsub().createAsync(TopicInfo.of(name)); + Topic createdTopic = topicFuture.get(); + assertEquals(name, createdTopic.name()); + topicFuture = pubsub().getTopicAsync(name); + assertEquals(createdTopic, topicFuture.get()); + assertTrue(createdTopic.deleteAsync().get()); + } + + @Test + public void testListTopics() { + Topic topic1 = pubsub().create(TopicInfo.of(formatForTest("test-list-topic1"))); + Topic topic2 = pubsub().create(TopicInfo.of(formatForTest("test-list-topic2"))); + Topic topic3 = pubsub().create(TopicInfo.of(formatForTest("test-list-topic3"))); + Set topicNames = Sets.newHashSet(); + // We use 1 as page size to force pagination + Page topics = pubsub().listTopics(PubSub.ListOption.pageSize(1)); + Iterator iterator = topics.iterateAll(); + while (iterator.hasNext()) { + topicNames.add(iterator.next().name()); + } + assertTrue(topicNames.contains(topic1.name())); + assertTrue(topicNames.contains(topic2.name())); + assertTrue(topicNames.contains(topic3.name())); + assertTrue(topic1.delete()); + assertTrue(topic2.delete()); + assertTrue(topic3.delete()); + } + + @Test + public void testListTopicsAsync() throws ExecutionException, InterruptedException { + Topic topic1 = pubsub().create(TopicInfo.of(formatForTest("test-list-async-topic1"))); + Topic topic2 = pubsub().create(TopicInfo.of(formatForTest("test-list-async-topic2"))); + Topic topic3 = pubsub().create(TopicInfo.of(formatForTest("test-list-async-topic3"))); + Set topicNames = Sets.newHashSet(); + Future> pageFuture = pubsub().listTopicsAsync(PubSub.ListOption.pageSize(1)); + Iterator iterator = pageFuture.get().iterateAll(); + while (iterator.hasNext()) { + topicNames.add(iterator.next().name()); + } + assertTrue(topicNames.contains(topic1.name())); + assertTrue(topicNames.contains(topic2.name())); + assertTrue(topicNames.contains(topic3.name())); + assertTrue(topic1.delete()); + assertTrue(topic2.delete()); + assertTrue(topic3.delete()); + } + + @Test + public void testPublishOneMessage() { + String topic = formatForTest("test-publish-one-message-topic"); + pubsub().create(TopicInfo.of(topic)); + Message message = Message.of("payload"); + assertNotNull(pubsub().publish(topic, message)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPublishNonExistingTopic() { + String topic = formatForTest("test-publish-non-existing-topic"); + Message message = Message.of("payload"); + thrown.expect(PubSubException.class); + pubsub().publish(topic, message); + } + + @Test + public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-publish-one-message-async-topic"); + pubsub().create(TopicInfo.of(topic)); + Message message = Message.of("payload"); + Future publishFuture = pubsub().publishAsync(topic, message); + assertNotNull(publishFuture.get()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPublishMoreMessages() { + String topic = formatForTest("test-publish-more-messages-topic"); + pubsub().create(TopicInfo.of(topic)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = pubsub().publish(topic, message1, message2); + assertEquals(2, messageIds.size()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-publish-more-messages-topic-async-topic"); + pubsub().create(TopicInfo.of(topic)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Future> publishFuture = pubsub().publishAsync(topic, message1, message2); + assertEquals(2, publishFuture.get().size()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPublishMessageList() { + String topic = formatForTest("test-publish-message-list-topic"); + pubsub().create(TopicInfo.of(topic)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPublishMessagesListAsync() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-publish-message-list-async-topic"); + pubsub().create(TopicInfo.of(topic)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Future> publishFuture = + pubsub().publishAsync(topic, ImmutableList.of(message1, message2)); + assertEquals(2, publishFuture.get().size()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testCreateGetAndDeleteSubscription() { + String topic = formatForTest("test-create-get-delete-subscription-topic"); + pubsub().create(TopicInfo.of(topic)); + String name = formatForTest("test-create-get-delete-subscription"); + Subscription subscription = pubsub().create(SubscriptionInfo.of(topic, name)); + assertEquals(TopicId.of(pubsub().options().projectId(), topic), subscription.topic()); + assertEquals(name, subscription.name()); + assertNull(subscription.pushConfig()); + // todo(mziccard) seems not to work on the emulator (returns 60) - see #989 + // assertEquals(10, subscription.ackDeadlineSeconds()); + Subscription remoteSubscription = pubsub().getSubscription(name); + assertEquals(subscription, remoteSubscription); + assertTrue(subscription.delete()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testGetSubscription_NotExist() { + assertNull(pubsub().getSubscription(formatForTest("test-get-non-existing-subscription"))); + } + + @Test + public void testDeleteSubscription_NotExist() { + assertFalse( + pubsub().deleteSubscription(formatForTest("test-delete-non-existing-subscription"))); + } + + @Test + public void testCreateGetAndDeleteSubscriptionAsync() + throws ExecutionException, InterruptedException { + String topic = formatForTest("test-create-get-delete-async-subscription-topic"); + pubsub().create(TopicInfo.of(topic)); + String name = formatForTest("test-create-get-delete-async-subscription"); + String endpoint = "https://" + pubsub().options().projectId() + ".appspot.com/push"; + PushConfig pushConfig = PushConfig.of(endpoint); + Future subscriptionFuture = + pubsub().createAsync(SubscriptionInfo.builder(topic, name).pushConfig(pushConfig).build()); + Subscription subscription = subscriptionFuture.get(); + assertEquals(TopicId.of(pubsub().options().projectId(), topic), subscription.topic()); + assertEquals(name, subscription.name()); + assertEquals(pushConfig, subscription.pushConfig()); + // todo(mziccard) seems not to work on the emulator (returns 60) - see #989 + // assertEquals(10, subscription.ackDeadlineSeconds()); + subscriptionFuture = pubsub().getSubscriptionAsync(name); + Subscription remoteSubscription = subscriptionFuture.get(); + assertEquals(subscription, remoteSubscription); + assertTrue(subscription.deleteAsync().get()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + @Ignore("Emulator incosistency; see issue ##988") + public void testGetSubscriptionDeletedTopic() { + String topic = formatForTest("test-get-deleted-topic-subscription-topic"); + pubsub().create(TopicInfo.of(topic)); + String name = formatForTest("test-get-deleted-topic-subscription"); + Subscription subscription = pubsub().create(SubscriptionInfo.of(topic, name)); + assertEquals(TopicId.of(pubsub().options().projectId(), topic), subscription.topic()); + assertEquals(name, subscription.name()); + assertNull(subscription.pushConfig()); + // todo(mziccard) seems not to work on the emulator (returns 60) - see #989 + // assertEquals(10, subscription.ackDeadlineSeconds()); + assertTrue(pubsub().deleteTopic(topic)); + assertNull(pubsub().getTopic(topic)); + Subscription remoteSubscription = pubsub().getSubscription(name); + assertEquals(TopicId.of("_deleted-topic_"), remoteSubscription.topic()); + assertEquals(name, remoteSubscription.name()); + assertNull(remoteSubscription.pushConfig()); + assertTrue(subscription.delete()); + } + + @Test + public void testReplaceSubscriptionPushConfig() { + String topic = formatForTest("test-replace-push-config-topic"); + pubsub().create(TopicInfo.of(topic)); + String name = formatForTest("test-replace-push-config-subscription"); + String endpoint = "https://" + pubsub().options().projectId() + ".appspot.com/push"; + PushConfig pushConfig = PushConfig.of(endpoint); + Subscription subscription = + pubsub().create(SubscriptionInfo.builder(topic, name).pushConfig(pushConfig).build()); + assertEquals(TopicId.of(pubsub().options().projectId(), topic), subscription.topic()); + assertEquals(name, subscription.name()); + assertEquals(pushConfig, subscription.pushConfig()); + // todo(mziccard) seems not to work on the emulator (returns 60) - see #989 + // assertEquals(10, subscription.ackDeadlineSeconds()); + pubsub().replacePushConfig(name, null); + Subscription remoteSubscription = pubsub().getSubscription(name); + assertEquals(TopicId.of(pubsub().options().projectId(), topic), remoteSubscription.topic()); + assertEquals(name, remoteSubscription.name()); + assertNull(remoteSubscription.pushConfig()); + // todo(mziccard) seems not to work on the emulator (returns 60) - see #989 + // assertEquals(10, remoteSubscription.ackDeadlineSeconds()); + assertTrue(subscription.delete()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testReplaceNonExistingSubscriptionPushConfig() { + String name = formatForTest("test-replace-push-config-non-existing-subscription"); + thrown.expect(PubSubException.class); + pubsub().replacePushConfig(name, null); + } + + @Test + public void testReplaceSubscriptionPushConfigAsync() + throws ExecutionException, InterruptedException { + String topic = formatForTest("test-replace-push-config-async-topic"); + pubsub().create(TopicInfo.of(topic)); + String name = formatForTest("test-replace-push-config-async-subscription"); + Future subscriptionFuture = + pubsub().createAsync(SubscriptionInfo.of(topic, name)); + Subscription subscription = subscriptionFuture.get(); + assertEquals(TopicId.of(pubsub().options().projectId(), topic), subscription.topic()); + assertEquals(name, subscription.name()); + assertNull(subscription.pushConfig()); + // todo(mziccard) seems not to work on the emulator (returns 60) - see #989 + // assertEquals(10, subscription.ackDeadlineSeconds()); + String endpoint = "https://" + pubsub().options().projectId() + ".appspot.com/push"; + PushConfig pushConfig = PushConfig.of(endpoint); + pubsub().replacePushConfigAsync(name, pushConfig).get(); + Subscription remoteSubscription = pubsub().getSubscriptionAsync(name).get(); + assertEquals(TopicId.of(pubsub().options().projectId(), topic), remoteSubscription.topic()); + assertEquals(name, remoteSubscription.name()); + assertEquals(pushConfig, remoteSubscription.pushConfig()); + // todo(mziccard) seems not to work on the emulator (returns 60) - see #989 + // assertEquals(10, remoteSubscription.ackDeadlineSeconds()); + assertTrue(subscription.deleteAsync().get()); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testListSubscriptions() { + String topicName1 = formatForTest("test-list-subscriptions-topic1"); + String topicName2 = formatForTest("test-list-subscriptions-topic2"); + Topic topic1 = pubsub().create(TopicInfo.of(topicName1)); + Topic topic2 = pubsub().create(TopicInfo.of(topicName2)); + String subscriptionName1 = formatForTest("test-list-subscriptions-subscription1"); + String subscriptionName2 = formatForTest("test-list-subscriptions-subscription2"); + String subscriptionName3 = formatForTest("test-list-subscriptions-subscription3"); + Subscription subscription1 = + pubsub().create(SubscriptionInfo.of(topicName1, subscriptionName1)); + Subscription subscription2 = + pubsub().create(SubscriptionInfo.of(topicName1, subscriptionName2)); + Subscription subscription3 = + pubsub().create(SubscriptionInfo.of(topicName2, subscriptionName3)); + Set subscriptionNames = Sets.newHashSet(); + // We use 1 as page size to force pagination + Page subscriptions = pubsub().listSubscriptions(PubSub.ListOption.pageSize(1)); + Iterator iterator = subscriptions.iterateAll(); + while (iterator.hasNext()) { + String name = iterator.next().name(); + subscriptionNames.add(name); + } + assertTrue(subscriptionNames.contains(subscriptionName1)); + assertTrue(subscriptionNames.contains(subscriptionName2)); + assertTrue(subscriptionNames.contains(subscriptionName3)); + Set topicSubscriptionNames = Sets.newHashSet(); + Page topic1Subscriptions = + topic1.listSubscriptions(PubSub.ListOption.pageSize(1)); + Iterator firstStringPageIterator = topic1Subscriptions.values().iterator(); + topicSubscriptionNames.add(firstStringPageIterator.next().subscription()); + assertFalse(firstStringPageIterator.hasNext()); + Iterator topicSubscriptionsIterator = + topic1Subscriptions.nextPage().iterateAll(); + while (topicSubscriptionsIterator.hasNext()) { + topicSubscriptionNames.add(topicSubscriptionsIterator.next().subscription()); + } + assertEquals(2, topicSubscriptionNames.size()); + assertTrue(topicSubscriptionNames.contains(subscriptionName1)); + assertTrue(topicSubscriptionNames.contains(subscriptionName2)); + assertTrue(topic1.delete()); + assertTrue(topic2.delete()); + assertTrue(subscription1.delete()); + assertTrue(subscription2.delete()); + assertTrue(subscription3.delete()); + } + + @Test + public void testListSubscriptionsAsync() throws ExecutionException, InterruptedException { + String topicName1 = formatForTest("test-list-subscriptions-async-topic1"); + String topicName2 = formatForTest("test-list-subscriptions-async-topic2"); + Topic topic1 = pubsub().create(TopicInfo.of(topicName1)); + Topic topic2 = pubsub().create(TopicInfo.of(topicName2)); + String subscriptionName1 = formatForTest("test-list-subscriptions-async-subscription1"); + String subscriptionName2 = formatForTest("test-list-subscriptions-async-subscription2"); + String subscriptionName3 = formatForTest("test-list-subscriptions-async-subscription3"); + Subscription subscription1 = + pubsub().create(SubscriptionInfo.of(topicName1, subscriptionName1)); + Subscription subscription2 = + pubsub().create(SubscriptionInfo.of(topicName1, subscriptionName2)); + Subscription subscription3 = + pubsub().create(SubscriptionInfo.of(topicName2, subscriptionName3)); + // We use 1 as page size to force pagination + Set subscriptionNames = Sets.newHashSet(); + Future> pageFuture = + pubsub().listSubscriptionsAsync(PubSub.ListOption.pageSize(1)); + Iterator iterator = pageFuture.get().iterateAll(); + while (iterator.hasNext()) { + subscriptionNames.add(iterator.next().name()); + } + assertTrue(subscriptionNames.contains(subscriptionName1)); + assertTrue(subscriptionNames.contains(subscriptionName2)); + assertTrue(subscriptionNames.contains(subscriptionName3)); + Set topicSubscriptionNames = Sets.newHashSet(); + AsyncPage topic1Subscriptions = + topic1.listSubscriptionsAsync(PubSub.ListOption.pageSize(1)).get(); + Iterator firstStringPageIterator = topic1Subscriptions.values().iterator(); + topicSubscriptionNames.add(firstStringPageIterator.next().subscription()); + assertFalse(firstStringPageIterator.hasNext()); + Iterator topicSubscriptionsIterator = + topic1Subscriptions.nextPageAsync().get().iterateAll(); + while (topicSubscriptionsIterator.hasNext()) { + topicSubscriptionNames.add(topicSubscriptionsIterator.next().subscription()); + } + assertEquals(2, topicSubscriptionNames.size()); + assertTrue(topicSubscriptionNames.contains(subscriptionName1)); + assertTrue(topicSubscriptionNames.contains(subscriptionName2)); + assertTrue(topic1.delete()); + assertTrue(topic2.delete()); + assertTrue(subscription1.delete()); + assertTrue(subscription2.delete()); + assertTrue(subscription3.delete()); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/LocalSystemTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/LocalSystemTest.java new file mode 100644 index 000000000000..618a2b9bd926 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/LocalSystemTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.pubsub.testing.LocalPubsubHelper; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +public class LocalSystemTest extends BaseSystemTest { + + private static LocalPubsubHelper pubsubHelper; + private static PubSub pubsub; + + @Override + protected PubSub pubsub() { + return pubsub; + } + + @Override + protected String formatForTest(String resourceName) { + return resourceName; + } + + @BeforeClass + public static void startServer() throws IOException, InterruptedException { + pubsubHelper = new LocalPubsubHelper(); + pubsubHelper.start(); + pubsub = pubsubHelper.options().service(); + } + + @AfterClass + public static void stopServer() throws Exception { + pubsub.options().rpc().close(); + pubsubHelper.reset(); + pubsubHelper.stop(); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java new file mode 100644 index 000000000000..f9817d72b6b0 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -0,0 +1,1198 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.cloud.pubsub.spi.v1.SubscriberApi.formatSubscriptionName; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.AsyncPage; +import com.google.cloud.Page; +import com.google.cloud.RetryParams; +import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpcFactory; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteSubscriptionRequest; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetSubscriptionRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListSubscriptionsRequest; +import com.google.pubsub.v1.ListSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.ModifyPushConfigRequest; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; + +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class PubSubImplTest { + + private static final String PROJECT = "project"; + private static final String PROJECT_PB = "projects/project"; + private static final String TOPIC = "topic"; + private static final String TOPIC_NAME_PB = "projects/project/topics/topic"; + private static final TopicInfo TOPIC_INFO = TopicInfo.of(TOPIC); + private static final Function TOPIC_TO_PB_FUNCTION = + new Function() { + @Override + public com.google.pubsub.v1.Topic apply(TopicInfo topicInfo) { + return topicInfo.toPb(PROJECT); + } + }; + private static final Message MESSAGE = Message.of("payload"); + private static final String SUBSCRIPTION = "subscription"; + private static final String SUBSCRIPTION_NAME_PB = "projects/project/subscriptions/subscription"; + private static final PushConfig PUSH_CONFIG = PushConfig.of("endpoint"); + private static final SubscriptionInfo SUBSCRIPTION_INFO = + SubscriptionInfo.builder(TOPIC, SUBSCRIPTION) + .ackDeadLineSeconds(42) + .pushConfig(PUSH_CONFIG) + .build(); + private static final SubscriptionInfo COMPLETE_SUBSCRIPTION_INFO = + SubscriptionInfo.builder(TopicId.of(PROJECT, TOPIC), SUBSCRIPTION) + .ackDeadLineSeconds(42) + .pushConfig(PUSH_CONFIG) + .build(); + private static final Function + SUBSCRIPTION_TO_PB_FUNCTION = + new Function() { + @Override + public com.google.pubsub.v1.Subscription apply(SubscriptionInfo subscriptionInfo) { + return subscriptionInfo.toPb(PROJECT); + } + }; + private static final Function SUBSCRIPTION_ID_TO_PB_FUNCTION = + new Function() { + @Override + public String apply(SubscriptionId subscriptionId) { + return formatSubscriptionName(subscriptionId.project(), subscriptionId.subscription()); + } + }; + + private PubSubOptions options; + private PubSubRpcFactory rpcFactoryMock; + private PubSubRpc pubsubRpcMock; + private PubSub pubsub; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void setUp() { + rpcFactoryMock = EasyMock.createStrictMock(PubSubRpcFactory.class); + pubsubRpcMock = EasyMock.createStrictMock(PubSubRpc.class); + EasyMock.expect(rpcFactoryMock.create(EasyMock.anyObject(PubSubOptions.class))) + .andReturn(pubsubRpcMock).times(1); + options = PubSubOptions.builder() + .projectId(PROJECT) + .serviceRpcFactory(rpcFactoryMock) + .retryParams(RetryParams.noRetries()) + .build(); + EasyMock.replay(rpcFactoryMock, pubsubRpcMock); + EasyMock.reset(pubsubRpcMock); + } + + @After + public void tearDown() { + EasyMock.verify(rpcFactoryMock, pubsubRpcMock); + } + + @Test + public void testGetOptions() { + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertSame(options, pubsub.options()); + } + + @Test + public void testCreateTopic() { + com.google.pubsub.v1.Topic topicPb = TOPIC_INFO.toPb(PROJECT); + Future response = Futures.immediateFuture(topicPb); + EasyMock.expect(pubsubRpcMock.create(topicPb)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + Topic topic = pubsub.create(TOPIC_INFO); + assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic); + } + + @Test + public void testCreateTopicAsync() throws ExecutionException, InterruptedException { + com.google.pubsub.v1.Topic topicPb = TOPIC_INFO.toPb(PROJECT); + Future response = Futures.immediateFuture(topicPb); + EasyMock.expect(pubsubRpcMock.create(topicPb)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + Topic topic = pubsub.createAsync(TOPIC_INFO).get(); + assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic); + } + + @Test + public void testGetTopic() { + GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); + Future response = + Futures.immediateFuture(TOPIC_INFO.toPb(PROJECT)); + EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + Topic topic = pubsub.getTopic(TOPIC); + assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic); + } + + @Test + public void testGetTopic_Null() { + GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); + Future responseFuture = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.get(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertNull(pubsub.getTopic(TOPIC)); + } + + @Test + public void testGetTopicAsync() throws ExecutionException, InterruptedException { + GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); + Future response = + Futures.immediateFuture(TOPIC_INFO.toPb(PROJECT)); + EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + Future topicFuture = pubsub.getTopicAsync(TOPIC); + assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topicFuture.get()); + } + + @Test + public void testGetTopicAsync_Null() throws ExecutionException, InterruptedException { + GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); + Future responseFuture = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.get(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertNull(pubsub.getTopicAsync(TOPIC).get()); + } + + @Test + public void testDeleteTopic() { + DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertTrue(pubsub.deleteTopic(TOPIC)); + } + + @Test + public void testDeleteTopic_Null() { + DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertFalse(pubsub.deleteTopic(TOPIC)); + } + + @Test + public void testDeleteTopicAsync() throws ExecutionException, InterruptedException { + DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertTrue(pubsub.deleteTopicAsync(TOPIC).get()); + } + + @Test + public void testDeleteTopicAsync_Null() throws ExecutionException, InterruptedException { + DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertFalse(pubsub.deleteTopicAsync(TOPIC).get()); + } + + @Test + public void testListTopics() { + String cursor = "cursor"; + pubsub = options.service(); + ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); + List topicList = ImmutableList.of( + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO))); + ListTopicsResponse response = ListTopicsResponse.newBuilder() + .setNextPageToken("cursor") + .addAllTopics(Lists.transform(topicList, TOPIC_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listTopics(); + assertEquals(cursor, page.nextPageCursor()); + assertArrayEquals(topicList.toArray(), Iterables.toArray(page.values(), Topic.class)); + } + + @Test + public void testListTopicsNextPage() { + String cursor1 = "cursor"; + pubsub = options.service(); + ListTopicsRequest request1 = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); + ListTopicsRequest request2 = ListTopicsRequest.newBuilder() + .setProject(PROJECT_PB) + .setPageToken(cursor1) + .build(); + List topicList1 = ImmutableList.of( + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO))); + List topicList2 = ImmutableList.of( + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO))); + ListTopicsResponse response1 = ListTopicsResponse.newBuilder() + .setNextPageToken(cursor1) + .addAllTopics(Lists.transform(topicList1, TOPIC_TO_PB_FUNCTION)) + .build(); + String cursor2 = "nextCursor"; + ListTopicsResponse response2 = ListTopicsResponse.newBuilder() + .setNextPageToken(cursor2) + .addAllTopics(Lists.transform(topicList2, TOPIC_TO_PB_FUNCTION)) + .build(); + Future futureResponse1 = Futures.immediateFuture(response1); + Future futureResponse2 = Futures.immediateFuture(response2); + EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); + EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listTopics(); + assertEquals(cursor1, page.nextPageCursor()); + assertArrayEquals(topicList1.toArray(), Iterables.toArray(page.values(), Topic.class)); + page = page.nextPage(); + assertEquals(cursor2, page.nextPageCursor()); + assertArrayEquals(topicList2.toArray(), Iterables.toArray(page.values(), Topic.class)); + } + + @Test + public void testListTopicsEmpty() { + pubsub = options.service(); + ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); + List topicList = ImmutableList.of(); + ListTopicsResponse response = ListTopicsResponse.newBuilder() + .setNextPageToken("") + .addAllTopics(Lists.transform(topicList, TOPIC_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listTopics(); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertArrayEquals(topicList.toArray(), Iterators.toArray(page.iterateAll(), Topic.class)); + } + + @Test + public void testListTopicsWithOptions() { + String cursor = "cursor"; + pubsub = options.service(); + ListTopicsRequest request = ListTopicsRequest.newBuilder() + .setProject(PROJECT_PB) + .setPageSize(42) + .setPageToken(cursor) + .build(); + List topicList = ImmutableList.of( + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO))); + ListTopicsResponse response = ListTopicsResponse.newBuilder() + .setNextPageToken("") + .addAllTopics(Lists.transform(topicList, TOPIC_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listTopics(ListOption.pageSize(42), ListOption.pageToken(cursor)); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertArrayEquals(topicList.toArray(), Iterables.toArray(page.values(), Topic.class)); + } + + @Test + public void testListTopicsAsync() throws ExecutionException, InterruptedException { + String cursor = "cursor"; + pubsub = options.service(); + ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); + List topicList = ImmutableList.of( + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO))); + ListTopicsResponse response = ListTopicsResponse.newBuilder() + .setNextPageToken("cursor") + .addAllTopics(Lists.transform(topicList, TOPIC_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listTopicsAsync().get(); + assertEquals(cursor, page.nextPageCursor()); + assertArrayEquals(topicList.toArray(), Iterables.toArray(page.values(), Topic.class)); + } + + @Test + public void testListTopicsAsyncNextPage() throws ExecutionException, InterruptedException { + String cursor1 = "cursor"; + pubsub = options.service(); + ListTopicsRequest request1 = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); + ListTopicsRequest request2 = ListTopicsRequest.newBuilder() + .setProject(PROJECT_PB) + .setPageToken(cursor1) + .build(); + List topicList1 = ImmutableList.of( + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO))); + List topicList2 = ImmutableList.of( + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO))); + ListTopicsResponse response1 = ListTopicsResponse.newBuilder() + .setNextPageToken(cursor1) + .addAllTopics(Lists.transform(topicList1, TOPIC_TO_PB_FUNCTION)) + .build(); + String cursor2 = "nextCursor"; + ListTopicsResponse response2 = ListTopicsResponse.newBuilder() + .setNextPageToken(cursor2) + .addAllTopics(Lists.transform(topicList2, TOPIC_TO_PB_FUNCTION)) + .build(); + Future futureResponse1 = Futures.immediateFuture(response1); + Future futureResponse2 = Futures.immediateFuture(response2); + EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); + EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listTopicsAsync().get(); + assertEquals(cursor1, page.nextPageCursor()); + assertArrayEquals(topicList1.toArray(), Iterables.toArray(page.values(), Topic.class)); + page = page.nextPageAsync().get(); + assertEquals(cursor2, page.nextPageCursor()); + assertArrayEquals(topicList2.toArray(), Iterables.toArray(page.values(), Topic.class)); + } + + @Test + public void testListTopicsAsyncEmpty() throws ExecutionException, InterruptedException { + pubsub = options.service(); + ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); + List topicList = ImmutableList.of(); + ListTopicsResponse response = ListTopicsResponse.newBuilder() + .setNextPageToken("") + .addAllTopics(Lists.transform(topicList, TOPIC_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listTopicsAsync().get(); + assertNull(page.nextPageCursor()); + assertNull(page.nextPageAsync().get()); + assertNull(page.nextPage()); + assertArrayEquals(topicList.toArray(), Iterators.toArray(page.iterateAll(), Topic.class)); + } + + @Test + public void testListTopicsAsyncWithOptions() throws ExecutionException, InterruptedException { + String cursor = "cursor"; + pubsub = options.service(); + ListTopicsRequest request = ListTopicsRequest.newBuilder() + .setProject(PROJECT_PB) + .setPageSize(42) + .setPageToken(cursor) + .build(); + List topicList = ImmutableList.of( + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), + new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO))); + ListTopicsResponse response = ListTopicsResponse.newBuilder() + .setNextPageToken("") + .addAllTopics(Lists.transform(topicList, TOPIC_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = + pubsub.listTopicsAsync(ListOption.pageSize(42), ListOption.pageToken(cursor)).get(); + assertNull(page.nextPageCursor()); + assertNull(page.nextPageAsync().get()); + assertArrayEquals(topicList.toArray(), Iterables.toArray(page.values(), Topic.class)); + } + + @Test + public void testPublishOneMessage() { + PublishRequest request = PublishRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .addAllMessages(ImmutableList.of(MESSAGE.toPb())) + .build(); + String messageId = "messageId"; + PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertEquals(messageId, pubsub.publish(TOPIC, MESSAGE)); + } + + @Test + public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException { + PublishRequest request = PublishRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .addMessages(MESSAGE.toPb()) + .build(); + String messageId = "messageId"; + PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertEquals(messageId, pubsub.publishAsync(TOPIC, MESSAGE).get()); + } + + @Test + public void testPublishMoreMessages() { + PublishRequest request = PublishRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb())) + .build(); + List messageIds = ImmutableList.of("messageId1", "messageId2"); + PublishResponse response = PublishResponse.newBuilder() + .addAllMessageIds(messageIds) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertEquals(messageIds, pubsub.publish(TOPIC, MESSAGE, MESSAGE)); + } + + @Test + public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException { + PublishRequest request = PublishRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb())) + .build(); + List messageIds = ImmutableList.of("messageId1", "messageId2"); + PublishResponse response = PublishResponse.newBuilder() + .addAllMessageIds(messageIds) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertEquals(messageIds, pubsub.publishAsync(TOPIC, MESSAGE, MESSAGE).get()); + } + + @Test + public void testPublishMessageList() { + PublishRequest request = PublishRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb())) + .build(); + List messageIds = ImmutableList.of("messageId1", "messageId2"); + PublishResponse response = PublishResponse.newBuilder() + .addAllMessageIds(messageIds) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertEquals(messageIds, pubsub.publish(TOPIC, ImmutableList.of(MESSAGE, MESSAGE))); + } + + @Test + public void testPublishMessageListAsync() throws ExecutionException, InterruptedException { + PublishRequest request = PublishRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb())) + .build(); + List messageIds = ImmutableList.of("messageId1", "messageId2"); + PublishResponse response = PublishResponse.newBuilder() + .addAllMessageIds(messageIds) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertEquals(messageIds, pubsub.publishAsync(TOPIC, ImmutableList.of(MESSAGE, MESSAGE)).get()); + } + + @Test + public void testCreateSubscription() { + com.google.pubsub.v1.Subscription subscriptionPb = SUBSCRIPTION_INFO.toPb(PROJECT); + Future response = + Futures.immediateFuture(subscriptionPb); + EasyMock.expect(pubsubRpcMock.create(subscriptionPb)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + Subscription subscription = pubsub.create(SUBSCRIPTION_INFO); + assertEquals( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + subscription); + } + + @Test + public void testCreateSubscriptionAsync() throws ExecutionException, InterruptedException { + com.google.pubsub.v1.Subscription subscriptionPb = SUBSCRIPTION_INFO.toPb(PROJECT); + Future response = + Futures.immediateFuture(subscriptionPb); + EasyMock.expect(pubsubRpcMock.create(subscriptionPb)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + Subscription subscription = pubsub.createAsync(SUBSCRIPTION_INFO).get(); + assertEquals( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + subscription); + } + + @Test + public void testGetSubscription() { + GetSubscriptionRequest request = + GetSubscriptionRequest.newBuilder().setSubscription(SUBSCRIPTION_NAME_PB).build(); + Future response = + Futures.immediateFuture(SUBSCRIPTION_INFO.toPb(PROJECT)); + EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + Subscription subscription = pubsub.getSubscription(SUBSCRIPTION); + assertEquals( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + subscription); + } + + @Test + public void testGetSubscription_Null() { + GetSubscriptionRequest request = + GetSubscriptionRequest.newBuilder().setSubscription(SUBSCRIPTION_NAME_PB).build(); + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertNull(pubsub.getSubscription(SUBSCRIPTION)); + } + + @Test + public void testGetSubscriptionAsync() throws ExecutionException, InterruptedException { + GetSubscriptionRequest request = + GetSubscriptionRequest.newBuilder().setSubscription(SUBSCRIPTION_NAME_PB).build(); + Future response = + Futures.immediateFuture(SUBSCRIPTION_INFO.toPb(PROJECT)); + EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + Subscription subscription = pubsub.getSubscriptionAsync(SUBSCRIPTION).get(); + assertEquals( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + subscription); + } + + @Test + public void testGetSubscriptionAsync_Null() throws ExecutionException, InterruptedException { + GetSubscriptionRequest request = + GetSubscriptionRequest.newBuilder().setSubscription(SUBSCRIPTION_NAME_PB).build(); + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertNull(pubsub.getSubscriptionAsync(SUBSCRIPTION).get()); + } + + @Test + public void testDeleteSubscription() { + DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertTrue(pubsub.deleteSubscription(SUBSCRIPTION)); + } + + @Test + public void testDeleteSubscription_Null() { + DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .build(); + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertFalse(pubsub.deleteSubscription(SUBSCRIPTION)); + } + + @Test + public void testDeleteSubscriptionAsync() throws ExecutionException, InterruptedException { + DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertTrue(pubsub.deleteSubscriptionAsync(SUBSCRIPTION).get()); + } + + @Test + public void testDeleteSubscriptionAsync_Null() throws ExecutionException, InterruptedException { + DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .build(); + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + assertFalse(pubsub.deleteSubscriptionAsync(SUBSCRIPTION).get()); + } + + @Test + public void testReplacePushConfig() { + ModifyPushConfigRequest request = ModifyPushConfigRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setPushConfig(PUSH_CONFIG.toPb()) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + pubsub.replacePushConfig(SUBSCRIPTION, PUSH_CONFIG); + } + + @Test + public void testReplacePushConfig_Null() { + ModifyPushConfigRequest request = ModifyPushConfigRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setPushConfig(com.google.pubsub.v1.PushConfig.getDefaultInstance()) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + pubsub.replacePushConfig(SUBSCRIPTION, null); + } + + @Test + public void testReplacePushConfigAsync() throws ExecutionException, InterruptedException { + ModifyPushConfigRequest request = ModifyPushConfigRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setPushConfig(PUSH_CONFIG.toPb()) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + pubsub.replacePushConfigAsync(SUBSCRIPTION, PUSH_CONFIG).get(); + } + + @Test + public void testReplacePushConfigAsync_Null() throws ExecutionException, InterruptedException { + ModifyPushConfigRequest request = ModifyPushConfigRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setPushConfig(com.google.pubsub.v1.PushConfig.getDefaultInstance()) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub = options.service(); + pubsub.replacePushConfigAsync(SUBSCRIPTION, null).get(); + } + + @Test + public void testListSubscriptions() { + String cursor = "cursor"; + pubsub = options.service(); + ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .build(); + List subscriptionList = ImmutableList.of( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO))); + ListSubscriptionsResponse response = ListSubscriptionsResponse.newBuilder() + .setNextPageToken("cursor") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listSubscriptions(); + assertEquals(cursor, page.nextPageCursor()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + } + + @Test + public void testListSubscriptionsNextPage() { + String cursor1 = "cursor"; + pubsub = options.service(); + ListSubscriptionsRequest request1 = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .build(); + ListSubscriptionsRequest request2 = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .setPageToken(cursor1) + .build(); + List subscriptionList1 = ImmutableList.of( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO))); + List subscriptionList2 = ImmutableList.of( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO))); + ListSubscriptionsResponse response1 = ListSubscriptionsResponse.newBuilder() + .setNextPageToken(cursor1) + .addAllSubscriptions(Lists.transform(subscriptionList1, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + String cursor2 = "nextCursor"; + ListSubscriptionsResponse response2 = ListSubscriptionsResponse.newBuilder() + .setNextPageToken(cursor2) + .addAllSubscriptions(Lists.transform(subscriptionList2, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + Future futureResponse1 = Futures.immediateFuture(response1); + Future futureResponse2 = Futures.immediateFuture(response2); + EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); + EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listSubscriptions(); + assertEquals(cursor1, page.nextPageCursor()); + assertArrayEquals(subscriptionList1.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + page = page.nextPage(); + assertEquals(cursor2, page.nextPageCursor()); + assertArrayEquals(subscriptionList2.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + } + + @Test + public void testListSubscriptionsEmpty() { + pubsub = options.service(); + ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .build(); + List subscriptionList = ImmutableList.of(); + ListSubscriptionsResponse response = ListSubscriptionsResponse.newBuilder() + .setNextPageToken("") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listSubscriptions(); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + } + + @Test + public void testListSubscriptionsWithOptions() { + String cursor = "cursor"; + pubsub = options.service(); + ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .setPageSize(42) + .setPageToken(cursor) + .build(); + List subscriptionList = ImmutableList.of( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO))); + ListSubscriptionsResponse response = ListSubscriptionsResponse.newBuilder() + .setNextPageToken("") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = + pubsub.listSubscriptions(ListOption.pageSize(42), ListOption.pageToken(cursor)); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + } + + @Test + public void testListSubscriptionsAsync() throws ExecutionException, InterruptedException { + String cursor = "cursor"; + pubsub = options.service(); + ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .build(); + List subscriptionList = ImmutableList.of( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO))); + ListSubscriptionsResponse response = ListSubscriptionsResponse.newBuilder() + .setNextPageToken("cursor") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listSubscriptionsAsync().get(); + assertEquals(cursor, page.nextPageCursor()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + } + + @Test + public void testListSubscriptionsAsyncNextPage() throws ExecutionException, InterruptedException { + String cursor1 = "cursor"; + pubsub = options.service(); + ListSubscriptionsRequest request1 = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .build(); + ListSubscriptionsRequest request2 = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .setPageToken(cursor1) + .build(); + List subscriptionList1 = ImmutableList.of( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO))); + List subscriptionList2 = ImmutableList.of( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO))); + ListSubscriptionsResponse response1 = ListSubscriptionsResponse.newBuilder() + .setNextPageToken(cursor1) + .addAllSubscriptions(Lists.transform(subscriptionList1, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + String cursor2 = "nextCursor"; + ListSubscriptionsResponse response2 = ListSubscriptionsResponse.newBuilder() + .setNextPageToken(cursor2) + .addAllSubscriptions(Lists.transform(subscriptionList2, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + Future futureResponse1 = Futures.immediateFuture(response1); + Future futureResponse2 = Futures.immediateFuture(response2); + EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); + EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listSubscriptionsAsync().get(); + assertEquals(cursor1, page.nextPageCursor()); + assertArrayEquals(subscriptionList1.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + page = page.nextPageAsync().get(); + assertEquals(cursor2, page.nextPageCursor()); + assertArrayEquals(subscriptionList2.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + } + + @Test + public void testListSubscriptionsAsyncEmpty() throws ExecutionException, InterruptedException { + pubsub = options.service(); + ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .build(); + List subscriptionList = ImmutableList.of(); + ListSubscriptionsResponse response = ListSubscriptionsResponse.newBuilder() + .setNextPageToken("") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listSubscriptionsAsync().get(); + assertNull(page.nextPageCursor()); + assertNull(page.nextPageAsync().get()); + assertNull(page.nextPage()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + } + + @Test + public void testListSubscriptionsAsyncWithOptions() + throws ExecutionException, InterruptedException { + String cursor = "cursor"; + pubsub = options.service(); + ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() + .setProject(PROJECT_PB) + .setPageSize(42) + .setPageToken(cursor) + .build(); + List subscriptionList = ImmutableList.of( + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), + new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO))); + ListSubscriptionsResponse response = ListSubscriptionsResponse.newBuilder() + .setNextPageToken("") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_TO_PB_FUNCTION)) + .build(); + Future futureResponse = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = + pubsub.listSubscriptionsAsync(ListOption.pageSize(42), ListOption.pageToken(cursor)).get(); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertNull(page.nextPageAsync().get()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), Subscription.class)); + } + + @Test + public void testListTopicSubscriptions() { + String cursor = "cursor"; + pubsub = options.service(); + ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .build(); + List subscriptionList = ImmutableList.of( + new SubscriptionId(PROJECT, "subscription1"), + new SubscriptionId(PROJECT, "subscription2")); + ListTopicSubscriptionsResponse response = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken("cursor") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + Future futureResponse = + Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listSubscriptions(TOPIC); + assertEquals(cursor, page.nextPageCursor()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + } + + @Test + public void testListTopicSubscriptionsNextPage() { + String cursor1 = "cursor"; + pubsub = options.service(); + ListTopicSubscriptionsRequest request1 = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .build(); + ListTopicSubscriptionsRequest request2 = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .setPageToken(cursor1) + .build(); + List subscriptionList1 = ImmutableList.of( + new SubscriptionId(PROJECT, "subscription1"), + new SubscriptionId(PROJECT, "subscription2")); + List subscriptionList2 = ImmutableList.of( + new SubscriptionId(PROJECT, "subscription3")); + ListTopicSubscriptionsResponse response1 = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken(cursor1) + .addAllSubscriptions(Lists.transform(subscriptionList1, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + String cursor2 = "nextCursor"; + ListTopicSubscriptionsResponse response2 = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken(cursor2) + .addAllSubscriptions(Lists.transform(subscriptionList2, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + Future futureResponse1 = + Futures.immediateFuture(response1); + Future futureResponse2 = + Futures.immediateFuture(response2); + EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); + EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listSubscriptions(TOPIC); + assertEquals(cursor1, page.nextPageCursor()); + assertArrayEquals(subscriptionList1.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + page = page.nextPage(); + assertEquals(cursor2, page.nextPageCursor()); + assertArrayEquals(subscriptionList2.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + } + + @Test + public void testListTopicSubscriptionsEmpty() { + pubsub = options.service(); + ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .build(); + List subscriptionList = ImmutableList.of(); + ListTopicSubscriptionsResponse response = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken("") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + Future futureResponse = + Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = pubsub.listSubscriptions(TOPIC); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + } + + @Test + public void testListTopicSubscriptionsWithOptions() { + String cursor = "cursor"; + pubsub = options.service(); + ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .setPageSize(42) + .setPageToken(cursor) + .build(); + List subscriptionList = ImmutableList.of( + new SubscriptionId(PROJECT, "subscription1"), + new SubscriptionId(PROJECT, "subscription2")); + ListTopicSubscriptionsResponse response = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken("") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + Future futureResponse = + Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + Page page = + pubsub.listSubscriptions(TOPIC, ListOption.pageSize(42), ListOption.pageToken(cursor)); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + } + + @Test + public void testListTopicSubscriptionsAsync() throws ExecutionException, InterruptedException { + String cursor = "cursor"; + pubsub = options.service(); + ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .build(); + List subscriptionList = ImmutableList.of( + new SubscriptionId(PROJECT, "subscription1"), + new SubscriptionId(PROJECT, "subscription2")); + ListTopicSubscriptionsResponse response = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken("cursor") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + Future futureResponse = + Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listSubscriptionsAsync(TOPIC).get(); + assertEquals(cursor, page.nextPageCursor()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + } + + @Test + public void testListTopicSubscriptionsAsyncNextPage() + throws ExecutionException, InterruptedException { + String cursor1 = "cursor"; + pubsub = options.service(); + ListTopicSubscriptionsRequest request1 = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .build(); + ListTopicSubscriptionsRequest request2 = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .setPageToken(cursor1) + .build(); + List subscriptionList1 = ImmutableList.of( + new SubscriptionId(PROJECT, "subscription1"), + new SubscriptionId(PROJECT, "subscription2")); + List subscriptionList2 = ImmutableList.of( + new SubscriptionId(PROJECT, "subscription3")); + ListTopicSubscriptionsResponse response1 = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken(cursor1) + .addAllSubscriptions(Lists.transform(subscriptionList1, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + String cursor2 = "nextCursor"; + ListTopicSubscriptionsResponse response2 = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken(cursor2) + .addAllSubscriptions(Lists.transform(subscriptionList2, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + Future futureResponse1 = + Futures.immediateFuture(response1); + Future futureResponse2 = + Futures.immediateFuture(response2); + EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); + EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listSubscriptionsAsync(TOPIC).get(); + assertEquals(cursor1, page.nextPageCursor()); + assertArrayEquals(subscriptionList1.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + page = page.nextPageAsync().get(); + assertEquals(cursor2, page.nextPageCursor()); + assertArrayEquals(subscriptionList2.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + } + + @Test + public void testListTopicSubscriptionsAsyncEmpty() + throws ExecutionException, InterruptedException { + pubsub = options.service(); + ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .build(); + List subscriptionList = ImmutableList.of(); + ListTopicSubscriptionsResponse response = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken("") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + Future futureResponse = + Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listSubscriptionsAsync(TOPIC).get(); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertNull(page.nextPageAsync().get()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + } + + @Test + public void testListTopicSubscriptionsAsyncWithOptions() + throws ExecutionException, InterruptedException { + String cursor = "cursor"; + pubsub = options.service(); + ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() + .setTopic(TOPIC_NAME_PB) + .setPageSize(42) + .setPageToken(cursor) + .build(); + List subscriptionList = ImmutableList.of( + new SubscriptionId(PROJECT, "subscription1"), + new SubscriptionId(PROJECT, "subscription2")); + ListTopicSubscriptionsResponse response = ListTopicSubscriptionsResponse.newBuilder() + .setNextPageToken("") + .addAllSubscriptions(Lists.transform(subscriptionList, SUBSCRIPTION_ID_TO_PB_FUNCTION)) + .build(); + Future futureResponse = + Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); + EasyMock.replay(pubsubRpcMock); + AsyncPage page = pubsub.listSubscriptionsAsync( + TOPIC, ListOption.pageSize(42), ListOption.pageToken(cursor)).get(); + assertNull(page.nextPageCursor()); + assertNull(page.nextPage()); + assertNull(page.nextPageAsync().get()); + assertArrayEquals(subscriptionList.toArray(), + Iterables.toArray(page.values(), SubscriptionId.class)); + } + + @Test + public void testClose() throws Exception { + pubsub = options.service(); + pubsubRpcMock.close(); + EasyMock.expectLastCall(); + EasyMock.replay(pubsubRpcMock); + pubsub.close(); + } +}