From dd703b2b17fc85a105a03b654c633ad1a52a61b5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 18 Jul 2018 12:47:50 +0100 Subject: [PATCH 1/7] Introduce deterministic task queue The cluster coordination layer relies on timeouts to ensure that a cluster can successfully form, and must also deal with concurrent activity in the cluster. This commit introduces some infrastructure that will help us to deterministically test components that use concurrency and/or timeouts. --- .../cluster/coordination/FutureExecutor.java | 33 ++ .../elasticsearch/threadpool/ThreadPool.java | 9 +- .../coordination/DeterministicTaskQueue.java | 361 ++++++++++++++++++ .../DeterministicTaskQueueTests.java | 280 ++++++++++++++ 4 files changed, 680 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java new file mode 100644 index 0000000000000..67af3f927fed3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.unit.TimeValue; + +/** + * Device which supports running a task after some delay has elapsed. + */ +public interface FutureExecutor { + /** + * Schedule the given task for execution after the given delay has elapsed. + */ + void schedule(TimeValue delay, Runnable task); +} + diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 51a4adec8d16d..18f57b3a99380 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -362,9 +362,13 @@ public Runnable preserveContext(Runnable command) { return getThreadContext().preserveContext(command); } - public void shutdown() { + protected final void stopCachedTimeThread() { cachedTimeThread.running = false; cachedTimeThread.interrupt(); + } + + public void shutdown() { + stopCachedTimeThread(); scheduler.shutdown(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { @@ -374,8 +378,7 @@ public void shutdown() { } public void shutdownNow() { - cachedTimeThread.running = false; - cachedTimeThread.interrupt(); + stopCachedTimeThread(); scheduler.shutdownNow(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java new file mode 100644 index 0000000000000..d33745739a190 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -0,0 +1,361 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; +import org.apache.lucene.util.Counter; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolInfo; +import org.elasticsearch.threadpool.ThreadPoolStats; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class DeterministicTaskQueue extends AbstractComponent { + + private final List runnableTasks = new ArrayList<>(); + private List deferredTasks = new ArrayList<>(); + private long currentTimeMillis; + private long nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; + + public DeterministicTaskQueue(Settings settings) { + super(settings); + } + + /** + * @return whether there are any runnable tasks. + */ + boolean hasRunnableTasks() { + return runnableTasks.isEmpty() == false; + } + + /** + * @return whether there are any deferred tasks, i.e. tasks that are scheduled for the future. + */ + boolean hasDeferredTasks() { + return deferredTasks.isEmpty() == false; + } + + /** + * @return the current (simulated) time, in milliseconds. + */ + long getCurrentTimeMillis() { + return currentTimeMillis; + } + + /** + * Runs the first runnable task. + */ + void runNextTask() { + assert hasRunnableTasks(); + runTask(0); + } + + /** + * Runs an arbitrary runnable task. + */ + void runRandomTask(final Random random) { + assert hasRunnableTasks(); + runTask(RandomNumbers.randomIntBetween(random, 0, runnableTasks.size() - 1)); + } + + private void runTask(final int index) { + final Runnable task = runnableTasks.remove(index); + logger.trace("running task {} of {}: {}", index, runnableTasks.size() + 1, task); + task.run(); + } + + /** + * Schedule a task for immediate execution. + */ + public void scheduleNow(final Runnable task) { + logger.trace("scheduleNow: adding runnable {}", task); + runnableTasks.add(task); + } + + /** + * Schedule a task for future execution. + */ + public void scheduleAt(final long executionTimeMillis, final Runnable task) { + if (executionTimeMillis <= currentTimeMillis) { + logger.trace("scheduleAt: [{}ms] is not in the future, adding runnable {}", executionTimeMillis, task); + runnableTasks.add(task); + } else { + final DeferredTask deferredTask = new DeferredTask(executionTimeMillis, task); + logger.trace("scheduleAt: adding {}", deferredTask); + nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, executionTimeMillis); + deferredTasks.add(deferredTask); + } + } + + /** + * Advance the current time to the time of the next deferred task, and update the sets of deferred and runnable tasks accordingly. + */ + public void advanceTime() { + assert hasDeferredTasks(); + assert currentTimeMillis < nextDeferredTaskExecutionTimeMillis; + + logger.trace("advanceTime: from [{}ms] to [{}ms]", currentTimeMillis, nextDeferredTaskExecutionTimeMillis); + currentTimeMillis = nextDeferredTaskExecutionTimeMillis; + + nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; + List remainingTasks = new ArrayList<>(); + for (final DeferredTask deferredTask : deferredTasks) { + assert currentTimeMillis <= deferredTask.getExecutionTimeMillis(); + if (deferredTask.getExecutionTimeMillis() == currentTimeMillis) { + logger.trace("advanceTime: no longer deferred: {}", deferredTask); + runnableTasks.add(deferredTask.getTask()); + } else { + remainingTasks.add(deferredTask); + nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, deferredTask.getExecutionTimeMillis()); + } + } + deferredTasks = remainingTasks; + + assert deferredTasks.isEmpty() == (nextDeferredTaskExecutionTimeMillis == Long.MAX_VALUE); + } + + /** + * @return A FutureExecutor that uses this task queue. + */ + public FutureExecutor getFutureExecutor() { + return (delay, task) -> scheduleAt(currentTimeMillis + delay.millis(), task); + } + + /** + * @return A ExecutorService that uses this task queue. + */ + public ExecutorService getExecutorService() { + return new ExecutorService() { + + @Override + public void shutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public List shutdownNow() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTerminated() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public Future submit(Callable task) { + throw new UnsupportedOperationException(); + } + + @Override + public Future submit(Runnable task, T result) { + throw new UnsupportedOperationException(); + } + + @Override + public Future submit(Runnable task) { + throw new UnsupportedOperationException(); + } + + @Override + public List> invokeAll(Collection> tasks) { + throw new UnsupportedOperationException(); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks) { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public void execute(Runnable command) { + scheduleNow(command); + } + }; + } + + /** + * @return A ThreadPool that uses this task queue. + */ + public ThreadPool getThreadPool() { + return new ThreadPool(settings) { + + { + stopCachedTimeThread(); + } + + @Override + public long relativeTimeInMillis() { + return currentTimeMillis; + } + + @Override + public long absoluteTimeInMillis() { + return currentTimeMillis; + } + + @Override + public Counter estimatedTimeInMillisCounter() { + return new Counter() { + @Override + public long addAndGet(long delta) { + throw new UnsupportedOperationException(); + } + + @Override + public long get() { + return currentTimeMillis; + } + }; + } + + @Override + public ThreadPoolInfo info() { + throw new UnsupportedOperationException(); + } + + @Override + public Info info(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public ThreadPoolStats stats() { + throw new UnsupportedOperationException(); + } + + @Override + public ExecutorService generic() { + return getExecutorService(); + } + + @Override + public ExecutorService executor(String name) { + return getExecutorService(); + } + + @Override + public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + throw new UnsupportedOperationException(); + } + + @Override + public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) { + throw new UnsupportedOperationException(); + } + + @Override + public Runnable preserveContext(Runnable command) { + throw new UnsupportedOperationException(); + } + + @Override + public void shutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public void shutdownNow() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledExecutorService scheduler() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } + + @Override + public ThreadContext getThreadContext() { + throw new UnsupportedOperationException(); + } + }; + } + + private class DeferredTask { + private final long executionTimeMillis; + private final Runnable task; + + DeferredTask(long executionTimeMillis, Runnable task) { + this.executionTimeMillis = executionTimeMillis; + this.task = task; + assert executionTimeMillis < Long.MAX_VALUE : "Long.MAX_VALUE is special, cannot be an execution time"; + assert currentTimeMillis < executionTimeMillis : executionTimeMillis + " <= " + currentTimeMillis; + } + + long getExecutionTimeMillis() { + return executionTimeMillis; + } + + Runnable getTask() { + return task; + } + + @Override + public String toString() { + return "DeferredTask{" + + "executionTimeMillis=" + executionTimeMillis + + ", task=" + task + + '}'; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java new file mode 100644 index 0000000000000..972c43c374e93 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -0,0 +1,280 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; + +public class DeterministicTaskQueueTests extends ESTestCase { + + public void testRunNextTask() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(2); + + taskQueue.scheduleNow(() -> strings.add("foo")); + taskQueue.scheduleNow(() -> strings.add("bar")); + + assertThat(strings, empty()); + + assertTrue(taskQueue.hasRunnableTasks()); + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + + assertTrue(taskQueue.hasRunnableTasks()); + taskQueue.runNextTask(); + assertThat(strings, contains("foo", "bar")); + + assertFalse(taskQueue.hasRunnableTasks()); + } + + public void testRunRandomTask() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(4); + + taskQueue.scheduleNow(() -> strings.add("foo")); + taskQueue.scheduleNow(() -> strings.add("bar")); + taskQueue.scheduleNow(() -> strings.add("baz")); + taskQueue.scheduleNow(() -> strings.add("quux")); + + assertThat(strings, empty()); + + while (taskQueue.hasRunnableTasks()) { + taskQueue.runRandomTask(random()); + } + + assertThat(strings, containsInAnyOrder("foo", "bar", "baz", "quux")); + } + + public void testStartsAtTimeZero() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); + } + + public void testDoesNotDeferTasksForImmediateExecution() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(1); + + taskQueue.scheduleAt(0, () -> strings.add("foo")); + assertTrue(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + + assertFalse(taskQueue.hasRunnableTasks()); + } + + public void testDoesNotDeferTasksScheduledInThePast() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(1); + + taskQueue.scheduleAt(-1, () -> strings.add("foo")); + assertTrue(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + + assertFalse(taskQueue.hasRunnableTasks()); + } + + public void testDefersTasksWithPositiveDelays() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(1); + + taskQueue.scheduleAt(100, () -> strings.add("foo")); + assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(100L)); + assertTrue(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + + assertFalse(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + } + + public void testKeepsFutureTasksDeferred() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(2); + + taskQueue.scheduleAt(100, () -> strings.add("foo")); + taskQueue.scheduleAt(200, () -> strings.add("bar")); + + assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(100L)); + assertTrue(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(200L)); + assertTrue(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + assertThat(strings, contains("foo", "bar")); + } + + public void testExecutesTasksInTimeOrder() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(3); + + taskQueue.scheduleAt(100, () -> strings.add("foo")); + taskQueue.scheduleAt(200, () -> strings.add("bar")); + + assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(100L)); + assertTrue(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.scheduleAt(150, () -> strings.add("baz")); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(150L)); + assertTrue(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + taskQueue.advanceTime(); + taskQueue.runNextTask(); + assertThat(strings, contains("foo", "baz", "bar")); + assertThat(taskQueue.getCurrentTimeMillis(), is(200L)); + assertFalse(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + } + + public void testExecutorServiceEnqueuesTasks() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(2); + + final ExecutorService executorService = taskQueue.getExecutorService(); + assertFalse(taskQueue.hasRunnableTasks()); + executorService.execute(() -> strings.add("foo")); + assertTrue(taskQueue.hasRunnableTasks()); + executorService.execute(() -> strings.add("bar")); + + assertThat(strings, empty()); + + while (taskQueue.hasRunnableTasks()) { + taskQueue.runRandomTask(random()); + } + + assertThat(strings, containsInAnyOrder("foo", "bar")); + } + + public void testThreadPoolEnqueuesTasks() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(2); + + final ThreadPool threadPool = taskQueue.getThreadPool(); + assertFalse(taskQueue.hasRunnableTasks()); + threadPool.generic().execute(() -> strings.add("foo")); + assertTrue(taskQueue.hasRunnableTasks()); + threadPool.executor("anything").execute(() -> strings.add("bar")); + + assertThat(strings, empty()); + + while (taskQueue.hasRunnableTasks()) { + taskQueue.runRandomTask(random()); + } + + assertThat(strings, containsInAnyOrder("foo", "bar")); + } + + public void testFutureExecutorSchedulesTasks() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(5); + + final FutureExecutor futureExecutor = taskQueue.getFutureExecutor(); + futureExecutor.schedule(TimeValue.timeValueMillis(100), () -> strings.add("deferred")); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + futureExecutor.schedule(TimeValue.ZERO, () -> strings.add("runnable")); + assertTrue(taskQueue.hasRunnableTasks()); + + futureExecutor.schedule(TimeValue.MINUS_ONE, () -> strings.add("also runnable")); + + runAllTasks(taskQueue); + + assertThat(taskQueue.getCurrentTimeMillis(), is(100L)); + assertThat(strings, contains("runnable", "also runnable", "deferred")); + + futureExecutor.schedule(TimeValue.timeValueMillis(100), () -> strings.add("further deferred")); + futureExecutor.schedule(TimeValue.timeValueMillis(50), () -> strings.add("not quite so deferred")); + + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + runAllTasks(taskQueue); + assertThat(taskQueue.getCurrentTimeMillis(), is(200L)); + assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); + } + + private static void runAllTasks(DeterministicTaskQueue taskQueue) { + while (true) { + while (taskQueue.hasRunnableTasks()) { + taskQueue.runNextTask(); + } + if (taskQueue.hasDeferredTasks()) { + taskQueue.advanceTime(); + } else { + break; + } + } + } + + private static DeterministicTaskQueue newTaskQueue() { + return new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build()); + } +} From 5454d0d81ab4260b1e8589f6e98a31161c4863e4 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 20 Jul 2018 14:55:14 +0100 Subject: [PATCH 2/7] Rename variable --- .../cluster/coordination/DeterministicTaskQueue.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index d33745739a190..56ad36fa66c34 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -128,18 +128,18 @@ public void advanceTime() { currentTimeMillis = nextDeferredTaskExecutionTimeMillis; nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; - List remainingTasks = new ArrayList<>(); + List remainingDeferredTasks = new ArrayList<>(); for (final DeferredTask deferredTask : deferredTasks) { assert currentTimeMillis <= deferredTask.getExecutionTimeMillis(); if (deferredTask.getExecutionTimeMillis() == currentTimeMillis) { logger.trace("advanceTime: no longer deferred: {}", deferredTask); runnableTasks.add(deferredTask.getTask()); } else { - remainingTasks.add(deferredTask); + remainingDeferredTasks.add(deferredTask); nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, deferredTask.getExecutionTimeMillis()); } } - deferredTasks = remainingTasks; + deferredTasks = remainingDeferredTasks; assert deferredTasks.isEmpty() == (nextDeferredTaskExecutionTimeMillis == Long.MAX_VALUE); } From d4130120803b6cd2da47f15377d7e50bb4cf50a0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 20 Jul 2018 14:56:58 +0100 Subject: [PATCH 3/7] Static class --- .../cluster/coordination/DeterministicTaskQueue.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 56ad36fa66c34..1e851279ad7c8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -331,7 +331,7 @@ public ThreadContext getThreadContext() { }; } - private class DeferredTask { + private static class DeferredTask { private final long executionTimeMillis; private final Runnable task; @@ -339,7 +339,6 @@ private class DeferredTask { this.executionTimeMillis = executionTimeMillis; this.task = task; assert executionTimeMillis < Long.MAX_VALUE : "Long.MAX_VALUE is special, cannot be an execution time"; - assert currentTimeMillis < executionTimeMillis : executionTimeMillis + " <= " + currentTimeMillis; } long getExecutionTimeMillis() { From 481164e5394ca234482e83f2d12aed1608c3dfcd Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 20 Jul 2018 14:57:13 +0100 Subject: [PATCH 4/7] Behave more randomly --- .../DeterministicTaskQueueTests.java | 67 +++++++++++++------ 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index 972c43c374e93..739c108d805c7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -79,11 +79,22 @@ public void testStartsAtTimeZero() { assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); } + private void advanceToRandomTime(DeterministicTaskQueue taskQueue) { + taskQueue.scheduleAt(randomLongBetween(1, 100), () -> { + }); + taskQueue.advanceTime(); + taskQueue.runNextTask(); + assertFalse(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + } + public void testDoesNotDeferTasksForImmediateExecution() { final DeterministicTaskQueue taskQueue = newTaskQueue(); + advanceToRandomTime(taskQueue); + final List strings = new ArrayList<>(1); - taskQueue.scheduleAt(0, () -> strings.add("foo")); + taskQueue.scheduleAt(taskQueue.getCurrentTimeMillis(), () -> strings.add("foo")); assertTrue(taskQueue.hasRunnableTasks()); assertFalse(taskQueue.hasDeferredTasks()); taskQueue.runNextTask(); @@ -94,9 +105,11 @@ public void testDoesNotDeferTasksForImmediateExecution() { public void testDoesNotDeferTasksScheduledInThePast() { final DeterministicTaskQueue taskQueue = newTaskQueue(); + advanceToRandomTime(taskQueue); + final List strings = new ArrayList<>(1); - taskQueue.scheduleAt(-1, () -> strings.add("foo")); + taskQueue.scheduleAt(taskQueue.getCurrentTimeMillis() - randomInt(200), () -> strings.add("foo")); assertTrue(taskQueue.hasRunnableTasks()); assertFalse(taskQueue.hasDeferredTasks()); taskQueue.runNextTask(); @@ -109,13 +122,14 @@ public void testDefersTasksWithPositiveDelays() { final DeterministicTaskQueue taskQueue = newTaskQueue(); final List strings = new ArrayList<>(1); - taskQueue.scheduleAt(100, () -> strings.add("foo")); + final long executionTimeMillis = randomLongBetween(1, 100); + taskQueue.scheduleAt(executionTimeMillis, () -> strings.add("foo")); assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); taskQueue.advanceTime(); - assertThat(taskQueue.getCurrentTimeMillis(), is(100L)); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis)); assertTrue(taskQueue.hasRunnableTasks()); assertFalse(taskQueue.hasDeferredTasks()); @@ -130,15 +144,18 @@ public void testKeepsFutureTasksDeferred() { final DeterministicTaskQueue taskQueue = newTaskQueue(); final List strings = new ArrayList<>(2); - taskQueue.scheduleAt(100, () -> strings.add("foo")); - taskQueue.scheduleAt(200, () -> strings.add("bar")); + final long executionTimeMillis1 = randomLongBetween(1, 100); + final long executionTimeMillis2 = randomLongBetween(executionTimeMillis1 + 1, 200); + + taskQueue.scheduleAt(executionTimeMillis1, () -> strings.add("foo")); + taskQueue.scheduleAt(executionTimeMillis2, () -> strings.add("bar")); assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); taskQueue.advanceTime(); - assertThat(taskQueue.getCurrentTimeMillis(), is(100L)); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis1)); assertTrue(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); @@ -148,7 +165,7 @@ public void testKeepsFutureTasksDeferred() { assertTrue(taskQueue.hasDeferredTasks()); taskQueue.advanceTime(); - assertThat(taskQueue.getCurrentTimeMillis(), is(200L)); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis2)); assertTrue(taskQueue.hasRunnableTasks()); assertFalse(taskQueue.hasDeferredTasks()); @@ -160,15 +177,18 @@ public void testExecutesTasksInTimeOrder() { final DeterministicTaskQueue taskQueue = newTaskQueue(); final List strings = new ArrayList<>(3); - taskQueue.scheduleAt(100, () -> strings.add("foo")); - taskQueue.scheduleAt(200, () -> strings.add("bar")); + final long executionTimeMillis1 = randomLongBetween(1, 100); + final long executionTimeMillis2 = randomLongBetween(executionTimeMillis1 + 100, 300); + + taskQueue.scheduleAt(executionTimeMillis1, () -> strings.add("foo")); + taskQueue.scheduleAt(executionTimeMillis2, () -> strings.add("bar")); assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); taskQueue.advanceTime(); - assertThat(taskQueue.getCurrentTimeMillis(), is(100L)); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis1)); assertTrue(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); @@ -177,10 +197,11 @@ public void testExecutesTasksInTimeOrder() { assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); - taskQueue.scheduleAt(150, () -> strings.add("baz")); + final long executionTimeMillis3 = randomLongBetween(executionTimeMillis1 + 1, executionTimeMillis2 - 1); + taskQueue.scheduleAt(executionTimeMillis3, () -> strings.add("baz")); taskQueue.advanceTime(); - assertThat(taskQueue.getCurrentTimeMillis(), is(150L)); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis3)); assertTrue(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); @@ -188,7 +209,7 @@ public void testExecutesTasksInTimeOrder() { taskQueue.advanceTime(); taskQueue.runNextTask(); assertThat(strings, contains("foo", "baz", "bar")); - assertThat(taskQueue.getCurrentTimeMillis(), is(200L)); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis2)); assertFalse(taskQueue.hasRunnableTasks()); assertFalse(taskQueue.hasDeferredTasks()); } @@ -233,10 +254,15 @@ public void testThreadPoolEnqueuesTasks() { public void testFutureExecutorSchedulesTasks() { final DeterministicTaskQueue taskQueue = newTaskQueue(); + advanceToRandomTime(taskQueue); + final long startTime = taskQueue.getCurrentTimeMillis(); + final List strings = new ArrayList<>(5); final FutureExecutor futureExecutor = taskQueue.getFutureExecutor(); - futureExecutor.schedule(TimeValue.timeValueMillis(100), () -> strings.add("deferred")); + final long delayMillis = randomLongBetween(1, 100); + + futureExecutor.schedule(TimeValue.timeValueMillis(delayMillis), () -> strings.add("deferred")); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); @@ -247,17 +273,20 @@ public void testFutureExecutorSchedulesTasks() { runAllTasks(taskQueue); - assertThat(taskQueue.getCurrentTimeMillis(), is(100L)); + assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis)); assertThat(strings, contains("runnable", "also runnable", "deferred")); - futureExecutor.schedule(TimeValue.timeValueMillis(100), () -> strings.add("further deferred")); - futureExecutor.schedule(TimeValue.timeValueMillis(50), () -> strings.add("not quite so deferred")); + final long delayMillis1 = randomLongBetween(2, 100); + final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1); + + futureExecutor.schedule(TimeValue.timeValueMillis(delayMillis1), () -> strings.add("further deferred")); + futureExecutor.schedule(TimeValue.timeValueMillis(delayMillis2), () -> strings.add("not quite so deferred")); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); runAllTasks(taskQueue); - assertThat(taskQueue.getCurrentTimeMillis(), is(200L)); + assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1)); assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); } From d3465ab0db6221418c7739057a2c77535200bdb5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 20 Jul 2018 14:57:51 +0100 Subject: [PATCH 5/7] Reorder arguments to FutureExecutor#schedule --- .../cluster/coordination/FutureExecutor.java | 2 +- .../cluster/coordination/DeterministicTaskQueue.java | 2 +- .../coordination/DeterministicTaskQueueTests.java | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java index 67af3f927fed3..b1e9a14d7f198 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java @@ -28,6 +28,6 @@ public interface FutureExecutor { /** * Schedule the given task for execution after the given delay has elapsed. */ - void schedule(TimeValue delay, Runnable task); + void schedule(Runnable task, TimeValue delay); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 1e851279ad7c8..f9f9b0f1ce6f4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -148,7 +148,7 @@ public void advanceTime() { * @return A FutureExecutor that uses this task queue. */ public FutureExecutor getFutureExecutor() { - return (delay, task) -> scheduleAt(currentTimeMillis + delay.millis(), task); + return (task, delay) -> scheduleAt(currentTimeMillis + delay.millis(), task); } /** diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index 739c108d805c7..a532eb99d45f7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -262,14 +262,14 @@ public void testFutureExecutorSchedulesTasks() { final FutureExecutor futureExecutor = taskQueue.getFutureExecutor(); final long delayMillis = randomLongBetween(1, 100); - futureExecutor.schedule(TimeValue.timeValueMillis(delayMillis), () -> strings.add("deferred")); + futureExecutor.schedule(() -> strings.add("deferred"), TimeValue.timeValueMillis(delayMillis)); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); - futureExecutor.schedule(TimeValue.ZERO, () -> strings.add("runnable")); + futureExecutor.schedule(() -> strings.add("runnable"), TimeValue.ZERO); assertTrue(taskQueue.hasRunnableTasks()); - futureExecutor.schedule(TimeValue.MINUS_ONE, () -> strings.add("also runnable")); + futureExecutor.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE); runAllTasks(taskQueue); @@ -279,8 +279,8 @@ public void testFutureExecutorSchedulesTasks() { final long delayMillis1 = randomLongBetween(2, 100); final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1); - futureExecutor.schedule(TimeValue.timeValueMillis(delayMillis1), () -> strings.add("further deferred")); - futureExecutor.schedule(TimeValue.timeValueMillis(delayMillis2), () -> strings.add("not quite so deferred")); + futureExecutor.schedule(() -> strings.add("further deferred"), TimeValue.timeValueMillis(delayMillis1)); + futureExecutor.schedule(() -> strings.add("not quite so deferred"), TimeValue.timeValueMillis(delayMillis2)); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); From 7c48ceffe43a70e06c5dbee871e39c000689830c Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 20 Jul 2018 15:15:59 +0100 Subject: [PATCH 6/7] Check that runRandomTask() runs the tasks in different orders sometimes --- .../coordination/DeterministicTaskQueueTests.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index a532eb99d45f7..adba2bcc8a0e8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -26,12 +26,15 @@ import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.ExecutorService; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.core.Is.is; public class DeterministicTaskQueueTests extends ESTestCase { @@ -57,6 +60,12 @@ public void testRunNextTask() { } public void testRunRandomTask() { + final List strings1 = getResultsOfRunningRandomly(new Random(4520795446362137264L)); + final List strings2 = getResultsOfRunningRandomly(new Random(266504691902226821L)); + assertThat(strings1, not(equalTo(strings2))); + } + + private List getResultsOfRunningRandomly(Random random) { final DeterministicTaskQueue taskQueue = newTaskQueue(); final List strings = new ArrayList<>(4); @@ -68,10 +77,11 @@ public void testRunRandomTask() { assertThat(strings, empty()); while (taskQueue.hasRunnableTasks()) { - taskQueue.runRandomTask(random()); + taskQueue.runRandomTask(random); } assertThat(strings, containsInAnyOrder("foo", "bar", "baz", "quux")); + return strings; } public void testStartsAtTimeZero() { From e53c2d6e05bb330f50b1e103850a53abcfa77e59 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 20 Jul 2018 16:00:26 +0100 Subject: [PATCH 7/7] Make methods public --- .../cluster/coordination/DeterministicTaskQueue.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index f9f9b0f1ce6f4..6afdf81a8b45f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -54,28 +54,28 @@ public DeterministicTaskQueue(Settings settings) { /** * @return whether there are any runnable tasks. */ - boolean hasRunnableTasks() { + public boolean hasRunnableTasks() { return runnableTasks.isEmpty() == false; } /** * @return whether there are any deferred tasks, i.e. tasks that are scheduled for the future. */ - boolean hasDeferredTasks() { + public boolean hasDeferredTasks() { return deferredTasks.isEmpty() == false; } /** * @return the current (simulated) time, in milliseconds. */ - long getCurrentTimeMillis() { + public long getCurrentTimeMillis() { return currentTimeMillis; } /** * Runs the first runnable task. */ - void runNextTask() { + public void runNextTask() { assert hasRunnableTasks(); runTask(0); } @@ -83,7 +83,7 @@ void runNextTask() { /** * Runs an arbitrary runnable task. */ - void runRandomTask(final Random random) { + public void runRandomTask(final Random random) { assert hasRunnableTasks(); runTask(RandomNumbers.randomIntBetween(random, 0, runnableTasks.size() - 1)); }