Skip to content

Commit

Permalink
Add javadoc and tests for Subscription (#1074)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard authored Jun 24, 2016
1 parent 7de0d45 commit 0da6ac4
Show file tree
Hide file tree
Showing 4 changed files with 494 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ interface MessageConsumer extends AutoCloseable {

/**
* Sends a request for creating a topic. This method returns a {@code Future} object to consume
* the result. {@link Future#get()} returns the created topic or {@code null} if not found.
* the result. {@link Future#get()} returns the created topic.
*/
Future<Topic> createAsync(TopicInfo topic);

Expand Down Expand Up @@ -311,8 +311,7 @@ interface MessageConsumer extends AutoCloseable {

/**
* Sends a request for creating a subscription. This method returns a {@code Future} object to
* consume the result. {@link Future#get()} returns the created subscription or {@code null} if
* not found.
* consume the result. {@link Future#get()} returns the created subscription.
*/
Future<Subscription> createAsync(SubscriptionInfo subscription);

Expand Down Expand Up @@ -463,7 +462,7 @@ interface MessageConsumer extends AutoCloseable {
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);

/**
* Creates a message consumer that pulls messages for the provided subscription. You can stop
* Creates a message consumer that pulls messages from the provided subscription. You can stop
* pulling messages by calling {@link MessageConsumer#close()}. The returned message consumer
* executes {@link MessageProcessor#process(Message)} on each pulled message. If
* {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.GrpcServiceOptions;
import com.google.cloud.pubsub.PubSub.MessageConsumer;
import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.PubSub.PullOption;
Expand All @@ -30,7 +31,31 @@
import java.util.concurrent.Future;

/**
* PubSub subscription.
* A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a
* single, specific topic, to be delivered to the subscribing application. Pub/Sub subscriptions
* support both push and pull message delivery.
*
* <p>In a push subscription, the Pub/Sub server sends a request to the subscriber application, at a
* preconfigured endpoint (see {@link PushConfig}). The subscriber's HTTP response serves as an
* implicit acknowledgement: a success response indicates that the message has been succesfully
* processed and the Pub/Sub system can delete it from the subscription; a non-success response
* indicates that the Pub/Sub server should resend it (implicit "nack").
*
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
* When messages are pulled with {@link PubSub#pull(String, int)} or
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
* {@link PubSub#ackAsync(String, String, String...)}.
*
* <p>{@code Subscription} adds a layer of service-related functionality over
* {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription}
* object with the most recent information use {@link #reload} or {@link #reloadAsync}.
*
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
* @see <a href="https://cloud.google.com/pubsub/subscriber">Subscriber Guide</a>
*/
public class Subscription extends SubscriptionInfo {

Expand All @@ -39,6 +64,9 @@ public class Subscription extends SubscriptionInfo {
private final PubSubOptions options;
private transient PubSub pubsub;

/**
* A builder for {@code Subscription} objects.
*/
public static final class Builder extends SubscriptionInfo.Builder {

private final PubSub pubsub;
Expand Down Expand Up @@ -103,62 +131,172 @@ public Builder toBuilder() {
}

@Override
public int hashCode() {
public final int hashCode() {
return Objects.hash(options, super.hashCode());
}

@Override
public boolean equals(Object obj) {
public final boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
if (obj == null || !obj.getClass().equals(Subscription.class)) {
return false;
}
Subscription other = (Subscription) obj;
return Objects.equals(topic(), other.topic())
&& Objects.equals(name(), other.name())
&& Objects.equals(pushConfig(), other.pushConfig())
&& ackDeadlineSeconds() == other.ackDeadlineSeconds()
&& Objects.equals(options, other.options);
return baseEquals(other) && Objects.equals(options, other.options);
}

/**
* Returns the subscription's {@code PubSub} object used to issue requests.
*/
public PubSub pubSub() {
return pubsub;
}

/**
* Deletes this subscription.
*
* @return {@code true} if the subscription was deleted, {@code false} if it was not found
* @throws PubSubException upon failure
*/
public boolean delete() {
return pubsub.deleteSubscription(name());
}

/**
* Sends a request for deleting this subscription. This method returns a {@code Future} object to
* consume the result. {@link Future#get()} returns {@code true} if the subscription was deleted,
* {@code false} if it was not found.
*/
public Future<Boolean> deleteAsync() {
return pubsub.deleteSubscriptionAsync(name());
}

/**
* Fetches current subscription's latest information. Returns {@code null} if the subscription
* does not exist.
*
* @return a {@code Subscription} object with latest information or {@code null} if not found
* @throws PubSubException upon failure
*/
public Subscription reload() {
return pubsub.getSubscription(name());
}

/**
* Sends a request for fetching current subscription's latest information. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns the requested
* subscription or {@code null} if not found.
*
* @return a {@code Subscription} object with latest information or {@code null} if not found
* @throws PubSubException upon failure
*/
public Future<Subscription> reloadAsync() {
return pubsub.getSubscriptionAsync(name());
}

/**
* Sets the push configuration for this subscription. This may be used to change a push
* subscription to a pull one (passing a {@code null} {@code pushConfig} parameter) or vice versa.
* This methods can also be used to change the endpoint URL and other attributes of a push
* subscription. Messages will accumulate for delivery regardless of changes to the push
* configuration.
*
* @param pushConfig the new push configuration. Use {@code null} to unset it
* @throws PubSubException upon failure, or if the subscription does not exist
*/
public void replacePushConfig(PushConfig pushConfig) {
pubsub.replacePushConfig(name(), pushConfig);
}

/**
* Sends a request for updating the push configuration for a specified subscription. This may be
* used to change a push subscription to a pull one (passing a {@code null} {@code pushConfig}
* parameter) or vice versa. This methods can also be used to change the endpoint URL and other
* attributes of a push subscription. Messages will accumulate for delivery regardless of changes
* to the push configuration. The method returns a {@code Future} object that can be used to wait
* for the replace operation to be completed.
*
* @param pushConfig the new push configuration. Use {@code null} to unset it
* @return a {@code Future} to wait for the replace operation to be completed.
*/
public Future<Void> replacePushConfigAsync(PushConfig pushConfig) {
return pubsub.replacePushConfigAsync(name(), pushConfig);
}

/**
* Pulls messages from this subscription. This method possibly returns no messages if no message
* was available at the time the request was processed by the Pub/Sub service (i.e. the system is
* not allowed to wait until at least one message is available). Pulled messages have their
* acknowledge deadline automatically renewed until they are explicitly consumed using
* {@link Iterator#next()}.
*
* <p>Example usage of synchronous message pulling:
* <pre> {@code
* Iterator<ReceivedMessage> messageIterator = pubsub.pull("subscription", 100);
* while (messageIterator.hasNext()) {
* ReceivedMessage message = messageIterator.next();
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
* doSomething(message);
* message.ack(); // or message.nack()
* }}</pre>
*
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
public Iterator<ReceivedMessage> pull(int maxMessages) {
return pubsub.pull(name(), maxMessages);
}

/**
* Sends a request for pulling messages from this subscription. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
* This method possibly returns no messages if no message was available at the time the request
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
* message is available).
*
* <p>Example usage of asynchronous message pulling:
* <pre> {@code
* Future<Iterator<ReceivedMessage>> future = pubsub.pull("subscription", 100);
* // do something while the request gets processed
* Iterator<ReceivedMessage> messageIterator = future.get();
* while (messageIterator.hasNext()) {
* ReceivedMessage message = messageIterator.next();
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
* doSomething(message);
* message.ack(); // or message.nack()
* }}</pre>
*
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
return pubsub.pullAsync(name(), maxMessages);
}

/**
* Creates a message consumer that pulls messages from this subscription. You can stop pulling
* messages by calling {@link MessageConsumer#close()}. The returned message consumer executes
* {@link MessageProcessor#process(Message)} on each pulled message. If
* {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If
* {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For
* all pulled messages, the ack deadline is automatically renewed until the message is either
* acknowledged or "nacked".
*
* <p>The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
* number of queued messages (messages either being processed or waiting to be processed). The
* {@link PullOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be used to provide
* an executor to run message processor callbacks.
*
* @param callback the callback to be executed on each message
* @param options pulling options
* @return a message consumer for the provided subscription and options
*/
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
return pubsub.pullAsync(name(), callback, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@
* indicates that the Pub/Sub server should resend it (implicit "nack").
*
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
* {@link PubSub#pull(String, PubSub.PullOption...)},
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor)} or
* {@link PubSub#pullAsync(String, PubSub.PullOption...)}. The subscribing application must then
* explicitly acknowledge the messages using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
* When messages are pulled with {@link PubSub#pull(String, int)} or
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
* {@link PubSub#ackAsync(String, String, String...)}.
*
Expand Down Expand Up @@ -190,7 +191,7 @@ public TopicId topic() {
}

/**
* Sets the name of the subscription. The name must start with a letter, and contain only
* Returns the name of the subscription. The name must start with a letter, and contain only
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
Expand Down Expand Up @@ -223,19 +224,19 @@ public long ackDeadlineSeconds() {
return ackDeadlineSeconds;
}

final boolean baseEquals(SubscriptionInfo subscriptionInfo) {
return Objects.equals(topic, subscriptionInfo.topic)
&& Objects.equals(name, subscriptionInfo.name)
&& Objects.equals(pushConfig, subscriptionInfo.pushConfig)
&& ackDeadlineSeconds == subscriptionInfo.ackDeadlineSeconds;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || !obj.getClass().equals(this.getClass())) {
return false;
}
SubscriptionInfo other = (SubscriptionInfo) obj;
return Objects.equals(topic, other.topic)
&& Objects.equals(name, other.name)
&& Objects.equals(pushConfig, other.pushConfig)
&& ackDeadlineSeconds == other.ackDeadlineSeconds;
return obj == this
|| obj != null
&& obj.getClass().equals(SubscriptionInfo.class)
&& baseEquals((SubscriptionInfo) obj);
}

@Override
Expand Down
Loading

0 comments on commit 0da6ac4

Please sign in to comment.