Skip to content

Commit

Permalink
Add ActionListener.releaseBefore
Browse files Browse the repository at this point in the history
This commit introduces a new helper function, ActionListener.releaseBefore, which ensures that a resource is released before the listener is notified.
  • Loading branch information
fcofdez committed Feb 27, 2025
1 parent 731a412 commit 69ce93e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
19 changes: 19 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,25 @@ static <Response> ActionListener<Response> runBefore(ActionListener<Response> de
return assertOnce(new ActionListenerImplementations.RunBeforeActionListener<>(delegate, runBefore));
}

/**
* Wraps a given listener and returns a new listener which releases the provided {@code releaseBefore}
* resource before the listener is notified via either {@code #onResponse} or {@code #onFailure}.
*/
static <Response> ActionListener<Response> releaseBefore(ActionListener<Response> delegate, Releasable releaseBefore) {
Runnable releasableRunnable = runnableFromReleasable(releaseBefore);
return assertOnce(new ActionListenerImplementations.RunBeforeActionListener<>(delegate, new CheckedRunnable<>() {
@Override
public void run() {
releasableRunnable.run();
}

@Override
public String toString() {
return releasableRunnable.toString();
}
}));
}

/**
* Wraps a given listener and returns a new listener which makes sure {@link #onResponse(Object)}
* and {@link #onFailure(Exception)} of the provided listener will be called at most once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -610,16 +611,26 @@ public String toString() {
);
}

public void testReleaseBefore() {
runReleaseListenerTest(true, false, ActionListener::releaseBefore);
runReleaseListenerTest(true, true, ActionListener::releaseBefore);
runReleaseListenerTest(false, false, ActionListener::releaseBefore);
}

public void testReleaseAfter() {
runReleaseAfterTest(true, false);
runReleaseAfterTest(true, true);
runReleaseAfterTest(false, false);
runReleaseListenerTest(true, false, ActionListener::releaseAfter);
runReleaseListenerTest(true, true, ActionListener::releaseAfter);
runReleaseListenerTest(false, false, ActionListener::releaseAfter);
}

private static void runReleaseAfterTest(boolean successResponse, final boolean throwFromOnResponse) {
private static void runReleaseListenerTest(
boolean successResponse,
final boolean throwFromOnResponse,
BiFunction<ActionListener<Void>, Releasable, ActionListener<Void>> releaseListenerProvider
) {
final AtomicBoolean released = new AtomicBoolean();
final String description = randomAlphaOfLength(10);
final ActionListener<Void> l = ActionListener.releaseAfter(new ActionListener<>() {
final ActionListener<Void> l = releaseListenerProvider.apply(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
if (throwFromOnResponse) {
Expand Down

0 comments on commit 69ce93e

Please sign in to comment.