From e3a6d2ed173a57949b9b9c77ac37ff2b534edd01 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Fri, 3 Jun 2016 12:42:22 +0200 Subject: [PATCH] Add AckDeadlineRenewer class for automatic ack deadline renewal (#1031) * Add AckDeadlineRenewer class for automatic ack deadline renewal * Refactor renewer tests and wake up renewer only when needed * Skip removed/re-added messages when scheduling new renewal * Better tuning of ack deadline in renewer --- .../cloud/pubsub/AckDeadlineRenewer.java | 317 ++++++++++++++++++ .../google/cloud/pubsub/PubSubOptions.java | 5 + .../cloud/pubsub/AckDeadlineRenewerTest.java | 281 ++++++++++++++++ 3 files changed, 603 insertions(+) create mode 100644 gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java create mode 100644 gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java new file mode 100644 index 000000000000..3bea6440e224 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java @@ -0,0 +1,317 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.ServiceOptions.Clock; +import com.google.common.base.MoreObjects; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Multimaps; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Class for an automatic ack deadline renewer. An ack deadline renewer automatically renews the + * acknowledge deadline of messages added to it (via {@link #add(String, String)} or + * {@link #add(String, Iterable)}. The acknowledge deadlines of added messages are renewed until the + * messages are explicitly removed using {@link #remove(String, String)}. + */ +class AckDeadlineRenewer implements AutoCloseable { + + private static final int MIN_DEADLINE_MILLIS = 10_000; + private static final int DEADLINE_SLACK_MILLIS = 1_000; + private static final int RENEW_THRESHOLD_MILLIS = 3_000; + private static final int NEXT_RENEWAL_THRESHOLD_MILLIS = 1_000; + + private final PubSub pubsub; + private final ScheduledExecutorService executor; + private final ExecutorFactory executorFactory; + private final Clock clock; + private final Queue messageQueue; + private final Map messageDeadlines; + private final Object lock = new Object(); + private final Object futureLock = new Object(); + private Future renewerFuture; + private boolean closed; + + /** + * This class holds the identity of a message to renew: subscription and acknowledge id. + */ + private static class MessageId { + + private final String subscription; + private final String ackId; + + MessageId(String subscription, String ackId) { + this.subscription = subscription; + this.ackId = ackId; + } + + String subscription() { + return subscription; + } + + String ackId() { + return ackId; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof MessageId)) { + return false; + } + MessageId other = (MessageId) obj; + return Objects.equals(other.subscription, this.subscription) + && Objects.equals(other.ackId, this.ackId); + } + + @Override + public int hashCode() { + return Objects.hash(subscription, ackId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("subscription", subscription) + .add("ackId", ackId) + .toString(); + } + } + + /** + * This class holds the identity of a message to renew and its expected ack deadline. + */ + private static final class Message { + + private final MessageId messageId; + private final Long deadline; + + Message(MessageId messageId, Long deadline) { + this.messageId = messageId; + this.deadline = deadline; + } + + MessageId messageId() { + return messageId; + } + + Long expectedDeadline() { + return deadline; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof Message)) { + return false; + } + Message other = (Message) obj; + return Objects.equals(other.messageId, this.messageId) + && Objects.equals(other.deadline, this.deadline); + } + + @Override + public int hashCode() { + return Objects.hash(messageId, deadline); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("messageId", messageId) + .add("expectedDeadline", deadline) + .toString(); + } + } + + AckDeadlineRenewer(PubSub pubsub) { + PubSubOptions options = pubsub.options(); + this.pubsub = pubsub; + this.executorFactory = options.executorFactory(); + this.executor = executorFactory.get(); + this.clock = options.clock(); + this.messageQueue = new LinkedList<>(); + this.messageDeadlines = new HashMap<>(); + } + + private void unsetAndScheduleNextRenewal() { + synchronized (futureLock) { + renewerFuture = null; + scheduleNextRenewal(); + } + } + + private void scheduleNextRenewal() { + // Schedules next renewal if there are still messages to process and no renewals scheduled that + // could handle them, otherwise does nothing + Message nextMessage; + synchronized (lock) { + Message peek = messageQueue.peek(); + // We remove from the queue messages that were removed from the ack deadline renewer (and + // possibly re-added) + while (peek != null && (!messageDeadlines.containsKey(peek.messageId()) + || messageDeadlines.get(peek.messageId()) > peek.expectedDeadline())) { + messageQueue.poll(); + peek = messageQueue.peek(); + } + nextMessage = peek; + } + synchronized (futureLock) { + if (renewerFuture == null && nextMessage != null) { + long delay = + (nextMessage.expectedDeadline() - clock.millis()) - NEXT_RENEWAL_THRESHOLD_MILLIS; + renewerFuture = executor.schedule(new Runnable() { + @Override + public void run() { + renewAckDeadlines(); + } + }, delay, TimeUnit.MILLISECONDS); + } + } + } + + private void renewAckDeadlines() { + ListMultimap messagesToRenewNext = LinkedListMultimap.create(); + // At every activation we renew all ack deadlines that will expire in the following + // RENEW_THRESHOLD_MILLIS + long threshold = clock.millis() + RENEW_THRESHOLD_MILLIS; + Message message; + while ((message = nextMessageToRenew(threshold)) != null) { + // If the expected deadline is null the message was removed and we must ignore it, otherwise + // we renew its ack deadline + if (message.expectedDeadline() != null) { + messagesToRenewNext.put(message.messageId().subscription(), message.messageId().ackId()); + } + } + for (Map.Entry> entry : Multimaps.asMap(messagesToRenewNext).entrySet()) { + // We send all ack deadline renewals for a subscription + pubsub.modifyAckDeadlineAsync(entry.getKey(), MIN_DEADLINE_MILLIS, TimeUnit.MILLISECONDS, + entry.getValue()); + } + unsetAndScheduleNextRenewal(); + } + + private Message nextMessageToRenew(long threshold) { + synchronized (lock) { + Message message = messageQueue.peek(); + // if the queue is empty or the next expected deadline is after threshold we stop + if (message == null || message.expectedDeadline() > threshold) { + return null; + } + MessageId messageId = messageQueue.poll().messageId(); + // Check if the next expected deadline changed. This can happen if the message was removed + // from the ack deadline renewer or if it was nacked and then pulled again + Long deadline = messageDeadlines.get(messageId); + if (deadline == null || deadline > threshold) { + // the message was removed (deadline == null) or removed and then added back + // (deadline > threshold), we should not renew its deadline (yet) + return new Message(messageId, null); + } else { + // Message deadline must be renewed, we must submit it again to the renewer + add(messageId.subscription(), messageId.ackId()); + return new Message(messageId, deadline); + } + } + } + + /** + * Adds a new message for which the acknowledge deadline should be automatically renewed. The + * message is identified by the subscription from which it was pulled and its acknowledge id. + * Auto-renewal will take place until the message is removed (see + * {@link #remove(String, String)}). + * + * @param subscription the subscription from which the message has been pulled + * @param ackId the message's acknowledge id + */ + void add(String subscription, String ackId) { + synchronized (lock) { + long deadline = clock.millis() + MIN_DEADLINE_MILLIS - DEADLINE_SLACK_MILLIS; + Message message = new Message(new MessageId(subscription, ackId), deadline); + messageQueue.add(message); + messageDeadlines.put(message.messageId(), deadline); + } + scheduleNextRenewal(); + } + + /** + * Adds new messages for which the acknowledge deadlined should be automatically renewed. The + * messages are identified by the subscription from which they were pulled and their + * acknowledge id. Auto-renewal will take place until the messages are removed (see + * {@link #remove(String, String)}). + * + * @param subscription the subscription from which the messages have been pulled + * @param ackIds the acknowledge ids of the messages + */ + void add(String subscription, Iterable ackIds) { + synchronized (lock) { + long deadline = clock.millis() + MIN_DEADLINE_MILLIS - DEADLINE_SLACK_MILLIS; + for (String ackId : ackIds) { + Message message = new Message(new MessageId(subscription, ackId), deadline); + messageQueue.add(message); + messageDeadlines.put(message.messageId(), deadline); + } + } + scheduleNextRenewal(); + } + + /** + * Removes a message from this {@code AckDeadlineRenewer}. The message is identified by the + * subscription from which it was pulled and its acknowledge id. Once the message is removed from + * this {@code AckDeadlineRenewer}, automated ack deadline renewals will stop. + * + * @param subscription the subscription from which the message has been pulled + * @param ackId the message's acknowledge id + */ + void remove(String subscription, String ackId) { + synchronized (lock) { + messageDeadlines.remove(new MessageId(subscription, ackId)); + } + } + + @Override + public void close() throws Exception { + if (closed) { + return; + } + closed = true; + synchronized (lock) { + messageDeadlines.clear(); + messageQueue.clear(); + } + synchronized (futureLock) { + if (renewerFuture != null) { + renewerFuture.cancel(true); + } + } + executorFactory.release(executor); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java index 420b0d50148b..87a25bf0dc36 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -85,6 +85,11 @@ protected PubSubOptions(Builder builder) { super(PubSubFactory.class, PubSubRpcFactory.class, builder); } + @Override + protected ExecutorFactory executorFactory() { + return super.executorFactory(); + } + @Override protected PubSubFactory defaultServiceFactory() { return DefaultPubSubFactory.INSTANCE; diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java new file mode 100644 index 000000000000..07cc70c6e30d --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java @@ -0,0 +1,281 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static org.junit.Assert.assertTrue; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.common.collect.ImmutableList; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class AckDeadlineRenewerTest { + + private static final int MIN_DEADLINE_MILLIS = 10_000; + + private static final String SUBSCRIPTION1 = "subscription1"; + private static final String SUBSCRIPTION2 = "subscription2"; + private static final String ACK_ID1 = "ack-id1"; + private static final String ACK_ID2 = "ack-id2"; + private static final String ACK_ID3 = "ack-id3"; + + private PubSub pubsub; + private AckDeadlineRenewer ackDeadlineRenewer; + + @Before + public void setUp() { + pubsub = EasyMock.createStrictMock(PubSub.class); + PubSubOptions options = PubSubOptions.builder() + .projectId("projectId") + .build(); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.replay(pubsub); + ackDeadlineRenewer = new AckDeadlineRenewer(pubsub); + } + + @After + public void tearDown() throws Exception { + EasyMock.verify(pubsub); + ackDeadlineRenewer.close(); + } + + private static IAnswer> createAnswer(final CountDownLatch latch, + final AtomicLong renewal) { + return new IAnswer>() { + @Override + public Future answer() throws Throwable { + latch.countDown(); + renewal.set(System.currentTimeMillis()); + return null; + } + }; + } + + @Test + public void testAddOneMessage() throws InterruptedException { + EasyMock.reset(pubsub); + final CountDownLatch firstLatch = new CountDownLatch(1); + final CountDownLatch secondLatch = new CountDownLatch(1); + final AtomicLong firstRenewal = new AtomicLong(); + final AtomicLong secondRenewal = new AtomicLong(); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(firstLatch, firstRenewal)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(secondLatch, secondRenewal)); + EasyMock.replay(pubsub); + long addTime = System.currentTimeMillis(); + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + firstLatch.await(); + assertTrue(firstRenewal.get() < (addTime + MIN_DEADLINE_MILLIS)); + secondLatch.await(); + assertTrue(secondRenewal.get() < (firstRenewal.get() + MIN_DEADLINE_MILLIS)); + } + + @Test + public void testAddMessages() throws InterruptedException { + EasyMock.reset(pubsub); + final CountDownLatch firstLatch = new CountDownLatch(2); + final CountDownLatch secondLatch = new CountDownLatch(2); + final AtomicLong firstRenewalSub1 = new AtomicLong(); + final AtomicLong firstRenewalSub2 = new AtomicLong(); + final AtomicLong secondRenewalSub1 = new AtomicLong(); + final AtomicLong secondRenewalSub2 = new AtomicLong(); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))) + .andAnswer(createAnswer(firstLatch, firstRenewalSub1)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(firstLatch, firstRenewalSub2)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))) + .andAnswer(createAnswer(secondLatch, secondRenewalSub1)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID3))) + .andAnswer(createAnswer(secondLatch, secondRenewalSub2)); + EasyMock.replay(pubsub); + long addTime1 = System.currentTimeMillis(); + ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2)); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + firstLatch.await(); + assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS)); + assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS)); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID3); + secondLatch.await(); + assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS)); + assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS)); + } + + @Test + public void testAddExistingMessage() throws InterruptedException { + EasyMock.reset(pubsub); + final CountDownLatch firstLatch = new CountDownLatch(2); + final CountDownLatch secondLatch = new CountDownLatch(2); + final AtomicLong firstRenewalSub1 = new AtomicLong(); + final AtomicLong firstRenewalSub2 = new AtomicLong(); + final AtomicLong secondRenewalSub1 = new AtomicLong(); + final AtomicLong secondRenewalSub2 = new AtomicLong(); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))) + .andAnswer(createAnswer(firstLatch, firstRenewalSub1)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(firstLatch, firstRenewalSub2)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))) + .andAnswer(createAnswer(secondLatch, secondRenewalSub1)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(secondLatch, secondRenewalSub2)); + EasyMock.replay(pubsub); + long addTime1 = System.currentTimeMillis(); + ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2)); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + firstLatch.await(); + assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS)); + assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS)); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + secondLatch.await(); + assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS)); + assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS)); + } + + @Test + public void testRemoveNonExistingMessage() throws InterruptedException { + EasyMock.reset(pubsub); + final CountDownLatch firstLatch = new CountDownLatch(2); + final CountDownLatch secondLatch = new CountDownLatch(2); + final AtomicLong firstRenewalSub1 = new AtomicLong(); + final AtomicLong firstRenewalSub2 = new AtomicLong(); + final AtomicLong secondRenewalSub1 = new AtomicLong(); + final AtomicLong secondRenewalSub2 = new AtomicLong(); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))) + .andAnswer(createAnswer(firstLatch, firstRenewalSub1)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(firstLatch, firstRenewalSub2)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))) + .andAnswer(createAnswer(secondLatch, secondRenewalSub1)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(secondLatch, secondRenewalSub2)); + EasyMock.replay(pubsub); + long addTime1 = System.currentTimeMillis(); + ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2)); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + firstLatch.await(); + assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS)); + assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS)); + ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID3); + secondLatch.await(); + assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS)); + assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS)); + } + + @Test + public void testRemoveMessage() throws InterruptedException { + EasyMock.reset(pubsub); + final CountDownLatch firstLatch = new CountDownLatch(2); + final CountDownLatch secondLatch = new CountDownLatch(2); + final AtomicLong firstRenewalSub1 = new AtomicLong(); + final AtomicLong firstRenewalSub2 = new AtomicLong(); + final AtomicLong secondRenewalSub1 = new AtomicLong(); + final AtomicLong secondRenewalSub2 = new AtomicLong(); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))) + .andAnswer(createAnswer(firstLatch, firstRenewalSub1)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(firstLatch, firstRenewalSub2)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(secondLatch, secondRenewalSub1)); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))) + .andAnswer(createAnswer(secondLatch, secondRenewalSub2)); + EasyMock.replay(pubsub); + long addTime1 = System.currentTimeMillis(); + ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2)); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + firstLatch.await(); + assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS)); + assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS)); + ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID2); + secondLatch.await(); + assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS)); + assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS)); + } + + @Test + @SuppressWarnings("unchecked") + public void testClose() throws Exception { + PubSub pubsub = EasyMock.createStrictMock(PubSub.class); + ScheduledExecutorService executor = EasyMock.createStrictMock(ScheduledExecutorService.class); + ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class); + EasyMock.expect(executorFactory.get()).andReturn(executor); + PubSubOptions options = PubSubOptions.builder() + .projectId("projectId") + .executorFactory(executorFactory) + .build(); + EasyMock.expect(pubsub.options()).andReturn(options); + executorFactory.release(executor); + EasyMock.expectLastCall(); + EasyMock.replay(executor, executorFactory, pubsub); + AckDeadlineRenewer ackDeadlineRenewer = new AckDeadlineRenewer(pubsub); + ackDeadlineRenewer.close(); + EasyMock.verify(pubsub, executor, executorFactory); + } + + @Test + @SuppressWarnings("unchecked") + public void testCloseWithMessage() throws Exception { + PubSub pubsub = EasyMock.createStrictMock(PubSub.class); + ScheduledExecutorService executor = EasyMock.createStrictMock(ScheduledExecutorService.class); + ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class); + EasyMock.expect(executorFactory.get()).andReturn(executor); + ScheduledFuture future = EasyMock.createStrictMock(ScheduledFuture.class); + EasyMock.expect(executor.schedule(EasyMock.anyObject(), EasyMock.anyLong(), + EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(future); + PubSubOptions options = PubSubOptions.builder() + .projectId("projectId") + .executorFactory(executorFactory) + .build(); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(future.cancel(true)).andReturn(true); + executorFactory.release(executor); + EasyMock.expectLastCall(); + EasyMock.replay(executor, executorFactory, future, pubsub); + AckDeadlineRenewer ackDeadlineRenewer = new AckDeadlineRenewer(pubsub); + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + ackDeadlineRenewer.close(); + EasyMock.verify(pubsub, executor, executorFactory, future); + } +}