Skip to content

Commit 757e15a

Browse files
bzhangamtandonks
authored andcommitted
Pass index settings to system ingest processor factories. (opensearch-project#18708)
Signed-off-by: Bo Zhang <bzhangam@amazon.com>
1 parent 806dda9 commit 757e15a

File tree

8 files changed

+177
-5
lines changed

8 files changed

+177
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2323
- Add support for non-timing info in profiler ([#18460](https://github.com/opensearch-project/OpenSearch/issues/18460))
2424
- Extend Approximation Framework to other numeric types ([#18530](https://github.com/opensearch-project/OpenSearch/issues/18530))
2525
- Add Semantic Version field type mapper and extensive unit tests([#18454](https://github.com/opensearch-project/OpenSearch/pull/18454))
26+
- Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708))
2627

2728
### Changed
2829
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

plugins/examples/system-ingest-processor/src/main/java/org/opensearch/example/systemingestprocessor/ExampleSystemIngestProcessorFactory.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,20 @@
88

99
package org.opensearch.example.systemingestprocessor;
1010

11+
import org.opensearch.common.settings.Settings;
1112
import org.opensearch.ingest.AbstractBatchingSystemProcessor;
1213

1314
import java.util.ArrayList;
15+
import java.util.Collection;
1416
import java.util.HashMap;
1517
import java.util.List;
1618
import java.util.Map;
1719

20+
import static org.opensearch.example.systemingestprocessor.ExampleSystemIngestProcessorPlugin.TRIGGER_SETTING;
1821
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_MAPPINGS;
22+
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_SETTINGS;
1923
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_TEMPLATE_MAPPINGS;
24+
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_TEMPLATE_SETTINGS;
2025

2126
/**
2227
* A factory to create the example system ingest processor
@@ -55,17 +60,26 @@ protected ExampleSystemIngestProcessorFactory() {
5560
@Override
5661
protected AbstractBatchingSystemProcessor newProcessor(String tag, String description, Map<String, Object> config) {
5762
final List<Map<String, Object>> mappings = new ArrayList<>();
63+
final List<Settings> settings = new ArrayList<>();
5864
final Object mappingFromIndex = config.get(INDEX_MAPPINGS);
5965
final Object mappingFromTemplates = config.get(INDEX_TEMPLATE_MAPPINGS);
66+
final Object settingsFromIndex = config.get(INDEX_SETTINGS);
67+
final Object settingsFromTemplates = config.get(INDEX_TEMPLATE_SETTINGS);
6068
if (mappingFromTemplates instanceof List) {
6169
mappings.addAll((List<Map<String, Object>>) mappingFromTemplates);
6270
}
6371
if (mappingFromIndex instanceof Map) {
6472
mappings.add((Map<String, Object>) mappingFromIndex);
6573
}
74+
if (settingsFromTemplates instanceof List) {
75+
settings.addAll((Collection<? extends Settings>) settingsFromTemplates);
76+
}
77+
if (settingsFromIndex instanceof Settings) {
78+
settings.add((Settings) settingsFromIndex);
79+
}
6680

6781
// If no config we are not able to create a processor so simply return a null to show no processor created
68-
if (mappings.isEmpty()) {
82+
if (mappings.isEmpty() && settings.isEmpty()) {
6983
return null;
7084
}
7185

@@ -87,6 +101,15 @@ protected AbstractBatchingSystemProcessor newProcessor(String tag, String descri
87101
}
88102
}
89103

104+
// If the trigger setting is configured then use it directly.
105+
// When we rely on the v1 template to create the index there can be multiple settings and the later one can
106+
// override the previous one so we need to loop through all the settings.
107+
for (final Settings setting : settings) {
108+
if (setting.hasValue(TRIGGER_SETTING.getKey())) {
109+
isTriggerFieldFound = TRIGGER_SETTING.get(setting);
110+
}
111+
}
112+
90113
return isTriggerFieldFound ? new ExampleSystemIngestProcessor(tag, description, DEFAULT_BATCH_SIZE) : null;
91114
}
92115

plugins/examples/system-ingest-processor/src/main/java/org/opensearch/example/systemingestprocessor/ExampleSystemIngestProcessorPlugin.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88

99
package org.opensearch.example.systemingestprocessor;
1010

11+
import org.opensearch.common.settings.Setting;
1112
import org.opensearch.ingest.Processor;
1213
import org.opensearch.plugins.IngestPlugin;
1314
import org.opensearch.plugins.Plugin;
1415

16+
import java.util.List;
1517
import java.util.Map;
1618

1719
/**
@@ -23,8 +25,23 @@ public class ExampleSystemIngestProcessorPlugin extends Plugin implements Ingest
2325
*/
2426
public ExampleSystemIngestProcessorPlugin() {}
2527

28+
/**
29+
* A custom index setting which is used to control if we should create the example system ingest processor.
30+
*/
31+
public static final Setting<Boolean> TRIGGER_SETTING = Setting.boolSetting(
32+
"index.example_system_ingest_processor_plugin.trigger_setting",
33+
false,
34+
Setting.Property.IndexScope,
35+
Setting.Property.Dynamic
36+
);
37+
2638
@Override
2739
public Map<String, Processor.Factory> getSystemIngestProcessors(Processor.Parameters parameters) {
2840
return Map.of(ExampleSystemIngestProcessorFactory.TYPE, new ExampleSystemIngestProcessorFactory());
2941
}
42+
43+
@Override
44+
public List<Setting<?>> getSettings() {
45+
return List.of(TRIGGER_SETTING);
46+
}
3047
}

plugins/examples/system-ingest-processor/src/test/java/org/opensearch/example/systemingestprocessor/ExampleSystemIngestProcessorFactoryTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.example.systemingestprocessor;
1010

11+
import org.opensearch.common.settings.Settings;
1112
import org.opensearch.ingest.AbstractBatchingSystemProcessor;
1213
import org.opensearch.test.OpenSearchTestCase;
1314

@@ -18,8 +19,11 @@
1819
import static org.opensearch.example.systemingestprocessor.ExampleSystemIngestProcessorFactory.DOC;
1920
import static org.opensearch.example.systemingestprocessor.ExampleSystemIngestProcessorFactory.PROPERTIES;
2021
import static org.opensearch.example.systemingestprocessor.ExampleSystemIngestProcessorFactory.TRIGGER_FIELD_NAME;
22+
import static org.opensearch.example.systemingestprocessor.ExampleSystemIngestProcessorPlugin.TRIGGER_SETTING;
2123
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_MAPPINGS;
24+
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_SETTINGS;
2225
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_TEMPLATE_MAPPINGS;
26+
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_TEMPLATE_SETTINGS;
2327

2428
public class ExampleSystemIngestProcessorFactoryTests extends OpenSearchTestCase {
2529
public void testNewProcessor_whenWithTriggerField_thenReturnProcessor() {
@@ -55,4 +59,37 @@ public void testNewProcessor_whenNoMapping_thenReturnNull() {
5559

5660
assertNull("Should not create an example system ingest processor when the mapping is not found.", processor);
5761
}
62+
63+
public void testNewProcessor_whenWithTriggerSettingFromIndex_thenReturnProcessor() {
64+
final ExampleSystemIngestProcessorFactory factory = new ExampleSystemIngestProcessorFactory();
65+
Settings triggerEnabled = Settings.builder().put(TRIGGER_SETTING.getKey(), true).build();
66+
67+
AbstractBatchingSystemProcessor processor = factory.newProcessor("tag", "description", Map.of(INDEX_SETTINGS, triggerEnabled));
68+
69+
assertNotNull("Should create an example system ingest processor when the trigger_setting is true.", processor);
70+
assertTrue(processor instanceof ExampleSystemIngestProcessor);
71+
}
72+
73+
public void testNewProcessor_whenWithTriggerSettingFromTemplate_thenReturnProcessor() {
74+
final ExampleSystemIngestProcessorFactory factory = new ExampleSystemIngestProcessorFactory();
75+
Settings triggerEnabled = Settings.builder().put(TRIGGER_SETTING.getKey(), true).build();
76+
77+
AbstractBatchingSystemProcessor processor = factory.newProcessor(
78+
"tag",
79+
"description",
80+
Map.of(INDEX_TEMPLATE_SETTINGS, List.of(triggerEnabled))
81+
);
82+
83+
assertNotNull("Should create an example system ingest processor when the trigger_setting is true.", processor);
84+
assertTrue(processor instanceof ExampleSystemIngestProcessor);
85+
}
86+
87+
public void testNewProcessor_whenWithTriggerSettingDisabled_thenReturnProcessor() {
88+
final ExampleSystemIngestProcessorFactory factory = new ExampleSystemIngestProcessorFactory();
89+
Settings triggerDisabled = Settings.builder().put(TRIGGER_SETTING.getKey(), false).build();
90+
91+
AbstractBatchingSystemProcessor processor = factory.newProcessor("tag", "description", Map.of(INDEX_SETTINGS, triggerDisabled));
92+
93+
assertNull("Should not create an example system ingest processor when the trigger_setting is false.", processor);
94+
}
5895
}

plugins/examples/system-ingest-processor/src/test/java/org/opensearch/example/systemingestprocessor/ExampleSystemIngestProcessorPluginTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import org.opensearch.ingest.Processor;
1212
import org.opensearch.test.OpenSearchTestCase;
1313

14+
import java.util.List;
1415
import java.util.Map;
1516

17+
import static org.opensearch.example.systemingestprocessor.ExampleSystemIngestProcessorPlugin.TRIGGER_SETTING;
1618
import static org.mockito.Mockito.mock;
1719

1820
public class ExampleSystemIngestProcessorPluginTests extends OpenSearchTestCase {
@@ -27,4 +29,8 @@ public void testGetSystemIngestProcessors() {
2729
factories.get(ExampleSystemIngestProcessorFactory.TYPE) instanceof ExampleSystemIngestProcessorFactory
2830
);
2931
}
32+
33+
public void testGetSettings() {
34+
assertEquals(List.of(TRIGGER_SETTING), plugin.getSettings());
35+
}
3036
}

plugins/examples/system-ingest-processor/src/yamlRestTest/resources/rest-api-spec/test/example-system-ingest-processor/20_system_ingest_processor.yml

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,28 @@ teardown:
2929
- match:
3030
_source.field_auto_added_by_system_ingest_processor: "This field is auto added by the example system ingest processor."
3131

32+
---
33+
"Processor injects a field on indexing a doc to existing index when trigger_setting is true":
34+
- do:
35+
indices.create:
36+
index: test-index
37+
body:
38+
settings:
39+
index.example_system_ingest_processor_plugin.trigger_setting: true
40+
- do:
41+
index:
42+
index: test-index
43+
id: 1
44+
body:
45+
system_ingest_processor_trigger_field: "dummy value"
46+
refresh: true
47+
- do:
48+
get:
49+
index: test-index
50+
id: 1
51+
- match:
52+
_source.field_auto_added_by_system_ingest_processor: "This field is auto added by the example system ingest processor."
53+
3254
---
3355
"Processor should not inject a field on indexing a doc to existing index when trigger field is not defined in the index mapping":
3456
- do:
@@ -56,6 +78,7 @@ teardown:
5678
- skip:
5779
features: allowed_warnings
5880
- do:
81+
# test v1 template
5982
indices.put_template:
6083
name: example-template
6184
body:
@@ -79,7 +102,41 @@ teardown:
79102
id: 1
80103
- match:
81104
_source.field_auto_added_by_system_ingest_processor: "This field is auto added by the example system ingest processor."
105+
- do:
106+
indices.delete_template:
107+
name: example-template
108+
---
82109

110+
"Processor injects field when index is created from matching template where trigger_setting is true":
111+
- skip:
112+
features: allowed_warnings
113+
- do:
114+
# test v2 template
115+
indices.put_index_template:
116+
name: example-template
117+
body:
118+
index_patterns: ["template-*"]
119+
template:
120+
settings:
121+
index.example_system_ingest_processor_plugin.trigger_setting: true
122+
- do:
123+
allowed_warnings:
124+
- "index [template-index-1] matches multiple legacy templates [example-template, global], composable templates will only match a single template"
125+
index:
126+
index: template-index-1
127+
id: 1
128+
body:
129+
system_ingest_processor_trigger_field: "dummy value"
130+
refresh: true
131+
- do:
132+
get:
133+
index: template-index-1
134+
id: 1
135+
- match:
136+
_source.field_auto_added_by_system_ingest_processor: "This field is auto added by the example system ingest processor."
137+
- do:
138+
indices.delete_index_template:
139+
name: example-template
83140
---
84141
"Processor injects field on bulk indexing to existing index":
85142
- do:
@@ -156,7 +213,9 @@ teardown:
156213
id: 2
157214
- match:
158215
_source.field_auto_added_by_system_ingest_processor: "This field is auto added by the example system ingest processor."
159-
216+
- do:
217+
indices.delete_template:
218+
name: bulk-template
160219
---
161220
"Processor injects field on bulk update, upsert on existing/new documents":
162221
# Temporarily disable system ingest pipelines to insert without triggering the system ingest field

server/src/main/java/org/opensearch/ingest/IngestService.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@
107107
import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_PIPELINE;
108108
import static org.opensearch.cluster.service.ClusterManagerTask.PUT_PIPELINE;
109109
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_MAPPINGS;
110+
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_SETTINGS;
110111
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_TEMPLATE_MAPPINGS;
112+
import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_TEMPLATE_SETTINGS;
111113

112114
/**
113115
* Holder class for several ingest related services.
@@ -272,7 +274,8 @@ public boolean resolveSystemIngestPipeline(
272274
// precedence), or if a V2 template does not match, any V1 templates
273275
String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false);
274276
if (v2Template != null) {
275-
systemIngestPipelineId = getSystemIngestPipelineForTemplateV2(v2Template, indexRequest);
277+
Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template);
278+
systemIngestPipelineId = getSystemIngestPipelineForTemplateV2(v2Template, indexRequest, settings);
276279
} else {
277280
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(
278281
metadata,
@@ -344,7 +347,7 @@ public boolean resolvePipelines(final DocWriteRequest<?> originalRequest, final
344347
}
345348

346349
if (this.isSystemIngestPipelineEnabled) {
347-
systemIngestPipelineId = getSystemIngestPipelineForTemplateV2(v2Template, indexRequest);
350+
systemIngestPipelineId = getSystemIngestPipelineForTemplateV2(v2Template, indexRequest, settings);
348351
}
349352
} else {
350353
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(
@@ -433,9 +436,13 @@ private String getSystemIngestPipelineForTemplateV1(
433436
final String indexId = createIndexIdWithTemplateSuffix(indexRequest.index());
434437
Pipeline ingestPipeline = systemIngestPipelineCache.getSystemIngestPipeline(indexId);
435438
if (ingestPipeline == null) {
439+
final List<Settings> settingsList = new ArrayList<>();
436440
final List<Map<String, Object>> mappingsMap = new ArrayList<>();
437441
final Map<String, Object> pipelineConfig = new HashMap<>();
438442
for (final IndexTemplateMetadata template : templates) {
443+
if (template.settings() != null) {
444+
settingsList.add(template.settings());
445+
}
439446
if (template.mappings() != null) {
440447
try {
441448
mappingsMap.add(MapperService.parseMapping(xContentRegistry, template.mappings().string()));
@@ -453,6 +460,7 @@ private String getSystemIngestPipelineForTemplateV1(
453460
}
454461

455462
pipelineConfig.put(INDEX_TEMPLATE_MAPPINGS, mappingsMap);
463+
pipelineConfig.put(INDEX_TEMPLATE_SETTINGS, settingsList);
456464
ingestPipeline = createSystemIngestPipeline(indexId, pipelineConfig);
457465
}
458466

@@ -461,7 +469,11 @@ private String getSystemIngestPipelineForTemplateV1(
461469
return ingestPipeline.getProcessors().isEmpty() ? null : indexId;
462470
}
463471

464-
private String getSystemIngestPipelineForTemplateV2(@NonNull final String templateName, @NonNull final IndexRequest indexRequest) {
472+
private String getSystemIngestPipelineForTemplateV2(
473+
@NonNull final String templateName,
474+
@NonNull final IndexRequest indexRequest,
475+
final Settings settings
476+
) {
465477
// Here we cache it with index name + template as the suffix since currently we don't have the uuid.
466478
// We need to cache it so that later during execution we can find it by indexId to reuse it.
467479
final String indexId = createIndexIdWithTemplateSuffix(indexRequest.index());
@@ -491,6 +503,7 @@ private String getSystemIngestPipelineForTemplateV2(@NonNull final String templa
491503
}
492504

493505
pipelineConfig.put(INDEX_TEMPLATE_MAPPINGS, mappingsMap);
506+
pipelineConfig.put(INDEX_TEMPLATE_SETTINGS, settings == null ? Collections.emptyList() : List.of(settings));
494507
ingestPipeline = createSystemIngestPipeline(indexId, pipelineConfig);
495508
}
496509

@@ -515,10 +528,14 @@ private String getSystemIngestPipelineForExistingIndex(@NonNull final IndexMetad
515528
if (ingestPipeline == null) {
516529
// no cache we will try to resolve the ingest pipeline based on the index configuration
517530
final MappingMetadata mappingMetadata = indexMetadata.mapping();
531+
final Settings settings = indexMetadata.getSettings();
518532
final Map<String, Object> pipelineConfig = new HashMap<>();
519533
if (mappingMetadata != null) {
520534
pipelineConfig.put(INDEX_MAPPINGS, mappingMetadata.getSourceAsMap());
521535
}
536+
if (settings != null) {
537+
pipelineConfig.put(INDEX_SETTINGS, settings);
538+
}
522539
ingestPipeline = createSystemIngestPipeline(indexId, pipelineConfig);
523540
}
524541
// we can get an empty pipeline from the cache

server/src/main/java/org/opensearch/plugins/IngestPlugin.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,5 +109,17 @@ class SystemIngestPipelineConfigKeys {
109109
* ]
110110
*/
111111
public static final String INDEX_TEMPLATE_MAPPINGS = "index_template_mappings";
112+
113+
/**
114+
* Use this key to access the settings{@link org.opensearch.common.settings.Settings} of the index from the config.
115+
*/
116+
public static final String INDEX_SETTINGS = "index_settings";
117+
118+
/**
119+
* Use this key to access the settings{@link org.opensearch.common.settings.Settings} of the matched templates
120+
* of the index from the config. If there are multiple matched templates the later one can override the setting of the previous one if merge
121+
* rules are allowed. So this will be a list of settings.
122+
*/
123+
public static final String INDEX_TEMPLATE_SETTINGS = "index_template_settings";
112124
}
113125
}

0 commit comments

Comments
 (0)