Skip to content

Commit

Permalink
Add allowlist setting for ingest processors
Browse files Browse the repository at this point in the history
Add a new static setting that lets an operator choose specific ingest
processors to enable by name. The behavior is as follows:

- If the allowlist setting is not defined, all installed processors are
  enabled. This is the status quo.
- If the allowlist setting is defined as the empty set, then all processors
  are disabled.
- If the allowlist setting contains the names of valid processors, only those
  processors are enabled.
- If the allowlist setting contains a name of a processor that does not exist,
  then the server will fail to start with an IllegalStateException
  listing which processors were defined in the allowlist but are not
  installed.
- If the allowlist setting is changed between server restarts then any
  ingest pipeline using a now-disabled processor will fail. This is the
  same experience if a pipeline used a processor defined by a plugin but
  then that plugin were to be uninstalled across restarts.

Related to opensearch-project#14439

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Jun 20, 2024
1 parent d2c08b3 commit 48f2e3a
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
Expand Down Expand Up @@ -754,7 +755,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,
IngestService.PROCESSORS_ALLOWLIST_SETTING
)
)
);
Expand Down
50 changes: 45 additions & 5 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.metrics.OperationMetrics;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -93,6 +95,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;

Expand All @@ -109,6 +112,13 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge

private static final Logger logger = LogManager.getLogger(IngestService.class);

public static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.processors.allowlist",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);

private final ClusterService clusterService;
private final ScriptService scriptService;
private final Map<String, Processor.Factory> processorFactories;
Expand Down Expand Up @@ -136,6 +146,12 @@ public IngestService(
) {
this.clusterService = clusterService;
this.scriptService = scriptService;
final Set<String> allowlist;
if (PROCESSORS_ALLOWLIST_SETTING.exists(clusterService.getSettings())) {
allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(clusterService.getSettings()));
} else {
allowlist = null;
}
this.processorFactories = processorFactories(
ingestPlugins,
new Processor.Parameters(
Expand All @@ -149,7 +165,8 @@ public IngestService(
client,
threadPool.generic()::execute,
indicesService
)
),
allowlist
);
this.threadPool = threadPool;

Expand All @@ -158,17 +175,40 @@ public IngestService(
deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_PIPELINE_KEY, true);
}

private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
private static Map<String, Processor.Factory> processorFactories(
List<IngestPlugin> ingestPlugins,
Processor.Parameters parameters,
@Nullable Set<String> allowlist
) {
Set<String> unknownAllowlistProcessors = (allowlist == null) ? new HashSet<>() : new HashSet<>(allowlist);
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (IngestPlugin ingestPlugin : ingestPlugins) {
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(parameters);
unknownAllowlistProcessors.removeAll(newProcessors.keySet());
for (Map.Entry<String, Processor.Factory> entry : newProcessors.entrySet()) {
if (processorFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered");
if (allowlist == null || allowlist.contains(entry.getKey())) {
if (processorFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered");
}
} else {
logger.info(
"Not registering ingest processor [{}] because it is not allowed per the [{}] setting",
entry.getKey(),
PROCESSORS_ALLOWLIST_SETTING.getKey()
);
}
}
}
return Collections.unmodifiableMap(processorFactories);
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) ["
+ unknownAllowlistProcessors
+ "] were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return Map.copyOf(processorFactories);
}

public static boolean resolvePipelines(
Expand Down
Loading

0 comments on commit 48f2e3a

Please sign in to comment.