Skip to content

Commit

Permalink
Turn RecoveryResponse into a chunked REST response (#89999)
Browse files Browse the repository at this point in the history
These can be huge, make them chunked to be nice to the coordinating node.

relates #89838
  • Loading branch information
original-brownbear authored Sep 12, 2022
1 parent 11294ad commit 3f820a4
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* Information regarding the recovery state of indices and their associated shards.
*/
public class RecoveryResponse extends BroadcastResponse {
public class RecoveryResponse extends BroadcastResponse implements ChunkedToXContent {

private final Map<String, List<RecoveryState>> shardRecoveryStates;

Expand Down Expand Up @@ -62,27 +65,27 @@ public Map<String, List<RecoveryState>> shardRecoveryStates() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (hasRecoveries()) {
for (String index : shardRecoveryStates.keySet()) {
List<RecoveryState> recoveryStates = shardRecoveryStates.get(index);
if (recoveryStates == null || recoveryStates.size() == 0) {
continue;
}
builder.startObject(index);
builder.startArray("shards");
for (RecoveryState recoveryState : recoveryStates) {
builder.startObject();
recoveryState.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
builder.endObject();
}
}
builder.endObject();
return builder;
public Iterator<ToXContent> toXContentChunked() {
return Iterators.concat(
Iterators.single((b, p) -> b.startObject()),
shardRecoveryStates.entrySet()
.stream()
.filter(entry -> entry != null && entry.getValue().isEmpty() == false)
.map(entry -> (ToXContent) (b, p) -> {
b.startObject(entry.getKey());
b.startArray("shards");
for (RecoveryState recoveryState : entry.getValue()) {
b.startObject();
recoveryState.toXContent(b, p);
b.endObject();
}
b.endArray();
b.endObject();
return b;
})
.iterator(),
Iterators.single((b, p) -> b.endObject())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
package org.elasticsearch.rest.action.admin.indices;

import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -51,6 +55,14 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
.indices()
.recoveries(recoveryRequest, new RestToXContentListener<>(channel));
.recoveries(recoveryRequest, new RestActionListener<>(channel) {
@Override
protected void processResponse(RecoveryResponse recoveryResponse) throws IOException {
ensureOpen();
channel.sendResponse(
new RestResponse(RestStatus.OK, ChunkedRestResponseBody.fromXContent(recoveryResponse, request, channel))
);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESTestCase;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

public class RecoveryResponseTests extends ESTestCase {

public void testChunkedToXContent() {
final int failedShards = randomIntBetween(0, 50);
final int successfulShards = randomIntBetween(0, 50);
DiscoveryNode sourceNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
DiscoveryNode targetNode = new DiscoveryNode("bar", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
final int shards = randomInt(50);
final RecoveryResponse recoveryResponse = new RecoveryResponse(
successfulShards + failedShards,
successfulShards,
failedShards,
IntStream.range(0, shards)
.boxed()
.collect(
Collectors.toUnmodifiableMap(
i -> "index-" + i,
i -> List.of(
new RecoveryState(
ShardRouting.newUnassigned(
new ShardId("index-" + i, "index-uuid-" + i, 0),
randomBoolean(),
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)
).initialize(sourceNode.getId(), null, randomNonNegativeLong()),
sourceNode,
targetNode
)
)
)
),
List.of()
);
final var iterator = recoveryResponse.toXContentChunked();
int chunks = 0;
while (iterator.hasNext()) {
iterator.next();
chunks++;
}
assertEquals(shards + 2, chunks);
}
}

0 comments on commit 3f820a4

Please sign in to comment.