Skip to content

Commit 6bc38ab

Browse files
Fix TransportSnapshotsStatusAction ThreadPool Use (#45824)
In case of an in-progress snapshot this endpoint was broken because it tried to execute repository operations in the callback on a transport thread which is not allowed (only generic or snapshot pool are allowed here).
1 parent 7cb26ef commit 6bc38ab

File tree

2 files changed

+45
-9
lines changed

2 files changed

+45
-9
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.carrotsearch.hppc.cursors.ObjectCursor;
2323
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2424
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.action.ActionRunnable;
2526
import org.elasticsearch.action.support.ActionFilters;
2627
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2728
import org.elasticsearch.client.node.NodeClient;
@@ -116,15 +117,13 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
116117
for (int i = 0; i < currentSnapshots.size(); i++) {
117118
snapshots[i] = currentSnapshots.get(i).snapshot();
118119
}
119-
120-
TransportNodesSnapshotsStatus.Request nodesRequest =
121-
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
122-
.snapshots(snapshots).timeout(request.masterNodeTimeout());
123-
client.executeLocally(TransportNodesSnapshotsStatus.TYPE, nodesRequest,
124-
ActionListener.map(
125-
listener, nodeSnapshotStatuses ->
126-
buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
127-
nodeSnapshotStatuses)));
120+
client.executeLocally(TransportNodesSnapshotsStatus.TYPE,
121+
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY))
122+
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
123+
ActionListener.wrap(
124+
nodeSnapshotStatuses -> threadPool.executor(ThreadPool.Names.GENERIC).execute(
125+
ActionRunnable.wrap(listener, l -> l.onResponse(buildResponse(request, snapshotsService.currentSnapshots(
126+
request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses)))), listener::onFailure));
128127
} else {
129128
// We don't have any in-progress shards, just return current stats
130129
listener.onResponse(buildResponse(request, currentSnapshots, null));

server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
package org.elasticsearch.snapshots;
2020

2121
import org.elasticsearch.Version;
22+
import org.elasticsearch.action.ActionFuture;
2223
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
2324
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
2425
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
2526
import org.elasticsearch.client.Client;
27+
import org.elasticsearch.cluster.SnapshotsInProgress;
2628
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.common.unit.TimeValue;
2730

2831
import java.util.List;
2932

@@ -72,4 +75,38 @@ public void testStatusApiConsistency() {
7275
assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
7376
assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
7477
}
78+
79+
public void testStatusAPICallInProgressSnapshot() throws InterruptedException {
80+
Client client = client();
81+
82+
logger.info("--> creating repository");
83+
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("mock").setSettings(
84+
Settings.builder().put("location", randomRepoPath()).put("block_on_data", true)));
85+
86+
createIndex("test-idx-1");
87+
ensureGreen();
88+
89+
logger.info("--> indexing some data");
90+
for (int i = 0; i < 100; i++) {
91+
index("test-idx-1", "_doc", Integer.toString(i), "foo", "bar" + i);
92+
}
93+
refresh();
94+
95+
logger.info("--> snapshot");
96+
ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture =
97+
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();
98+
99+
logger.info("--> wait for data nodes to get blocked");
100+
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
101+
102+
final List<SnapshotStatus> snapshotStatus = client.admin().cluster().snapshotsStatus(
103+
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
104+
assertEquals(snapshotStatus.get(0).getState(), SnapshotsInProgress.State.STARTED);
105+
106+
logger.info("--> unblock all data nodes");
107+
unblockAllDataNodes("test-repo");
108+
109+
logger.info("--> wait for snapshot to finish");
110+
createSnapshotResponseActionFuture.actionGet();
111+
}
75112
}

0 commit comments

Comments
 (0)