Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A class which wraps an existing {@link Runnable} and allows to execute it exactly once
* whatever the number of times the wrapped {@link Runnable#run()} method is called.
*/
public class RunOnce implements Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have at least one implementation of this in the codebase: WorkerBulkByScrollTakeState#RunOnce. Can we relocate that code instead? Also, it is lacking tests. I think it could go in separately in a PR that adds tests along side it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #35489


private final AtomicBoolean executed;
private final Runnable delegate;

public RunOnce(final Runnable delegate) {
this.delegate = Objects.requireNonNull(delegate);
this.executed = new AtomicBoolean(false);
}

@Override
public void run() {
if (executed.compareAndSet(false, true)) {
delegate.run();
}
}

/**
* {@code true} if the {@link RunOnce} has been executed once.
*/
public boolean hasRun() {
return executed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -124,12 +125,12 @@ public void asyncBlockOperations(final ActionListener<Releasable> onAcquired, fi
delayOperations();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {

final AtomicBoolean released = new AtomicBoolean(false);
final RunOnce released = new RunOnce(() -> releaseDelayedOperations());

@Override
public void onFailure(final Exception e) {
try {
releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible
released.run(); // resume delayed operations as soon as possible
} finally {
onAcquired.onFailure(e);
}
Expand All @@ -142,16 +143,10 @@ protected void doRun() throws Exception {
try {
releasable.close();
} finally {
releaseDelayedOperationsIfNeeded();
released.run();
}
});
}

private void releaseDelayedOperationsIfNeeded() {
if (released.compareAndSet(false, true)) {
releaseDelayedOperations();
}
}
});
}

Expand All @@ -173,13 +168,11 @@ private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throw
}
}
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
final AtomicBoolean closed = new AtomicBoolean();
return () -> {
if (closed.compareAndSet(false, true)) {
assert semaphore.availablePermits() == 0;
semaphore.release(TOTAL_PERMITS);
}
};
final RunOnce release = new RunOnce(() -> {
assert semaphore.availablePermits() == 0;
semaphore.release(TOTAL_PERMITS);
});
return release::run;
} else {
throw new TimeoutException("timeout while blocking operations");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package org.elasticsearch.test.transport;

import com.carrotsearch.randomizedtesting.SysGlobals;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -67,7 +67,7 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -349,8 +349,7 @@ public void sendRequest(Transport.Connection connection, long requestId, String
request.writeTo(bStream);
final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput());

Runnable runnable = new AbstractRunnable() {
AtomicBoolean requestSent = new AtomicBoolean();
final RunOnce runnable = new RunOnce(new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
Expand All @@ -359,11 +358,9 @@ public void onFailure(Exception e) {

@Override
protected void doRun() throws IOException {
if (requestSent.compareAndSet(false, true)) {
connection.sendRequest(requestId, action, clonedRequest, options);
}
connection.sendRequest(requestId, action, clonedRequest, options);
}
};
});

// store the request to send it once the rule is cleared.
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.output;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;

import java.time.Duration;
Expand All @@ -14,16 +15,22 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class FlushListener {

final ConcurrentMap<String, FlushAcknowledgementHolder> awaitingFlushed = new ConcurrentHashMap<>();
final AtomicBoolean cleared = new AtomicBoolean(false);

final RunOnce onClear = new RunOnce(() -> {
Iterator<ConcurrentMap.Entry<String, FlushAcknowledgementHolder>> latches = awaitingFlushed.entrySet().iterator();
while (latches.hasNext()) {
latches.next().getValue().latch.countDown();
latches.remove();
}
});

@Nullable
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException {
if (cleared.get()) {
if (onClear.hasRun()) {
return null;
}

Expand All @@ -49,13 +56,7 @@ void clear(String flushId) {
}

void clear() {
if (cleared.compareAndSet(false, true)) {
Iterator<ConcurrentMap.Entry<String, FlushAcknowledgementHolder>> latches = awaitingFlushed.entrySet().iterator();
while (latches.hasNext()) {
latches.next().getValue().latch.countDown();
latches.remove();
}
}
onClear.run();
}

private static class FlushAcknowledgementHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ public void testClear() throws Exception {
}
assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));
assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false));
assertFalse(listener.cleared.get());
assertFalse(listener.onClear.hasRun());

listener.clear();

for (AtomicReference<FlushAcknowledgement> f : flushAcknowledgementHolders) {
assertBusy(() -> assertNotNull(f.get()));
}
assertTrue(listener.awaitingFlushed.isEmpty());
assertTrue(listener.cleared.get());
assertTrue(listener.onClear.hasRun());
}
}