Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [BUGFIX] Fix missing fields to resolve Strict Dynamic Mapping issue when saving task result #16414

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix wrong default value when setting `index.number_of_routing_shards` to null on index creation ([#16331](https://github.com/opensearch-project/OpenSearch/pull/16331))
- Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386))
- [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370)
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.Streams;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.tasks.resourcetracker.TaskResourceStats;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.tasks.resourcetracker.TaskThreadUsage;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.mapper.StrictDynamicMappingException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
Expand All @@ -73,11 +78,17 @@
import org.opensearch.transport.ReceiveTimeoutTransportException;
import org.opensearch.transport.TransportService;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand All @@ -103,6 +114,8 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

/**
* Integration tests for task management API
Expand All @@ -112,6 +125,26 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class TasksIT extends AbstractTasksIT {

protected final TaskInfo taskInfo = new TaskInfo(
new TaskId("fake", 1),
"test_type",
"test_action",
"test_description",
null,
0L,
1L,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
new TaskResourceStats(new HashMap<>() {
{
put("dummy-type1", new TaskResourceUsage(10, 20));
}
}, new TaskThreadUsage(30, 40)),
2L
);

public void testTaskCounts() {
// Run only on data nodes
ListTasksResponse response = client().admin()
Expand Down Expand Up @@ -879,46 +912,77 @@ public void testNodeNotFoundButTaskFound() throws Exception {
// Save a fake task that looks like it is from a node that isn't part of the cluster
CyclicBarrier b = new CyclicBarrier(2);
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(
new TaskResult(
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
null
),
new RuntimeException("test")
),
new ActionListener<Void>() {
resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
try {
b.await();
} catch (InterruptedException | BrokenBarrierException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
});
b.await();

// Now we can find it!
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
TaskResult taskResult = response.getTask();
TaskInfo task = taskResult.getTask();

assertEquals("fake", task.getTaskId().getNodeId());
assertEquals(1, task.getTaskId().getId());
assertEquals("test_type", task.getType());
assertEquals("test_action", task.getAction());
assertEquals("test_description", task.getDescription());
assertEquals(0L, task.getStartTime());
assertEquals(1L, task.getRunningTimeNanos());
assertFalse(task.isCancellable());
assertFalse(task.isCancelled());
assertEquals(TaskId.EMPTY_TASK_ID, task.getParentTaskId());
assertEquals(1, task.getResourceStats().getResourceUsageInfo().size());
assertEquals(30, task.getResourceStats().getThreadUsage().getThreadExecutions());
assertEquals(40, task.getResourceStats().getThreadUsage().getActiveThreads());
assertEquals(Long.valueOf(2L), task.getCancellationStartTime());

assertNotNull(taskResult.getError());
assertNull(taskResult.getResponse());
}

public void testStoreTaskResultFailsDueToMissingIndexMappingFields() throws IOException {
// given
TaskResultsService resultsService = spy(internalCluster().getInstance(TaskResultsService.class));

InputStream mockInputStream = getClass().getResourceAsStream("/org/opensearch/tasks/missing-fields-task-index-mapping.json");
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(mockInputStream, out);
String mockJsonString = out.toString(StandardCharsets.UTF_8.name());

// when & then
doReturn(mockJsonString).when(resultsService).taskResultIndexMapping();

CompletionException thrown = assertThrows(CompletionException.class, () -> {
CompletableFuture<Void> future = new CompletableFuture<>();

resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
try {
b.await();
} catch (InterruptedException | BrokenBarrierException e) {
onFailure(e);
}
future.complete(null);
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
future.completeExceptionally(e);
}
}
);
b.await();
});

// Now we can find it!
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
assertEquals("test", response.getTask().getTask().getAction());
assertNotNull(response.getTask().getError());
assertNull(response.getTask().getResponse());
future.join();
});

assertTrue(thrown.getCause() instanceof StrictDynamicMappingException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class TaskResultsService {

public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";

public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json
public static final int TASK_RESULT_MAPPING_VERSION = 5; // must match version in task-index-mapping.json

/**
* The backoff policy to use when saving a task result fails. The total wait
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"_doc" : {
"_meta": {
"version": 4
"version": 5
},
"dynamic" : "strict",
"properties" : {
Expand Down Expand Up @@ -34,6 +34,9 @@
"start_time_in_millis": {
"type": "long"
},
"cancellation_time_millis": {
"type": "long"
},
"type": {
"type": "keyword"
},
Expand All @@ -47,6 +50,10 @@
"headers": {
"type" : "object",
"enabled" : false
},
"resource_stats": {
"type" : "object",
"enabled" : false
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"_doc" : {
"_meta": {
"version": 5
},
"dynamic" : "strict",
"properties" : {
"completed": {
"type": "boolean"
},
"task" : {
"properties": {
"action": {
"type": "keyword"
},
"cancellable": {
"type": "boolean"
},
"cancelled": {
"type": "boolean"
},
"id": {
"type": "long"
},
"parent_task_id": {
"type": "keyword"
},
"node": {
"type": "keyword"
},
"running_time_in_nanos": {
"type": "long"
},
"start_time_in_millis": {
"type": "long"
},
"type": {
"type": "keyword"
},
"status": {
"type" : "object",
"enabled" : false
},
"description": {
"type": "text"
},
"headers": {
"type" : "object",
"enabled" : false
}
}
},
"response" : {
"type" : "object",
"enabled" : false
},
"error" : {
"type" : "object",
"enabled" : false
}
}
}
}
Loading