Skip to content

Commit

Permalink
[BUG] Add missing fields to resolve Strict Dynamic Mapping issue in .…
Browse files Browse the repository at this point in the history
…tasks index (opensearch-project#16060)

- Fixed issue where `.tasks` index failed to update due to StrictDynamicMappingException when a task was cancelled.
- Added missing `cancellation_time_millis` and `resource_stats` fields to `task-index-mapping.json`.
- Ensured proper task result storage by updating the mappings.
- Changed the version in the meta field from 4 to 5 to reflect the updated mappings.

Signed-off-by: inpink <inpink@kakao.com>
  • Loading branch information
inpink committed Oct 18, 2024
1 parent 20536ee commit 9a01b21
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Streaming bulk request hangs ([#16158](https://github.com/opensearch-project/OpenSearch/pull/16158))
- Fix warnings from SLF4J on startup when repository-s3 is installed ([#16194](https://github.com/opensearch-project/OpenSearch/pull/16194))
- Fix protobuf-java leak through client library dependencies ([#16254](https://github.com/opensearch-project/OpenSearch/pull/16254))
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@

package org.opensearch.action.admin.cluster.node.tasks;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
Expand All @@ -58,9 +64,14 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.Streams;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.tasks.resourcetracker.TaskResourceStats;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.tasks.resourcetracker.TaskThreadUsage;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.mapper.StrictDynamicMappingException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
Expand All @@ -86,6 +97,8 @@

import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.common.unit.TimeValue.timeValueSeconds;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
Expand All @@ -112,6 +125,26 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class TasksIT extends AbstractTasksIT {

public static final TaskInfo taskInfo = new TaskInfo(
new TaskId("fake", 1),
"test_type",
"test_action",
"test_description",
null,
0L,
1L,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
new TaskResourceStats(new HashMap<>() {
{
put("dummy-type1", new TaskResourceUsage(10, 20));
}
}, new TaskThreadUsage(30, 40)),
2L
);

public void testTaskCounts() {
// Run only on data nodes
ListTasksResponse response = client().admin()
Expand Down Expand Up @@ -881,20 +914,7 @@ public void testNodeNotFoundButTaskFound() throws Exception {
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(
new TaskResult(
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
null
),
taskInfo,
new RuntimeException("test")
),
new ActionListener<Void>() {
Expand All @@ -917,8 +937,66 @@ public void onFailure(Exception e) {

// Now we can find it!
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
assertEquals("test", response.getTask().getTask().getAction());
assertNotNull(response.getTask().getError());
assertNull(response.getTask().getResponse());
TaskResult taskResult = response.getTask();
TaskInfo task = taskResult.getTask();

assertEquals("fake", task.getTaskId().getNodeId());
assertEquals(1, task.getTaskId().getId());
assertEquals("test_type", task.getType());
assertEquals("test_action", task.getAction());
assertEquals("test_description", task.getDescription());
assertEquals(0L, task.getStartTime());
assertEquals(1L, task.getRunningTimeNanos());
assertFalse(task.isCancellable());
assertFalse(task.isCancelled());
assertEquals(TaskId.EMPTY_TASK_ID, task.getParentTaskId());
assertEquals(1, task.getResourceStats().getResourceUsageInfo().size());
assertEquals(30, task.getResourceStats().getThreadUsage().getThreadExecutions());
assertEquals(40, task.getResourceStats().getThreadUsage().getActiveThreads());
assertEquals(Long.valueOf(2L), task.getCancellationStartTime());


assertNotNull(taskResult.getError());
assertNull(taskResult.getResponse());
System.out.println("original:"+resultsService.taskResultIndexMapping());
}

public void testStoreTaskResultFailsDueToMissingIndexMappingFields() throws IOException {
// given
TaskResultsService resultsService = spy(internalCluster().getInstance(TaskResultsService.class));

InputStream mockInputStream = getClass().getResourceAsStream("/org/opensearch/tasks/missing-fields-task-index-mapping.json");
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(mockInputStream, out);
String mockJsonString = out.toString(StandardCharsets.UTF_8.name());

// when & then
doReturn(mockJsonString).when(resultsService).taskResultIndexMapping();

CompletionException thrown = assertThrows(CompletionException.class, () -> {
CompletableFuture<Void> future = new CompletableFuture<>();

resultsService.storeResult(
new TaskResult(
taskInfo,
new RuntimeException("test")
),
new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
future.complete(null);
}

@Override
public void onFailure(Exception e) {
future.completeExceptionally(e);
}
}
);

future.join();
});

assertTrue(thrown.getCause() instanceof StrictDynamicMappingException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class TaskResultsService {

public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";

public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json
public static final int TASK_RESULT_MAPPING_VERSION = 5; // must match version in task-index-mapping.json

/**
* The backoff policy to use when saving a task result fails. The total wait
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"_doc" : {
"_meta": {
"version": 4
"version": 5
},
"dynamic" : "strict",
"properties" : {
Expand Down Expand Up @@ -34,6 +34,9 @@
"start_time_in_millis": {
"type": "long"
},
"cancellation_time_millis": {
"type": "long"
},
"type": {
"type": "keyword"
},
Expand All @@ -47,6 +50,10 @@
"headers": {
"type" : "object",
"enabled" : false
},
"resource_stats": {
"type" : "object",
"enabled" : false
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"_doc" : {
"_meta": {
"version": 5
},
"dynamic" : "strict",
"properties" : {
"completed": {
"type": "boolean"
},
"task" : {
"properties": {
"action": {
"type": "keyword"
},
"cancellable": {
"type": "boolean"
},
"cancelled": {
"type": "boolean"
},
"id": {
"type": "long"
},
"parent_task_id": {
"type": "keyword"
},
"node": {
"type": "keyword"
},
"running_time_in_nanos": {
"type": "long"
},
"start_time_in_millis": {
"type": "long"
},
"type": {
"type": "keyword"
},
"status": {
"type" : "object",
"enabled" : false
},
"description": {
"type": "text"
},
"headers": {
"type" : "object",
"enabled" : false
}
}
},
"response" : {
"type" : "object",
"enabled" : false
},
"error" : {
"type" : "object",
"enabled" : false
}
}
}
}

0 comments on commit 9a01b21

Please sign in to comment.