Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunked encoding for pending tasks API #91929

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testPendingTasksWithIndexBlocks() {
try {
enableIndexBlock("test", blockSetting);
PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get();
assertNotNull(response.getPendingTasks());
assertNotNull(response.pendingTasks());
} finally {
disableIndexBlock("test", blockSetting);
}
Expand All @@ -54,7 +54,7 @@ public void testPendingTasksWithClusterReadOnlyBlock() {
try {
setClusterReadOnly(true);
PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get();
assertNotNull(response.getPendingTasks());
assertNotNull(response.pendingTasks());
} finally {
setClusterReadOnly(false);
}
Expand All @@ -80,7 +80,7 @@ public boolean validateClusterForming() {
}
});

assertNotNull(client().admin().cluster().preparePendingClusterTasks().get().getPendingTasks());
assertNotNull(client().admin().cluster().preparePendingClusterTasks().get().pendingTasks());

// starting one more node allows the cluster to recover
internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
for (PendingClusterTask task : response) {
for (PendingClusterTask task : response.pendingTasks()) {
controlSources.remove(task.getSource().string());
}
assertTrue(controlSources.isEmpty());
Expand Down Expand Up @@ -431,7 +431,7 @@ public void onFailure(Exception e) {
response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : response) {
for (PendingClusterTask task : response.pendingTasks()) {
if (controlSources.remove(task.getSource().string())) {
assertThat(task.getTimeInQueueInMillis(), greaterThan(0L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class PendingClusterTasksResponse extends ActionResponse implements Iterable<PendingClusterTask>, ToXContentObject {
public class PendingClusterTasksResponse extends ActionResponse implements ChunkedToXContent {

private final List<PendingClusterTask> pendingTasks;

Expand All @@ -36,23 +37,11 @@ public List<PendingClusterTask> pendingTasks() {
return pendingTasks;
}

/**
* The pending cluster tasks
*/
public List<PendingClusterTask> getPendingTasks() {
return pendingTasks();
}

@Override
public Iterator<PendingClusterTask> iterator() {
return pendingTasks.iterator();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("tasks: (").append(pendingTasks.size()).append("):\n");
for (PendingClusterTask pendingClusterTask : this) {
for (PendingClusterTask pendingClusterTask : pendingTasks) {
sb.append(pendingClusterTask.getInsertOrder())
.append("/")
.append(pendingClusterTask.getPriority())
Expand All @@ -66,10 +55,12 @@ public String toString() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(Fields.TASKS);
for (PendingClusterTask pendingClusterTask : this) {
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(Iterators.single((builder, p) -> {
builder.startObject();
builder.startArray(Fields.TASKS);
return builder;
}), pendingTasks.stream().<ToXContent>map(pendingClusterTask -> (builder, p) -> {
builder.startObject();
builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder());
builder.field(Fields.PRIORITY, pendingClusterTask.getPriority());
Expand All @@ -78,10 +69,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
return builder;
}).iterator(), Iterators.single((builder, p) -> {
builder.endArray();
builder.endObject();
return builder;
}));
}

static final class Fields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.io.IOException;
import java.util.List;
Expand All @@ -36,6 +36,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
return channel -> client.admin().cluster().pendingClusterTasks(pendingClusterTasksRequest, new RestToXContentListener<>(channel));
return channel -> client.admin()
.cluster()
.pendingClusterTasks(pendingClusterTasksRequest, new RestChunkedToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ protected Table getTableWithHeader(final RestRequest request) {
return t;
}

private Table buildTable(RestRequest request, PendingClusterTasksResponse tasks) {
private Table buildTable(RestRequest request, PendingClusterTasksResponse response) {
Table t = getTableWithHeader(request);

for (PendingClusterTask task : tasks) {
for (PendingClusterTask task : response.pendingTasks()) {
t.startRow();
t.addCell(task.getInsertOrder());
t.addCell(task.getTimeInQueue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.tasks;

import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

public class PendingClusterTasksResponseTests extends ESTestCase {
public void testPendingClusterTasksResponseChunking() throws IOException {
final var tasks = new ArrayList<PendingClusterTask>();
for (int i = between(0, 10); i > 0; i--) {
tasks.add(
new PendingClusterTask(
randomNonNegativeLong(),
randomFrom(Priority.values()),
new Text(randomAlphaOfLengthBetween(1, 10)),
randomNonNegativeLong(),
randomBoolean()
)
);
}

int chunkCount = 0;
try (XContentBuilder builder = jsonBuilder()) {
final var iterator = new PendingClusterTasksResponse(tasks).toXContentChunked(ToXContent.EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
} // closing the builder verifies that the XContent is well-formed

assertEquals(tasks.size() + 2, chunkCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,11 @@ public void waitNoPendingTasksOnAll() throws Exception {
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
assertThat("client " + client + " still has pending tasks " + pendingTasks, pendingTasks, Matchers.emptyIterable());
assertThat(
"client " + client + " still has pending tasks " + pendingTasks,
pendingTasks.pendingTasks(),
Matchers.emptyIterable()
);
clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
}
Expand Down