Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Close IndexFieldDataService asynchronously ([#18888](https://github.com/opensearch-project/OpenSearch/pull/18888))
- Fix query string regex queries incorrectly swallowing TooComplexToDeterminizeException ([#18883](https://github.com/opensearch-project/OpenSearch/pull/18883))
- Fix socks5 user password settings for Azure repo ([#18904](https://github.com/opensearch-project/OpenSearch/pull/18904))

- Reset isPipelineResolved to false to resolve the system ingest pipeline again. ([#18911](https://github.com/opensearch-project/OpenSearch/pull/18911))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1266,9 +1266,12 @@ private Pipeline getPipeline(
final PipelineHolder holder = pipelines.get(pipelineId);
if (IngestPipelineType.SYSTEM_FINAL.equals(pipelineType)) {
Pipeline indexPipeline = systemIngestPipelineCache.getSystemIngestPipeline(pipelineId);
// In very edge case it is possible the cache is invalidated after we resolve the
// pipeline. So try to resolve the system ingest pipeline again here.
// In some edge cases it is possible the cache is invalidated after we resolve the
// pipeline or the request is forwarded from a non-ingest node. In that case we should try to resolve the
// system ingest pipeline on this node one more time.
if (indexPipeline == null) {
// reset isPipelineResolved as false so that we can resolve it again
indexRequest.isPipelineResolved(false);
resolveSystemIngestPipeline(actionRequest, indexRequest, state.metadata());
final String newPipelineId = indexRequest.getSystemIngestPipeline();
// set it as NOOP to avoid duplicated execution after we switch back to the write thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
Expand Down Expand Up @@ -2937,19 +2938,29 @@ public void testExecuteBulkRequestInBatchWithSystemPipeline() throws Exception {
clusterState = createPipeline("pipeline", new BytesArray("{\"processors\": [{\"default\" : {}}]}"), ingestService, clusterState);
createPipeline("final_pipeline", new BytesArray("{\"processors\": [{\"dummy\" : {}}]}"), ingestService, clusterState);

// prepare systemIngestPipelineCache for the case using index template
SystemIngestPipelineCache systemIngestPipelineCache = ingestService.getSystemIngestPipelineCache();
systemIngestPipelineCache.cachePipeline(
"[_index/template]",
new Pipeline("id", "description", null, new CompoundProcessor(false, List.of(dummySystemProcessor), Collections.emptyList())),
100
);

// prepare request
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1")
.source(emptyMap())
.setPipeline("pipeline")
.setFinalPipeline("final_pipeline")
.setSystemIngestPipeline("index_pipeline");
.setSystemIngestPipeline("[_index/template]")
.isPipelineResolved(true);
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2")
.source(emptyMap())
.setPipeline("pipeline")
.setFinalPipeline("final_pipeline")
.setSystemIngestPipeline("index_pipeline");
.setSystemIngestPipeline("[_index/template]")
.isPipelineResolved(true);
bulkRequest.add(indexRequest2);

// prepare handler
Expand All @@ -2967,6 +2978,10 @@ public void testExecuteBulkRequestInBatchWithSystemPipeline() throws Exception {
Names.WRITE
);

// verify we use the system pipeline from the cache
verify(systemIngestPipelineCache, times(1)).getSystemIngestPipeline(any());
// only 1 time for set up
verify(systemIngestPipelineCache, times(1)).cachePipeline(any(), any(), anyInt());
// verify
verify(defaultProcessor, times(1)).batchExecute(any(), any());
verify(dummyProcessor, times(1)).batchExecute(any(), any());
Expand Down Expand Up @@ -3030,7 +3045,73 @@ public void testExecuteBulkRequestSingleRequestWithSystemPipeline() throws Excep
.source(emptyMap())
.setPipeline("_none")
.setFinalPipeline("_none")
.setSystemIngestPipeline("[_index/uuid]");
.setSystemIngestPipeline("[_index/uuid]")
.isPipelineResolved(true);
bulkRequest.add(indexRequest1);

// prepare systemIngestPipelineCache
SystemIngestPipelineCache systemIngestPipelineCache = ingestService.getSystemIngestPipelineCache();
systemIngestPipelineCache.cachePipeline(
"[_index/uuid]",
new Pipeline("id", "description", null, new CompoundProcessor(false, List.of(dummySystemProcessor), Collections.emptyList())),
100
);

// prepare handler
final Map<Integer, Exception> failureHandler = new HashMap<>();
final Map<Thread, Exception> completionHandler = new HashMap<>();
final List<Integer> dropHandler = new ArrayList<>();

// call
ingestService.executeBulkRequest(
1,
bulkRequest.requests(),
failureHandler::put,
completionHandler::put,
dropHandler::add,
Names.WRITE
);

// verify we use the system pipeline from the cache
verify(systemIngestPipelineCache, times(1)).getSystemIngestPipeline(any());
// only 1 time for set up
verify(systemIngestPipelineCache, times(1)).cachePipeline(any(), any(), anyInt());
// verify
verify(dummySystemProcessor, times(1)).execute(any(), any());
assertTrue(failureHandler.isEmpty());
assertTrue(dropHandler.isEmpty());
assertEquals(1, completionHandler.size());
}

public void testExecuteBulkRequestWithSystemPipelineButCacheNotAvailable() throws Exception {
// mock a system pipeline do nothing
final Processor dummySystemProcessor = mock(Processor.class);
final Map<String, Processor.Factory> systemProcessors = createDummyMockSystemProcessors(dummySystemProcessor, false);

// add index metadata for the new index
IngestService ingestService = createIngestServiceWithProcessors(Map.of(), systemProcessors);
IndexMetadata indexMetadata = spy(
IndexMetadata.builder("_index")
.settings(settings(Version.CURRENT))
.putMapping("{}")
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetadata.builder("alias").writeIndex(true).build())
.build()
);
when(indexMetadata.getIndex()).thenReturn(new Index("_index", "uuid"));
Metadata metadata = Metadata.builder().indices(Map.of("_index", indexMetadata)).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build();
ingestService.applyClusterState(new ClusterChangedEvent("_name", clusterState, clusterState));

// prepare request
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1")
.source(emptyMap())
.setPipeline("_none")
.setFinalPipeline("_none")
.setSystemIngestPipeline("[_index/uuid]")
.isPipelineResolved(true);
bulkRequest.add(indexRequest1);

// prepare handler
Expand All @@ -3048,6 +3129,11 @@ public void testExecuteBulkRequestSingleRequestWithSystemPipeline() throws Excep
Names.WRITE
);

// verify we resolve the system pipeline again and then cache it
SystemIngestPipelineCache systemIngestPipelineCache = ingestService.getSystemIngestPipelineCache();
verify(systemIngestPipelineCache, times(3)).getSystemIngestPipeline(any());
// only 1 time to cache the newly generated pipeline
verify(systemIngestPipelineCache, times(1)).cachePipeline(any(), any(), anyInt());
// verify
verify(dummySystemProcessor, times(1)).execute(any(), any());
assertTrue(failureHandler.isEmpty());
Expand Down
Loading