Skip to content

Commit

Permalink
Fix bulk upsert ignores the default_pipeline and final_pipeline when …
Browse files Browse the repository at this point in the history
…auto-created index matches with the index template

Signed-off-by: Gao Binlong <gbinlong@amazon.com>
  • Loading branch information
gaobinlong committed Mar 25, 2024
1 parent 5b4b4aa commit e263910
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches with the index template

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ teardown:
ingest.delete_pipeline:
id: "pipeline2"
ignore: 404
- do:
indices.delete_index_template:
name: test_index_template_for_bulk
ignore: 404

---
"Test bulk request without default pipeline":
Expand Down Expand Up @@ -144,3 +148,36 @@ teardown:

- is_false: _source.field1
- match: {_source.field2: value2}

---
"Test bulk upsert honors default_pipeline and final_pipeline when the auto-created index matches with the index template":
- skip:
version: " - 2.99.99"
reason: "fixed in 3.0.0"
- do:
indices.put_index_template:
name: test_for_bulk_upsert_index_template
body:
index_patterns: test_bulk_upsert_*
template:
settings:
number_of_shards: 1
number_of_replicas: 0
default_pipeline: pipeline1
final_pipeline: pipeline2

- do:
bulk:
refresh: true
body:
- '{"update": {"_index": "test_bulk_upsert_index", "_id": "test_id3"}}'
- '{"upsert": {"f1": "v2", "f2": 47}, "doc": {"x": 1}}'

- match: { errors: false }
- match: { items.0.update.result: created }

- do:
get:
index: test_bulk_upsert_index
id: test_id3
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1", "field2": "value2"}}
17 changes: 10 additions & 7 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,18 @@ public static boolean resolvePipelines(
finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings);
indexRequest.setFinalPipeline(finalPipeline);
}
} else if (indexRequest.index() != null) {
} else if (indexRequest.index() != null || originalRequest != null && originalRequest.index() != null) {
// the index does not exist yet (and this is a valid request), so match index
// templates to look for pipelines in either a matching V2 template (which takes
// precedence), or if a V2 template does not match, any V1 templates
String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false);
String indexName;
if (indexRequest.index() != null) {
indexName = indexRequest.index();
} else {
assert originalRequest != null;
indexName = originalRequest.index();
}
String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexName, false);
if (v2Template != null) {
Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template);
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
Expand All @@ -229,11 +236,7 @@ public static boolean resolvePipelines(
indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : NOOP_PIPELINE_NAME);
indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : NOOP_PIPELINE_NAME);
} else {
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(
metadata,
indexRequest.index(),
null
);
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(metadata, indexName, null);
// order of templates are highest order first
for (final IndexTemplateMetadata template : templates) {
final Settings settings = template.settings();
Expand Down
16 changes: 16 additions & 0 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,14 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() {
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("default-pipeline"));

// index name matches with ITMD within bulk upsert:
IndexRequest upsertRequest = new IndexRequest().source(emptyMap());
UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(upsertRequest).script(mockScript("1"));
result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata);
assertThat(result, is(true));
assertThat(upsertRequest.isPipelineResolved(), is(true));
assertThat(upsertRequest.getPipeline(), equalTo("default-pipeline"));
}

public void testResolveFinalPipeline() {
Expand Down Expand Up @@ -1619,6 +1627,14 @@ public void testResolveFinalPipeline() {
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("_none"));
assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));

// index name matches with ITMD within bulk upsert:
IndexRequest upsertRequest = new IndexRequest().source(emptyMap());
UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(upsertRequest).script(mockScript("1"));
result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata);
assertThat(result, is(true));
assertThat(upsertRequest.isPipelineResolved(), is(true));
assertThat(upsertRequest.getFinalPipeline(), equalTo("final-pipeline"));
}

public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
Expand Down

0 comments on commit e263910

Please sign in to comment.