diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 8afddd4579c..d10f4fbf6b3 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -36,6 +36,7 @@ public class CalcitePlanContext { public final QueryType queryType; public final Integer querySizeLimit; + /** This thread local variable is only used to skip script encoding in script pushdown. */ public static final ThreadLocal skipEncoding = ThreadLocal.withInitial(() -> false); @Getter @Setter private boolean isResolvingJoinCondition = false; diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index a9f84c9bc63..7afbfdf5ba2 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -111,11 +111,14 @@ public void executeWithCalcite( log.warn("Fallback to V2 query engine since got exception", t); executeWithLegacy(plan, queryType, listener, Optional.of(t)); } else { - if (t instanceof Error) { + if (t instanceof Exception) { + listener.onFailure((Exception) t); + } else if (t instanceof VirtualMachineError) { + // throw and fast fail the VM errors such as OOM (same with v2). + throw t; + } else { // Calcite may throw AssertError during query execution. listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t)); - } else { - listener.onFailure((Exception) t); } } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java index af7d748f5cd..52761dc9b2a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java @@ -53,7 +53,7 @@ public static void reset() throws IOException { * down, which will cause ResourceMonitor restriction. */ protected Set ignored() { - return Set.of(29, 30); + return Set.of(29); } @Test diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java index 43aad2d10a3..db1ee17f344 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java @@ -7,7 +7,6 @@ import static org.opensearch.sql.legacy.TestUtils.getResponseBody; import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.EXPLAIN_API_ENDPOINT; -import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.EXTENDED_EXPLAIN_API_ENDPOINT; import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT; import com.google.common.io.Resources; @@ -34,6 +33,8 @@ /** OpenSearch Rest integration test base for PPL testing. */ public abstract class PPLIntegTestCase extends SQLIntegTestCase { + private static final String EXTENDED_EXPLAIN_API_ENDPOINT = + "/_plugins/_ppl/_explain?format=extended"; private static final Logger LOG = LogManager.getLogger(); @Rule public final RetryProcessor retryProcessor = new RetryProcessor(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/GCedMemoryUsage.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/GCedMemoryUsage.java new file mode 100644 index 00000000000..a94209b0ef3 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/GCedMemoryUsage.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.monitor; + +import com.sun.management.GarbageCollectionNotificationInfo; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import javax.management.Notification; +import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.openmbean.CompositeData; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** Get memory usage from GC notification listener, which is used in Calcite engine. */ +public class GCedMemoryUsage implements MemoryUsage { + private static final Logger LOG = LogManager.getLogger(); + private static final List OLD_GEN_GC_ACTION_KEYWORDS = + List.of("major", "concurrent", "old", "full", "marksweep"); + + private GCedMemoryUsage() { + registerGCListener(); + } + + // Lazy initialize the instance to avoid register GCListener in v2. + private static class Holder { + static final MemoryUsage INSTANCE = new GCedMemoryUsage(); + } + + /** + * Get the singleton instance of GCedMemoryUsage. + * + * @return GCedMemoryUsage instance + */ + public static MemoryUsage getInstance() { + return Holder.INSTANCE; + } + + private final AtomicLong usage = new AtomicLong(-1); + + @Override + public long usage() { + return usage.get(); + } + + @Override + public void setUsage(long value) { + usage.set(value); + } + + private void registerGCListener() { + boolean registered = false; + List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + if (gcBean instanceof NotificationEmitter && isOldGenGc(gcBean.getName())) { + LOG.info("{} listener registered for memory usage monitor.", gcBean.getName()); + registered = true; + NotificationEmitter emitter = (NotificationEmitter) gcBean; + emitter.addNotificationListener( + new OldGenGCListener(), + notification -> { + if (!notification + .getType() + .equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) { + return false; + } + CompositeData cd = (CompositeData) notification.getUserData(); + GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd); + return isOldGenGc(info.getGcAction()); + }, + null); + } + } + if (!registered) { + // fallback to RuntimeMemoryUsage + LOG.info("No old gen GC listener registered, fallback to RuntimeMemoryUsage"); + throw new OpenSearchMemoryHealthy.MemoryUsageException(); + } + } + + private boolean isOldGenGc(String gcKeyword) { + String keyword = gcKeyword.toLowerCase(Locale.ROOT); + return OLD_GEN_GC_ACTION_KEYWORDS.stream().anyMatch(keyword::contains); + } + + private static class OldGenGCListener implements NotificationListener { + @Override + public void handleNotification(Notification notification, Object handback) { + CompositeData cd = (CompositeData) notification.getUserData(); + GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd); + Map memoryUsageAfterGc = + info.getGcInfo().getMemoryUsageAfterGc(); + // Skip Metaspace and CodeHeap spaces which the GC scope is out of stack GC. + long totalStackUsed = + memoryUsageAfterGc.entrySet().stream() + .filter( + entry -> + !entry.getKey().equals("Metaspace") + && !entry.getKey().equals("Compressed Class Space") + && !entry.getKey().startsWith("CodeHeap")) + .mapToLong(entry -> entry.getValue().getUsed()) + .sum(); + getInstance().setUsage(totalStackUsed); + if (LOG.isDebugEnabled()) { + LOG.debug("Old Gen GC detected, memory usage after GC is {} bytes.", totalStackUsed); + } + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/MemoryUsage.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/MemoryUsage.java new file mode 100644 index 00000000000..fa911c1e69d --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/MemoryUsage.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.monitor; + +/** Memory usage interface. It is used to get the memory usage of the VM. */ +public interface MemoryUsage { + long usage(); + + void setUsage(long usage); +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthy.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthy.java index bc038cb42fb..2d462f07cc4 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthy.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthy.java @@ -9,6 +9,7 @@ import java.util.concurrent.ThreadLocalRandom; import lombok.NoArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.opensearch.sql.common.setting.Settings; /** OpenSearch Memory Monitor. */ @Log4j2 @@ -16,9 +17,9 @@ public class OpenSearchMemoryHealthy { private final RandomFail randomFail; private final MemoryUsage memoryUsage; - public OpenSearchMemoryHealthy() { + public OpenSearchMemoryHealthy(Settings settings) { randomFail = new RandomFail(); - memoryUsage = new MemoryUsage(); + memoryUsage = buildMemoryUsage(settings); } @VisibleForTesting @@ -27,6 +28,24 @@ public OpenSearchMemoryHealthy(RandomFail randomFail, MemoryUsage memoryUsage) { this.memoryUsage = memoryUsage; } + private MemoryUsage buildMemoryUsage(Settings settings) { + try { + return isCalciteEnabled(settings) + ? GCedMemoryUsage.getInstance() + : RuntimeMemoryUsage.getInstance(); + } catch (Throwable e) { + return RuntimeMemoryUsage.getInstance(); + } + } + + private boolean isCalciteEnabled(Settings settings) { + if (settings != null) { + return settings.getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED); + } else { + return false; + } + } + /** Is Memory Healthy. Calculate based on the current heap memory usage. */ public boolean isMemoryHealthy(long limitBytes) { final long memoryUsage = this.memoryUsage.usage(); @@ -50,17 +69,12 @@ public boolean shouldFail() { } } - static class MemoryUsage { - public long usage() { - final long freeMemory = Runtime.getRuntime().freeMemory(); - final long totalMemory = Runtime.getRuntime().totalMemory(); - return totalMemory - freeMemory; - } - } + @NoArgsConstructor + public static class MemoryUsageExceedFastFailureException extends MemoryUsageException {} @NoArgsConstructor - public static class MemoryUsageExceedFastFailureException extends RuntimeException {} + public static class MemoryUsageExceedException extends MemoryUsageException {} @NoArgsConstructor - public static class MemoryUsageExceedException extends RuntimeException {} + public static class MemoryUsageException extends RuntimeException {} } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/RuntimeMemoryUsage.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/RuntimeMemoryUsage.java new file mode 100644 index 00000000000..c621700e8a5 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/RuntimeMemoryUsage.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.monitor; + +/** Get memory usage from runtime, which is used in v2. */ +public class RuntimeMemoryUsage implements MemoryUsage { + private RuntimeMemoryUsage() {} + + private static class Holder { + static final MemoryUsage INSTANCE = new RuntimeMemoryUsage(); + } + + public static MemoryUsage getInstance() { + return Holder.INSTANCE; + } + + @Override + public long usage() { + final long freeMemory = Runtime.getRuntime().freeMemory(); + final long totalMemory = Runtime.getRuntime().totalMemory(); + return totalMemory - freeMemory; + } + + @Override + public void setUsage(long usage) { + throw new UnsupportedOperationException("Cannot set usage in RuntimeMemoryUsage"); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 3ebaf77e510..8b6f8d0efe9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -55,6 +55,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import lombok.Getter; import lombok.Setter; import org.apache.calcite.DataContext.Variable; @@ -175,6 +176,15 @@ public static QueryBuilder analyze( return analyzeExpression(expression, schema, fieldTypes, rowType, cluster).builder(); } + /** + * Analyzes the expression and returns a {@link QueryExpression}. + * + * @param expression expression to analyze + * @param schema current schema of scan operator + * @param fieldTypes mapping of OpenSearch field name to ExprType, nested fields are flattened + * @return search query which can be used to query OS cluster + * @throws ExpressionNotAnalyzableException when expression can't processed by this analyzer + */ public static QueryExpression analyzeExpression( RexNode expression, List schema, @@ -183,10 +193,28 @@ public static QueryExpression analyzeExpression( RelOptCluster cluster) throws ExpressionNotAnalyzableException { requireNonNull(expression, "expression"); + return analyzeExpression( + expression, + schema, + fieldTypes, + rowType, + cluster, + new Visitor(schema, fieldTypes, rowType, cluster)); + } + + /** For test only, passing a customer Visitor */ + public static QueryExpression analyzeExpression( + RexNode expression, + List schema, + Map fieldTypes, + RelDataType rowType, + RelOptCluster cluster, + Visitor visitor) + throws ExpressionNotAnalyzableException { + requireNonNull(expression, "expression"); try { // visits expression tree - QueryExpression queryExpression = - (QueryExpression) expression.accept(new Visitor(schema, fieldTypes, rowType, cluster)); + QueryExpression queryExpression = (QueryExpression) expression.accept(visitor); return queryExpression; } catch (Throwable e) { if (e instanceof UnsupportedScriptException) { @@ -201,14 +229,14 @@ public static QueryExpression analyzeExpression( } /** Traverses {@link RexNode} tree and builds OpenSearch query. */ - private static class Visitor extends RexVisitorImpl { + static class Visitor extends RexVisitorImpl { List schema; Map fieldTypes; RelDataType rowType; RelOptCluster cluster; - private Visitor( + Visitor( List schema, Map fieldTypes, RelDataType rowType, @@ -699,7 +727,7 @@ private QueryExpression andOr(RexCall call) { } } - private Expression tryAnalyzeOperand(RexNode node) { + public Expression tryAnalyzeOperand(RexNode node) { try { Expression expr = node.accept(this); if (expr instanceof NamedFieldExpression) { @@ -1374,8 +1402,9 @@ private static String ipValueForPushDown(String value) { } public static class ScriptQueryExpression extends QueryExpression { - private final String code; private RexNode analyzedNode; + // use lambda to generate code lazily to avoid store generated code + private final Supplier codeGenerator; public ScriptQueryExpression( RexNode rexNode, @@ -1383,9 +1412,10 @@ public ScriptQueryExpression( Map fieldTypes, RelOptCluster cluster) { RelJsonSerializer serializer = new RelJsonSerializer(cluster); - this.code = - SerializationWrapper.wrapWithLangType( - ScriptEngineType.CALCITE, serializer.serialize(rexNode, rowType, fieldTypes)); + this.codeGenerator = + () -> + SerializationWrapper.wrapWithLangType( + ScriptEngineType.CALCITE, serializer.serialize(rexNode, rowType, fieldTypes)); } @Override @@ -1402,7 +1432,7 @@ public Script getScript() { return new Script( DEFAULT_SCRIPT_TYPE, COMPOUNDED_LANG_NAME, - code, + codeGenerator.get(), Collections.emptyMap(), Map.of(Variable.UTC_TIMESTAMP.camelName, currentTime)); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index aed1ab42839..c8b00c6daed 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -260,7 +260,7 @@ public OpenSearchRequestBuilder createRequestBuilder() { } public OpenSearchResourceMonitor createOpenSearchResourceMonitor() { - return new OpenSearchResourceMonitor(getSettings(), new OpenSearchMemoryHealthy()); + return new OpenSearchResourceMonitor(getSettings(), new OpenSearchMemoryHealthy(settings)); } public OpenSearchRequest buildRequest(OpenSearchRequestBuilder requestBuilder) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java index 7c022e21904..ce6740cd784 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java @@ -28,7 +28,7 @@ public class OpenSearchStorageEngine implements StorageEngine { @Override public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) { if (isSystemIndex(name)) { - return new OpenSearchSystemIndex(client, name); + return new OpenSearchSystemIndex(client, settings, name); } else { return new OpenSearchIndex(client, settings, name); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index ab4de0f89fe..384ec17b470 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -120,13 +120,19 @@ public boolean moveNext() { @Override public void reset() { - iterator = Collections.emptyIterator(); + OpenSearchResponse response = client.search(request); + if (!response.isEmpty()) { + iterator = response.iterator(); + } else { + iterator = Collections.emptyIterator(); + } queryCount = 0; } @Override public void close() { - reset(); + iterator = Collections.emptyIterator(); + queryCount = 0; client.cleanup(request); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/CalciteEnumerableSystemIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/CalciteEnumerableSystemIndexScan.java index bad59047026..b0c92dce8f9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/CalciteEnumerableSystemIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/CalciteEnumerableSystemIndexScan.java @@ -69,7 +69,9 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { @Override public Enumerator enumerator() { return new OpenSearchSystemIndexEnumerator( - getFieldPath(), sysIndex.getSystemIndexBundle().getRight()); + getFieldPath(), + sysIndex.getSystemIndexBundle().getRight(), + sysIndex.createOpenSearchResourceMonitor()); } }; } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java index 828f9248e79..6bae4d21aba 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java @@ -16,8 +16,11 @@ import org.apache.calcite.rel.RelNode; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.calcite.plan.AbstractOpenSearchTable; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.monitor.OpenSearchMemoryHealthy; +import org.opensearch.sql.opensearch.monitor.OpenSearchResourceMonitor; import org.opensearch.sql.opensearch.request.system.OpenSearchCatIndicesRequest; import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest; import org.opensearch.sql.opensearch.request.system.OpenSearchSystemRequest; @@ -33,8 +36,11 @@ public class OpenSearchSystemIndex extends AbstractOpenSearchTable { /** System Index Name. */ private final Pair systemIndexBundle; - public OpenSearchSystemIndex(OpenSearchClient client, String indexName) { + @Getter private final Settings settings; + + public OpenSearchSystemIndex(OpenSearchClient client, Settings settings, String indexName) { this.systemIndexBundle = buildIndexBundle(client, indexName); + this.settings = settings; } @Override @@ -64,6 +70,10 @@ public PhysicalPlan implement(LogicalPlan plan) { return plan.accept(new OpenSearchSystemIndexDefaultImplementor(), null); } + public OpenSearchResourceMonitor createOpenSearchResourceMonitor() { + return new OpenSearchResourceMonitor(getSettings(), new OpenSearchMemoryHealthy(settings)); + } + @VisibleForTesting @RequiredArgsConstructor public class OpenSearchSystemIndexDefaultImplementor extends DefaultImplementor { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexEnumerator.java index 8a09823bc39..1bb8f9d5293 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexEnumerator.java @@ -12,10 +12,14 @@ import org.apache.calcite.linq4j.Enumerator; import org.opensearch.sql.data.model.ExprNullValue; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.exception.NonFallbackCalciteException; +import org.opensearch.sql.monitor.ResourceMonitor; import org.opensearch.sql.opensearch.request.system.OpenSearchSystemRequest; /** Supports a simple iteration over a collection for OpenSearch system index */ public class OpenSearchSystemIndexEnumerator implements Enumerator { + /** How many moveNext() calls to perform resource check once. */ + private static final long NUMBER_OF_NEXT_CALL_TO_CHECK = 1000; private final List fields; @@ -25,10 +29,22 @@ public class OpenSearchSystemIndexEnumerator implements Enumerator { private ExprValue current; - public OpenSearchSystemIndexEnumerator(List fields, OpenSearchSystemRequest request) { + /** Number of rows returned. */ + private Integer queryCount; + + /** ResourceMonitor. */ + private final ResourceMonitor monitor; + + public OpenSearchSystemIndexEnumerator( + List fields, OpenSearchSystemRequest request, ResourceMonitor monitor) { this.fields = fields; this.request = request; + this.monitor = monitor; + this.queryCount = 0; this.current = null; + if (!this.monitor.isHealthy()) { + throw new NonFallbackCalciteException("insufficient resources to run the query, quit."); + } this.iterator = request.search().iterator(); } @@ -41,8 +57,13 @@ public Object current() { @Override public boolean moveNext() { + boolean shouldCheck = (queryCount % NUMBER_OF_NEXT_CALL_TO_CHECK == 0); + if (shouldCheck && !this.monitor.isHealthy()) { + throw new NonFallbackCalciteException("insufficient resources to load next row, quit."); + } if (iterator.hasNext()) { current = iterator.next(); + queryCount++; return true; } else { return false; @@ -52,6 +73,7 @@ public boolean moveNext() { @Override public void reset() { iterator = request.search().iterator(); + queryCount = 0; current = null; } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthyTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthyTest.java index a61f7343e6d..1de87d2cff3 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthyTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthyTest.java @@ -21,7 +21,7 @@ class OpenSearchMemoryHealthyTest { @Mock private OpenSearchMemoryHealthy.RandomFail randomFail; - @Mock private OpenSearchMemoryHealthy.MemoryUsage memoryUsage; + @Mock private MemoryUsage memoryUsage; private OpenSearchMemoryHealthy monitor; @@ -59,7 +59,7 @@ void memoryUsageExceedLimitWithoutFastFailure() { @Test void constructOpenSearchMemoryMonitorWithoutArguments() { - OpenSearchMemoryHealthy monitor = new OpenSearchMemoryHealthy(); + OpenSearchMemoryHealthy monitor = new OpenSearchMemoryHealthy(null); assertNotNull(monitor); } @@ -70,8 +70,14 @@ void randomFail() { } @Test - void setMemoryUsage() { - OpenSearchMemoryHealthy.MemoryUsage usage = new OpenSearchMemoryHealthy.MemoryUsage(); + void getMemoryUsage() { + MemoryUsage usage = RuntimeMemoryUsage.getInstance(); assertTrue(usage.usage() > 0); } + + @Test + void setMemoryUsage() { + MemoryUsage usage = RuntimeMemoryUsage.getInstance(); + assertThrows(UnsupportedOperationException.class, () -> usage.setUsage(10L)); + } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/PredicateAnalyzerTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/PredicateAnalyzerTest.java index e3e810d58c9..7f032e437da 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/PredicateAnalyzerTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/PredicateAnalyzerTest.java @@ -7,8 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; import com.google.common.collect.ImmutableList; import java.math.BigDecimal; @@ -32,7 +31,6 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Holder; import org.junit.jupiter.api.Test; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.ExistsQueryBuilder; @@ -44,6 +42,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryStringQueryBuilder; import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.index.query.ScriptQueryBuilder; import org.opensearch.index.query.SimpleQueryStringBuilder; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.query.TermsQueryBuilder; @@ -56,8 +55,6 @@ import org.opensearch.sql.opensearch.data.type.OpenSearchDataType.MappingType; import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException; import org.opensearch.sql.opensearch.request.PredicateAnalyzer.QueryExpression; -import org.opensearch.sql.opensearch.storage.script.CalciteScriptEngine.UnsupportedScriptException; -import org.opensearch.sql.opensearch.storage.serde.SerializationWrapper; public class PredicateAnalyzerTest { final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); @@ -776,23 +773,34 @@ void isNullOr_ScriptPushDown() throws ExpressionNotAnalyzableException { @Test void verify_partial_pushdown() throws ExpressionNotAnalyzableException { + final RelDataType rowType = + builder + .getTypeFactory() + .builder() + .kind(StructKind.FULLY_QUALIFIED) + .add("a", builder.getTypeFactory().createSqlType(SqlTypeName.BIGINT)) + .add("b", builder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR)) + .build(); RexNode call1 = builder.makeCall(SqlStdOperatorTable.EQUALS, field1, numericLiteral); RexNode call2 = builder.makeCall(SqlStdOperatorTable.IS_EMPTY, field2); - try (MockedStatic mockedSerializationWrapper = - Mockito.mockStatic(SerializationWrapper.class)) { - mockedSerializationWrapper - .when(() -> SerializationWrapper.wrapWithLangType(any(), any())) - .thenThrow(new UnsupportedScriptException("")); - - // Partial push down part of and - RexNode andCall = builder.makeCall(SqlStdOperatorTable.AND, List.of(call1, call2)); - QueryExpression result = - PredicateAnalyzer.analyzeExpression(andCall, schema, fieldTypes, null, null); - - QueryBuilder resultBuilder = result.builder(); - assertInstanceOf(BoolQueryBuilder.class, resultBuilder); - assertEquals( - """ + // Partial push down part of and + RexNode andCall = builder.makeCall(SqlStdOperatorTable.AND, List.of(call1, call2)); + Hook.CURRENT_TIME.addThread((Consumer>) h -> h.set(0L)); + + PredicateAnalyzer.Visitor visitor = + new PredicateAnalyzer.Visitor(schema, fieldTypes, rowType, cluster); + PredicateAnalyzer.Visitor visitSpy = spy(visitor); + Mockito.doThrow(new PredicateAnalyzer.PredicateAnalyzerException("")) + .when(visitSpy) + .tryAnalyzeOperand(call2); + QueryExpression result = + PredicateAnalyzer.analyzeExpression( + andCall, schema, fieldTypes, rowType, cluster, visitSpy); + + QueryBuilder resultBuilder = result.builder(); + assertInstanceOf(BoolQueryBuilder.class, resultBuilder); + assertEquals( + """ { "bool" : { "must" : [ @@ -809,21 +817,47 @@ void verify_partial_pushdown() throws ExpressionNotAnalyzableException { "boost" : 1.0 } }""", - resultBuilder.toString()); - - List unAnalyzableNodes = result.getUnAnalyzableNodes(); - assertEquals(1, unAnalyzableNodes.size()); - assertEquals(call2, unAnalyzableNodes.getFirst()); - - // Don't push down the whole condition if part of `or` cannot be pushed down - RexNode orCall = builder.makeCall(SqlStdOperatorTable.OR, List.of(call1, call2)); - ExpressionNotAnalyzableException exception = - assertThrows( - ExpressionNotAnalyzableException.class, - () -> { - PredicateAnalyzer.analyzeExpression(orCall, schema, fieldTypes, null, null); - }); - assertEquals("Can't convert OR(=($0, 12), IS EMPTY($1))", exception.getMessage()); - } + resultBuilder.toString()); + + List unAnalyzableNodes = result.getUnAnalyzableNodes(); + assertEquals(1, unAnalyzableNodes.size()); + assertEquals(call2, unAnalyzableNodes.getFirst()); + + // If the call2 throw PredicateAnalyzerException, the `or` expression converts to script + // pushdown. + RexNode orCall = builder.makeCall(SqlStdOperatorTable.OR, List.of(call1, call2)); + result = + PredicateAnalyzer.analyzeExpression(orCall, schema, fieldTypes, rowType, cluster, visitSpy); + resultBuilder = result.builder(); + assertInstanceOf(ScriptQueryBuilder.class, resultBuilder); + + Mockito.doThrow(new PredicateAnalyzer.PredicateAnalyzerException("")) + .when(visitSpy) + .tryAnalyzeOperand(orCall); + RexNode thenAndCall = builder.makeCall(SqlStdOperatorTable.AND, List.of(orCall, call1)); + result = + PredicateAnalyzer.analyzeExpression( + thenAndCall, schema, fieldTypes, rowType, cluster, visitSpy); + resultBuilder = result.builder(); + assertInstanceOf(BoolQueryBuilder.class, resultBuilder); + assertEquals( + """ + { + "bool" : { + "must" : [ + { + "term" : { + "a" : { + "value" : 12, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }""", + resultBuilder.toString()); } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexTest.java index 1afcfcdc867..0b0aa1ec521 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.opensearch.client.OpenSearchClient; @@ -38,9 +39,11 @@ class OpenSearchSystemIndexTest { @Mock private Table table; + @Mock private Settings settings; + @Test void testGetFieldTypesOfMetaTable() { - OpenSearchSystemIndex systemIndex = new OpenSearchSystemIndex(client, TABLE_INFO); + OpenSearchSystemIndex systemIndex = new OpenSearchSystemIndex(client, settings, TABLE_INFO); final Map fieldTypes = systemIndex.getFieldTypes(); assertThat(fieldTypes, anyOf(hasEntry("TABLE_CAT", STRING))); } @@ -48,26 +51,26 @@ void testGetFieldTypesOfMetaTable() { @Test void testGetFieldTypesOfMappingTable() { OpenSearchSystemIndex systemIndex = - new OpenSearchSystemIndex(client, mappingTable("test_index")); + new OpenSearchSystemIndex(client, settings, mappingTable("test_index")); final Map fieldTypes = systemIndex.getFieldTypes(); assertThat(fieldTypes, anyOf(hasEntry("COLUMN_NAME", STRING))); } @Test void testIsExist() { - Table systemIndex = new OpenSearchSystemIndex(client, TABLE_INFO); + Table systemIndex = new OpenSearchSystemIndex(client, settings, TABLE_INFO); assertTrue(systemIndex.exists()); } @Test void testCreateTable() { - Table systemIndex = new OpenSearchSystemIndex(client, TABLE_INFO); + Table systemIndex = new OpenSearchSystemIndex(client, settings, TABLE_INFO); assertThrows(UnsupportedOperationException.class, () -> systemIndex.create(ImmutableMap.of())); } @Test void implement() { - OpenSearchSystemIndex systemIndex = new OpenSearchSystemIndex(client, TABLE_INFO); + OpenSearchSystemIndex systemIndex = new OpenSearchSystemIndex(client, settings, TABLE_INFO); NamedExpression projectExpr = named("TABLE_NAME", ref("TABLE_NAME", STRING)); final PhysicalPlan plan = diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java index 11c02a73d27..05076506ce9 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java @@ -65,7 +65,7 @@ public ExecutionEngine executionEngine( @Provides public ResourceMonitor resourceMonitor(Settings settings) { - return new OpenSearchResourceMonitor(settings, new OpenSearchMemoryHealthy()); + return new OpenSearchResourceMonitor(settings, new OpenSearchMemoryHealthy(settings)); } @Provides diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index dec51e1017c..94cc8c2fe0f 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -41,8 +41,6 @@ public class RestPPLQueryAction extends BaseRestHandler { public static final String QUERY_API_ENDPOINT = "/_plugins/_ppl"; public static final String EXPLAIN_API_ENDPOINT = "/_plugins/_ppl/_explain"; - public static final String EXTENDED_EXPLAIN_API_ENDPOINT = - "/_plugins/_ppl/_explain?format=extended"; private static final Logger LOG = LogManager.getLogger();