Skip to content

Commit

Permalink
Incorporate PR review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ashking94 committed Jun 27, 2023
1 parent 9a2b2e1 commit 2fcc975
Showing 1 changed file with 11 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,21 @@

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

public void testWritesRejected() {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 1d, "metadata", Long.MAX_VALUE);

Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(request)
.get();
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");

logger.info("--> Indexing data");
OpenSearchRejectedExecutionException ex = assertThrows(
OpenSearchRejectedExecutionException.class,
() -> indexData(randomIntBetween(10, 20), randomBoolean())
);
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
String shardId = "0";
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
assertTrue(stats.rejectionCount > 0);
deleteRepo();
}

public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
// Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions
// fail leading to consecutive failure limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), 15, "failure_streak_count");
}

public void testWritesRejectedDueToBytesLagBreach() throws Exception {
// Initially indexing happens with doc size of 2 bytes, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1KB leading to bytes lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), 15, "bytes_lag");
}

public void testWritesRejectedDueToTimeLagBreach() throws Exception {
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag");
}

Expand Down Expand Up @@ -135,6 +107,11 @@ private void validateBackpressure(
assertEquals(0, finalStats.refreshTimeLagMs);
assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber);
}, 30, TimeUnit.SECONDS);

long rejectionCount = stats.rejectionCount;
stats = stats();
indexDocAndRefresh(initialSource, initialDocsToIndex);
assertEquals(rejectionCount, stats.rejectionCount);
deleteRepo();
}

Expand Down

0 comments on commit 2fcc975

Please sign in to comment.