diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index 39a49100ec755..c5a6c0323a6f9 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -70,62 +70,39 @@ public void testIsSegmentsUploadBackpressureEnabled() { } public void testAfterIndexShardCreatedForRemoteBackedIndex() { - Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true").build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); - ShardId testShardId = new ShardId("index", "uuid", 0); - IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(indexSettings); - when(indexShard.shardId()).thenReturn(testShardId); + IndexShard indexShard = createIndexShard(shardId, true); pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); - assertNotNull(pressureService.getRemoteRefreshSegmentTracker(testShardId)); + assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); } public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { - Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "false").build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); - ShardId testShardId = new ShardId("index", "uuid", 0); - IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(indexSettings); - when(indexShard.shardId()).thenReturn(testShardId); + IndexShard indexShard = createIndexShard(shardId, false); pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); - assertNull(pressureService.getRemoteRefreshSegmentTracker(testShardId)); + assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); } public void testAfterIndexShardClosed() { - Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true").build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); - ShardId testShardId = new ShardId("index", "uuid", 0); - IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(indexSettings); - when(indexShard.shardId()).thenReturn(testShardId); + IndexShard indexShard = createIndexShard(shardId, true); pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); - assertNotNull(pressureService.getRemoteRefreshSegmentTracker(testShardId)); + assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); - pressureService.afterIndexShardClosed(testShardId, indexShard, settings); - assertNull(pressureService.getRemoteRefreshSegmentTracker(testShardId)); + pressureService.afterIndexShardClosed(shardId, indexShard, indexShard.indexSettings().getSettings()); + assertNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); } public void testValidateSegmentUploadLag() { // Create the pressure tracker - Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true").build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); - ShardId testShardId = new ShardId("index", "uuid", 0); - IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(indexSettings); - when(indexShard.shardId()).thenReturn(testShardId); + IndexShard indexShard = createIndexShard(shardId, true); pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); // 1. Seq no - add data points to the pressure tracker - RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(testShardId); + RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); pressureTracker.updateLocalRefreshSeqNo(6); - Exception e = assertThrows( - OpenSearchRejectedExecutionException.class, - () -> pressureService.validateSegmentsUploadLag(testShardId) - ); + Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("remote_refresh_seq_no:0 local_refresh_seq_no:6")); @@ -140,12 +117,12 @@ public void testValidateSegmentUploadLag() { long currentMs = System.nanoTime() / 1_000_000; pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 4 * avg)); pressureTracker.updateRemoteRefreshTimeMs(currentMs); - e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(testShardId)); + e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("time_lag:38 ms dynamic_time_lag_threshold:19.0 ms")); pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg)); - pressureService.validateSegmentsUploadLag(testShardId); + pressureService.validateSegmentsUploadLag(shardId); // 3. bytes lag more than dynamic threshold sum.set(0); @@ -157,23 +134,32 @@ public void testValidateSegmentUploadLag() { Map nameSizeMap = new HashMap<>(); nameSizeMap.put("a", (long) (4 * avg)); pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); - e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(testShardId)); + e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("bytes_lag:38 dynamic_bytes_lag_threshold:19.0")); nameSizeMap.put("a", (long) (2 * avg)); pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); - pressureService.validateSegmentsUploadLag(testShardId); + pressureService.validateSegmentsUploadLag(shardId); // 4. Consecutive failures more than the limit IntStream.range(0, 10).forEach(ignore -> pressureTracker.incrementTotalUploadsFailed()); - pressureService.validateSegmentsUploadLag(testShardId); + pressureService.validateSegmentsUploadLag(shardId); pressureTracker.incrementTotalUploadsFailed(); - e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(testShardId)); + e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("failure_streak_count:11 min_consecutive_failure_threshold:10")); pressureTracker.incrementTotalUploadSucceeded(); - pressureService.validateSegmentsUploadLag(testShardId); + pressureService.validateSegmentsUploadLag(shardId); + } + + private static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, String.valueOf(remoteStoreEnabled)).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.indexSettings()).thenReturn(indexSettings); + when(indexShard.shardId()).thenReturn(shardId); + return indexShard; } }