diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Message.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Message.java
index b83e5a6296f6..cc65536d8a84 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Message.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Message.java
@@ -37,7 +37,7 @@
* a key {@code iana.org/language_tag} and value {@code en} could be added to messages to mark them
* as readable by an English-speaking subscriber.
*
- *
To be published a message must have a non-empty payload, or at least one attribute.
+ *
To be published, a message must have a non-empty payload, or at least one attribute.
*
* @see Pub/Sub Data Model
*/
@@ -64,11 +64,11 @@ private static final class InternalByteArray extends ByteArray {
private static final long serialVersionUID = -3330181485911805428L;
- protected InternalByteArray(ByteString byteString) {
+ InternalByteArray(ByteString byteString) {
super(byteString);
}
- protected InternalByteArray(ByteArray byteArray) {
+ InternalByteArray(ByteArray byteArray) {
super(byteArray);
}
@@ -244,17 +244,24 @@ public ByteArray payload() {
return payload;
}
+ final boolean baseEquals(Message message) {
+ return Objects.equals(id, message.id)
+ && Objects.equals(payload, message.payload)
+ && Objects.equals(attributes, message.attributes)
+ && Objects.equals(publishTime, message.publishTime);
+ }
+
@Override
public boolean equals(Object obj) {
return obj == this
|| obj != null
&& obj.getClass().equals(Message.class)
- && Objects.equals(toPb(), ((Message) obj).toPb());
+ && baseEquals((Message) obj);
}
@Override
public int hashCode() {
- return Objects.hash(serialVersionUID, id, payload, attributes, publishTime);
+ return Objects.hash(id, payload, attributes, publishTime);
}
@Override
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java
index c3cb705cb6d7..1af9a5ae46d2 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java
@@ -27,7 +27,16 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-public class ReceivedMessage extends Message {
+/**
+ * A Google Cloud Pub/Sub received message. A received message has all the information in
+ * {@link Message} as well as the acknowledge id. The ack id can be used to acknowledge the received
+ * message.
+ *
+ *
{@code ReceivedMessage} also adds a layer of service-related functionality over
+ * {@link Message} that help manage received messages (see {@link #ack()}, {@link #nack()} and
+ * {@link #modifyAckDeadline(int, TimeUnit)}).
+ */
+public final class ReceivedMessage extends Message {
private static final long serialVersionUID = -4178477763916251733L;
@@ -124,48 +133,109 @@ public int hashCode() {
@Override
public boolean equals(Object obj) {
- if (this == obj) {
+ if (obj == this) {
return true;
}
- if (obj == null || getClass() != obj.getClass()) {
+ if (obj == null || !obj.getClass().equals(ReceivedMessage.class)) {
return false;
}
ReceivedMessage other = (ReceivedMessage) obj;
- return Objects.equals(toPb(), other.toPb()) && Objects.equals(options, other.options);
+ return baseEquals(other) && Objects.equals(options, other.options);
}
- public PubSub pubSub() {
+ /**
+ * Returns the received message's {@code PubSub} object used to issue requests.
+ */
+ public PubSub pubsub() {
return pubsub;
}
+ /**
+ * Returns the name of the subscription this message was received from.
+ */
public String subscription() {
return subscription;
}
+ /**
+ * Returns the acknowledge id of the message. The ack id can be used to acknowledge the received
+ * message.
+ */
public String ackId() {
return ackId;
}
+ /**
+ * Acknowledges the current message.
+ *
+ * @throws PubSubException upon failure, or if the subscription was not found
+ */
public void ack() {
pubsub.ack(subscription, ackId);
}
+ /**
+ * Sends a request to acknowledge the current message. The method returns a {@code Future} object
+ * that can be used to wait for the acknowledge operation to be completed.
+ *
+ * @throws PubSubException upon failure, or if the subscription was not found
+ */
public Future ackAsync() {
return pubsub.ackAsync(subscription, ackId);
}
+ /**
+ * "Nacks" the current message. This method corresponds to calling
+ * {@link #modifyAckDeadline(int, TimeUnit)} with a deadline of 0.
+ *
+ * @throws PubSubException upon failure, or if the subscription was not found
+ */
public void nack() {
pubsub.nack(subscription, ackId);
}
+ /**
+ * Sends a request to "nack" the current message. This method corresponds to calling
+ * {@link #modifyAckDeadlineAsync(int, TimeUnit)} with a deadline of 0. The method returns a
+ * {@code Future} object that can be used to wait for the "nack" operation to be completed.
+ *
+ * @throws PubSubException upon failure, or if the subscription was not found
+ */
public Future nackAsync() {
return pubsub.nackAsync(subscription, ackId);
}
+ /**
+ * Modifies the acknowledge deadline of the current message. {@code deadline} must be >= 0 and
+ * is the new deadline with respect to the time the modify request was received by the Pub/Sub
+ * service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
+ * the new ack deadline will expire 10 seconds after the modify request was received by the
+ * service. Specifying 0 may be used to make the message available for another pull request
+ * (corresponds to calling {@link #nack()}.
+ *
+ * @param deadline the new deadline, relative to the time the modify request is received by the
+ * Pub/Sub service
+ * @param unit {@code deadline} time unit
+ * @throws PubSubException upon failure, or if the subscription was not found
+ */
public void modifyAckDeadline(int deadline, TimeUnit unit) {
pubsub.modifyAckDeadline(subscription, deadline, unit, ackId);
}
+ /**
+ * Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
+ * must be >= 0 and is the new deadline with respect to the time the modify request was
+ * received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
+ * {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
+ * was received by the service. Specifying 0 may be used to make the message available for another
+ * pull request (corresponds to calling {@link #nackAsync()}. The method returns a {@code Future}
+ * object that can be used to wait for the modify operation to be completed.
+ *
+ * @param deadline the new deadline, relative to the time the modify request is received by the
+ * Pub/Sub service
+ * @param unit {@code deadline} time unit
+ * @throws PubSubException upon failure, or if the subscription was not found
+ */
public Future modifyAckDeadlineAsync(int deadline, TimeUnit unit) {
return pubsub.modifyAckDeadlineAsync(subscription, deadline, unit, ackId);
}
@@ -175,10 +245,10 @@ private void readObject(ObjectInputStream input) throws IOException, ClassNotFou
this.pubsub = options.service();
}
- static ReceivedMessage fromPb(PubSub storage, String subscription,
+ static ReceivedMessage fromPb(PubSub pubsub, String subscription,
com.google.pubsub.v1.ReceivedMessage msgPb) {
Message message = fromPb(msgPb.getMessage());
String ackId = msgPb.getAckId();
- return new Builder(subscription, ackId, storage, new BuilderImpl(message)).build();
+ return new Builder(subscription, ackId, pubsub, new BuilderImpl(message)).build();
}
}
diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/ReceivedMessageTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/ReceivedMessageTest.java
new file mode 100644
index 000000000000..f21fbd106674
--- /dev/null
+++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/ReceivedMessageTest.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2016 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+import com.google.api.client.util.Charsets;
+import com.google.cloud.ByteArray;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class ReceivedMessageTest {
+
+ private static final String SUBSCRIPTION = "subscription";
+ private static final String ACK_ID = "ackId";
+ private static final String MESSAGE_ID = "messageId";
+ private static final String PAYLOAD_STRING = "payload";
+ private static final ByteArray PAYLOAD =
+ ByteArray.copyFrom("payload".getBytes(StandardCharsets.UTF_8));
+ private static final Map ATTRIBUTES =
+ ImmutableMap.of("key1", "value1", "key2", "value2");
+ private static final Long PUBLISH_TIME = 42L;
+ private static final Message MESSAGE = Message.builder(PAYLOAD)
+ .id(MESSAGE_ID)
+ .attributes(ATTRIBUTES)
+ .publishTime(PUBLISH_TIME)
+ .build();
+ private static final com.google.pubsub.v1.ReceivedMessage RECEIVED_MESSAGE_PB =
+ com.google.pubsub.v1.ReceivedMessage.newBuilder()
+ .setMessage(MESSAGE.toPb())
+ .setAckId(ACK_ID)
+ .build();
+
+ private final PubSub serviceMockReturnsOptions = createStrictMock(PubSub.class);
+ private final PubSubOptions mockOptions = createMock(PubSubOptions.class);
+ private PubSub pubsub;
+ private ReceivedMessage expectedMessage;
+ private ReceivedMessage message;
+
+ private void initializeExpectedMessage(int optionsCalls) {
+ expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
+ replay(serviceMockReturnsOptions);
+ pubsub = createStrictMock(PubSub.class);
+ expectedMessage =
+ ReceivedMessage.fromPb(serviceMockReturnsOptions, SUBSCRIPTION, RECEIVED_MESSAGE_PB);
+ }
+
+ private void initializeMessage() {
+ message = ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, RECEIVED_MESSAGE_PB);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ verify(pubsub, serviceMockReturnsOptions);
+ }
+
+ @Test
+ public void testBuilder() {
+ initializeExpectedMessage(3);
+ replay(pubsub);
+ Map attributes = ImmutableMap.of("newKey1", "newVal1");
+ ReceivedMessage builtMessage = expectedMessage.toBuilder()
+ .payload("newPayload")
+ .id("newMessageId")
+ .attributes(attributes)
+ .publishTime(PUBLISH_TIME + 1)
+ .build();
+ assertSame(serviceMockReturnsOptions, builtMessage.pubsub());
+ assertEquals(SUBSCRIPTION, builtMessage.subscription());
+ assertEquals(ACK_ID, builtMessage.ackId());
+ assertEquals("newMessageId", builtMessage.id());
+ assertArrayEquals("newPayload".getBytes(Charsets.UTF_8), builtMessage.payload().toByteArray());
+ assertEquals("newPayload", builtMessage.payloadAsString());
+ assertEquals(attributes, builtMessage.attributes());
+ assertEquals(PUBLISH_TIME + 1, (long) builtMessage.publishTime());
+ builtMessage = builtMessage.toBuilder()
+ .payload(PAYLOAD)
+ .id(MESSAGE_ID)
+ .clearAttributes()
+ .addAttribute("key1", "value1")
+ .addAttribute("key2", "value2")
+ .publishTime(PUBLISH_TIME)
+ .build();
+ assertSame(serviceMockReturnsOptions, builtMessage.pubsub());
+ assertEquals(MESSAGE_ID, builtMessage.id());
+ assertEquals(PAYLOAD, builtMessage.payload());
+ assertEquals(PAYLOAD_STRING, builtMessage.payloadAsString());
+ assertEquals(ATTRIBUTES, builtMessage.attributes());
+ assertEquals(PUBLISH_TIME, builtMessage.publishTime());
+ compareReceivedMessage(expectedMessage, builtMessage);
+ }
+
+ @Test
+ public void testToBuilder() {
+ initializeExpectedMessage(2);
+ replay(pubsub);
+ compareReceivedMessage(expectedMessage, expectedMessage.toBuilder().build());
+ }
+
+ @Test
+ public void testAck() {
+ initializeExpectedMessage(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ pubsub.ack(SUBSCRIPTION, ACK_ID);
+ EasyMock.expectLastCall();
+ replay(pubsub);
+ initializeMessage();
+ message.ack();
+ }
+
+ @Test
+ public void testAckAsync() throws ExecutionException, InterruptedException {
+ initializeExpectedMessage(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID)).andReturn(Futures.immediateFuture(null));
+ EasyMock.expectLastCall();
+ replay(pubsub);
+ initializeMessage();
+ assertNull(message.ackAsync().get());
+ }
+
+ @Test
+ public void testModifyAckDeadline() {
+ initializeExpectedMessage(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, ACK_ID);
+ EasyMock.expectLastCall();
+ replay(pubsub);
+ initializeMessage();
+ message.modifyAckDeadline(10, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testModifyAckDeadlineAsync() throws ExecutionException, InterruptedException {
+ initializeExpectedMessage(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, ACK_ID))
+ .andReturn(Futures.immediateFuture(null));
+ EasyMock.expectLastCall();
+ replay(pubsub);
+ initializeMessage();
+ assertNull(message.modifyAckDeadlineAsync(10, TimeUnit.SECONDS).get());
+ }
+
+ private void compareReceivedMessage(ReceivedMessage expected, ReceivedMessage value) {
+ assertEquals(expected, value);
+ assertEquals(expected.id(), value.id());
+ assertEquals(expected.payload(), value.payload());
+ assertEquals(expected.payloadAsString(), value.payloadAsString());
+ assertEquals(expected.attributes(), value.attributes());
+ assertEquals(expected.publishTime(), value.publishTime());
+ assertEquals(expected.ackId(), value.ackId());
+ assertEquals(expected.subscription(), value.subscription());
+ assertEquals(expected.hashCode(), value.hashCode());
+ }
+}