From 2d5fb99526312fd4cc343e5fc42acf9932416ea7 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 13 Nov 2018 11:33:36 +0100 Subject: [PATCH] Add RunOnce utility class that executes a Runnable exactly once --- .../common/util/concurrent/RunOnce.java | 52 +++++++++++++++++++ .../shard/IndexShardOperationPermits.java | 25 ++++----- .../test/transport/MockTransportService.java | 13 ++--- .../autodetect/output/FlushListener.java | 21 ++++---- .../autodetect/output/FlushListenerTests.java | 4 +- 5 files changed, 79 insertions(+), 36 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java new file mode 100644 index 0000000000000..a80ac5e78b701 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A class which wraps an existing {@link Runnable} and allows to execute it exactly once + * whatever the number of times the wrapped {@link Runnable#run()} method is called. + */ +public class RunOnce implements Runnable { + + private final AtomicBoolean executed; + private final Runnable delegate; + + public RunOnce(final Runnable delegate) { + this.delegate = Objects.requireNonNull(delegate); + this.executed = new AtomicBoolean(false); + } + + @Override + public void run() { + if (executed.compareAndSet(false, true)) { + delegate.run(); + } + } + + /** + * {@code true} if the {@link RunOnce} has been executed once. + */ + public boolean hasRun() { + return executed.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 67c48c38791f0..d5d0d7f3e9753 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -124,12 +125,12 @@ public void asyncBlockOperations(final ActionListener onAcquired, fi delayOperations(); threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { - final AtomicBoolean released = new AtomicBoolean(false); + final RunOnce released = new RunOnce(() -> releaseDelayedOperations()); @Override public void onFailure(final Exception e) { try { - releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible + released.run(); // resume delayed operations as soon as possible } finally { onAcquired.onFailure(e); } @@ -142,16 +143,10 @@ protected void doRun() throws Exception { try { releasable.close(); } finally { - releaseDelayedOperationsIfNeeded(); + released.run(); } }); } - - private void releaseDelayedOperationsIfNeeded() { - if (released.compareAndSet(false, true)) { - releaseDelayedOperations(); - } - } }); } @@ -173,13 +168,11 @@ private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throw } } if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { - final AtomicBoolean closed = new AtomicBoolean(); - return () -> { - if (closed.compareAndSet(false, true)) { - assert semaphore.availablePermits() == 0; - semaphore.release(TOTAL_PERMITS); - } - }; + final RunOnce release = new RunOnce(() -> { + assert semaphore.availablePermits() == 0; + semaphore.release(TOTAL_PERMITS); + }); + return release::run; } else { throw new TimeoutException("timeout while blocking operations"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 3fc4d030da046..e507d84432bbd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -20,13 +20,13 @@ package org.elasticsearch.test.transport; import com.carrotsearch.randomizedtesting.SysGlobals; -import java.util.concurrent.TimeUnit; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -67,7 +67,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -349,8 +349,7 @@ public void sendRequest(Transport.Connection connection, long requestId, String request.writeTo(bStream); final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput()); - Runnable runnable = new AbstractRunnable() { - AtomicBoolean requestSent = new AtomicBoolean(); + final RunOnce runnable = new RunOnce(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -359,11 +358,9 @@ public void onFailure(Exception e) { @Override protected void doRun() throws IOException { - if (requestSent.compareAndSet(false, true)) { - connection.sendRequest(requestId, action, clonedRequest, options); - } + connection.sendRequest(requestId, action, clonedRequest, options); } - }; + }); // store the request to send it once the rule is cleared. synchronized (this) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java index 1340556fbcdb1..196de0b4125b4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import java.time.Duration; @@ -14,16 +15,22 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; class FlushListener { final ConcurrentMap awaitingFlushed = new ConcurrentHashMap<>(); - final AtomicBoolean cleared = new AtomicBoolean(false); + + final RunOnce onClear = new RunOnce(() -> { + Iterator> latches = awaitingFlushed.entrySet().iterator(); + while (latches.hasNext()) { + latches.next().getValue().latch.countDown(); + latches.remove(); + } + }); @Nullable FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException { - if (cleared.get()) { + if (onClear.hasRun()) { return null; } @@ -49,13 +56,7 @@ void clear(String flushId) { } void clear() { - if (cleared.compareAndSet(false, true)) { - Iterator> latches = awaitingFlushed.entrySet().iterator(); - while (latches.hasNext()) { - latches.next().getValue().latch.countDown(); - latches.remove(); - } - } + onClear.run(); } private static class FlushAcknowledgementHolder { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java index 3bcedb523923e..3343882d581b8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java @@ -60,7 +60,7 @@ public void testClear() throws Exception { } assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size())); assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false)); - assertFalse(listener.cleared.get()); + assertFalse(listener.onClear.hasRun()); listener.clear(); @@ -68,6 +68,6 @@ public void testClear() throws Exception { assertBusy(() -> assertNotNull(f.get())); } assertTrue(listener.awaitingFlushed.isEmpty()); - assertTrue(listener.cleared.get()); + assertTrue(listener.onClear.hasRun()); } }