From 9d241148b83817db9af65b02e6da32cfe9604f00 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 11 Oct 2022 10:50:34 +0200 Subject: [PATCH 1/2] Chunked encoding for snapshot status API Simple implementation of chunked encoding for the snapshot status API. Tested with 100 snapshots of 25k shards (all in-progress) where it can produce the 1G+ response in less than 10s. --- .../snapshots/status/SnapshotStatus.java | 71 ++++++++----------- .../status/SnapshotsStatusResponse.java | 34 +++++---- .../cluster/RestSnapshotsStatusAction.java | 4 +- .../snapshots/status/SnapshotStatusTests.java | 10 ++- .../status/SnapshotsStatusResponseTests.java | 27 ++++++- 5 files changed, 84 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java index 2a1e34392b9a5..c8259637b6d10 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -11,28 +11,28 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.Nullable; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xcontent.ToXContentObject; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -43,7 +43,7 @@ /** * Status of a snapshot */ -public class SnapshotStatus implements ToXContentObject, Writeable { +public class SnapshotStatus implements ChunkedToXContent, Writeable { private final Snapshot snapshot; @@ -142,29 +142,22 @@ public SnapshotShardsStats getShardsStats() { * Returns list of snapshot indices */ public Map getIndices() { - if (this.indicesStatus != null) { - return this.indicesStatus; + var res = this.indicesStatus; + if (res != null) { + return res; } - Map indicesStatus = new HashMap<>(); - - Set indices = new HashSet<>(); + Map> indices = new HashMap<>(); for (SnapshotIndexShardStatus shard : shards) { - indices.add(shard.getIndex()); + indices.computeIfAbsent(shard.getIndex(), k -> new ArrayList<>()).add(shard); } - - for (String index : indices) { - List shards = new ArrayList<>(); - for (SnapshotIndexShardStatus shard : this.shards) { - if (shard.getIndex().equals(index)) { - shards.add(shard); - } - } - indicesStatus.put(index, new SnapshotIndexStatus(index, shards)); + Map indicesStatus = Maps.newMapWithExpectedSize(indices.size()); + for (Map.Entry> entry : indices.entrySet()) { + indicesStatus.put(entry.getKey(), new SnapshotIndexStatus(entry.getKey(), entry.getValue())); } - this.indicesStatus = unmodifiableMap(indicesStatus); - return this.indicesStatus; - + res = unmodifiableMap(indicesStatus); + this.indicesStatus = res; + return res; } @Override @@ -197,24 +190,20 @@ public SnapshotStats getStats() { private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(SNAPSHOT, snapshot.getSnapshotId().getName()); - builder.field(REPOSITORY, snapshot.getRepository()); - builder.field(UUID, snapshot.getSnapshotId().getUUID()); - builder.field(STATE, state.name()); - if (includeGlobalState != null) { - builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState); - } - builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, params); - builder.field(SnapshotStats.Fields.STATS, stats, params); - builder.startObject(INDICES); - for (SnapshotIndexStatus indexStatus : getIndices().values()) { - indexStatus.toXContent(builder, params); - } - builder.endObject(); - builder.endObject(); - return builder; + public Iterator toXContentChunked() { + return Iterators.concat(Iterators.single((ToXContent) (b, p) -> { + var builder = b.startObject() + .field(SNAPSHOT, snapshot.getSnapshotId().getName()) + .field(REPOSITORY, snapshot.getRepository()) + .field(UUID, snapshot.getSnapshotId().getUUID()) + .field(STATE, state.name()); + if (includeGlobalState != null) { + builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState); + } + return builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, p) + .field(SnapshotStats.Fields.STATS, stats, p) + .startObject(INDICES); + }), getIndices().values().iterator(), Iterators.single((b, p) -> b.endObject().endObject())); } static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java index cc4431ad27d81..13069119d3b9f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java @@ -9,24 +9,29 @@ package org.elasticsearch.action.admin.cluster.snapshots.status; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xcontent.ToXContentObject; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.StreamSupport; import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; /** * Snapshot status response */ -public class SnapshotsStatusResponse extends ActionResponse implements ToXContentObject { +public class SnapshotsStatusResponse extends ActionResponse implements ChunkedToXContent { private final List snapshots; @@ -53,18 +58,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeList(snapshots); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.startArray("snapshots"); - for (SnapshotStatus snapshot : snapshots) { - snapshot.toXContent(builder, params); - } - builder.endArray(); - builder.endObject(); - return builder; - } - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "snapshots_status_response", true, @@ -94,4 +87,15 @@ public boolean equals(Object o) { public int hashCode() { return snapshots != null ? snapshots.hashCode() : 0; } + + @Override + public Iterator toXContentChunked() { + return Iterators.concat( + Iterators.single((ToXContent) (b, p) -> b.startObject().startArray("snapshots")), + snapshots.stream() + .flatMap(s -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(s.toXContentChunked(), Spliterator.ORDERED), false)) + .iterator(), + Iterators.single((b, p) -> b.endArray().endObject()) + ); + } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java index 02ae94cb52945..8e8744a03d626 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java @@ -14,7 +14,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestCancellableNodeClient; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; import java.io.IOException; import java.util.List; @@ -54,6 +54,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC snapshotsStatusRequest.masterNodeTimeout(request.paramAsTime("master_timeout", snapshotsStatusRequest.masterNodeTimeout())); return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .cluster() - .snapshotsStatus(snapshotsStatusRequest, new RestToXContentListener<>(channel)); + .snapshotsStatus(snapshotsStatusRequest, new RestChunkedToXContentListener<>(channel)); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java index 33fbb47f22667..fe5d4e92bd225 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java @@ -10,10 +10,11 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; @@ -21,7 +22,7 @@ import java.util.List; import java.util.function.Predicate; -public class SnapshotStatusTests extends AbstractXContentTestCase { +public class SnapshotStatusTests extends AbstractChunkedSerializingTestCase { public void testToString() throws Exception { SnapshotsInProgress.State state = randomFrom(SnapshotsInProgress.State.values()); @@ -180,4 +181,9 @@ protected SnapshotStatus doParseInstance(XContentParser parser) throws IOExcepti protected boolean supportsUnknownFields() { return true; } + + @Override + protected Writeable.Reader instanceReader() { + return SnapshotStatus::new; + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponseTests.java index 82807d37501ad..df4fe643ab25c 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponseTests.java @@ -8,7 +8,8 @@ package org.elasticsearch.action.admin.cluster.snapshots.status; -import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; @@ -16,7 +17,7 @@ import java.util.List; import java.util.function.Predicate; -public class SnapshotsStatusResponseTests extends AbstractXContentTestCase { +public class SnapshotsStatusResponseTests extends AbstractChunkedSerializingTestCase { @Override protected SnapshotsStatusResponse doParseInstance(XContentParser parser) throws IOException { @@ -43,4 +44,26 @@ protected SnapshotsStatusResponse createTestInstance() { } return new SnapshotsStatusResponse(snapshotStatuses); } + + @Override + protected Writeable.Reader instanceReader() { + return SnapshotsStatusResponse::new; + } + + public void testChunkCount() { + final var instance = createTestInstance(); + // open and close chunk + int chunksExpected = 2; + for (SnapshotStatus snapshot : instance.getSnapshots()) { + // open and close chunk + one chunk per index + chunksExpected += 2 + snapshot.getIndices().size(); + } + final var iterator = instance.toXContentChunked(); + int chunksSeen = 0; + while (iterator.hasNext()) { + iterator.next(); + chunksSeen++; + } + assertEquals(chunksExpected, chunksSeen); + } } From 9e1cd89c65c9998355448264295944f6fd684dc0 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 2 Nov 2022 16:09:57 +0100 Subject: [PATCH 2/2] cr nit --- .../admin/cluster/snapshots/status/SnapshotStatus.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java index c8259637b6d10..d9b47f7ecc237 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -192,15 +192,15 @@ public SnapshotStats getStats() { @Override public Iterator toXContentChunked() { return Iterators.concat(Iterators.single((ToXContent) (b, p) -> { - var builder = b.startObject() + b.startObject() .field(SNAPSHOT, snapshot.getSnapshotId().getName()) .field(REPOSITORY, snapshot.getRepository()) .field(UUID, snapshot.getSnapshotId().getUUID()) .field(STATE, state.name()); if (includeGlobalState != null) { - builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState); + b.field(INCLUDE_GLOBAL_STATE, includeGlobalState); } - return builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, p) + return b.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, p) .field(SnapshotStats.Fields.STATS, stats, p) .startObject(INDICES); }), getIndices().values().iterator(), Iterators.single((b, p) -> b.endObject().endObject()));