diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java new file mode 100644 index 0000000000000..b096107e0a327 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.XContentTestUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; + +import static org.elasticsearch.rest.RestStatus.CREATED; +import static org.elasticsearch.rest.RestStatus.OK; +import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +/** + * Test Indexing Pressure Metrics and Statistics + */ +@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) +public class IndexingPressureRestIT extends HttpSmokeTestCase { + + private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build(); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB") + .put(unboundedWriteQueue) + .build(); + } + + @SuppressWarnings("unchecked") + public void testIndexingPressureStats() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " + + "\"write.wait_for_active_shards\": 2}}}"); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request successfulIndexingRequest = new Request("POST", "/index_name/_doc/"); + successfulIndexingRequest.setJsonEntity("{\"x\": \"small text\"}"); + final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(CREATED.getStatus())); + + Request getNodeStats = new Request("GET", "/_nodes/stats/indexing_pressure"); + final Response nodeStats = getRestClient().performRequest(getNodeStats); + Map nodeStatsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats.getEntity().getContent(), true); + ArrayList values = new ArrayList<>(((Map) nodeStatsMap.get("nodes")).values()); + assertThat(values.size(), equalTo(2)); + XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map) values.get(0)); + Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes"); + Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes"); + Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map) values.get(1)); + Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes"); + Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes"); + Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + + if (node1IndexingBytes == 0) { + assertThat(node2IndexingBytes, greaterThan(0)); + assertThat(node2IndexingBytes, lessThan(1024)); + } else { + assertThat(node1IndexingBytes, greaterThan(0)); + assertThat(node1IndexingBytes, lessThan(1024)); + } + + if (node1ReplicaBytes == 0) { + assertThat(node2ReplicaBytes, greaterThan(0)); + assertThat(node2ReplicaBytes, lessThan(1024)); + } else { + assertThat(node2ReplicaBytes, equalTo(0)); + assertThat(node1ReplicaBytes, lessThan(1024)); + } + + assertThat(node1Rejections, equalTo(0)); + assertThat(node2Rejections, equalTo(0)); + + Request failedIndexingRequest = new Request("POST", "/index_name/_doc/"); + String largeString = randomAlphaOfLength(10000); + failedIndexingRequest.setJsonEntity("{\"x\": " + largeString + "}"); + ResponseException exception = expectThrows(ResponseException.class, () -> getRestClient().performRequest(failedIndexingRequest)); + assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(TOO_MANY_REQUESTS.getStatus())); + + Request getNodeStats2 = new Request("GET", "/_nodes/stats/indexing_pressure"); + final Response nodeStats2 = getRestClient().performRequest(getNodeStats2); + Map nodeStatsMap2 = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats2.getEntity().getContent(), + true); + ArrayList values2 = new ArrayList<>(((Map) nodeStatsMap2.get("nodes")).values()); + assertThat(values2.size(), equalTo(2)); + XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map) values2.get(0)); + node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map) values2.get(1)); + node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + + if (node1Rejections == 0) { + assertThat(node2Rejections, equalTo(1)); + } else { + assertThat(node1Rejections, equalTo(1)); + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json index 0dc5e159a4528..fcc0f1d14da2b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json @@ -44,7 +44,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" } @@ -69,7 +70,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, @@ -98,7 +100,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, @@ -146,7 +149,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml new file mode 100644 index 0000000000000..475d1b1813485 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml @@ -0,0 +1,28 @@ +--- +"Indexing pressure stats": + - skip: + version: " - 7.99.99" + reason: "indexing_pressure not in prior versions" + features: [arbitrary_key] + + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.stats: + metric: [ indexing_pressure ] + + - gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.all_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.all_bytes: 0 } + +# TODO: +# +# Change skipped version after backport diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java similarity index 74% rename from server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java index 101becbd2c730..43ccf33db1e00 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java @@ -16,11 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.bulk; +package org.elasticsearch.index; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -51,7 +54,7 @@ import static org.hamcrest.Matchers.instanceOf; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1) -public class WriteMemoryLimitsIT extends ESIntegTestCase { +public class IndexingPressureIT extends ESIntegTestCase { // TODO: Add additional REST tests when metrics are exposed @@ -63,7 +66,6 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - // Need at least two threads because we are going to block one .put(unboundedWriteQueue) .build(); } @@ -134,16 +136,16 @@ public void testWriteBytesAreIncremented() throws Exception { final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); replicationSendPointReached.await(); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); latchBlockingReplicationSend.countDown(); @@ -165,14 +167,15 @@ public void testWriteBytesAreIncremented() throws Exception { final long secondBulkShardRequestSize = request.ramBytesUsed(); if (usePrimaryAsCoordinatingNode) { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); - assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), + greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); } else { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); } - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); replicaRelease.close(); @@ -180,12 +183,12 @@ public void testWriteBytesAreIncremented() throws Exception { successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); @@ -212,8 +215,8 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { final long bulkRequestSize = bulkRequest.ramBytesUsed(); final long bulkShardRequestSize = totalRequestSize; - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), - (long)(bulkShardRequestSize * 1.5) + "B").build()); + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), + (long) (bulkShardRequestSize * 1.5) + "B").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -229,17 +232,17 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); }); expectThrows(EsRejectedExecutionException.class, () -> { @@ -256,12 +259,12 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } } @@ -276,7 +279,7 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { bulkRequest.add(request); } final long bulkShardRequestSize = totalRequestSize; - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), (long)(bulkShardRequestSize * 1.5) + "B").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() @@ -293,17 +296,17 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { final ActionFuture successFuture = client(primaryName).bulk(bulkRequest); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); }); BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); @@ -314,17 +317,17 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } } public void testWritesWillSucceedIfBelowThreshold() throws Exception { - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), "1MB").build()); + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1MB").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 60f1fc4da1063..5e52fd63ae57a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.Version; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -29,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.http.HttpStats; +import org.elasticsearch.index.stats.IndexingPressureStats; import org.elasticsearch.indices.NodeIndicesStats; import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; import org.elasticsearch.ingest.IngestStats; @@ -90,6 +92,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private AdaptiveSelectionStats adaptiveSelectionStats; + @Nullable + private IndexingPressureStats indexingPressureStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -108,6 +113,12 @@ public NodeStats(StreamInput in) throws IOException { discoveryStats = in.readOptionalWriteable(DiscoveryStats::new); ingestStats = in.readOptionalWriteable(IngestStats::new); adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new); + // TODO: Change after backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new); + } else { + indexingPressureStats = null; + } } public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices, @@ -117,7 +128,8 @@ public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats @Nullable ScriptStats scriptStats, @Nullable DiscoveryStats discoveryStats, @Nullable IngestStats ingestStats, - @Nullable AdaptiveSelectionStats adaptiveSelectionStats) { + @Nullable AdaptiveSelectionStats adaptiveSelectionStats, + @Nullable IndexingPressureStats indexingPressureStats) { super(node); this.timestamp = timestamp; this.indices = indices; @@ -133,6 +145,7 @@ public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats this.discoveryStats = discoveryStats; this.ingestStats = ingestStats; this.adaptiveSelectionStats = adaptiveSelectionStats; + this.indexingPressureStats = indexingPressureStats; } public long getTimestamp() { @@ -227,6 +240,11 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() { return adaptiveSelectionStats; } + @Nullable + public IndexingPressureStats getIndexingPressureStats() { + return indexingPressureStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -249,6 +267,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(discoveryStats); out.writeOptionalWriteable(ingestStats); out.writeOptionalWriteable(adaptiveSelectionStats); + // TODO: Change after backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(indexingPressureStats); + } } @Override @@ -312,6 +334,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getAdaptiveSelectionStats() != null) { getAdaptiveSelectionStats().toXContent(builder, params); } + if (getIndexingPressureStats() != null) { + getIndexingPressureStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 35e307a3cb393..d0b76291c0e1c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -220,7 +220,8 @@ public enum Metric { DISCOVERY("discovery"), INGEST("ingest"), ADAPTIVE_SELECTION("adaptive_selection"), - SCRIPT_CACHE("script_cache"); + SCRIPT_CACHE("script_cache"), + INDEXING_PRESSURE("indexing_pressure"),; private String metricName; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java index 2ce3cb2b03e1a..5446f752a5303 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java @@ -157,6 +157,11 @@ public NodesStatsRequestBuilder setScriptCache(boolean scriptCache) { return this; } + public NodesStatsRequestBuilder setIndexingPressure(boolean indexingPressure) { + addOrRemoveMetric(indexingPressure, NodesStatsRequest.Metric.INDEXING_PRESSURE); + return this; + } + /** * Helper method for adding metrics to a request */ diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index db5011042f4e1..3b9d6c3ad9f6d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -84,7 +84,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task) NodesStatsRequest.Metric.DISCOVERY.containedIn(metrics), NodesStatsRequest.Metric.INGEST.containedIn(metrics), NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics), - NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics)); + NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics), + NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics)); } public static class NodeStatsRequest extends TransportRequest { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 7a8d7a40dcb75..a8440de35d262 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -98,7 +98,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) { NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false); NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, - true, true, true, false, true, false, false, false, false, false, true, false, false); + true, true, true, false, true, false, false, false, false, false, true, false, false, false); List shardsStats = new ArrayList<>(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9821fc264c306..3500bad94476a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -68,6 +68,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; @@ -112,21 +113,21 @@ public class TransportBulkAction extends HandledTransportAction docWriteReque @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes); + final Releasable releasable = indexingPressure.markIndexingOperationStarted(indexingBytes); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { doInternalExecute(task, bulkRequest, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index d1c77df56b790..dd53d54f75434 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -56,6 +56,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; @@ -93,9 +94,9 @@ public class TransportShardBulkAction extends TransportWriteAction MAX_INDEXING_BYTES = - Setting.memorySizeSetting("indexing_limits.memory.limit", "10%", Setting.Property.NodeScope); - - private final AtomicLong writeBytes = new AtomicLong(0); - private final AtomicLong replicaWriteBytes = new AtomicLong(0); - private final long writeLimits; - - public WriteMemoryLimits(Settings settings) { - this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); - } - - public WriteMemoryLimits(Settings settings, ClusterSettings clusterSettings) { - this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); - } - - public Releasable markWriteOperationStarted(long bytes) { - return markWriteOperationStarted(bytes, false); - } - - public Releasable markWriteOperationStarted(long bytes, boolean forceExecution) { - long currentWriteLimits = this.writeLimits; - long writeBytes = this.writeBytes.addAndGet(bytes); - long replicaWriteBytes = this.replicaWriteBytes.get(); - long totalBytes = writeBytes + replicaWriteBytes; - if (forceExecution == false && totalBytes > currentWriteLimits) { - long bytesWithoutOperation = writeBytes - bytes; - long totalBytesWithoutOperation = totalBytes - bytes; - this.writeBytes.getAndAdd(-bytes); - throw new EsRejectedExecutionException("rejected execution of write operation [" + - "write_bytes=" + bytesWithoutOperation + ", " + - "replica_write_bytes=" + replicaWriteBytes + ", " + - "total_write_bytes=" + totalBytesWithoutOperation + ", " + - "current_operation_bytes=" + bytes + ", " + - "max_write_bytes=" + currentWriteLimits + "]", false); - } - return () -> this.writeBytes.getAndAdd(-bytes); - } - - public long getWriteBytes() { - return writeBytes.get(); - } - - public Releasable markReplicaWriteStarted(long bytes, boolean forceExecution) { - long currentReplicaWriteLimits = (long) (this.writeLimits * 1.5); - long replicaWriteBytes = this.replicaWriteBytes.getAndAdd(bytes); - if (forceExecution == false && replicaWriteBytes > currentReplicaWriteLimits) { - long replicaBytesWithoutOperation = replicaWriteBytes - bytes; - this.replicaWriteBytes.getAndAdd(-bytes); - throw new EsRejectedExecutionException("rejected execution of replica write operation [" + - "replica_write_bytes=" + replicaBytesWithoutOperation + ", " + - "current_replica_operation_bytes=" + bytes + ", " + - "max_replica_write_bytes=" + currentReplicaWriteLimits + "]", false); - } - return () -> this.replicaWriteBytes.getAndAdd(-bytes); - } - - public long getReplicaWriteBytes() { - return replicaWriteBytes.get(); - } -} diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 74ddcf54b3212..47f287ba0133f 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -20,7 +20,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -57,11 +57,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction extends TransportReplicationAction { private final boolean forceExecution; - private final WriteMemoryLimits writeMemoryLimits; + private final IndexingPressure indexingPressure; private final String executor; protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader request, Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary, - WriteMemoryLimits writeMemoryLimits) { + IndexingPressure indexingPressure) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE thread pool in this class. super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); this.executor = executor; this.forceExecution = forceExecutionOnPrimary; - this.writeMemoryLimits = writeMemoryLimits; + this.indexingPressure = indexingPressure; } @Override protected Releasable checkOperationLimits(Request request) { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution); } @Override @@ -90,7 +90,7 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal if (rerouteWasLocal) { return () -> {}; } else { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution); } } @@ -100,7 +100,7 @@ protected long primaryOperationSize(Request request) { @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request), forceExecution); + return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution); } protected long replicaOperationSize(ReplicaRequest request) { diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 1f16a6cec36d8..4ede1bfdc05d1 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -21,7 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; @@ -489,7 +489,7 @@ public void apply(Settings value, Settings current, Settings previous) { FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, - WriteMemoryLimits.MAX_INDEXING_BYTES); + IndexingPressure.MAX_INDEXING_BYTES); static List> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList(); diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java new file mode 100644 index 0000000000000..9c8fb83fe4ffc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.stats.IndexingPressureStats; + +import java.util.concurrent.atomic.AtomicLong; + +public class IndexingPressure { + + public static final Setting MAX_INDEXING_BYTES = + Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope); + + private final AtomicLong currentCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong currentReplicaBytes = new AtomicLong(0); + private final AtomicLong totalCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong totalReplicaBytes = new AtomicLong(0); + private final AtomicLong coordinatingAndPrimaryRejections = new AtomicLong(0); + private final AtomicLong replicaRejections = new AtomicLong(0); + + private final long primaryAndCoordinatingLimits; + private final long replicaLimits; + + public IndexingPressure(Settings settings) { + this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); + this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); + } + + public Releasable markIndexingOperationStarted(long bytes) { + return markIndexingOperationStarted(bytes, false); + } + + public Releasable markIndexingOperationStarted(long bytes, boolean forceExecution) { + long writeBytes = this.currentCoordinatingAndPrimaryBytes.addAndGet(bytes); + long replicaWriteBytes = this.currentReplicaBytes.get(); + long totalBytes = writeBytes + replicaWriteBytes; + if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) { + long bytesWithoutOperation = writeBytes - bytes; + long totalBytesWithoutOperation = totalBytes - bytes; + this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + this.coordinatingAndPrimaryRejections.getAndIncrement(); + throw new EsRejectedExecutionException("rejected execution of operation [" + + "coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " + + "replica_bytes=" + replicaWriteBytes + ", " + + "all_bytes=" + totalBytesWithoutOperation + ", " + + "operation_bytes=" + bytes + ", " + + "max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false); + } + totalCoordinatingAndPrimaryBytes.getAndAdd(bytes); + return () -> this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + } + + public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) { + long replicaWriteBytes = this.currentReplicaBytes.getAndAdd(bytes); + if (forceExecution == false && replicaWriteBytes > replicaLimits) { + long replicaBytesWithoutOperation = replicaWriteBytes - bytes; + this.currentReplicaBytes.getAndAdd(-bytes); + this.replicaRejections.getAndIncrement(); + throw new EsRejectedExecutionException("rejected execution of replica operation [" + + "replica_bytes=" + replicaBytesWithoutOperation + ", " + + "replica_operation_bytes=" + bytes + ", " + + "max_replica_bytes=" + replicaLimits + "]", false); + } + totalReplicaBytes.getAndAdd(bytes); + return () -> this.currentReplicaBytes.getAndAdd(-bytes); + } + + public long getCurrentCoordinatingAndPrimaryBytes() { + return currentCoordinatingAndPrimaryBytes.get(); + } + + public long getCurrentReplicaBytes() { + return currentReplicaBytes.get(); + } + + public long getTotalCoordinatingAndPrimaryBytes() { + return totalCoordinatingAndPrimaryBytes.get(); + } + + public long getTotalReplicaBytes() { + return totalReplicaBytes.get(); + } + + public IndexingPressureStats stats() { + return new IndexingPressureStats(totalCoordinatingAndPrimaryBytes.get(), totalReplicaBytes.get(), + currentCoordinatingAndPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingAndPrimaryRejections.get(), + replicaRejections.get()); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 54a418fe673c7..dd08f8ff763ad 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -25,7 +25,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; @@ -80,7 +80,7 @@ public RetentionLeaseSyncAction( final ThreadPool threadPool, final ShardStateAction shardStateAction, final ActionFilters actionFilters, - final WriteMemoryLimits writeMemoryLimits) { + final IndexingPressure indexingPressure) { super( settings, ACTION_NAME, @@ -92,7 +92,7 @@ public RetentionLeaseSyncAction( actionFilters, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ThreadPool.Names.MANAGEMENT, false, writeMemoryLimits); + ThreadPool.Names.MANAGEMENT, false, indexingPressure); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java new file mode 100644 index 0000000000000..309cf863b6324 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +public class IndexingPressureStats implements Writeable, ToXContentFragment { + + private final long totalCoordinatingAndPrimaryBytes; + private final long totalReplicaBytes; + private final long currentCoordinatingAndPrimaryBytes; + private final long currentReplicaBytes; + private final long coordinatingAndPrimaryRejections; + private final long replicaRejections; + + public IndexingPressureStats(StreamInput in) throws IOException { + totalCoordinatingAndPrimaryBytes = in.readVLong(); + totalReplicaBytes = in.readVLong(); + currentCoordinatingAndPrimaryBytes = in.readVLong(); + currentReplicaBytes = in.readVLong(); + coordinatingAndPrimaryRejections = in.readVLong(); + replicaRejections = in.readVLong(); + } + + public IndexingPressureStats(long totalCoordinatingAndPrimaryBytes, long totalReplicaBytes, long currentCoordinatingAndPrimaryBytes, + long currentReplicaBytes, long coordinatingAndPrimaryRejections, long replicaRejections) { + this.totalCoordinatingAndPrimaryBytes = totalCoordinatingAndPrimaryBytes; + this.totalReplicaBytes = totalReplicaBytes; + this.currentCoordinatingAndPrimaryBytes = currentCoordinatingAndPrimaryBytes; + this.currentReplicaBytes = currentReplicaBytes; + this.coordinatingAndPrimaryRejections = coordinatingAndPrimaryRejections; + this.replicaRejections = replicaRejections; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalCoordinatingAndPrimaryBytes); + out.writeVLong(totalReplicaBytes); + out.writeVLong(currentCoordinatingAndPrimaryBytes); + out.writeVLong(currentReplicaBytes); + out.writeVLong(coordinatingAndPrimaryRejections); + out.writeVLong(replicaRejections); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("indexing_pressure"); + builder.startObject("total"); + builder.field("coordinating_and_primary_bytes", totalCoordinatingAndPrimaryBytes); + builder.field("replica_bytes", totalReplicaBytes); + builder.field("all_bytes", totalReplicaBytes + totalCoordinatingAndPrimaryBytes); + builder.field("coordinating_and_primary_memory_limit_rejections", coordinatingAndPrimaryRejections); + builder.field("replica_memory_limit_rejections", replicaRejections); + builder.endObject(); + builder.startObject("current"); + builder.field("coordinating_and_primary_bytes", currentCoordinatingAndPrimaryBytes); + builder.field("replica_bytes", currentReplicaBytes); + builder.field("all_bytes", currentCoordinatingAndPrimaryBytes + currentReplicaBytes); + builder.endObject(); + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 7f2a61359110f..68258a55047a6 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -30,7 +30,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchTransportService; @@ -538,6 +538,7 @@ protected Node(final Environment initialEnvironment, final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); + final IndexingPressure indexingLimits = new IndexingPressure(settings); final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment, @@ -566,7 +567,7 @@ protected Node(final Environment initialEnvironment, this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptService, httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, - searchTransportService); + searchTransportService, indexingLimits); final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, searchModule.getFetchPhase(), @@ -584,7 +585,6 @@ protected Node(final Environment initialEnvironment, new PersistentTasksClusterService(settings, registry, clusterService, threadPool); resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); - final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(settings); modules.add(b -> { b.bind(Node.class).toInstance(this); @@ -603,7 +603,7 @@ protected Node(final Environment initialEnvironment, b.bind(ScriptService.class).toInstance(scriptService); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); - b.bind(WriteMemoryLimits.class).toInstance(bulkIndexingLimits); + b.bind(IndexingPressure.class).toInstance(indexingLimits); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index 2d42dfda9ea09..3394a796f9836 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -19,6 +19,7 @@ package org.elasticsearch.node; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Build; import org.elasticsearch.Version; @@ -59,6 +60,7 @@ public class NodeService implements Closeable { private final HttpServerTransport httpServerTransport; private final ResponseCollectorService responseCollectorService; private final SearchTransportService searchTransportService; + private final IndexingPressure indexingPressure; private final Discovery discovery; @@ -67,7 +69,7 @@ public class NodeService implements Closeable { CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService, - SearchTransportService searchTransportService) { + SearchTransportService searchTransportService, IndexingPressure indexingPressure) { this.settings = settings; this.threadPool = threadPool; this.monitorService = monitorService; @@ -82,6 +84,7 @@ public class NodeService implements Closeable { this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; this.searchTransportService = searchTransportService; + this.indexingPressure = indexingPressure; clusterService.addStateApplier(ingestService); } @@ -103,7 +106,8 @@ public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean fs, boolean transport, boolean http, boolean circuitBreaker, - boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache) { + boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache, + boolean indexingPressure) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats(transportService.getLocalNode(), System.currentTimeMillis(), @@ -119,8 +123,8 @@ public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, bo script ? scriptService.stats() : null, discoveryStats ? discovery.stats() : null, ingest ? ingestService.stats() : null, - adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null - ); + adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null, + indexingPressure ? this.indexingPressure.stats() : null); } public IngestService getIngestService() { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 8ca7cf3b3b702..936c13b8a8fb9 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -528,7 +528,7 @@ public static NodeStats createNodeStats() { //TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats, fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, - ingestStats, adaptiveSelectionStats); + ingestStats, adaptiveSelectionStats, null); } private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 5ff65721d4480..e182833c4902f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -122,7 +123,7 @@ private void indicesThatCannotBeCreatedTestCase(Set expected, when(threadPool.executor(anyString())).thenReturn(direct); TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, null, null, mock(ActionFilters.class), null, null, - new WriteMemoryLimits(Settings.EMPTY)) { + new IndexingPressure(Settings.EMPTY)) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses, Map indicesThatCannotBeCreated) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index f8e27d8954b77..8e7af9de16adb 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -143,7 +144,7 @@ null, new ActionFilters(Collections.emptySet()), null, new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new IndexNameExpressionResolver() - ), new WriteMemoryLimits(SETTINGS) + ), new IndexingPressure(SETTINGS) ); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 716294e2762ab..a22db5e9d30b1 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -82,7 +83,7 @@ class TestTransportBulkAction extends TransportBulkAction { super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, new ActionFilters(Collections.emptySet()), new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 44f78c25afd39..44007488433db 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -233,7 +234,7 @@ static class TestTransportBulkAction extends TransportBulkAction { actionFilters, indexNameExpressionResolver, autoCreateIndex, - new WriteMemoryLimits(Settings.EMPTY), + new IndexingPressure(Settings.EMPTY), relativeTimeProvider); } diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index b67539f986b0d..04845547c29ec 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -20,7 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; @@ -145,7 +145,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService, clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); assertThat(action.globalBlockLevel(), nullValue()); assertThat(action.indexBlockLevel(), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 6535ab6d68f62..68ac15cba23ba 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -367,7 +367,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -377,7 +377,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new WriteMemoryLimits(settings)); + new IndexingPressure(settings)); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index d89cc710e6f9e..5ecf765f82028 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -152,11 +152,11 @@ public void testFillDiskUsage() { }; List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null), + null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null, null), new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null), + null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null, null), new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null) + null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null, null) ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages); DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1"); @@ -193,11 +193,11 @@ public void testFillDiskUsageSomeInvalidValues() { }; List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null), + null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null, null), new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null), + null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null, null), new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, - null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null) + null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null, null) ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages); DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1"); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 31d32e6b705fc..a1f08407c1ffe 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -106,7 +106,7 @@ public void testRetentionLeaseSyncActionOnPrimary() { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); action.dispatchedShardOperationOnPrimary(request, indexShard, @@ -143,7 +143,7 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -182,7 +182,7 @@ public void testBlocks() { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); assertNull(action.indexBlockLevel()); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 21bf0f98ed1c2..a61a4a5dbbd30 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -65,7 +65,7 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; import org.elasticsearch.action.bulk.BulkAction; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; @@ -1552,7 +1552,7 @@ public void onFailure(final Exception e) { threadPool, shardStateAction, actionFilters, - new WriteMemoryLimits(settings))), + new IndexingPressure(settings))), RetentionLeaseSyncer.EMPTY, client); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService); @@ -1567,7 +1567,7 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index actionFilters, indexNameExpressionResolver )); final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings, clusterService); - final WriteMemoryLimits indexingMemoryLimits = new WriteMemoryLimits(settings); + final IndexingPressure indexingMemoryLimits = new IndexingPressure(settings); mappingUpdatedAction.setClient(client); actions.put(BulkAction.INSTANCE, new TransportBulkAction(threadPool, transportService, clusterService, @@ -1577,7 +1577,7 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index Collections.emptyList(), client), client, actionFilters, indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), - new WriteMemoryLimits(settings) + new IndexingPressure(settings) )); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 9a4ae5022319f..fd5f4f74bafef 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -84,7 +84,7 @@ List adjustNodesStats(List nodesStats) { .map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath)) .toArray(FsInfo.Path[]::new)), nodeStats.getTransport(), nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(), - nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats()); + nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getIndexingPressureStats()); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index ad98c6edde7bb..4f14ac5815507 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -36,7 +36,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; @@ -1165,13 +1165,13 @@ public void beforeIndexDeletion() throws Exception { private void assertAllPendingWriteLimitsReleased() throws Exception { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { - WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name); - final long writeBytes = writeMemoryLimits.getWriteBytes(); + IndexingPressure indexingPressure = getInstance(IndexingPressure.class, nodeAndClient.name); + final long writeBytes = indexingPressure.getCurrentCoordinatingAndPrimaryBytes(); if (writeBytes > 0) { throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes(); + final long replicaWriteBytes = indexingPressure.getCurrentReplicaBytes(); if (replicaWriteBytes > 0) { throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); @@ -2259,7 +2259,7 @@ public void ensureEstimatedStats() { NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node); CommonStatsFlags flags = new CommonStatsFlags(Flag.FieldData, Flag.QueryCache, Flag.Segments); NodeStats stats = nodeService.stats(flags, - false, false, false, false, false, false, false, false, false, false, false, false, false); + false, false, false, false, false, false, false, false, false, false, false, false, false, false); assertThat("Fielddata size must be 0 on node: " + stats.getNode(), stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat("Query cache size must be 0 on node: " + stats.getNode(), diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 8b55572d19b8b..08e309cf443c9 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,7 +6,7 @@ package org.elasticsearch.xpack.ccr; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -117,12 +117,12 @@ public void testWriteLimitsIncremented() throws Exception { final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); client().execute(PutFollowAction.INSTANCE, followRequest).get(); - WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class); + IndexingPressure memoryLimits = getInstanceFromNode(IndexingPressure.class); final long finalSourceSize = sourceSize; assertBusy(() -> { // The actual write bytes will be greater due to other request fields. However, this test is // just spot checking that the bytes are incremented at all. - assertTrue(memoryLimits.getWriteBytes() > finalSourceSize); + assertTrue(memoryLimits.getCurrentCoordinatingAndPrimaryBytes() > finalSourceSize); }); blocker.countDown(); assertBusy(() -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 4f75318b50b2d..9da48580702fa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -9,7 +9,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -38,7 +38,7 @@ public class TransportBulkShardOperationsAction extends TransportWriteAction { - private final WriteMemoryLimits writeMemoryLimits; + private final IndexingPressure indexingPressure; @Inject public TransportBulkShardOperationsAction( @@ -49,7 +49,7 @@ public TransportBulkShardOperationsAction( final ThreadPool threadPool, final ShardStateAction shardStateAction, final ActionFilters actionFilters, - final WriteMemoryLimits writeMemoryLimits) { + final IndexingPressure indexingPressure) { super( settings, BulkShardOperationsAction.NAME, @@ -61,14 +61,14 @@ public TransportBulkShardOperationsAction( actionFilters, BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, - ThreadPool.Names.WRITE, false, writeMemoryLimits); - this.writeMemoryLimits = writeMemoryLimits; + ThreadPool.Names.WRITE, false, indexingPressure); + this.indexingPressure = indexingPressure; } @Override protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { // This is executed on the follower coordinator node and we need to mark the bytes. - Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + Releasable releasable = indexingPressure.markIndexingOperationStarted(primaryOperationSize(request)); ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { super.doExecute(task, request, releasingListener); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java index e890520580e2f..b7641e4906933 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java @@ -610,7 +610,7 @@ private static NodeStats buildNodeStats(List pipelineNames, List