Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.sql.opensearch.monitor.GCedMemoryUsage;
import org.opensearch.sql.ppl.PPLIntegTestCase;

@FixMethodOrder(MethodSorters.JVM)
Expand Down Expand Up @@ -48,14 +49,15 @@ public static void reset() throws IOException {
System.out.println();
}

/**
* Ignore queries that are not supported by Calcite.
*
* <p>q30 is ignored because it will trigger ResourceMonitory health check. TODO: should be
* addressed by: https://github.com/opensearch-project/sql/issues/3981
*/
/** Ignore queries that are not supported by Calcite. */
protected Set<Integer> ignored() {
return Set.of(29);
if (GCedMemoryUsage.initialized()) {
return Set.of(29);
} else {
// Ignore q30 when use RuntimeMemoryUsage,
// because of too much script push down, which will cause ResourceMonitor restriction.
return Set.of(29, 30);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,30 @@

package org.opensearch.sql.ppl;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG;
import static org.opensearch.sql.util.MatcherUtils.columnName;
import static org.opensearch.sql.util.MatcherUtils.verifyColumn;
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;

import java.io.IOException;
import org.hamcrest.Matchers;
import org.json.JSONObject;
import org.junit.Test;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.opensearch.monitor.GCedMemoryUsage;

public class ResourceMonitorIT extends PPLIntegTestCase {

@Override
public void init() throws Exception {
super.init();
loadIndex(Index.DOG);
loadIndex(Index.CLICK_BENCH);
}

@Test
public void queryExceedResourceLimitShouldFail() throws IOException {
// update plugins.ppl.query.memory_limit to 1%
updateClusterSettings(
new ClusterSetting("persistent", Settings.Key.QUERY_MEMORY_LIMIT.getKeyValue(), "1%"));
String query = String.format("search source=%s age=20", TEST_INDEX_DOG);

ResponseException exception = expectThrows(ResponseException.class, () -> executeQuery(query));
ResponseException exception = expectThrows(ResponseException.class, this::executeQuery);
assertEquals(500, exception.getResponse().getStatusLine().getStatusCode());
assertThat(
exception.getMessage(),
Expand All @@ -40,7 +37,23 @@ public void queryExceedResourceLimitShouldFail() throws IOException {
// update plugins.ppl.query.memory_limit to default value 85%
updateClusterSettings(
new ClusterSetting("persistent", Settings.Key.QUERY_MEMORY_LIMIT.getKeyValue(), "85%"));
JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG));
verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age"));
executeQuery();
}

private void executeQuery() throws IOException {
if (GCedMemoryUsage.initialized()) {
// ClickBench Q30 is a high memory consumption query. Run 5 times to ensure GC triggered.
String query = sanitize(loadFromFile("clickbench/queries/q30.ppl"));
for (int i = 0; i < 5; i++) {
JSONObject result = executeQuery(query);
verifyNumOfRows(result, 1);
}
} else {
// Q2 is not a high memory consumption query.
// It cannot run in 1% resource but passed in 85%.
String query = sanitize(loadFromFile("clickbench/queries/q2.ppl"));
JSONObject result = executeQuery(query);
verifyNumOfRows(result, 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class GCedMemoryUsage implements MemoryUsage {
private static final Logger LOG = LogManager.getLogger();
private static final List<String> OLD_GEN_GC_ACTION_KEYWORDS =
List.of("major", "concurrent", "old", "full", "marksweep");
private static boolean initialized = false;

private GCedMemoryUsage() {
registerGCListener();
Expand Down Expand Up @@ -55,13 +56,24 @@ public void setUsage(long value) {
usage.set(value);
}

public static boolean initialized() {
return initialized;
}

private void registerGCListener() {
boolean registered = false;
List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
if (gcBeans.stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we get runtime exception on 2.19-dev branch? Which JDK version this feature depend on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't backported to 2.19-dev. I am still working on backporting #4105. Then the JDK version < 20, there is no specific G1 Concurrent GC Monitoring registered, ref https://github.com/openjdk/jdk/blob/jdk-19-ga/src/hotspot/share/gc/g1/g1MonitoringSupport.cpp#L91. From JDK 20, G1 Concurrent GC was registered in Monitoring, ref https://github.com/openjdk/jdk/blob/jdk-20-ga/src/hotspot/share/gc/g1/g1MonitoringSupport.cpp#L93.

This PR is to verify if there a Concurrent MXBean registered in JDK before initialing a GCedMemoryUsage.

.filter(b -> b instanceof NotificationEmitter)
.noneMatch(b -> containConcurrentGcBean(b.getName()))) {
// Concurrent Garbage Collector MXBean only existed since Java 21.
// fallback to RuntimeMemoryUsage
LOG.info("No Concurrent Garbage Collector MXBean, fallback to RuntimeMemoryUsage");
throw new OpenSearchMemoryHealthy.MemoryUsageException();
}
for (GarbageCollectorMXBean gcBean : gcBeans) {
if (gcBean instanceof NotificationEmitter && isOldGenGc(gcBean.getName())) {
LOG.info("{} listener registered for memory usage monitor.", gcBean.getName());
registered = true;
initialized = true;
NotificationEmitter emitter = (NotificationEmitter) gcBean;
emitter.addNotificationListener(
new OldGenGCListener(),
Expand All @@ -78,11 +90,10 @@ private void registerGCListener() {
null);
}
}
if (!registered) {
// fallback to RuntimeMemoryUsage
LOG.info("No old gen GC listener registered, fallback to RuntimeMemoryUsage");
throw new OpenSearchMemoryHealthy.MemoryUsageException();
}
}

private boolean containConcurrentGcBean(String beanName) {
return beanName.toLowerCase(Locale.ROOT).contains("concurrent");
}

private boolean isOldGenGc(String gcKeyword) {
Expand Down
Loading