diff --git a/server/src/main/java/org/opensearch/index/IndexingPressureService.java b/server/src/main/java/org/opensearch/index/IndexingPressureService.java new file mode 100644 index 0000000000000..02079d7bad65d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/IndexingPressureService.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors. + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.index; + +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.stats.IndexingPressureStats; +import org.opensearch.index.stats.ShardIndexingPressureStats; + +/** + * Sets up classes for node/shard level indexing pressure. + * Provides abstraction and orchestration for indexing pressure interfaces when called from Transport Actions or for Stats. + */ +public class IndexingPressureService { + + private final ShardIndexingPressure shardIndexingPressure; + + public IndexingPressureService(Settings settings, ClusterService clusterService) { + shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + } + + public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { + if (isShardIndexingPressureEnabled()) { + return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution); + } else { + return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution); + } + } + + public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { + if (isShardIndexingPressureEnabled()) { + return shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, forceExecution); + } else { + return shardIndexingPressure.markPrimaryOperationStarted(bytes, forceExecution); + } + } + + public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId shardId, long bytes) { + if (isShardIndexingPressureEnabled()) { + return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, bytes); + } else { + return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(bytes); + } + } + + public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { + if (isShardIndexingPressureEnabled()) { + return shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, forceExecution); + } else { + return shardIndexingPressure.markReplicaOperationStarted(bytes, forceExecution); + } + } + + public IndexingPressureStats nodeStats() { + return shardIndexingPressure.stats(); + } + + public ShardIndexingPressureStats shardStats(CommonStatsFlags statsFlags) { + return shardIndexingPressure.shardStats(statsFlags); + } + + private boolean isShardIndexingPressureEnabled() { + return shardIndexingPressure.isShardIndexingPressureEnabled(); + } +} diff --git a/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java b/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java new file mode 100644 index 0000000000000..a96b0fda3d25e --- /dev/null +++ b/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.junit.Before; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.stats.IndexingPressurePerShardStats; +import org.opensearch.index.stats.IndexingPressureStats; +import org.opensearch.test.OpenSearchTestCase; + +public class IndexingPressureServiceTests extends OpenSearchTestCase { + + private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100) + .build(); + + private ClusterSettings clusterSettings; + private ClusterService clusterService; + + @Before + public void beforeTest() { + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterService = new ClusterService(settings, clusterSettings, null); + } + + public void testCoordinatingOperationForShardIndexingPressure() { + IndexingPressureService service = new IndexingPressureService(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + + Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false); + + IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + assertEquals(1024, shardStats.getCurrentCoordinatingBytes()); + releasable.close(); + } + + public void testCoordinatingOperationForIndexingPressure() { + IndexingPressureService service = new IndexingPressureService(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + Settings.Builder updated = Settings.builder(); + clusterSettings.updateDynamicSettings(Settings.builder() + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(), + Settings.builder().put(settings), updated, getTestClass().getName()); + clusterSettings.applySettings(updated.build()); + + Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false); + IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + assertNull(shardStats); + IndexingPressureStats nodeStats = service.nodeStats(); + assertEquals(1024, nodeStats.getCurrentCoordinatingBytes()); + releasable.close(); + } + + public void testPrimaryOperationForShardIndexingPressure() { + IndexingPressureService service = new IndexingPressureService(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + + Releasable releasable = service.markPrimaryOperationStarted(shardId, 1024, false); + + IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + assertEquals(1024, shardStats.getCurrentPrimaryBytes()); + releasable.close(); + } + + public void testPrimaryOperationForIndexingPressure() { + IndexingPressureService service = new IndexingPressureService(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + Settings.Builder updated = Settings.builder(); + clusterSettings.updateDynamicSettings(Settings.builder() + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(), + Settings.builder().put(settings), updated, getTestClass().getName()); + clusterSettings.applySettings(updated.build()); + + Releasable releasable = service.markPrimaryOperationStarted(shardId, 1024, false); + + IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + assertNull(shardStats); + IndexingPressureStats nodeStats = service.nodeStats(); + assertEquals(1024, nodeStats.getCurrentPrimaryBytes()); + releasable.close(); + } + + public void testLocalPrimaryOperationForShardIndexingPressure() { + IndexingPressureService service = new IndexingPressureService(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + + Releasable releasable = service.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 1024); + + IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + assertEquals(1024, shardStats.getCurrentPrimaryBytes()); + releasable.close(); + } + + public void testLocalPrimaryOperationForIndexingPressure() { + IndexingPressureService service = new IndexingPressureService(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + Settings.Builder updated = Settings.builder(); + clusterSettings.updateDynamicSettings(Settings.builder() + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(), + Settings.builder().put(settings), updated, getTestClass().getName()); + clusterSettings.applySettings(updated.build()); + + Releasable releasable = service.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 1024); + + IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + assertNull(shardStats); + IndexingPressureStats nodeStats = service.nodeStats(); + assertEquals(1024, nodeStats.getCurrentPrimaryBytes()); + releasable.close(); + } + + public void testReplicaOperationForShardIndexingPressure() { + IndexingPressureService service = new IndexingPressureService(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + + Releasable releasable = service.markReplicaOperationStarted(shardId, 1024, false); + + IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + assertEquals(1024, shardStats.getCurrentReplicaBytes()); + releasable.close(); + } + + public void testReplicaOperationForIndexingPressure() { + IndexingPressureService service = new IndexingPressureService(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + Settings.Builder updated = Settings.builder(); + clusterSettings.updateDynamicSettings(Settings.builder() + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(), + Settings.builder().put(settings), updated, getTestClass().getName()); + clusterSettings.applySettings(updated.build()); + + Releasable releasable = service.markReplicaOperationStarted(shardId, 1024, false); + + IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + assertNull(shardStats); + IndexingPressureStats nodeStats = service.nodeStats(); + assertEquals(1024, nodeStats.getCurrentReplicaBytes()); + releasable.close(); + } +}