Skip to content

Commit

Permalink
Incorporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 27, 2023
1 parent 327cce8 commit 518ef11
Showing 1 changed file with 27 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,62 +70,39 @@ public void testIsSegmentsUploadBackpressureEnabled() {
}

public void testAfterIndexShardCreatedForRemoteBackedIndex() {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true").build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
ShardId testShardId = new ShardId("index", "uuid", 0);
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.indexSettings()).thenReturn(indexSettings);
when(indexShard.shardId()).thenReturn(testShardId);
IndexShard indexShard = createIndexShard(shardId, true);
pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY);
pressureService.afterIndexShardCreated(indexShard);
assertNotNull(pressureService.getRemoteRefreshSegmentTracker(testShardId));
assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()));
}

public void testAfterIndexShardCreatedForNonRemoteBackedIndex() {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "false").build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
ShardId testShardId = new ShardId("index", "uuid", 0);
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.indexSettings()).thenReturn(indexSettings);
when(indexShard.shardId()).thenReturn(testShardId);
IndexShard indexShard = createIndexShard(shardId, false);
pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY);
pressureService.afterIndexShardCreated(indexShard);
assertNull(pressureService.getRemoteRefreshSegmentTracker(testShardId));
assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()));
}

public void testAfterIndexShardClosed() {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true").build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
ShardId testShardId = new ShardId("index", "uuid", 0);
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.indexSettings()).thenReturn(indexSettings);
when(indexShard.shardId()).thenReturn(testShardId);
IndexShard indexShard = createIndexShard(shardId, true);
pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY);
pressureService.afterIndexShardCreated(indexShard);
assertNotNull(pressureService.getRemoteRefreshSegmentTracker(testShardId));
assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId));

pressureService.afterIndexShardClosed(testShardId, indexShard, settings);
assertNull(pressureService.getRemoteRefreshSegmentTracker(testShardId));
pressureService.afterIndexShardClosed(shardId, indexShard, indexShard.indexSettings().getSettings());
assertNull(pressureService.getRemoteRefreshSegmentTracker(shardId));
}

public void testValidateSegmentUploadLag() {
// Create the pressure tracker
Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true").build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
ShardId testShardId = new ShardId("index", "uuid", 0);
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.indexSettings()).thenReturn(indexSettings);
when(indexShard.shardId()).thenReturn(testShardId);
IndexShard indexShard = createIndexShard(shardId, true);
pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY);
pressureService.afterIndexShardCreated(indexShard);

// 1. Seq no - add data points to the pressure tracker
RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(testShardId);
RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId);
pressureTracker.updateLocalRefreshSeqNo(6);
Exception e = assertThrows(
OpenSearchRejectedExecutionException.class,
() -> pressureService.validateSegmentsUploadLag(testShardId)
);
Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments"));
assertTrue(e.getMessage().contains("remote_refresh_seq_no:0 local_refresh_seq_no:6"));

Expand All @@ -140,12 +117,12 @@ public void testValidateSegmentUploadLag() {
long currentMs = System.nanoTime() / 1_000_000;
pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 4 * avg));
pressureTracker.updateRemoteRefreshTimeMs(currentMs);
e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(testShardId));
e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments"));
assertTrue(e.getMessage().contains("time_lag:38 ms dynamic_time_lag_threshold:19.0 ms"));

pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg));
pressureService.validateSegmentsUploadLag(testShardId);
pressureService.validateSegmentsUploadLag(shardId);

// 3. bytes lag more than dynamic threshold
sum.set(0);
Expand All @@ -157,23 +134,32 @@ public void testValidateSegmentUploadLag() {
Map<String, Long> nameSizeMap = new HashMap<>();
nameSizeMap.put("a", (long) (4 * avg));
pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap);
e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(testShardId));
e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments"));
assertTrue(e.getMessage().contains("bytes_lag:38 dynamic_bytes_lag_threshold:19.0"));

nameSizeMap.put("a", (long) (2 * avg));
pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap);
pressureService.validateSegmentsUploadLag(testShardId);
pressureService.validateSegmentsUploadLag(shardId);

// 4. Consecutive failures more than the limit
IntStream.range(0, 10).forEach(ignore -> pressureTracker.incrementTotalUploadsFailed());
pressureService.validateSegmentsUploadLag(testShardId);
pressureService.validateSegmentsUploadLag(shardId);
pressureTracker.incrementTotalUploadsFailed();
e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(testShardId));
e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments"));
assertTrue(e.getMessage().contains("failure_streak_count:11 min_consecutive_failure_threshold:10"));
pressureTracker.incrementTotalUploadSucceeded();
pressureService.validateSegmentsUploadLag(testShardId);
pressureService.validateSegmentsUploadLag(shardId);
}

private static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, String.valueOf(remoteStoreEnabled)).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.indexSettings()).thenReturn(indexSettings);
when(indexShard.shardId()).thenReturn(shardId);
return indexShard;
}

}

0 comments on commit 518ef11

Please sign in to comment.