Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CalcitePlanContext {
public final QueryType queryType;
public final Integer querySizeLimit;

/** This thread local variable is only used to skip script encoding in script pushdown. */
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);

@Getter @Setter private boolean isResolvingJoinCondition = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ public void executeWithCalcite(
log.warn("Fallback to V2 query engine since got exception", t);
executeWithLegacy(plan, queryType, listener, Optional.of(t));
} else {
if (t instanceof Error) {
if (t instanceof Exception) {
listener.onFailure((Exception) t);
} else if (t instanceof VirtualMachineError) {
// throw and fast fail the VM errors such as OOM (same with v2).
throw t;
} else {
Comment on lines +114 to +119
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To align with behaviour of v2, OOM error will fail the whole JVM. We caught OOM error in previous implementation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It think it's OK to keep align with v2. But will it be better to trigger GC and then check memory usage again before making the service crash? Anyway, the primary purpose of this PR should not be blocked by this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But will it be better to trigger GC and then check memory usage again before making the service crash?

Maybe more discussion is required and track in a follow-up issue.

// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
} else {
listener.onFailure((Exception) t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static void reset() throws IOException {
* down, which will cause ResourceMonitor restriction.
*/
protected Set<Integer> ignored() {
return Set.of(29, 30);
return Set.of(29);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.EXPLAIN_API_ENDPOINT;
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.EXTENDED_EXPLAIN_API_ENDPOINT;
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT;

import com.google.common.io.Resources;
Expand All @@ -34,6 +33,8 @@

/** OpenSearch Rest integration test base for PPL testing. */
public abstract class PPLIntegTestCase extends SQLIntegTestCase {
private static final String EXTENDED_EXPLAIN_API_ENDPOINT =
"/_plugins/_ppl/_explain?format=extended";
private static final Logger LOG = LogManager.getLogger();
@Rule public final RetryProcessor retryProcessor = new RetryProcessor();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.monitor;

import com.sun.management.GarbageCollectionNotificationInfo;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/** Get memory usage from GC notification listener, which is used in Calcite engine. */
public class GCedMemoryUsage implements MemoryUsage {
private static final Logger LOG = LogManager.getLogger();
private static final List<String> OLD_GEN_GC_ACTION_KEYWORDS =
List.of("major", "concurrent", "old", "full", "marksweep");

private GCedMemoryUsage() {
registerGCListener();
}

// Lazy initialize the instance to avoid register GCListener in v2.
private static class Holder {
static final MemoryUsage INSTANCE = new GCedMemoryUsage();
}

/**
* Get the singleton instance of GCedMemoryUsage.
*
* @return GCedMemoryUsage instance
*/
public static MemoryUsage getInstance() {
return Holder.INSTANCE;
}

private final AtomicLong usage = new AtomicLong(-1);

@Override
public long usage() {
return usage.get();
}

@Override
public void setUsage(long value) {
usage.set(value);
}

private void registerGCListener() {
boolean registered = false;
List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean gcBean : gcBeans) {
if (gcBean instanceof NotificationEmitter && isOldGenGc(gcBean.getName())) {
LOG.info("{} listener registered for memory usage monitor.", gcBean.getName());
registered = true;
NotificationEmitter emitter = (NotificationEmitter) gcBean;
emitter.addNotificationListener(
new OldGenGCListener(),
notification -> {
if (!notification
.getType()
.equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) {
return false;
}
CompositeData cd = (CompositeData) notification.getUserData();
GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd);
return isOldGenGc(info.getGcAction());
},
null);
}
}
if (!registered) {
// fallback to RuntimeMemoryUsage
LOG.info("No old gen GC listener registered, fallback to RuntimeMemoryUsage");
throw new OpenSearchMemoryHealthy.MemoryUsageException();
}
}

private boolean isOldGenGc(String gcKeyword) {
String keyword = gcKeyword.toLowerCase(Locale.ROOT);
return OLD_GEN_GC_ACTION_KEYWORDS.stream().anyMatch(keyword::contains);
}

private static class OldGenGCListener implements NotificationListener {
@Override
public void handleNotification(Notification notification, Object handback) {
CompositeData cd = (CompositeData) notification.getUserData();
GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd);
Map<String, java.lang.management.MemoryUsage> memoryUsageAfterGc =
info.getGcInfo().getMemoryUsageAfterGc();
// Skip Metaspace and CodeHeap spaces which the GC scope is out of stack GC.
long totalStackUsed =
memoryUsageAfterGc.entrySet().stream()
.filter(
entry ->
!entry.getKey().equals("Metaspace")
&& !entry.getKey().equals("Compressed Class Space")
&& !entry.getKey().startsWith("CodeHeap"))
.mapToLong(entry -> entry.getValue().getUsed())
.sum();
getInstance().setUsage(totalStackUsed);
if (LOG.isDebugEnabled()) {
LOG.debug("Old Gen GC detected, memory usage after GC is {} bytes.", totalStackUsed);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.monitor;

/** Memory usage interface. It is used to get the memory usage of the VM. */
public interface MemoryUsage {
long usage();

void setUsage(long usage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
import java.util.concurrent.ThreadLocalRandom;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.opensearch.sql.common.setting.Settings;

/** OpenSearch Memory Monitor. */
@Log4j2
public class OpenSearchMemoryHealthy {
private final RandomFail randomFail;
private final MemoryUsage memoryUsage;

public OpenSearchMemoryHealthy() {
public OpenSearchMemoryHealthy(Settings settings) {
randomFail = new RandomFail();
memoryUsage = new MemoryUsage();
memoryUsage = buildMemoryUsage(settings);
}

@VisibleForTesting
Expand All @@ -27,6 +28,24 @@ public OpenSearchMemoryHealthy(RandomFail randomFail, MemoryUsage memoryUsage) {
this.memoryUsage = memoryUsage;
}

private MemoryUsage buildMemoryUsage(Settings settings) {
try {
return isCalciteEnabled(settings)
? GCedMemoryUsage.getInstance()
: RuntimeMemoryUsage.getInstance();
} catch (Throwable e) {
return RuntimeMemoryUsage.getInstance();
}
}

private boolean isCalciteEnabled(Settings settings) {
if (settings != null) {
return settings.getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED);
} else {
return false;
}
}

/** Is Memory Healthy. Calculate based on the current heap memory usage. */
public boolean isMemoryHealthy(long limitBytes) {
final long memoryUsage = this.memoryUsage.usage();
Expand All @@ -50,17 +69,12 @@ public boolean shouldFail() {
}
}

static class MemoryUsage {
public long usage() {
final long freeMemory = Runtime.getRuntime().freeMemory();
final long totalMemory = Runtime.getRuntime().totalMemory();
return totalMemory - freeMemory;
}
}
@NoArgsConstructor
public static class MemoryUsageExceedFastFailureException extends MemoryUsageException {}

@NoArgsConstructor
public static class MemoryUsageExceedFastFailureException extends RuntimeException {}
public static class MemoryUsageExceedException extends MemoryUsageException {}

@NoArgsConstructor
public static class MemoryUsageExceedException extends RuntimeException {}
public static class MemoryUsageException extends RuntimeException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.monitor;

/** Get memory usage from runtime, which is used in v2. */
public class RuntimeMemoryUsage implements MemoryUsage {
private RuntimeMemoryUsage() {}

private static class Holder {
static final MemoryUsage INSTANCE = new RuntimeMemoryUsage();
}

public static MemoryUsage getInstance() {
return Holder.INSTANCE;
}

@Override
public long usage() {
final long freeMemory = Runtime.getRuntime().freeMemory();
final long totalMemory = Runtime.getRuntime().totalMemory();
return totalMemory - freeMemory;
}

@Override
public void setUsage(long usage) {
throw new UnsupportedOperationException("Cannot set usage in RuntimeMemoryUsage");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import org.apache.calcite.DataContext.Variable;
Expand Down Expand Up @@ -175,6 +176,15 @@ public static QueryBuilder analyze(
return analyzeExpression(expression, schema, fieldTypes, rowType, cluster).builder();
}

/**
* Analyzes the expression and returns a {@link QueryExpression}.
*
* @param expression expression to analyze
* @param schema current schema of scan operator
* @param fieldTypes mapping of OpenSearch field name to ExprType, nested fields are flattened
* @return search query which can be used to query OS cluster
* @throws ExpressionNotAnalyzableException when expression can't processed by this analyzer
*/
public static QueryExpression analyzeExpression(
RexNode expression,
List<String> schema,
Expand All @@ -183,10 +193,28 @@ public static QueryExpression analyzeExpression(
RelOptCluster cluster)
throws ExpressionNotAnalyzableException {
requireNonNull(expression, "expression");
return analyzeExpression(
expression,
schema,
fieldTypes,
rowType,
cluster,
new Visitor(schema, fieldTypes, rowType, cluster));
}

/** For test only, passing a customer Visitor */
public static QueryExpression analyzeExpression(
RexNode expression,
List<String> schema,
Map<String, ExprType> fieldTypes,
RelDataType rowType,
RelOptCluster cluster,
Visitor visitor)
throws ExpressionNotAnalyzableException {
requireNonNull(expression, "expression");
try {
// visits expression tree
QueryExpression queryExpression =
(QueryExpression) expression.accept(new Visitor(schema, fieldTypes, rowType, cluster));
QueryExpression queryExpression = (QueryExpression) expression.accept(visitor);
return queryExpression;
} catch (Throwable e) {
if (e instanceof UnsupportedScriptException) {
Expand All @@ -201,14 +229,14 @@ public static QueryExpression analyzeExpression(
}

/** Traverses {@link RexNode} tree and builds OpenSearch query. */
private static class Visitor extends RexVisitorImpl<Expression> {
static class Visitor extends RexVisitorImpl<Expression> {

List<String> schema;
Map<String, ExprType> fieldTypes;
RelDataType rowType;
RelOptCluster cluster;

private Visitor(
Visitor(
List<String> schema,
Map<String, ExprType> fieldTypes,
RelDataType rowType,
Expand Down Expand Up @@ -699,7 +727,7 @@ private QueryExpression andOr(RexCall call) {
}
}

private Expression tryAnalyzeOperand(RexNode node) {
public Expression tryAnalyzeOperand(RexNode node) {
try {
Expression expr = node.accept(this);
if (expr instanceof NamedFieldExpression) {
Expand Down Expand Up @@ -1374,18 +1402,20 @@ private static String ipValueForPushDown(String value) {
}

public static class ScriptQueryExpression extends QueryExpression {
private final String code;
private RexNode analyzedNode;
// use lambda to generate code lazily to avoid store generated code
private final Supplier<String> codeGenerator;

public ScriptQueryExpression(
RexNode rexNode,
RelDataType rowType,
Map<String, ExprType> fieldTypes,
RelOptCluster cluster) {
RelJsonSerializer serializer = new RelJsonSerializer(cluster);
this.code =
SerializationWrapper.wrapWithLangType(
ScriptEngineType.CALCITE, serializer.serialize(rexNode, rowType, fieldTypes));
this.codeGenerator =
() ->
SerializationWrapper.wrapWithLangType(
ScriptEngineType.CALCITE, serializer.serialize(rexNode, rowType, fieldTypes));
}

@Override
Expand All @@ -1402,7 +1432,7 @@ public Script getScript() {
return new Script(
DEFAULT_SCRIPT_TYPE,
COMPOUNDED_LANG_NAME,
code,
codeGenerator.get(),
Collections.emptyMap(),
Map.of(Variable.UTC_TIMESTAMP.camelName, currentTime));
}
Expand Down
Loading
Loading