Skip to content

Commit

Permalink
Allow exchange source continue on failure (elastic#117410)
Browse files Browse the repository at this point in the history
Currently, when an exchange request fails, we stop fetching pages and 
abort the ExchangeSource. However, to support partial_results, we need
to continue fetching pages from other remote sinks despite failures.
This change introduces a failFast flag in ExchangeSource, which enables
the process to continue in case of failures. By default, this flag is
set to true but switches to false when allow_partial_results is enabled.
  • Loading branch information
dnhatn authored Nov 26, 2024
1 parent 0f5eb0c commit fadc752
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
/**
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
* It holds a map of {@link ExchangeSinkHandler} instances for each node in the cluster to serve {@link ExchangeRequest}s
* To connect exchange sources to exchange sinks, use the {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, int)} method.
* To connect exchange sources to exchange sinks, use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)}.
*/
public final class ExchangeService extends AbstractLifecycleComponent {
// TODO: Make this a child action of the data node transport to ensure that exchanges
Expand Down Expand Up @@ -311,7 +311,7 @@ static final class TransportRemoteSink implements RemoteSink {

@Override
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
final long reservedBytes = estimatedPageSizeInBytes.get();
final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
if (reservedBytes > 0) {
// This doesn't fully protect ESQL from OOM, but reduces the likelihood.
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public IsBlockedResult waitForWriting() {
* @param sourceFinished if true, then this handler can finish as sources have enough pages.
* @param listener the listener that will be notified when pages are ready or this handler is finished
* @see RemoteSink
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, int)
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)
*/
public void fetchPageAsync(boolean sourceFinished, ActionListener<ExchangeResponse> listener) {
if (sourceFinished) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,54 @@
/**
* An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s
* and feeds them to its {@link ExchangeSource}, which are created using the {@link #createExchangeSource()}) method.
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, int)}) method.
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, int, ActionListener)}) method.
*
* @see #createExchangeSource()
* @see #addRemoteSink(RemoteSink, int)
* @see #addRemoteSink(RemoteSink, boolean, int, ActionListener)
*/
public final class ExchangeSourceHandler {
private final ExchangeBuffer buffer;
private final Executor fetchExecutor;

private final PendingInstances outstandingSinks;
private final PendingInstances outstandingSources;
// Collect failures that occur while fetching pages from the remote sink with `failFast=true`.
// The exchange source will stop fetching and abort as soon as any failure is added to this failure collector.
// The final failure collected will be notified to callers via the {@code completionListener}.
private final FailureCollector failure = new FailureCollector();

public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
/**
* Creates a new ExchangeSourceHandler.
*
* @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory,
* which could otherwise be allocated for other purposes.
* @param fetchExecutor the executor used to fetch pages.
* @param completionListener a listener that will be notified when the exchange source handler fails or completes
*/
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionListener<Void> completionListener) {
this.buffer = new ExchangeBuffer(maxBufferSize);
this.fetchExecutor = fetchExecutor;
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
buffer.addCompletionListener(ActionListener.running(() -> {
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener).delegateFailure((l, unused) -> {
final Exception e = failure.getFailure();
if (e != null) {
l.onFailure(e);
} else {
l.onResponse(null);
}
});
try (RefCountingListener refs = new RefCountingListener(listener)) {
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
// Create an outstanding instance and then finish to complete the completionListener
// if we haven't registered any instances of exchange sinks or exchange sources before.
pending.trackNewInstance();
pending.completion.addListener(refs.acquire());
pending.finishInstance();
}
}
}));
}

private class ExchangeSourceImpl implements ExchangeSource {
Expand Down Expand Up @@ -89,20 +119,6 @@ public int bufferSize() {
}
}

public void addCompletionListener(ActionListener<Void> listener) {
buffer.addCompletionListener(ActionListener.running(() -> {
try (RefCountingListener refs = new RefCountingListener(listener)) {
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
// Create an outstanding instance and then finish to complete the completionListener
// if we haven't registered any instances of exchange sinks or exchange sources before.
pending.trackNewInstance();
pending.completion.addListener(refs.acquire());
pending.finishInstance();
}
}
}));
}

/**
* Create a new {@link ExchangeSource} for exchanging data
*
Expand Down Expand Up @@ -159,10 +175,14 @@ void exited() {
private final class RemoteSinkFetcher {
private volatile boolean finished = false;
private final RemoteSink remoteSink;
private final boolean failFast;
private final ActionListener<Void> completionListener;

RemoteSinkFetcher(RemoteSink remoteSink) {
RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, ActionListener<Void> completionListener) {
outstandingSinks.trackNewInstance();
this.remoteSink = remoteSink;
this.failFast = failFast;
this.completionListener = completionListener;
}

void fetchPage() {
Expand Down Expand Up @@ -198,15 +218,22 @@ void fetchPage() {
}

void onSinkFailed(Exception e) {
failure.unwrapAndCollect(e);
if (failFast) {
failure.unwrapAndCollect(e);
}
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
onSinkComplete();
if (finished == false) {
finished = true;
outstandingSinks.finishInstance();
completionListener.onFailure(e);
}
}

void onSinkComplete() {
if (finished == false) {
finished = true;
outstandingSinks.finishInstance();
completionListener.onResponse(null);
}
}
}
Expand All @@ -215,23 +242,36 @@ void onSinkComplete() {
* Add a remote sink as a new data source of this handler. The handler will start fetching data from this remote sink intermediately.
*
* @param remoteSink the remote sink
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages. More clients reduce latency,
* but add overhead.
* @param failFast determines how failures in this remote sink are handled:
* - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
* Callers must handle these failures notified via {@code listener}.
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
* Callers can safely ignore failures notified via this listener, as they are collected and
* reported by the exchange source.
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
* More clients reduce latency, but add overhead.
* @param listener a listener that will be notified when the sink fails or completes
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
*/
public void addRemoteSink(RemoteSink remoteSink, int instances) {
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(ActionListener.notifyOnce(listener));
fetchExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
failure.unwrapAndCollect(e);
if (failFast) {
failure.unwrapAndCollect(e);
}
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
sinkListener.onFailure(e);
}

@Override
protected void doRun() {
for (int i = 0; i < instances; i++) {
var fetcher = new RemoteSinkFetcher(remoteSink);
fetcher.fetchPage();
try (RefCountingListener refs = new RefCountingListener(sinkListener)) {
for (int i = 0; i < instances; i++) {
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire());
fetcher.fetchPage();
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,19 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
randomIntBetween(2, 10),
threadPool.relativeTimeInMillisSupplier()
);
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(randomIntBetween(1, 4), threadPool.executor(ESQL_TEST_EXECUTOR));
sourceExchanger.addRemoteSink(sinkExchanger::fetchPageAsync, 1);
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(
randomIntBetween(1, 4),
threadPool.executor(ESQL_TEST_EXECUTOR),
ActionListener.noop()
);
sourceExchanger.addRemoteSink(
sinkExchanger::fetchPageAsync,
randomBoolean(),
1,
ActionListener.<Void>noop().delegateResponse((l, e) -> {
throw new AssertionError("unexpected failure", e);
})
);

Iterator<? extends Operator> intermediateOperatorItr;
int itrSize = (splitInput.size() * 3) + 3; // 3 inter ops per initial source drivers, and 3 per final
Expand Down
Loading

0 comments on commit fadc752

Please sign in to comment.