Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CalcitePlanContext {
public final QueryType queryType;
public final Integer querySizeLimit;

/** This thread local variable is only used to skip script encoding in script pushdown. */
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);

@Getter @Setter private boolean isResolvingJoinCondition = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ public void executeWithCalcite(
log.warn("Fallback to V2 query engine since got exception", t);
executeWithLegacy(plan, queryType, listener, Optional.of(t));
} else {
if (t instanceof Error) {
if (t instanceof Exception) {
listener.onFailure((Exception) t);
} else if (t instanceof VirtualMachineError) {
// throw and fast fail the VM errors such as OOM (same with v2).
throw t;
} else {
// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
} else {
listener.onFailure((Exception) t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.sql.opensearch.monitor.GCedMemoryUsage;
import org.opensearch.sql.ppl.PPLIntegTestCase;

@FixMethodOrder(MethodSorters.JVM)
Expand Down Expand Up @@ -50,12 +51,15 @@ public static void reset() throws IOException {

/**
* Ignore queries that are not supported by Calcite.
*
* <p>q30 is ignored because it will trigger ResourceMonitory health check. TODO: should be
* addressed by: https://github.com/opensearch-project/sql/issues/3981
*/
protected Set<Integer> ignored() {
return Set.of(29, 30);
if (GCedMemoryUsage.initialized()) {
return Set.of(29);
} else {
// Ignore q30 when use RuntimeMemoryUsage,
// because of too much script push down, which will cause ResourceMonitor restriction.
return Set.of(29, 30);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.EXPLAIN_API_ENDPOINT;
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.EXTENDED_EXPLAIN_API_ENDPOINT;
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT;

import com.google.common.io.Resources;
Expand All @@ -34,6 +33,8 @@

/** OpenSearch Rest integration test base for PPL testing. */
public abstract class PPLIntegTestCase extends SQLIntegTestCase {
private static final String EXTENDED_EXPLAIN_API_ENDPOINT =
"/_plugins/_ppl/_explain?format=extended";
private static final Logger LOG = LogManager.getLogger();
@Rule public final RetryProcessor retryProcessor = new RetryProcessor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,30 @@

package org.opensearch.sql.ppl;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG;
import static org.opensearch.sql.util.MatcherUtils.columnName;
import static org.opensearch.sql.util.MatcherUtils.verifyColumn;
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;

import java.io.IOException;
import org.hamcrest.Matchers;
import org.json.JSONObject;
import org.junit.Test;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.opensearch.monitor.GCedMemoryUsage;

public class ResourceMonitorIT extends PPLIntegTestCase {

@Override
public void init() throws Exception {
super.init();
loadIndex(Index.DOG);
loadIndex(Index.CLICK_BENCH);
}

@Test
public void queryExceedResourceLimitShouldFail() throws IOException {
// update plugins.ppl.query.memory_limit to 1%
updateClusterSettings(
new ClusterSetting("persistent", Settings.Key.QUERY_MEMORY_LIMIT.getKeyValue(), "1%"));
String query = String.format("search source=%s age=20", TEST_INDEX_DOG);

ResponseException exception = expectThrows(ResponseException.class, () -> executeQuery(query));
ResponseException exception = expectThrows(ResponseException.class, this::executeQuery);
assertEquals(500, exception.getResponse().getStatusLine().getStatusCode());
assertThat(
exception.getMessage(),
Expand All @@ -40,7 +37,23 @@ public void queryExceedResourceLimitShouldFail() throws IOException {
// update plugins.ppl.query.memory_limit to default value 85%
updateClusterSettings(
new ClusterSetting("persistent", Settings.Key.QUERY_MEMORY_LIMIT.getKeyValue(), "85%"));
JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG));
verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age"));
executeQuery();
}

private void executeQuery() throws IOException {
if (GCedMemoryUsage.initialized()) {
// ClickBench Q30 is a high memory consumption query. Run 5 times to ensure GC triggered.
String query = sanitize(loadFromFile("clickbench/queries/q30.ppl"));
for (int i = 0; i < 5; i++) {
JSONObject result = executeQuery(query);
verifyNumOfRows(result, 1);
}
} else {
// Q2 is not a high memory consumption query.
// It cannot run in 1% resource but passed in 85%.
String query = sanitize(loadFromFile("clickbench/queries/q2.ppl"));
JSONObject result = executeQuery(query);
verifyNumOfRows(result, 1);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.monitor;

import com.sun.management.GarbageCollectionNotificationInfo;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/** Get memory usage from GC notification listener, which is used in Calcite engine. */
public class GCedMemoryUsage implements MemoryUsage {
private static final Logger LOG = LogManager.getLogger();
private static final List<String> OLD_GEN_GC_ACTION_KEYWORDS =
List.of("major", "concurrent", "old", "full", "marksweep");
private static boolean initialized = false;

private GCedMemoryUsage() {
registerGCListener();
}

// Lazy initialize the instance to avoid register GCListener in v2.
private static class Holder {
static final MemoryUsage INSTANCE = new GCedMemoryUsage();
}

/**
* Get the singleton instance of GCedMemoryUsage.
*
* @return GCedMemoryUsage instance
*/
public static MemoryUsage getInstance() {
return Holder.INSTANCE;
}

private final AtomicLong usage = new AtomicLong(-1);

@Override
public long usage() {
return usage.get();
}

@Override
public void setUsage(long value) {
usage.set(value);
}

public static boolean initialized() {
return initialized;
}

private void registerGCListener() {
List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
if (gcBeans.stream()
.filter(b -> b instanceof NotificationEmitter)
.noneMatch(b -> containConcurrentGcBean(b.getName()))) {
// Concurrent Garbage Collector MXBean only existed since Java 21.
// fallback to RuntimeMemoryUsage
LOG.info("No Concurrent Garbage Collector MXBean, fallback to RuntimeMemoryUsage");
throw new OpenSearchMemoryHealthy.MemoryUsageException();
}
for (GarbageCollectorMXBean gcBean : gcBeans) {
if (gcBean instanceof NotificationEmitter && isOldGenGc(gcBean.getName())) {
LOG.info("{} listener registered for memory usage monitor.", gcBean.getName());
initialized = true;
NotificationEmitter emitter = (NotificationEmitter) gcBean;
emitter.addNotificationListener(
new OldGenGCListener(),
notification -> {
if (!notification
.getType()
.equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) {
return false;
}
CompositeData cd = (CompositeData) notification.getUserData();
GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd);
return isOldGenGc(info.getGcAction());
},
null);
}
}
}

private boolean containConcurrentGcBean(String beanName) {
return beanName.toLowerCase(Locale.ROOT).contains("concurrent");
}

private boolean isOldGenGc(String gcKeyword) {
String keyword = gcKeyword.toLowerCase(Locale.ROOT);
return OLD_GEN_GC_ACTION_KEYWORDS.stream().anyMatch(keyword::contains);
}

private static class OldGenGCListener implements NotificationListener {
@Override
public void handleNotification(Notification notification, Object handback) {
CompositeData cd = (CompositeData) notification.getUserData();
GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd);
Map<String, java.lang.management.MemoryUsage> memoryUsageAfterGc =
info.getGcInfo().getMemoryUsageAfterGc();
// Skip Metaspace and CodeHeap spaces which the GC scope is out of stack GC.
long totalStackUsed =
memoryUsageAfterGc.entrySet().stream()
.filter(
entry ->
!entry.getKey().equals("Metaspace")
&& !entry.getKey().equals("Compressed Class Space")
&& !entry.getKey().startsWith("CodeHeap"))
.mapToLong(entry -> entry.getValue().getUsed())
.sum();
getInstance().setUsage(totalStackUsed);
if (LOG.isDebugEnabled()) {
LOG.debug("Old Gen GC detected, memory usage after GC is {} bytes.", totalStackUsed);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.monitor;

/** Memory usage interface. It is used to get the memory usage of the VM. */
public interface MemoryUsage {
long usage();

void setUsage(long usage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
import java.util.concurrent.ThreadLocalRandom;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.opensearch.sql.common.setting.Settings;

/** OpenSearch Memory Monitor. */
@Log4j2
public class OpenSearchMemoryHealthy {
private final RandomFail randomFail;
private final MemoryUsage memoryUsage;

public OpenSearchMemoryHealthy() {
public OpenSearchMemoryHealthy(Settings settings) {
randomFail = new RandomFail();
memoryUsage = new MemoryUsage();
memoryUsage = buildMemoryUsage(settings);
}

@VisibleForTesting
Expand All @@ -27,6 +28,24 @@ public OpenSearchMemoryHealthy(RandomFail randomFail, MemoryUsage memoryUsage) {
this.memoryUsage = memoryUsage;
}

private MemoryUsage buildMemoryUsage(Settings settings) {
try {
return isCalciteEnabled(settings)
? GCedMemoryUsage.getInstance()
: RuntimeMemoryUsage.getInstance();
} catch (Throwable e) {
return RuntimeMemoryUsage.getInstance();
}
}

private boolean isCalciteEnabled(Settings settings) {
if (settings != null) {
return settings.getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED);
} else {
return false;
}
}

/** Is Memory Healthy. Calculate based on the current heap memory usage. */
public boolean isMemoryHealthy(long limitBytes) {
final long memoryUsage = this.memoryUsage.usage();
Expand All @@ -50,17 +69,12 @@ public boolean shouldFail() {
}
}

static class MemoryUsage {
public long usage() {
final long freeMemory = Runtime.getRuntime().freeMemory();
final long totalMemory = Runtime.getRuntime().totalMemory();
return totalMemory - freeMemory;
}
}
@NoArgsConstructor
public static class MemoryUsageExceedFastFailureException extends MemoryUsageException {}

@NoArgsConstructor
public static class MemoryUsageExceedFastFailureException extends RuntimeException {}
public static class MemoryUsageExceedException extends MemoryUsageException {}

@NoArgsConstructor
public static class MemoryUsageExceedException extends RuntimeException {}
public static class MemoryUsageException extends RuntimeException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.monitor;

/** Get memory usage from runtime, which is used in v2. */
public class RuntimeMemoryUsage implements MemoryUsage {
private RuntimeMemoryUsage() {}

private static class Holder {
static final MemoryUsage INSTANCE = new RuntimeMemoryUsage();
}

public static MemoryUsage getInstance() {
return Holder.INSTANCE;
}

@Override
public long usage() {
final long freeMemory = Runtime.getRuntime().freeMemory();
final long totalMemory = Runtime.getRuntime().totalMemory();
return totalMemory - freeMemory;
}

@Override
public void setUsage(long usage) {
throw new UnsupportedOperationException("Cannot set usage in RuntimeMemoryUsage");
}
}
Loading
Loading