Skip to content

Commit

Permalink
Add limit on number of processors in Ingest pipelines (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#15465)

* Add limit on number of processors in Ingest pipelines

Signed-off-by: Rai <nndri@amazon.com>
  • Loading branch information
anandkrrai authored Sep 3, 2024
1 parent cfcfe21 commit 41ba00a
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)).
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.ingest.CompoundProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.IngestService;
import org.opensearch.ingest.Pipeline;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -56,9 +57,11 @@ class SimulateExecutionService {
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;

private final ThreadPool threadPool;
private final IngestService ingestService;

SimulateExecutionService(ThreadPool threadPool) {
SimulateExecutionService(ThreadPool threadPool, IngestService ingestService) {
this.threadPool = threadPool;
this.ingestService = ingestService;
}

void executeDocument(
Expand Down Expand Up @@ -91,6 +94,9 @@ void executeDocument(
}

public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {

ingestService.validateProcessorCountForIngestPipeline(request.getPipeline());

threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
final AtomicInteger counter = new AtomicInteger();
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public SimulatePipelineTransportAction(
(Writeable.Reader<SimulatePipelineRequest>) SimulatePipelineRequest::new
);
this.ingestService = ingestService;
this.executionService = new SimulateExecutionService(threadPool);
this.executionService = new SimulateExecutionService(threadPool, ingestService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
Expand Down Expand Up @@ -406,6 +407,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterService.USER_DEFINED_METADATA,
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down
37 changes: 37 additions & 0 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.metrics.OperationMetrics;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -107,6 +108,18 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge

public static final String INGEST_ORIGIN = "ingest";

/**
* Defines the limit for the number of processors which can run on a given document during ingestion.
*/
public static final Setting<Integer> MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting(
"cluster.ingest.max_number_processors",
Integer.MAX_VALUE,
1,
Integer.MAX_VALUE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private static final Logger logger = LogManager.getLogger(IngestService.class);

private final ClusterService clusterService;
Expand All @@ -123,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
private final ClusterManagerTaskThrottler.ThrottlingKey putPipelineTaskKey;
private final ClusterManagerTaskThrottler.ThrottlingKey deletePipelineTaskKey;
private volatile ClusterState state;
private volatile int maxIngestProcessorCount;

public IngestService(
ClusterService clusterService,
Expand Down Expand Up @@ -156,6 +170,12 @@ public IngestService(
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_PIPELINE_KEY, true);
deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_PIPELINE_KEY, true);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_NUMBER_OF_INGEST_PROCESSORS, this::setMaxIngestProcessorCount);
setMaxIngestProcessorCount(clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS));
}

private void setMaxIngestProcessorCount(Integer maxIngestProcessorCount) {
this.maxIngestProcessorCount = maxIngestProcessorCount;
}

private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
Expand Down Expand Up @@ -494,6 +514,9 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2();
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);

validateProcessorCountForIngestPipeline(pipeline);

List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
Expand All @@ -507,6 +530,20 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

public void validateProcessorCountForIngestPipeline(Pipeline pipeline) {
List<Processor> processors = pipeline.flattenAllProcessors();

if (processors.size() > maxIngestProcessorCount) {
throw new IllegalStateException(
"Cannot use more than the maximum processors allowed. Number of processors being configured is ["
+ processors.size()
+ "] which exceeds the maximum allowed configuration of ["
+ maxIngestProcessorCount
+ "] processors."
);
}
}

public void executeBulkRequest(
int numberOfActionRequests,
Iterable<DocWriteRequest<?>> actionRequests,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.ingest.DropProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.IngestProcessorException;
import org.opensearch.ingest.IngestService;
import org.opensearch.ingest.Pipeline;
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.RandomDocumentPicks;
Expand Down Expand Up @@ -67,6 +68,8 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;

public class SimulateExecutionServiceTests extends OpenSearchTestCase {

Expand All @@ -75,11 +78,13 @@ public class SimulateExecutionServiceTests extends OpenSearchTestCase {
private TestThreadPool threadPool;
private SimulateExecutionService executionService;
private IngestDocument ingestDocument;
private IngestService ingestService;

@Before
public void setup() {
ingestService = mock(IngestService.class);
threadPool = new TestThreadPool(SimulateExecutionServiceTests.class.getSimpleName());
executionService = new SimulateExecutionService(threadPool);
executionService = new SimulateExecutionService(threadPool, ingestService);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
}

Expand Down Expand Up @@ -400,6 +405,22 @@ public String getType() {
}
}

public void testValidateProcessorCountForIngestPipelineThrowsException() {

int numDocs = randomIntBetween(1, 64);
List<IngestDocument> documents = new ArrayList<>(numDocs);
for (int id = 0; id < numDocs; id++) {
documents.add(new IngestDocument("_index", Integer.toString(id), null, 0L, VersionType.INTERNAL, new HashMap<>()));
}

Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor());
SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false);

doThrow(new IllegalStateException()).when(ingestService).validateProcessorCountForIngestPipeline(pipeline);

expectThrows(IllegalStateException.class, () -> executionService.execute(request, ActionListener.wrap(response -> {}, e -> {})));
}

private static void assertVerboseResult(
SimulateProcessorResult result,
String expectedPipelineId,
Expand Down
Loading

0 comments on commit 41ba00a

Please sign in to comment.