Skip to content

Commit

Permalink
Introduce Delegating ActionListener Wrappers (elastic#40129)
Browse files Browse the repository at this point in the history
* Introduce Delegating ActionListener Wrappers
* Dry up use cases of ActionListener that simply pass through the response or exception to another listener
  • Loading branch information
original-brownbear authored and Gurkan Kaymak committed May 27, 2019
1 parent f77b890 commit 1b3bb6a
Show file tree
Hide file tree
Showing 24 changed files with 206 additions and 432 deletions.
47 changes: 47 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,53 @@ public void onFailure(Exception e) {
};
}

/**
* Creates a listener that delegates all responses it receives to another listener.
*
* @param delegate ActionListener to wrap and delegate any exception to
* @param bc BiConsumer invoked with delegate listener and exception
* @param <T> Type of the listener
* @return Delegating listener
*/
static <T> ActionListener<T> delegateResponse(ActionListener<T> delegate, BiConsumer<ActionListener<T>, Exception> bc) {
return new ActionListener<T>() {

@Override
public void onResponse(T r) {
delegate.onResponse(r);
}

@Override
public void onFailure(Exception e) {
bc.accept(delegate, e);
}
};
}

/**
* Creates a listener that delegates all exceptions it receives to another listener.
*
* @param delegate ActionListener to wrap and delegate any exception to
* @param bc BiConsumer invoked with delegate listener and response
* @param <T> Type of the delegating listener's response
* @param <R> Type of the wrapped listeners
* @return Delegating listener
*/
static <T, R> ActionListener<T> delegateFailure(ActionListener<R> delegate, BiConsumer<ActionListener<R>, T> bc) {
return new ActionListener<T>() {

@Override
public void onResponse(T r) {
bc.accept(delegate, r);
}

@Override
public void onFailure(Exception e) {
delegate.onFailure(e);
}
};
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
protected void doRun() {
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
}
Expand All @@ -180,26 +180,17 @@ public void onFailure(Exception e) {
*/
void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask,
ActionListener<GetTaskResponse> listener) {
getFinishedTaskFromIndex(thisTask, request, new ActionListener<GetTaskResponse>() {
@Override
public void onResponse(GetTaskResponse response) {
// We were able to load the task from the task index. Let's send that back.
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (delegatedListener, e) -> {
/*
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
* the error isn't a 404 then we'll just throw it back to the user.
*/
if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
listener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
delegatedListener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
} else {
listener.onFailure(e);
delegatedListener.onFailure(e);
}
}
});
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -69,17 +68,8 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus
protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.unregisterRepository(
request,
new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) {
listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
request, ActionListener.delegateFailure(listener,
(delegatedListener, unregisterRepositoryResponse) ->
delegatedListener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -68,17 +67,7 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster
@Override
protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.registerRepository(request, new ActionListener<ClusterStateUpdateResponse>() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener,
(delegatedListener, response) -> delegatedListener.onResponse(new AcknowledgedResponse(response.isAcknowledged()))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;

/**
* Transport action for verifying repository operation
*/
Expand Down Expand Up @@ -70,16 +68,8 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus
@Override
protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
final ActionListener<VerifyRepositoryResponse> listener) {
repositoriesService.verifyRepository(request.name(), new ActionListener<List<DiscoveryNode>>() {
@Override
public void onResponse(List<DiscoveryNode> verifyResponse) {
listener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener,
(delegatedListener, verifyResponse) ->
delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -75,20 +74,13 @@ protected ClusterBlockException checkBlock(RestoreSnapshotRequest request, Clust
@Override
protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state,
final ActionListener<RestoreSnapshotResponse> listener) {
restoreService.restoreSnapshot(request, new ActionListener<RestoreCompletionResponse>() {
@Override
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener,
(delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener);
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
} else {
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
}

@Override
public void onFailure(Exception t) {
listener.onFailure(t);
}
});
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,9 @@ protected void masterOperation(final Task task,
.masterNodeTimeout(request.masterNodeTimeout())
.waitForActiveShards(request.waitForActiveShards())
.indices(concreteIndices);

indexStateService.closeIndices(closeRequest, new ActionListener<CloseIndexResponse>() {

@Override
public void onResponse(final CloseIndexResponse response) {
listener.onResponse(response);
}

@Override
public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}
});
indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
delegatedListener.onFailure(t);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -97,25 +96,18 @@ protected void masterOperation(final ResizeRequest resizeRequest, final ClusterS
// there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(
ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
(i) -> {
i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest, ActionListener.map(listener,
updateRequest, ActionListener.map(delegatedListener,
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -104,7 +103,7 @@ protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, i
versions.put(index, new Tuple<>(version, luceneVersion));
}
}
Map<String, Tuple<org.elasticsearch.Version, String>> updatedVersions = new HashMap<>();
Map<String, Tuple<Version, String>> updatedVersions = new HashMap<>();
MetaData metaData = clusterState.metaData();
for (Map.Entry<String, Tuple<Version, org.apache.lucene.util.Version>> versionEntry : versions.entrySet()) {
String index = versionEntry.getKey();
Expand Down Expand Up @@ -209,16 +208,7 @@ public void onFailure(Exception e) {

private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse updateSettingsResponse) {
listener.onResponse(upgradeResponse);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, ActionListener.delegateFailure(
listener, (delegatedListener, updateSettingsResponse) -> delegatedListener.onResponse(upgradeResponse)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,15 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
return ActionListener.map(actionListener,
response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
} else {
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
delegatedListener.onResponse(
new BulkResponse(
itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis));
});
}
}

Expand Down Expand Up @@ -688,36 +696,4 @@ void markCurrentItemAsFailed(Exception e) {
}

}

static final class IngestBulkResponseListener implements ActionListener<BulkResponse> {

private final long ingestTookInMillis;
private final int[] originalSlots;
private final List<BulkItemResponse> itemResponses;
private final ActionListener<BulkResponse> actionListener;

IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses,
ActionListener<BulkResponse> actionListener) {
this.ingestTookInMillis = ingestTookInMillis;
this.itemResponses = itemResponses;
this.actionListener = actionListener;
this.originalSlots = originalSlots;
}

@Override
public void onResponse(BulkResponse response) {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
actionListener.onResponse(new BulkResponse(
itemResponses.toArray(new BulkItemResponse[itemResponses.size()]),
response.getTook().getMillis(), ingestTookInMillis));
}

@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
}
}
Loading

0 comments on commit 1b3bb6a

Please sign in to comment.