Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add javadoc and tests for functional ReceivedMessage class #1038

Merged
merged 1 commit into from
Jun 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* a key {@code iana.org/language_tag} and value {@code en} could be added to messages to mark them
* as readable by an English-speaking subscriber.
*
* <p>To be published a message must have a non-empty payload, or at least one attribute.
* <p>To be published, a message must have a non-empty payload, or at least one attribute.
*
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
*/
Expand All @@ -64,11 +64,11 @@ private static final class InternalByteArray extends ByteArray {

private static final long serialVersionUID = -3330181485911805428L;

protected InternalByteArray(ByteString byteString) {
InternalByteArray(ByteString byteString) {
super(byteString);
}

protected InternalByteArray(ByteArray byteArray) {
InternalByteArray(ByteArray byteArray) {
super(byteArray);
}

Expand Down Expand Up @@ -244,17 +244,24 @@ public ByteArray payload() {
return payload;
}

final boolean baseEquals(Message message) {
return Objects.equals(id, message.id)
&& Objects.equals(payload, message.payload)
&& Objects.equals(attributes, message.attributes)
&& Objects.equals(publishTime, message.publishTime);
}

@Override
public boolean equals(Object obj) {
return obj == this
|| obj != null
&& obj.getClass().equals(Message.class)
&& Objects.equals(toPb(), ((Message) obj).toPb());
&& baseEquals((Message) obj);
}

@Override
public int hashCode() {
return Objects.hash(serialVersionUID, id, payload, attributes, publishTime);
return Objects.hash(id, payload, attributes, publishTime);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ReceivedMessage extends Message {
/**
* A Google Cloud Pub/Sub received message. A received message has all the information in
* {@link Message} as well as the acknowledge id. The ack id can be used to acknowledge the received
* message.
*
* <p>{@code ReceivedMessage} also adds a layer of service-related functionality over
* {@link Message} that help manage received messages (see {@link #ack()}, {@link #nack()} and
* {@link #modifyAckDeadline(int, TimeUnit)}).
*/
public final class ReceivedMessage extends Message {

private static final long serialVersionUID = -4178477763916251733L;

Expand Down Expand Up @@ -124,48 +133,109 @@ public int hashCode() {

@Override
public boolean equals(Object obj) {
if (this == obj) {
if (obj == this) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
if (obj == null || !obj.getClass().equals(ReceivedMessage.class)) {
return false;
}
ReceivedMessage other = (ReceivedMessage) obj;
return Objects.equals(toPb(), other.toPb()) && Objects.equals(options, other.options);
return baseEquals(other) && Objects.equals(options, other.options);
}

public PubSub pubSub() {
/**
* Returns the received message's {@code PubSub} object used to issue requests.
*/
public PubSub pubsub() {
return pubsub;
}

/**
* Returns the name of the subscription this message was received from.
*/
public String subscription() {
return subscription;
}

/**
* Returns the acknowledge id of the message. The ack id can be used to acknowledge the received
* message.
*/
public String ackId() {
return ackId;
}

/**
* Acknowledges the current message.
*
* @throws PubSubException upon failure, or if the subscription was not found
*/
public void ack() {
pubsub.ack(subscription, ackId);
}

/**
* Sends a request to acknowledge the current message. The method returns a {@code Future} object
* that can be used to wait for the acknowledge operation to be completed.
*
* @throws PubSubException upon failure, or if the subscription was not found
*/
public Future<Void> ackAsync() {
return pubsub.ackAsync(subscription, ackId);
}

/**
* "Nacks" the current message. This method corresponds to calling
* {@link #modifyAckDeadline(int, TimeUnit)} with a deadline of 0.
*
* @throws PubSubException upon failure, or if the subscription was not found
*/
public void nack() {
pubsub.nack(subscription, ackId);
}

/**
* Sends a request to "nack" the current message. This method corresponds to calling
* {@link #modifyAckDeadlineAsync(int, TimeUnit)} with a deadline of 0. The method returns a
* {@code Future} object that can be used to wait for the "nack" operation to be completed.
*
* @throws PubSubException upon failure, or if the subscription was not found
*/
public Future<Void> nackAsync() {
return pubsub.nackAsync(subscription, ackId);
}

/**
* Modifies the acknowledge deadline of the current message. {@code deadline} must be &gt;= 0 and
* is the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
* (corresponds to calling {@link #nack()}.
*
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit {@code deadline} time unit
* @throws PubSubException upon failure, or if the subscription was not found
*/
public void modifyAckDeadline(int deadline, TimeUnit unit) {
pubsub.modifyAckDeadline(subscription, deadline, unit, ackId);
}

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be &gt;= 0 and is the new deadline with respect to the time the modify request was
* received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync()}. The method returns a {@code Future}
* object that can be used to wait for the modify operation to be completed.
*
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit {@code deadline} time unit
* @throws PubSubException upon failure, or if the subscription was not found
*/
public Future<Void> modifyAckDeadlineAsync(int deadline, TimeUnit unit) {
return pubsub.modifyAckDeadlineAsync(subscription, deadline, unit, ackId);
}
Expand All @@ -175,10 +245,10 @@ private void readObject(ObjectInputStream input) throws IOException, ClassNotFou
this.pubsub = options.service();
}

static ReceivedMessage fromPb(PubSub storage, String subscription,
static ReceivedMessage fromPb(PubSub pubsub, String subscription,
com.google.pubsub.v1.ReceivedMessage msgPb) {
Message message = fromPb(msgPb.getMessage());
String ackId = msgPb.getAckId();
return new Builder(subscription, ackId, storage, new BuilderImpl(message)).build();
return new Builder(subscription, ackId, pubsub, new BuilderImpl(message)).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;

import com.google.api.client.util.Charsets;
import com.google.cloud.ByteArray;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class ReceivedMessageTest {

private static final String SUBSCRIPTION = "subscription";
private static final String ACK_ID = "ackId";
private static final String MESSAGE_ID = "messageId";
private static final String PAYLOAD_STRING = "payload";
private static final ByteArray PAYLOAD =
ByteArray.copyFrom("payload".getBytes(StandardCharsets.UTF_8));
private static final Map<String, String> ATTRIBUTES =
ImmutableMap.of("key1", "value1", "key2", "value2");
private static final Long PUBLISH_TIME = 42L;
private static final Message MESSAGE = Message.builder(PAYLOAD)
.id(MESSAGE_ID)
.attributes(ATTRIBUTES)
.publishTime(PUBLISH_TIME)
.build();
private static final com.google.pubsub.v1.ReceivedMessage RECEIVED_MESSAGE_PB =
com.google.pubsub.v1.ReceivedMessage.newBuilder()
.setMessage(MESSAGE.toPb())
.setAckId(ACK_ID)
.build();

private final PubSub serviceMockReturnsOptions = createStrictMock(PubSub.class);
private final PubSubOptions mockOptions = createMock(PubSubOptions.class);
private PubSub pubsub;
private ReceivedMessage expectedMessage;
private ReceivedMessage message;

private void initializeExpectedMessage(int optionsCalls) {
expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
replay(serviceMockReturnsOptions);
pubsub = createStrictMock(PubSub.class);
expectedMessage =
ReceivedMessage.fromPb(serviceMockReturnsOptions, SUBSCRIPTION, RECEIVED_MESSAGE_PB);
}

private void initializeMessage() {
message = ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, RECEIVED_MESSAGE_PB);
}

@After
public void tearDown() throws Exception {
verify(pubsub, serviceMockReturnsOptions);
}

@Test
public void testBuilder() {
initializeExpectedMessage(3);
replay(pubsub);
Map<String, String> attributes = ImmutableMap.of("newKey1", "newVal1");
ReceivedMessage builtMessage = expectedMessage.toBuilder()
.payload("newPayload")
.id("newMessageId")
.attributes(attributes)
.publishTime(PUBLISH_TIME + 1)
.build();
assertSame(serviceMockReturnsOptions, builtMessage.pubsub());
assertEquals(SUBSCRIPTION, builtMessage.subscription());
assertEquals(ACK_ID, builtMessage.ackId());
assertEquals("newMessageId", builtMessage.id());
assertArrayEquals("newPayload".getBytes(Charsets.UTF_8), builtMessage.payload().toByteArray());
assertEquals("newPayload", builtMessage.payloadAsString());
assertEquals(attributes, builtMessage.attributes());
assertEquals(PUBLISH_TIME + 1, (long) builtMessage.publishTime());
builtMessage = builtMessage.toBuilder()
.payload(PAYLOAD)
.id(MESSAGE_ID)
.clearAttributes()
.addAttribute("key1", "value1")
.addAttribute("key2", "value2")
.publishTime(PUBLISH_TIME)
.build();
assertSame(serviceMockReturnsOptions, builtMessage.pubsub());
assertEquals(MESSAGE_ID, builtMessage.id());
assertEquals(PAYLOAD, builtMessage.payload());
assertEquals(PAYLOAD_STRING, builtMessage.payloadAsString());
assertEquals(ATTRIBUTES, builtMessage.attributes());
assertEquals(PUBLISH_TIME, builtMessage.publishTime());
compareReceivedMessage(expectedMessage, builtMessage);
}

@Test
public void testToBuilder() {
initializeExpectedMessage(2);
replay(pubsub);
compareReceivedMessage(expectedMessage, expectedMessage.toBuilder().build());
}

@Test
public void testAck() {
initializeExpectedMessage(1);
expect(pubsub.options()).andReturn(mockOptions);
pubsub.ack(SUBSCRIPTION, ACK_ID);
EasyMock.expectLastCall();
replay(pubsub);
initializeMessage();
message.ack();
}

@Test
public void testAckAsync() throws ExecutionException, InterruptedException {
initializeExpectedMessage(1);
expect(pubsub.options()).andReturn(mockOptions);
expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID)).andReturn(Futures.<Void>immediateFuture(null));
EasyMock.expectLastCall();
replay(pubsub);
initializeMessage();
assertNull(message.ackAsync().get());
}

@Test
public void testModifyAckDeadline() {
initializeExpectedMessage(1);
expect(pubsub.options()).andReturn(mockOptions);
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, ACK_ID);
EasyMock.expectLastCall();
replay(pubsub);
initializeMessage();
message.modifyAckDeadline(10, TimeUnit.SECONDS);
}

@Test
public void testModifyAckDeadlineAsync() throws ExecutionException, InterruptedException {
initializeExpectedMessage(1);
expect(pubsub.options()).andReturn(mockOptions);
expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, ACK_ID))
.andReturn(Futures.<Void>immediateFuture(null));
EasyMock.expectLastCall();
replay(pubsub);
initializeMessage();
assertNull(message.modifyAckDeadlineAsync(10, TimeUnit.SECONDS).get());
}

private void compareReceivedMessage(ReceivedMessage expected, ReceivedMessage value) {
assertEquals(expected, value);
assertEquals(expected.id(), value.id());
assertEquals(expected.payload(), value.payload());
assertEquals(expected.payloadAsString(), value.payloadAsString());
assertEquals(expected.attributes(), value.attributes());
assertEquals(expected.publishTime(), value.publishTime());
assertEquals(expected.ackId(), value.ackId());
assertEquals(expected.subscription(), value.subscription());
assertEquals(expected.hashCode(), value.hashCode());
}
}