Skip to content

Commit

Permalink
Add Shard Level Indexing Pressure (#1336)
Browse files Browse the repository at this point in the history
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests. 

**Key features**
- Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica.
- Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection).
- Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation).
- Node level and shard level indexing pressure statistics exposed through stats api.
- Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future.
- Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues.
- Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed.

The changes were divided into small manageable chunks as part of the following PRs against a feature branch.

- Add Shard Indexing Pressure Settings. #716
- Add Shard Indexing Pressure Tracker. #717
- Refactor IndexingPressure to allow extension. #718
- Add Shard Indexing Pressure Store #838
- Add Shard Indexing Pressure Memory Manager #945
- Add ShardIndexingPressure framework level construct and Stats #1015
- Add Indexing Pressure Service which acts as orchestrator for IP #1084
- Add plumbing logic for IndexingPressureService in Transport Actions. #1113
- Add shard indexing pressure metric/stats via rest end point. #1171
- Add shard indexing pressure integration tests. #1198

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Rabi Panda <adnapibar@gmail.com>
  • Loading branch information
3 people authored Oct 7, 2021
1 parent 808dbfd commit 3665daf
Show file tree
Hide file tree
Showing 51 changed files with 7,295 additions and 103 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,15 @@ private void verifyShardInfo(XContentParser parser, boolean primary, boolean inc
assertTrue(parser.currentName().equals("id")
|| parser.currentName().equals("name")
|| parser.currentName().equals("transport_address")
|| parser.currentName().equals("weight_ranking"));
|| parser.currentName().equals("weight_ranking")
|| parser.currentName().equals("attributes"));
// Skip past attributes object
if (parser.currentName().equals("attributes")) {
while(!parser.nextToken().equals(Token.END_OBJECT)) {
parser.nextToken();
}
break;
}
} else {
assertTrue(token.isValue());
assertNotNull(parser.text());
Expand Down Expand Up @@ -1403,6 +1411,11 @@ private String verifyNodeDecisionPrologue(XContentParser parser) throws IOExcept
parser.nextToken();
assertNotNull(parser.text());
parser.nextToken();
assertEquals("node_attributes", parser.currentName());
// skip past node_attributes object
while (!parser.currentName().equals("node_decision")) {
parser.nextToken();
}
assertEquals("node_decision", parser.currentName());
parser.nextToken();
return nodeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ public class IndexingPressureIT extends OpenSearchIntegTestCase {

private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build();

public static final Settings settings = Settings.builder()
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build();

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(unboundedWriteQueue)
.put(settings)
.build();
}

Expand Down Expand Up @@ -148,9 +152,12 @@ public void testWriteBytesAreIncremented() throws Exception {
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
replicationSendPointReached.await();

IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
IndexingPressure primaryWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, primaryName).getShardIndexingPressure();
IndexingPressure replicaWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, replicaName).getShardIndexingPressure();;
IndexingPressure coordinatingWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, coordinatingOnlyNode).getShardIndexingPressure();;

assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getCurrentPrimaryBytes(), greaterThan(bulkShardRequestSize));
Expand Down Expand Up @@ -271,9 +278,12 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception {
try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);

IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
IndexingPressure primaryWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, primaryName).getShardIndexingPressure();
IndexingPressure replicaWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, replicaName).getShardIndexingPressure();
IndexingPressure coordinatingWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, coordinatingOnlyNode).getShardIndexingPressure();

assertBusy(() -> {
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
Expand Down Expand Up @@ -335,9 +345,12 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception {
try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
final ActionFuture<BulkResponse> successFuture = client(primaryName).bulk(bulkRequest);

IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
IndexingPressure primaryWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, primaryName).getShardIndexingPressure();
IndexingPressure replicaWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, replicaName).getShardIndexingPressure();
IndexingPressure coordinatingWriteLimits = internalCluster()
.getInstance(IndexingPressureService.class, coordinatingOnlyNode).getShardIndexingPressure();

assertBusy(() -> {
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
Expand Down
Loading

0 comments on commit 3665daf

Please sign in to comment.