Skip to content

Commit

Permalink
Retaining the old constructors for classes marked as API changed as p…
Browse files Browse the repository at this point in the history
…art of #12333 (#13926)

* Retaining the old constructors for classes marked as API changed as part of #12333
---------
Signed-off-by: Harsh Garg <gkharsh@amazon.com>
Co-authored-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
gargharsh3134 authored and Harsh Garg committed Jun 10, 2024
1 parent af6770a commit 7b17c4d
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -125,6 +126,10 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private NodeConnectionsService nodeConnectionsService;
private final ClusterManagerMetrics clusterManagerMetrics;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(nodeName, settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
}

public ClusterApplierService(
String nodeName,
Settings settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

/**
Expand All @@ -21,6 +22,11 @@
*/
@PublicApi(since = "2.2.0")
public class ClusterManagerService extends MasterService {

public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
}

public ClusterManagerService(
Settings settings,
ClusterSettings clusterSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.node.Node;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
Expand Down Expand Up @@ -92,6 +93,10 @@ public class ClusterService extends AbstractLifecycleComponent {

private IndexingPressureService indexingPressureService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
}

public ClusterService(
Settings settings,
ClusterSettings clusterSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.node.Node;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.client.NoOpClient;
Expand Down Expand Up @@ -84,13 +83,7 @@ public void testScheduling() {
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool();

final ClusterApplierService clusterApplierService = new ClusterApplierService(
"test",
settings,
clusterSettings,
threadPool,
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
) {
final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) {
@Override
protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -92,15 +93,15 @@ public class ClusterApplierServiceTests extends OpenSearchTestCase {
private static ThreadPool threadPool;
private TimedClusterApplierService clusterApplierService;
private static MetricsRegistry metricsRegistry;
private static Histogram applierslatenctHistogram;
private static Histogram listenerslatenctHistogram;
private static Histogram applierslatencyHistogram;
private static Histogram listenerslatencyHistogram;

@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(ClusterApplierServiceTests.class.getName());
metricsRegistry = mock(MetricsRegistry.class);
applierslatenctHistogram = mock(Histogram.class);
listenerslatenctHistogram = mock(Histogram.class);
applierslatencyHistogram = mock(Histogram.class);
listenerslatencyHistogram = mock(Histogram.class);
}

@AfterClass
Expand All @@ -117,11 +118,11 @@ public void setUp() throws Exception {
when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())).thenAnswer(invocationOnMock -> {
String histogramName = (String) invocationOnMock.getArguments()[0];
if (histogramName.contains("appliers.latency")) {
return applierslatenctHistogram;
return applierslatencyHistogram;
}
return listenerslatenctHistogram;
return listenerslatencyHistogram;
});
clusterApplierService = createTimedClusterService(true);
clusterApplierService = createTimedClusterService(true, Optional.of(metricsRegistry));
}

@After
Expand All @@ -130,14 +131,26 @@ public void tearDown() throws Exception {
super.tearDown();
}

private TimedClusterApplierService createTimedClusterService(boolean makeClusterManager) {
private TimedClusterApplierService createTimedClusterService(
boolean makeClusterManager,
Optional<MetricsRegistry> metricsRegistryOptional
) {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(
Settings.builder().put("cluster.name", "ClusterApplierServiceTests").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
new ClusterManagerMetrics(metricsRegistry)
);
TimedClusterApplierService timedClusterApplierService;
if (metricsRegistryOptional != null && metricsRegistryOptional.isPresent()) {
timedClusterApplierService = new TimedClusterApplierService(
Settings.builder().put("cluster.name", "ClusterApplierServiceTests").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
new ClusterManagerMetrics(metricsRegistry)
);
} else {
timedClusterApplierService = new TimedClusterApplierService(
Settings.builder().put("cluster.name", "ClusterApplierServiceTests").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
}
timedClusterApplierService.setNodeConnectionsService(createNoOpNodeConnectionsService());
timedClusterApplierService.setInitialState(
ClusterState.builder(new ClusterName("ClusterApplierServiceTests"))
Expand Down Expand Up @@ -220,8 +233,8 @@ public void onFailure(String source, Exception e) {
});
assertBusy(mockAppender::assertAllExpectationsMatched);
}
verifyNoInteractions(applierslatenctHistogram);
verifyNoInteractions(listenerslatenctHistogram);
verifyNoInteractions(applierslatencyHistogram);
verifyNoInteractions(listenerslatencyHistogram);
}

@TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level")
Expand Down Expand Up @@ -319,12 +332,12 @@ public void onFailure(String source, Exception e) {
latch.await();
mockAppender.assertAllExpectationsMatched();
}
verifyNoInteractions(applierslatenctHistogram);
verifyNoInteractions(listenerslatenctHistogram);
verifyNoInteractions(applierslatencyHistogram);
verifyNoInteractions(listenerslatencyHistogram);
}

public void testLocalNodeClusterManagerListenerCallbacks() {
TimedClusterApplierService timedClusterApplierService = createTimedClusterService(false);
TimedClusterApplierService timedClusterApplierService = createTimedClusterService(false, Optional.empty());

AtomicBoolean isClusterManager = new AtomicBoolean();
timedClusterApplierService.addLocalNodeClusterManagerListener(new LocalNodeClusterManagerListener() {
Expand Down Expand Up @@ -359,9 +372,7 @@ public void offClusterManager() {
setState(timedClusterApplierService, state);
assertThat(isClusterManager.get(), is(true));

verify(listenerslatenctHistogram, atLeastOnce()).record(anyDouble(), any());
clearInvocations(listenerslatenctHistogram);
verifyNoInteractions(applierslatenctHistogram);
verifyNoInteractions(applierslatencyHistogram, listenerslatencyHistogram);

timedClusterApplierService.close();
}
Expand All @@ -372,7 +383,7 @@ public void offClusterManager() {
* To support inclusive language, LocalNodeMasterListener is deprecated in 2.2.
*/
public void testDeprecatedLocalNodeMasterListenerCallbacks() {
TimedClusterApplierService timedClusterApplierService = createTimedClusterService(false);
TimedClusterApplierService timedClusterApplierService = createTimedClusterService(false, Optional.empty());

AtomicBoolean isClusterManager = new AtomicBoolean();
timedClusterApplierService.addLocalNodeMasterListener(new LocalNodeMasterListener() {
Expand Down Expand Up @@ -400,9 +411,7 @@ public void offMaster() {
setState(timedClusterApplierService, state);
assertThat(isClusterManager.get(), is(false));

verify(listenerslatenctHistogram, atLeastOnce()).record(anyDouble(), any());
clearInvocations(listenerslatenctHistogram);
verifyNoInteractions(applierslatenctHistogram);
verifyNoInteractions(applierslatencyHistogram, listenerslatencyHistogram);

timedClusterApplierService.close();
}
Expand Down Expand Up @@ -444,9 +453,9 @@ public void onFailure(String source, Exception e) {
assertNull(error.get());
assertTrue(applierCalled.get());

verify(applierslatenctHistogram, atLeastOnce()).record(anyDouble(), any());
clearInvocations(applierslatenctHistogram);
verifyNoInteractions(listenerslatenctHistogram);
verify(applierslatencyHistogram, atLeastOnce()).record(anyDouble(), any());
clearInvocations(applierslatencyHistogram);
verifyNoInteractions(listenerslatencyHistogram);
}

public void testClusterStateApplierBubblesUpExceptionsInApplier() throws InterruptedException {
Expand Down Expand Up @@ -478,8 +487,8 @@ public void onFailure(String source, Exception e) {
assertNotNull(error.get());
assertThat(error.get().getMessage(), containsString("dummy exception"));

verifyNoInteractions(applierslatenctHistogram);
verifyNoInteractions(listenerslatenctHistogram);
verifyNoInteractions(applierslatencyHistogram);
verifyNoInteractions(listenerslatencyHistogram);
}

public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws InterruptedException {
Expand Down Expand Up @@ -524,8 +533,8 @@ public void onFailure(String source, Exception e) {
assertNotNull(error.get());
assertThat(error.get().getMessage(), containsString("illegal value can't update"));

verifyNoInteractions(applierslatenctHistogram);
verifyNoInteractions(listenerslatenctHistogram);
verifyNoInteractions(applierslatencyHistogram);
verifyNoInteractions(listenerslatencyHistogram);
}

public void testClusterStateApplierSwallowsExceptionInListener() throws InterruptedException {
Expand Down Expand Up @@ -558,8 +567,8 @@ public void onFailure(String source, Exception e) {
assertNull(error.get());
assertTrue(applierCalled.get());

verifyNoInteractions(applierslatenctHistogram);
verifyNoInteractions(listenerslatenctHistogram);
verifyNoInteractions(applierslatencyHistogram);
verifyNoInteractions(listenerslatencyHistogram);
}

public void testClusterStateApplierCanCreateAnObserver() throws InterruptedException {
Expand Down Expand Up @@ -617,9 +626,9 @@ public void onFailure(String source, Exception e) {
assertNull(error.get());
assertTrue(applierCalled.get());

verify(applierslatenctHistogram, atLeastOnce()).record(anyDouble(), any());
clearInvocations(applierslatenctHistogram);
verifyNoInteractions(listenerslatenctHistogram);
verify(applierslatencyHistogram, atLeastOnce()).record(anyDouble(), any());
clearInvocations(applierslatencyHistogram);
verifyNoInteractions(listenerslatencyHistogram);
}

public void testThreadContext() throws InterruptedException {
Expand Down Expand Up @@ -665,8 +674,8 @@ public void onFailure(String source, Exception e) {

latch.await();

verifyNoInteractions(applierslatenctHistogram);
verifyNoInteractions(listenerslatenctHistogram);
verifyNoInteractions(applierslatencyHistogram);
verifyNoInteractions(listenerslatencyHistogram);
}

static class TimedClusterApplierService extends ClusterApplierService {
Expand All @@ -675,6 +684,11 @@ static class TimedClusterApplierService extends ClusterApplierService {
volatile Long currentTimeOverride = null;
boolean applicationMayFail;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super("test_node", settings, clusterSettings, threadPool);
this.clusterSettings = clusterSettings;
}

TimedClusterApplierService(
Settings settings,
ClusterSettings clusterSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.junit.After;
Expand All @@ -27,11 +26,11 @@ public void terminateThreadPool() {

public void testDeprecatedGetMasterServiceBWC() {
try (
ClusterService clusterService = ClusterServiceUtils.createClusterService(
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
)
) {
MasterService masterService = clusterService.getMasterService();
ClusterManagerService clusterManagerService = clusterService.getClusterManagerService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -136,20 +137,36 @@ public void randomizeCurrentTime() {
}

private ClusterManagerService createClusterManagerService(boolean makeClusterManager) {
return createClusterManagerService(makeClusterManager, NoopMetricsRegistry.INSTANCE);
return createClusterManagerService(makeClusterManager, Optional.empty());
}

private ClusterManagerService createClusterManagerService(boolean makeClusterManager, MetricsRegistry metricsRegistry) {
private ClusterManagerService createClusterManagerService(
boolean makeClusterManager,
Optional<MetricsRegistry> metricsRegistryOptional
) {
final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
final ClusterManagerService clusterManagerService = new ClusterManagerService(
Settings.builder()
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
.put(Node.NODE_NAME_SETTING.getKey(), "test_node")
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
new ClusterManagerMetrics(metricsRegistry)
);
final ClusterManagerService clusterManagerService;
if (metricsRegistryOptional != null && metricsRegistryOptional.isPresent()) {
clusterManagerService = new ClusterManagerService(
Settings.builder()
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
.put(Node.NODE_NAME_SETTING.getKey(), "test_node")
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
new ClusterManagerMetrics(metricsRegistryOptional.get())
);
} else {
clusterManagerService = new ClusterManagerService(
Settings.builder()
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
.put(Node.NODE_NAME_SETTING.getKey(), "test_node")
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
}

final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
.nodes(
DiscoveryNodes.builder()
Expand Down Expand Up @@ -181,7 +198,7 @@ public void testClusterManagerAwareExecution() throws Exception {
return clusterStatePublishHistogram;
});

final ClusterManagerService nonClusterManager = createClusterManagerService(false, metricsRegistry);
final ClusterManagerService nonClusterManager = createClusterManagerService(false, Optional.of(metricsRegistry));

final boolean[] taskFailed = { false };
final CountDownLatch latch1 = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1921,13 +1921,7 @@ private final class TestClusterNode {
settings,
clusterSettings,
clusterManagerService,
new ClusterApplierService(
node.getName(),
settings,
clusterSettings,
threadPool,
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
) {
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
@Override
protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
Expand Down

0 comments on commit 7b17c4d

Please sign in to comment.