diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d1650c8341a7..be5e5598b09c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix constraint bug which allows more primary shards than average primary shards per index ([#14908](https://github.com/opensearch-project/OpenSearch/pull/14908)) +- Fix NPE when bulk ingest with empty pipeline ([#15033](https://github.com/opensearch-project/OpenSearch/pull/15033)) - Fix missing value of FieldSort for unsigned_long ([#14963](https://github.com/opensearch-project/OpenSearch/pull/14963)) - Fix delete index template failed when the index template matches a data stream but is unused ([#15080](https://github.com/opensearch-project/OpenSearch/pull/15080)) diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 17eb23422e68b..938ca7493926e 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -997,7 +997,7 @@ private void innerBatchExecute( Consumer> handler ) { if (pipeline.getProcessors().isEmpty()) { - handler.accept(null); + handler.accept(toIngestDocumentWrappers(slots, indexRequests)); return; } @@ -1271,6 +1271,14 @@ private static IngestDocumentWrapper toIngestDocumentWrapper(int slot, IndexRequ return new IngestDocumentWrapper(slot, toIngestDocument(indexRequest), null); } + private static List toIngestDocumentWrappers(List slots, List indexRequests) { + List ingestDocumentWrappers = new ArrayList<>(); + for (int i = 0; i < slots.size(); ++i) { + ingestDocumentWrappers.add(toIngestDocumentWrapper(slots.get(i), indexRequests.get(i))); + } + return ingestDocumentWrappers; + } + private static Map createSlotIndexRequestMap(List slots, List indexRequests) { Map slotIndexRequestMap = new HashMap<>(); for (int i = 0; i < slots.size(); ++i) { diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 166b94966196c..1f4b1d635d438 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -1995,6 +1995,42 @@ public void testExecuteBulkRequestInBatchWithDefaultBatchSize() { verify(mockCompoundProcessor, never()).execute(any(), any()); } + public void testExecuteEmptyPipelineInBatch() throws Exception { + IngestService ingestService = createWithProcessors(emptyMap()); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), + MediaTypeRegistry.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest3); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest4); + bulkRequest.batchSize(4); + final Map failureHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); + ingestService.executeBulkRequest( + 4, + bulkRequest.requests(), + failureHandler::put, + completionHandler::put, + indexReq -> {}, + Names.WRITE, + bulkRequest + ); + assertTrue(failureHandler.isEmpty()); + assertEquals(Set.of(Thread.currentThread()), completionHandler.keySet()); + } + public void testPrepareBatches_same_index_pipeline() { IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));