Skip to content

Commit

Permalink
[Backport 2.x] Add allowlist setting for ingest-common processors (#1…
Browse files Browse the repository at this point in the history
…4561)

* Add allowlist setting for ingest-common processors (#14479)

Add a new static setting that lets an operator choose specific ingest
processors to enable by name. The behavior is as follows:

- If the allowlist setting is not defined, all installed processors are
  enabled. This is the status quo.
- If the allowlist setting is defined as the empty set, then all processors
  are disabled.
- If the allowlist setting contains the names of valid processors, only those
  processors are enabled.
- If the allowlist setting contains a name of a processor that does not exist,
  then the server will fail to start with an IllegalStateException
  listing which processors were defined in the allowlist but are not
  installed.
- If the allowlist setting is changed between server restarts then any
  ingest pipeline using a now-disabled processor will fail. This is the
  same experience if a pipeline used a processor defined by a plugin but
  then that plugin were to be uninstalled across restarts.

Related to #14439

Signed-off-by: Andrew Ross <andrross@amazon.com>
(cherry picked from commit a99b494)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Fix test issues due to class renaming on main

Signed-off-by: Andrew Ross <andrross@amazon.com>

---------

Signed-off-by: Andrew Ross <andrross@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
3 people authored Jun 26, 2024
1 parent f2d8e1d commit 64383dd
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))

### Dependencies
- Update to Apache Lucene 9.11.0 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,20 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin {

static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.common.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);

static final Setting<TimeValue> WATCHDOG_INTERVAL = Setting.timeSetting(
"ingest.grok.watchdog.interval",
TimeValue.timeValueSeconds(1),
Expand All @@ -77,7 +87,7 @@ public IngestCommonPlugin() {}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Map<String, Processor.Factory> processors = new HashMap<>();
final Map<String, Processor.Factory> processors = new HashMap<>();
processors.put(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService));
processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService));
processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.scriptService));
Expand Down Expand Up @@ -110,7 +120,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());
return Collections.unmodifiableMap(processors);
return filterForAllowlistSetting(parameters.env.settings(), processors);
}

@Override
Expand All @@ -133,7 +143,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME);
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME, PROCESSORS_ALLOWLIST_SETTING);
}

private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
Expand All @@ -147,4 +157,27 @@ private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters par
);
}

private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
+ unknownAllowlistProcessors
+ " were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.common.settings.Settings;
import org.opensearch.env.TestEnvironment;
import org.opensearch.ingest.Processor;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Set;

public class IngestCommonPluginTests extends OpenSearchTestCase {

public void testAllowlist() throws IOException {
runAllowlistTest(List.of());
runAllowlistTest(List.of("date"));
runAllowlistTest(List.of("set"));
runAllowlistTest(List.of("copy", "date"));
runAllowlistTest(List.of("date", "set", "copy"));
}

private void runAllowlistTest(List<String> allowlist) throws IOException {
final Settings settings = Settings.builder().putList(IngestCommonPlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowlist).build();
try (IngestCommonPlugin plugin = new IngestCommonPlugin()) {
assertEquals(Set.copyOf(allowlist), plugin.getProcessors(createParameters(settings)).keySet());
}
}

public void testAllowlistNotSpecified() throws IOException {
final Settings.Builder builder = Settings.builder();
builder.remove(IngestCommonPlugin.PROCESSORS_ALLOWLIST_SETTING.getKey());
final Settings settings = builder.build();
try (IngestCommonPlugin plugin = new IngestCommonPlugin()) {
final Set<String> expected = Set.of(
"append",
"urldecode",
"sort",
"fail",
"trim",
"set",
"fingerprint",
"pipeline",
"json",
"join",
"kv",
"bytes",
"date",
"drop",
"community_id",
"lowercase",
"convert",
"copy",
"gsub",
"dot_expander",
"rename",
"remove_by_pattern",
"html_strip",
"remove",
"csv",
"grok",
"date_index_name",
"foreach",
"script",
"dissect",
"uppercase",
"split"
);
assertEquals(expected, plugin.getProcessors(createParameters(settings)).keySet());
}
}

public void testAllowlistHasNonexistentProcessors() throws IOException {
final Settings settings = Settings.builder()
.putList(IngestCommonPlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), List.of("threeve"))
.build();
try (IngestCommonPlugin plugin = new IngestCommonPlugin()) {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> plugin.getProcessors(createParameters(settings))
);
assertTrue(e.getMessage(), e.getMessage().contains("threeve"));
}
}

private static Processor.Parameters createParameters(Settings settings) {
return new Processor.Parameters(
TestEnvironment.newEnvironment(Settings.builder().put(settings).put("path.home", "").build()),
null,
null,
null,
() -> 0L,
(a, b) -> null,
null,
null,
$ -> {},
null
);
}
}

0 comments on commit 64383dd

Please sign in to comment.