Skip to content

Commit

Permalink
Add pressure tracker tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 18, 2023
1 parent b7ba049 commit 8a76d1b
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* Service used to validate if the incoming indexing request should be rejected based on the {@link RemoteRefreshSegmentPressureTracker}.
*/
public class RemoteRefreshSegmentPressureService implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(RemoteRefreshSegmentPressureService.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.common.util.MovingAverage;
import org.opensearch.common.util.Streak;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.shard.ShardId;

import java.util.Map;
Expand Down Expand Up @@ -118,7 +119,7 @@ public class RemoteRefreshSegmentPressureTracker {
/**
* Set of names of segment files that were uploaded as part of the most recent remote refresh.
*/
private volatile Set<String> latestUploadFiles;
private final Set<String> latestUploadFiles = ConcurrentCollections.newConcurrentSet();

/**
* Keeps the bytes lag computed so that we do not compute it for every request.
Expand Down Expand Up @@ -152,38 +153,58 @@ ShardId getShardId() {
return shardId;
}

AtomicLong getLocalRefreshSeqNo() {
return localRefreshSeqNo;
long getLocalRefreshSeqNo() {
return localRefreshSeqNo.get();
}

void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
assert localRefreshSeqNo > this.localRefreshSeqNo.get() : "newLocalRefreshSeqNo="
+ localRefreshSeqNo
+ ">="
+ "currentLocalRefreshSeqNo="
+ this.localRefreshSeqNo.get();
this.localRefreshSeqNo.set(localRefreshSeqNo);
computeSeqNoLag();
}

AtomicLong getLocalRefreshTimeMs() {
return localRefreshTimeMs;
long getLocalRefreshTimeMs() {
return localRefreshTimeMs.get();
}

void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs > this.localRefreshTimeMs.get() : "newLocalRefreshTimeMs="
+ localRefreshTimeMs
+ ">="
+ "currentLocalRefreshTimeMs="
+ this.localRefreshTimeMs.get();
this.localRefreshTimeMs.set(localRefreshTimeMs);
computeTimeMsLag();
}

AtomicLong getRemoteRefreshSeqNo() {
return remoteRefreshSeqNo;
long getRemoteRefreshSeqNo() {
return remoteRefreshSeqNo.get();
}

void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) {
assert remoteRefreshSeqNo > this.remoteRefreshSeqNo.get() : "newRemoteRefreshSeqNo="
+ remoteRefreshSeqNo
+ ">="
+ "currentRemoteRefreshSeqNo="
+ this.remoteRefreshSeqNo.get();
this.remoteRefreshSeqNo.set(remoteRefreshSeqNo);
computeSeqNoLag();
}

AtomicLong getRemoteRefreshTimeMs() {
return remoteRefreshTimeMs;
long getRemoteRefreshTimeMs() {
return remoteRefreshTimeMs.get();
}

void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
assert remoteRefreshTimeMs > this.remoteRefreshTimeMs.get() : "newRemoteRefreshTimeMs="
+ remoteRefreshTimeMs
+ ">="
+ "currentRemoteRefreshTimeMs="
+ this.remoteRefreshTimeMs.get();
this.remoteRefreshTimeMs.set(remoteRefreshTimeMs);
computeTimeMsLag();
}
Expand Down Expand Up @@ -294,7 +315,7 @@ private void computeBytesLag() {
}
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()
.stream()
.filter(f -> latestUploadFiles == null || latestUploadFiles.contains(f) == false)
.filter(f -> !latestUploadFiles.contains(f))
.collect(Collectors.toSet());
long bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum();
this.bytesLag.set(bytesLag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

public class RemoteRefreshSegmentPressureSettingsTest extends OpenSearchTestCase {
public class RemoteRefreshSegmentPressureSettingsTests extends OpenSearchTestCase {

private ClusterService clusterService;

Expand Down Expand Up @@ -224,7 +224,6 @@ public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() {
AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger();
AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger();


RemoteRefreshSegmentPressureService pressureService = mock(RemoteRefreshSegmentPressureService.class);

// Upload bytes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import static org.mockito.Mockito.mock;

public class RemoteRefreshSegmentPressureTrackerTests extends OpenSearchTestCase {

private RemoteRefreshSegmentPressureSettings pressureSettings;

private ClusterService clusterService;

private ThreadPool threadPool;

private ShardId shardId;

private RemoteRefreshSegmentPressureTracker pressureTracker;

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test");
clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
pressureSettings = new RemoteRefreshSegmentPressureSettings(
clusterService,
Settings.EMPTY,
mock(RemoteRefreshSegmentPressureService.class)
);
shardId = new ShardId("index", "uuid", 0);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testGetShardId() {
pressureTracker = new RemoteRefreshSegmentPressureTracker(shardId, pressureSettings);
assertEquals(shardId, pressureTracker.getShardId());
}

public void testUpdateLocalRefreshSeqNo() {
pressureTracker = new RemoteRefreshSegmentPressureTracker(shardId, pressureSettings);
long refreshSeqNo = 2;
pressureTracker.updateLocalRefreshSeqNo(refreshSeqNo);
assertEquals(refreshSeqNo, pressureTracker.getLocalRefreshSeqNo());
}

public void testUpdateRemoteRefreshSeqNo() {
pressureTracker = new RemoteRefreshSegmentPressureTracker(shardId, pressureSettings);
long refreshSeqNo = 4;
pressureTracker.updateRemoteRefreshSeqNo(refreshSeqNo);
assertEquals(refreshSeqNo, pressureTracker.getRemoteRefreshSeqNo());
}

public void testUpdateLocalRefreshTimeMs() {
pressureTracker = new RemoteRefreshSegmentPressureTracker(shardId, pressureSettings);
long refreshTimeMs = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100);
pressureTracker.updateLocalRefreshTimeMs(refreshTimeMs);
assertEquals(refreshTimeMs, pressureTracker.getLocalRefreshTimeMs());
}

public void testUpdateRemoteRefreshTimeMs() {
pressureTracker = new RemoteRefreshSegmentPressureTracker(shardId, pressureSettings);
long refreshTimeMs = System.nanoTime() / 1_000_000 + randomIntBetween(10, 100);
pressureTracker.updateRemoteRefreshTimeMs(refreshTimeMs);
assertEquals(refreshTimeMs, pressureTracker.getRemoteRefreshTimeMs());
}

public void testComputeSeqNoLagOnUpdate() {
pressureTracker = new RemoteRefreshSegmentPressureTracker(shardId, pressureSettings);
int localRefreshSeqNo = randomIntBetween(50, 100);
int remoteRefreshSeqNo = randomIntBetween(20, 50);
pressureTracker.updateLocalRefreshSeqNo(localRefreshSeqNo);
assertEquals(localRefreshSeqNo, pressureTracker.getSeqNoLag());
pressureTracker.updateRemoteRefreshSeqNo(remoteRefreshSeqNo);
assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, pressureTracker.getSeqNoLag());
}
}

0 comments on commit 8a76d1b

Please sign in to comment.