Skip to content

Commit

Permalink
Migrate .tasks to be managed automatically (elastic#67351)
Browse files Browse the repository at this point in the history
Re-apply changes from 0c9b9c1, which migrates the `.tasks` system index
to be managed automatically by the system indices infrastructure.

Changes went into elastic#67114 that, I hope, will avoid the problems we saw before
in the BWC tests in CI.
  • Loading branch information
pugnascotia committed Feb 8, 2021
1 parent 138cf89 commit 8bd7bed
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.tasks.TaskResultsService.TASK_INDEX;
import static org.elasticsearch.tasks.TaskResultsService.TASKS_DESCRIPTOR;

/**
* This class holds the {@link SystemIndexDescriptor} objects that represent system indices the
Expand All @@ -39,7 +39,7 @@
*/
public class SystemIndices {
private static final Map<String, Collection<SystemIndexDescriptor>> SERVER_SYSTEM_INDEX_DESCRIPTORS = singletonMap(
TaskResultsService.class.getName(), singletonList(new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index"))
TaskResultsService.class.getName(), singletonList(TASKS_DESCRIPTOR)
);

private final CharacterRunAutomaton runAutomaton;
Expand Down
200 changes: 109 additions & 91 deletions server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,32 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Map;

import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
* Service that can store task results.
Expand All @@ -55,14 +46,19 @@ public class TaskResultsService {
private static final Logger logger = LogManager.getLogger(TaskResultsService.class);

public static final String TASK_INDEX = ".tasks";

public static final String TASK_TYPE = "task";

public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json";

public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";

public static final int TASK_RESULT_MAPPING_VERSION = 3;
public static final SystemIndexDescriptor TASKS_DESCRIPTOR = SystemIndexDescriptor.builder()
.setIndexPattern(TASK_INDEX + "*")
.setPrimaryIndex(TASK_INDEX)
.setDescription("Task Result Index")
.setSettings(getTaskResultIndexSettings())
.setMappings(getTaskResultIndexMappings())
.setVersionMetaKey(TASK_RESULT_MAPPING_VERSION_META_FIELD)
.setOrigin(TASKS_ORIGIN)
.setIndexType(TASK_TYPE)
.build();

/**
* The backoff policy to use when saving a task result fails. The total wait
Expand All @@ -73,76 +69,16 @@ public class TaskResultsService {

private final Client client;

private final ClusterService clusterService;

private final ThreadPool threadPool;

@Inject
public TaskResultsService(Client client, ClusterService clusterService, ThreadPool threadPool) {
public TaskResultsService(Client client, ThreadPool threadPool) {
this.client = new OriginSettingClient(client, TASKS_ORIGIN);
this.clusterService = clusterService;
this.threadPool = threadPool;
}

public void storeResult(TaskResult taskResult, ActionListener<Void> listener) {

ClusterState state = clusterService.state();

if (state.routingTable().hasIndex(TASK_INDEX) == false) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.settings(taskResultIndexSettings());
createIndexRequest.index(TASK_INDEX);
createIndexRequest.mapping(TASK_TYPE, taskResultIndexMapping(), XContentType.JSON);
createIndexRequest.cause("auto(task api)");

client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
doStoreResult(taskResult, listener);
}

@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
// we have the index, do it
try {
doStoreResult(taskResult, listener);
} catch (Exception inner) {
inner.addSuppressed(e);
listener.onFailure(inner);
}
} else {
listener.onFailure(e);
}
}
});
} else {
IndexMetadata metadata = state.getMetadata().index(TASK_INDEX);
if (getTaskResultMappingVersion(metadata) < TASK_RESULT_MAPPING_VERSION) {
// The index already exists but doesn't have our mapping
client.admin().indices().preparePutMapping(TASK_INDEX).setType(TASK_TYPE)
.setSource(taskResultIndexMapping(), XContentType.JSON)
.execute(ActionListener.delegateFailure(listener, (l, r) -> doStoreResult(taskResult, listener)));
} else {
doStoreResult(taskResult, listener);
}
}
}

private int getTaskResultMappingVersion(IndexMetadata metadata) {
MappingMetadata mappingMetadata = metadata.getMappings().get(TASK_TYPE);
if (mappingMetadata == null) {
return 0;
}
@SuppressWarnings("unchecked") Map<String, Object> meta = (Map<String, Object>) mappingMetadata.sourceAsMap().get("_meta");
if (meta == null || meta.containsKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) == false) {
return 1; // The mapping was created before meta field was introduced
}
return (int) meta.get(TASK_RESULT_MAPPING_VERSION_META_FIELD);
}

private void doStoreResult(TaskResult taskResult, ActionListener<Void> listener) {
IndexRequestBuilder index = client.prepareIndex(TASK_INDEX, TASK_TYPE, taskResult.getTask().getTaskId().toString());
IndexRequestBuilder index = client.prepareIndex(TASK_INDEX, TASK_TYPE).setId(taskResult.getTask().getTaskId().toString());
try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) {
taskResult.toXContent(builder, ToXContent.EMPTY_PARAMS);
index.setSource(builder);
Expand Down Expand Up @@ -173,24 +109,106 @@ public void onFailure(Exception e) {
});
}

private Settings taskResultIndexSettings() {
private static Settings getTaskResultIndexSettings() {
return Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1")
.put(IndexMetadata.SETTING_PRIORITY, Integer.MAX_VALUE)
.build();
}

public String taskResultIndexMapping() {
try (InputStream is = getClass().getResourceAsStream(TASK_RESULT_INDEX_MAPPING_FILE)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
return out.toString(StandardCharsets.UTF_8.name());
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage(
"failed to create tasks results index template [{}]", TASK_RESULT_INDEX_MAPPING_FILE), e);
throw new IllegalStateException("failed to create tasks results index template [" + TASK_RESULT_INDEX_MAPPING_FILE + "]", e);
}
private static XContentBuilder getTaskResultIndexMappings() {
try {
final XContentBuilder builder = jsonBuilder();

builder.startObject();
{
builder.startObject(TASK_TYPE);
{
builder.startObject("_meta");
builder.field(TASK_RESULT_MAPPING_VERSION_META_FIELD, Version.CURRENT.toString());
builder.endObject();

builder.field("dynamic", "strict");
builder.startObject("properties");
{
builder.startObject("completed");
builder.field("type", "boolean");
builder.endObject();

builder.startObject("task");
{
builder.startObject("properties");
{
builder.startObject("action");
builder.field("type", "keyword");
builder.endObject();

builder.startObject("cancellable");
builder.field("type", "boolean");
builder.endObject();

builder.startObject("id");
builder.field("type", "long");
builder.endObject();

builder.startObject("parent_task_id");
builder.field("type", "keyword");
builder.endObject();

builder.startObject("node");
builder.field("type", "keyword");
builder.endObject();

builder.startObject("running_time_in_nanos");
builder.field("type", "long");
builder.endObject();

builder.startObject("start_time_in_millis");
builder.field("type", "long");
builder.endObject();

builder.startObject("type");
builder.field("type", "keyword");
builder.endObject();

builder.startObject("status");
builder.field("type", "object");
builder.field("enabled", false);
builder.endObject();

builder.startObject("description");
builder.field("type", "text");
builder.endObject();

builder.startObject("headers");
builder.field("type", "object");
builder.field("enabled", false);
builder.endObject();
}
builder.endObject();
}
builder.endObject();

builder.startObject("response");
builder.field("type", "object");
builder.field("enabled", false);
builder.endObject();

builder.startObject("error");
builder.field("type", "object");
builder.field("enabled", false);
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}

builder.endObject();
return builder;
} catch (IOException e) {
throw new UncheckedIOException("Failed to build " + TASK_INDEX + " index mappings", e);
}
}
}

This file was deleted.

0 comments on commit 8bd7bed

Please sign in to comment.