Skip to content

Commit

Permalink
Using fake clock in AckDeadlineRenewerTest (#1413)
Browse files Browse the repository at this point in the history
This reduces the running time of the test from 3-4 minutes
down to 0.03 seconds.
  • Loading branch information
garrettjonesgoogle authored Nov 21, 2016
1 parent 3f3e7c0 commit 1a8a438
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@

import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
import com.google.common.collect.ImmutableList;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.After;
Expand All @@ -29,16 +35,10 @@
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class AckDeadlineRenewerTest {

private static final int MIN_DEADLINE_MILLIS = 10_000;
private static final int TIME_ADVANCE = 9_000;

private static final String SUBSCRIPTION1 = "subscription1";
private static final String SUBSCRIPTION2 = "subscription2";
Expand All @@ -47,16 +47,31 @@ public class AckDeadlineRenewerTest {
private static final String ACK_ID3 = "ack-id3";

private PubSub pubsub;
private FakeScheduledExecutorService executorService;
private AckDeadlineRenewer ackDeadlineRenewer;
private final FakeClock clock = new FakeClock();

@Rule
public Timeout globalTimeout = Timeout.seconds(60);

@Before
public void setUp() {
pubsub = EasyMock.createStrictMock(PubSub.class);
executorService = new FakeScheduledExecutorService(4, clock);
ExecutorFactory executorFactory = new ExecutorFactory() {
@Override
public ExecutorService get() {
return executorService;
}
@Override
public void release(ExecutorService executor) {
executorService.shutdown();
}
};
PubSubOptions options = PubSubOptions.newBuilder()
.setProjectId("projectId")
.setExecutorFactory(executorFactory)
.setClock(clock)
.build();
EasyMock.expect(pubsub.getOptions()).andReturn(options);
EasyMock.replay(pubsub);
Expand All @@ -69,13 +84,13 @@ public void tearDown() throws Exception {
ackDeadlineRenewer.close();
}

private static IAnswer<Future<Void>> createAnswer(final CountDownLatch latch,
private IAnswer<Future<Void>> createAnswer(final CountDownLatch latch,
final AtomicLong renewal) {
return new IAnswer<Future<Void>>() {
@Override
public Future<Void> answer() throws Throwable {
latch.countDown();
renewal.set(System.currentTimeMillis());
renewal.set(clock.millis());
return null;
}
};
Expand All @@ -95,10 +110,12 @@ public void testAddOneMessage() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
.andAnswer(createAnswer(secondLatch, secondRenewal));
EasyMock.replay(pubsub);
long addTime = System.currentTimeMillis();
long addTime = clock.millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
firstLatch.await();
assertTrue(firstRenewal.get() < (addTime + MIN_DEADLINE_MILLIS));
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
secondLatch.await();
assertTrue(secondRenewal.get() < (firstRenewal.get() + MIN_DEADLINE_MILLIS));
}
Expand All @@ -125,13 +142,15 @@ public void testAddMessages() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID3)))
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
EasyMock.replay(pubsub);
long addTime1 = System.currentTimeMillis();
long addTime1 = clock.millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
firstLatch.await();
assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID3);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
secondLatch.await();
assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
Expand Down Expand Up @@ -159,13 +178,15 @@ public void testAddExistingMessage() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
EasyMock.replay(pubsub);
long addTime1 = System.currentTimeMillis();
long addTime1 = clock.millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
firstLatch.await();
assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
secondLatch.await();
assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
Expand Down Expand Up @@ -193,13 +214,15 @@ public void testRemoveNonExistingMessage() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
EasyMock.replay(pubsub);
long addTime1 = System.currentTimeMillis();
long addTime1 = clock.millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
firstLatch.await();
assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID3);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
secondLatch.await();
assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
Expand Down Expand Up @@ -227,13 +250,15 @@ public void testRemoveMessage() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
EasyMock.replay(pubsub);
long addTime1 = System.currentTimeMillis();
long addTime1 = clock.millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
firstLatch.await();
assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID2);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
secondLatch.await();
assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import com.google.cloud.Clock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* A Clock to help with testing time-based logic.
*/
class FakeClock extends Clock {

private final AtomicLong millis = new AtomicLong();

// Advances the clock value by {@code time} in {@code timeUnit}.
void advance(long time, TimeUnit timeUnit) {
millis.addAndGet(timeUnit.toMillis(time));
}

@Override
public long millis() {
return millis.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A ScheduledExecutorService to help with testing.
*/
class FakeScheduledExecutorService extends ScheduledThreadPoolExecutor {
private final FakeClock clock;
private final List<FakeScheduledFuture> futures = new ArrayList<>();

public FakeScheduledExecutorService(int corePoolSize, FakeClock clock) {
super(corePoolSize);
this.clock = clock;
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
synchronized (this) {
long runAtMillis = clock.millis() + unit.toMillis(delay);
FakeScheduledFuture scheduledFuture =
new FakeScheduledFuture(command, delay, unit, runAtMillis);
futures.add(scheduledFuture);
return scheduledFuture;
}
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
throw new UnsupportedOperationException(
"FakeScheduledExecutorService.schedule(Callable, long, TimeUnit) not supported");
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
throw new UnsupportedOperationException(
"FakeScheduledExecutorService.scheduleAtFixedRate not supported");
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
throw new UnsupportedOperationException(
"FakeScheduledExecutorService.scheduleAtFixedRate not supported");
}

public void tick(long time, TimeUnit unit) {
List<FakeScheduledFuture> runnablesToGo = new ArrayList<>();
synchronized (this) {
clock.advance(time, unit);
Iterator<FakeScheduledFuture> iter = futures.iterator();
while (iter.hasNext()) {
FakeScheduledFuture scheduledFuture = iter.next();
if (scheduledFuture.runAtMillis <= clock.millis()) {
runnablesToGo.add(scheduledFuture);
iter.remove();
}
}
}
for (FakeScheduledFuture scheduledFuture : runnablesToGo) {
scheduledFuture.run();
}
}

private boolean cancel(FakeScheduledFuture toCancel) {
synchronized (this) {
Iterator<FakeScheduledFuture> iter = futures.iterator();
while (iter.hasNext()) {
FakeScheduledFuture scheduledFuture = iter.next();
if (scheduledFuture == toCancel) {
iter.remove();
return true;
}
}
return false;
}
}

private class FakeScheduledFuture implements ScheduledFuture<Object> {
final Callable<Object> callable;
final long delay;
final TimeUnit unit;
final long runAtMillis;

volatile boolean isDone;
volatile boolean isCancelled;
volatile Exception exception;
volatile Object result;

FakeScheduledFuture(Runnable runnable, long delay, TimeUnit unit, long runAtMillis) {
this.callable = Executors.callable(runnable);
this.delay = delay;
this.unit = unit;
this.runAtMillis = runAtMillis;
}

@Override
public long getDelay(TimeUnit requestedUnit) {
return unit.convert(delay, requestedUnit);
}

@Override
public int compareTo(Delayed other) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
}

@Override
public boolean cancel(boolean var1) {
if (isCancelled) {
return isCancelled;
}
isCancelled = FakeScheduledExecutorService.this.cancel(this);
return isCancelled;
}

@Override
public boolean isCancelled() {
return isCancelled;
}

@Override
public boolean isDone() {
return isDone;
}

@Override
public Object get() throws InterruptedException, ExecutionException {
if (!isDone()) {
throw new UnsupportedOperationException("FakeScheduledFuture: blocking get not supported");
}

if (exception != null) {
throw new ExecutionException(exception);
}

return result;
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return get();
}

public void run() {
if (isDone()) {
throw new UnsupportedOperationException("FakeScheduledFuture already done.");
}

try {
result = callable.call();
} catch (Exception e) {
exception = e;
}

isDone = true;
}
}
}

0 comments on commit 1a8a438

Please sign in to comment.