From 01c93d71a3972e3217139054b933afb8250a5dab Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Tue, 21 Jan 2025 17:27:23 +0800 Subject: [PATCH 1/4] [POC] Make Calcite execute successfully Signed-off-by: Heng Qian --- .../sql/calcite/CalcitePlanContext.java | 32 +++++++++++++++- .../sql/calcite/plan/OpenSearchTable.java | 17 +++++++-- .../executor/OpenSearchExecutionEngine.java | 37 +++++++++++++++++-- .../opensearch/storage/OpenSearchIndex.java | 19 ++++++++-- .../scan/OpenSearchIndexEnumerator.java | 6 +-- 5 files changed, 98 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index c9b3839127d..0cacf26f935 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -5,16 +5,37 @@ package org.opensearch.sql.calcite; +import java.sql.DriverManager; import java.util.function.BiFunction; import lombok.Getter; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptSchema; +import org.apache.calcite.prepare.CalcitePrepareImpl; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.server.CalciteServerStatement; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.RelBuilder; +import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.ast.expression.UnresolvedExpression; public class CalcitePlanContext { + public static class OSRelBuilder extends RelBuilder { + + protected OSRelBuilder( + @Nullable Context context, + RelOptCluster cluster, + @Nullable RelOptSchema relOptSchema) { + super(context, cluster, relOptSchema); + } + } + public FrameworkConfig config; + public CalciteConnection connection; + public CalciteServerStatement statement; + public final CalcitePrepareImpl prepare; public final RelBuilder relBuilder; public final ExtendedRexBuilder rexBuilder; @@ -22,7 +43,16 @@ public class CalcitePlanContext { public CalcitePlanContext(FrameworkConfig config) { this.config = config; - this.relBuilder = RelBuilder.create(config); + try { + this.connection = DriverManager.getConnection("jdbc:calcite:").unwrap(CalciteConnection.class); + connection.getRootSchema().add( + OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, config.getDefaultSchema().unwrap(OpenSearchSchema.class)); + this.statement = connection.createStatement().unwrap(CalciteServerStatement.class); + } catch (Exception ignored) {} + this.prepare = new CalcitePrepareImpl(); + this.relBuilder = prepare.perform(statement, config, + (cluster, relOptSchema, rootSchema, statement) -> new OSRelBuilder(config.getContext(), + cluster, relOptSchema)); this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder()); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java index 8461b62820d..93e656fd50d 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java @@ -5,17 +5,23 @@ package org.opensearch.sql.calcite.plan; +import com.google.common.collect.ImmutableList; import java.lang.reflect.Type; +import org.apache.calcite.adapter.java.AbstractQueryableTable; +import org.apache.calcite.linq4j.AbstractQueryable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.linq4j.Queryable; import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.QueryableTable; +import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Schemas; import org.apache.calcite.schema.TranslatableTable; @@ -23,8 +29,12 @@ import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes; import org.opensearch.sql.data.model.ExprValue; -public abstract class OpenSearchTable extends AbstractTable - implements TranslatableTable, QueryableTable, org.opensearch.sql.storage.Table { +public abstract class OpenSearchTable extends AbstractQueryableTable + implements TranslatableTable, org.opensearch.sql.storage.Table { + + protected OpenSearchTable(Type elementType) { + super(elementType); + } @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { @@ -34,6 +44,7 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { final RelOptCluster cluster = context.getCluster(); + // return new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(), relOptTable); return new OpenSearchTableScan(cluster, relOptTable, this); } @@ -53,5 +64,5 @@ public Expression getExpression(SchemaPlus schema, String tableName, Class clazz return Schemas.tableExpression(schema, getElementType(), tableName, clazz); } - public abstract Enumerable search(); + public abstract Enumerable search(); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 08c5a5e7a2e..cdd35510da6 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -5,23 +5,34 @@ package org.opensearch.sql.opensearch.executor; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.rel.RelHomogeneousShuttle; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttle; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.tools.RelRunner; import org.apache.calcite.tools.RelRunners; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.executor.ExecutionContext; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; import org.opensearch.sql.executor.Explain; import org.opensearch.sql.executor.pagination.PlanSerializer; import org.opensearch.sql.opensearch.client.OpenSearchClient; @@ -102,30 +113,50 @@ public ExplainResponseNode visitTableScan( @Override public void execute( RelNode rel, CalcitePlanContext context, ResponseListener listener) { - try (PreparedStatement statement = RelRunners.run(rel)) { + Connection connection = context.connection; + final RelShuttle shuttle = new RelHomogeneousShuttle(); + rel = rel.accept(shuttle); + try { + RelRunner runner = connection.unwrap(RelRunner.class); + PreparedStatement statement = runner.prepareStatement(rel); ResultSet result = statement.executeQuery(); - printResultSet(result); + printResultSet(result, listener); } catch (SQLException e) { throw new RuntimeException(e); } } // for testing only - private void printResultSet(ResultSet resultSet) throws SQLException { + private void printResultSet(ResultSet resultSet, ResponseListener listener) throws SQLException { // Get the ResultSet metadata to know about columns ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); + List values = new ArrayList<>(); // Iterate through the ResultSet while (resultSet.next()) { + Map row = new LinkedHashMap(); // Loop through each column for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); String value = resultSet.getString(i); System.out.println(columnName + ": " + value); + + row.put(columnName, new ExprStringValue(value)); } + values.add(ExprTupleValue.fromExprValueMap(row)); System.out.println("-------------------"); // Separator between rows } + + List columns = new ArrayList<>(metaData.getColumnCount()); + for (int i = 1; i <= columnCount; ++i) { + // TODO: mapping RelDataType to ExprType or deprecate ExprType + columns.add(new Column(metaData.getColumnName(i), null, ExprCoreType.STRING)); + } + Schema schema = new Schema(columns); + QueryResponse response = + new QueryResponse(schema, values, null); + listener.onResponse(response); } private RelDataType makeStruct(RelDataTypeFactory typeFactory, RelDataType type) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index c6fda2be886..1eace7d8f34 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -11,9 +11,11 @@ import java.util.Map; import java.util.function.Function; import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; +import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.calcite.plan.OpenSearchTable; import org.opensearch.sql.common.setting.Settings; @@ -79,6 +81,7 @@ public class OpenSearchIndex extends OpenSearchTable { /** Constructor. */ public OpenSearchIndex(OpenSearchClient client, Settings settings, String indexName) { + super(null); this.client = client; this.settings = settings; this.indexName = new OpenSearchRequest.IndexName(indexName); @@ -190,6 +193,16 @@ public boolean isFieldTypeTolerance() { return settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE); } + //@Override + public Enumerable scan(DataContext root) { + return new AbstractEnumerable<@Nullable Object[]>() { + @Override public Enumerator<@Nullable Object[]> enumerator() { + return null; + // return search().toMap(v -> new Object[] {v}); + } + }; + } + @VisibleForTesting @RequiredArgsConstructor public static class OpenSearchDefaultImplementor extends DefaultImplementor { @@ -217,10 +230,10 @@ public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { } @Override - public Enumerable search() { - return new AbstractEnumerable() { + public Enumerable search() { + return new AbstractEnumerable() { @Override - public Enumerator enumerator() { + public Enumerator enumerator() { final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); final TimeValue cursorKeepAlive = diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index 7a555e84abd..573bb7965cf 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -15,7 +15,7 @@ import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; -public class OpenSearchIndexEnumerator implements Enumerator { +public class OpenSearchIndexEnumerator implements Enumerator { /** OpenSearch client. */ private final OpenSearchClient client; @@ -50,9 +50,9 @@ private void fetchNextBatch() { } @Override - public ExprValue current() { + public Object current() { queryCount++; - return iterator.next(); + return iterator.next().tupleValue().values().stream().map(ExprValue::value).toArray(); } @Override From 27714e0b1c396b7e6c9c7abe06c07242fdcffb94 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Fri, 24 Jan 2025 15:51:26 +0800 Subject: [PATCH 2/4] [POC] Change caching schema to simple schema and avoid registering table when visitRelation. Signed-off-by: Heng Qian --- .../sql/calcite/CalcitePlanContext.java | 13 +++++----- .../sql/calcite/CalciteRelNodeVisitor.java | 4 ---- .../sql/calcite/OpenSearchSchema.java | 18 ++++++++++---- .../opensearch/sql/executor/QueryService.java | 24 ++++++++++++------- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 0cacf26f935..b907945a6c4 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -5,7 +5,6 @@ package org.opensearch.sql.calcite; -import java.sql.DriverManager; import java.util.function.BiFunction; import lombok.Getter; import org.apache.calcite.jdbc.CalciteConnection; @@ -41,14 +40,14 @@ protected OSRelBuilder( @Getter private boolean isResolvingJoinCondition = false; - public CalcitePlanContext(FrameworkConfig config) { + public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) { this.config = config; + this.connection = connection; try { - this.connection = DriverManager.getConnection("jdbc:calcite:").unwrap(CalciteConnection.class); - connection.getRootSchema().add( - OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, config.getDefaultSchema().unwrap(OpenSearchSchema.class)); this.statement = connection.createStatement().unwrap(CalciteServerStatement.class); - } catch (Exception ignored) {} + } catch (Exception e) { + throw new RuntimeException("create statement failed", e); + } this.prepare = new CalcitePrepareImpl(); this.relBuilder = prepare.perform(statement, config, (cluster, relOptSchema, rootSchema, statement) -> new OSRelBuilder(config.getContext(), @@ -66,6 +65,6 @@ public RexNode resolveJoinCondition( } public static CalcitePlanContext create(FrameworkConfig config) { - return new CalcitePlanContext(config); + return new CalcitePlanContext(config, null); } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index bc92c89c638..a15288414db 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -63,10 +63,6 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) { @Override public RelNode visitRelation(Relation node, CalcitePlanContext context) { for (QualifiedName qualifiedName : node.getQualifiedNames()) { - SchemaPlus schema = context.config.getDefaultSchema(); - if (schema != null && schema.getName().equals(OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME)) { - schema.unwrap(OpenSearchSchema.class).registerTable(qualifiedName); - } context.relBuilder.scan(qualifiedName.getParts()); } if (node.getQualifiedNames().size() > 1) { diff --git a/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java b/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java index 90aa6af7f51..1de9ef4ba5c 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java +++ b/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java @@ -6,6 +6,7 @@ package org.opensearch.sql.calcite; import java.util.HashMap; +import java.util.List; import java.util.Map; import lombok.AllArgsConstructor; import lombok.Getter; @@ -13,7 +14,6 @@ import org.apache.calcite.schema.impl.AbstractSchema; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver; -import org.opensearch.sql.ast.expression.QualifiedName; import org.opensearch.sql.datasource.DataSourceService; @Getter @@ -23,11 +23,19 @@ public class OpenSearchSchema extends AbstractSchema { private final DataSourceService dataSourceService; - private final Map tableMap = new HashMap<>(); + private final Map tableMap = new HashMap<>() { + @Override + public Table get(Object key) { + if (!super.containsKey(key)) { + registerTable((String) key); + } + return super.get(key); + } + }; - public void registerTable(QualifiedName qualifiedName) { + public void registerTable(String name) { DataSourceSchemaIdentifierNameResolver nameResolver = - new DataSourceSchemaIdentifierNameResolver(dataSourceService, qualifiedName.getParts()); + new DataSourceSchemaIdentifierNameResolver(dataSourceService, List.of(name.split("\\."))); org.opensearch.sql.storage.Table table = dataSourceService .getDataSource(nameResolver.getDataSourceName()) @@ -36,6 +44,6 @@ public void registerTable(QualifiedName qualifiedName) { new DataSourceSchemaName( nameResolver.getDataSourceName(), nameResolver.getSchemaName()), nameResolver.getIdentifierName()); - tableMap.put(qualifiedName.toString(), (org.apache.calcite.schema.Table) table); + tableMap.put(name, (org.apache.calcite.schema.Table) table); } } diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index abae853247c..0f412757345 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -11,6 +11,10 @@ import java.util.List; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalciteJdbc41Factory; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.Driver; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.schema.SchemaPlus; @@ -60,8 +64,16 @@ public void execute( UnresolvedPlan plan, ResponseListener listener) { try { try { - final FrameworkConfig config = buildFrameworkConfig(); - final CalcitePlanContext context = new CalcitePlanContext(config); + // Use simple calcite schema since we don't compute tables in advance of the query. + CalciteSchema rootSchema= CalciteSchema.createRootSchema(true, false); + CalciteJdbc41Factory factory = new CalciteJdbc41Factory(); + CalciteConnection connection = factory.newConnection(new Driver(), factory, "", new java.util.Properties(), rootSchema, null); + final SchemaPlus defaultSchema = connection.getRootSchema().add( + OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService)); + // Set opensearch schema as the default schema in config, otherwise we need to explicitly + // add schema path 'OpenSearch' before the opensearch table name + final FrameworkConfig config = buildFrameworkConfig(defaultSchema); + final CalcitePlanContext context = new CalcitePlanContext(config, connection); executePlanByCalcite(analyze(plan, context), context, listener); } catch (Exception e) { LOG.warn("Fallback to V2 query engine since got exception", e); @@ -134,14 +146,10 @@ public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) { return relNodeVisitor.analyze(plan, context); } - private FrameworkConfig buildFrameworkConfig() { - final SchemaPlus rootSchema = Frameworks.createRootSchema(true); - final SchemaPlus opensearchSchema = - rootSchema.add( - OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService)); + private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) { return Frameworks.newConfigBuilder() .parserConfig(SqlParser.Config.DEFAULT) // TODO check - .defaultSchema(opensearchSchema) + .defaultSchema(defaultSchema) .traitDefs((List) null) .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)) .build(); From 8ad5251c0f5d53c89d170d43869c115f4b067b30 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Wed, 5 Feb 2025 16:22:48 +0800 Subject: [PATCH 3/4] spotlessApply Signed-off-by: Heng Qian --- .../sql/calcite/CalcitePlanContext.java | 13 +++++++------ .../sql/calcite/CalciteRelNodeVisitor.java | 1 - .../sql/calcite/OpenSearchSchema.java | 19 ++++++++++--------- .../sql/calcite/plan/OpenSearchTable.java | 11 ++--------- .../opensearch/sql/executor/QueryService.java | 14 ++++++++++---- .../executor/OpenSearchExecutionEngine.java | 9 +++------ .../opensearch/storage/OpenSearchIndex.java | 6 +++--- 7 files changed, 35 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index b907945a6c4..ff48a9b14c1 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -24,9 +24,7 @@ public class CalcitePlanContext { public static class OSRelBuilder extends RelBuilder { protected OSRelBuilder( - @Nullable Context context, - RelOptCluster cluster, - @Nullable RelOptSchema relOptSchema) { + @Nullable Context context, RelOptCluster cluster, @Nullable RelOptSchema relOptSchema) { super(context, cluster, relOptSchema); } } @@ -49,9 +47,12 @@ public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) throw new RuntimeException("create statement failed", e); } this.prepare = new CalcitePrepareImpl(); - this.relBuilder = prepare.perform(statement, config, - (cluster, relOptSchema, rootSchema, statement) -> new OSRelBuilder(config.getContext(), - cluster, relOptSchema)); + this.relBuilder = + prepare.perform( + statement, + config, + (cluster, relOptSchema, rootSchema, statement) -> + new OSRelBuilder(config.getContext(), cluster, relOptSchema)); this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder()); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index a15288414db..208228628c8 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -24,7 +24,6 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelBuilder.AggCall; import org.opensearch.sql.ast.AbstractNodeVisitor; diff --git a/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java b/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java index 1de9ef4ba5c..a3693dea242 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java +++ b/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java @@ -23,15 +23,16 @@ public class OpenSearchSchema extends AbstractSchema { private final DataSourceService dataSourceService; - private final Map tableMap = new HashMap<>() { - @Override - public Table get(Object key) { - if (!super.containsKey(key)) { - registerTable((String) key); - } - return super.get(key); - } - }; + private final Map tableMap = + new HashMap<>() { + @Override + public Table get(Object key) { + if (!super.containsKey(key)) { + registerTable((String) key); + } + return super.get(key); + } + }; public void registerTable(String name) { DataSourceSchemaIdentifierNameResolver nameResolver = diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java index 93e656fd50d..592fb105f2f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java @@ -5,29 +5,21 @@ package org.opensearch.sql.calcite.plan; -import com.google.common.collect.ImmutableList; import java.lang.reflect.Type; import org.apache.calcite.adapter.java.AbstractQueryableTable; -import org.apache.calcite.linq4j.AbstractQueryable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.linq4j.Queryable; import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.QueryableTable; -import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Schemas; import org.apache.calcite.schema.TranslatableTable; -import org.apache.calcite.schema.impl.AbstractTable; import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes; -import org.opensearch.sql.data.model.ExprValue; public abstract class OpenSearchTable extends AbstractQueryableTable implements TranslatableTable, org.opensearch.sql.storage.Table { @@ -44,7 +36,8 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { final RelOptCluster cluster = context.getCluster(); - // return new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(), relOptTable); + // return new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(), + // relOptTable); return new OpenSearchTableScan(cluster, relOptTable, this); } diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 0f412757345..baffc2e4db3 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -65,11 +65,17 @@ public void execute( try { try { // Use simple calcite schema since we don't compute tables in advance of the query. - CalciteSchema rootSchema= CalciteSchema.createRootSchema(true, false); + CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false); CalciteJdbc41Factory factory = new CalciteJdbc41Factory(); - CalciteConnection connection = factory.newConnection(new Driver(), factory, "", new java.util.Properties(), rootSchema, null); - final SchemaPlus defaultSchema = connection.getRootSchema().add( - OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService)); + CalciteConnection connection = + factory.newConnection( + new Driver(), factory, "", new java.util.Properties(), rootSchema, null); + final SchemaPlus defaultSchema = + connection + .getRootSchema() + .add( + OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, + new OpenSearchSchema(dataSourceService)); // Set opensearch schema as the default schema in config, otherwise we need to explicitly // add schema path 'OpenSearch' before the opensearch table name final FrameworkConfig config = buildFrameworkConfig(defaultSchema); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index cdd35510da6..b9de0b242d8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -15,21 +15,18 @@ import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; -import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.rel.RelHomogeneousShuttle; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelShuttle; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.tools.RelRunner; -import org.apache.calcite.tools.RelRunners; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.executor.ExecutionContext; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; @@ -127,7 +124,8 @@ public void execute( } // for testing only - private void printResultSet(ResultSet resultSet, ResponseListener listener) throws SQLException { + private void printResultSet(ResultSet resultSet, ResponseListener listener) + throws SQLException { // Get the ResultSet metadata to know about columns ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); @@ -154,8 +152,7 @@ private void printResultSet(ResultSet resultSet, ResponseListener columns.add(new Column(metaData.getColumnName(i), null, ExprCoreType.STRING)); } Schema schema = new Schema(columns); - QueryResponse response = - new QueryResponse(schema, values, null); + QueryResponse response = new QueryResponse(schema, values, null); listener.onResponse(response); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 1eace7d8f34..2ef95173a1f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -19,7 +19,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.calcite.plan.OpenSearchTable; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.opensearch.client.OpenSearchClient; @@ -193,10 +192,11 @@ public boolean isFieldTypeTolerance() { return settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE); } - //@Override + // @Override public Enumerable scan(DataContext root) { return new AbstractEnumerable<@Nullable Object[]>() { - @Override public Enumerator<@Nullable Object[]> enumerator() { + @Override + public Enumerator<@Nullable Object[]> enumerator() { return null; // return search().toMap(v -> new Object[] {v}); } From 876a03fde9a3765ff3d14ac54161a58ef999b455 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Fri, 7 Feb 2025 11:40:20 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: Heng Qian --- .../sql/calcite/CalcitePlanContext.java | 18 ++---------------- .../sql/calcite/OpenSearchSchema.java | 10 +++++----- .../executor/OpenSearchExecutionEngine.java | 4 ---- 3 files changed, 7 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index ff48a9b14c1..7355d2f4bf5 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -11,9 +11,7 @@ import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptSchema; -import org.apache.calcite.prepare.CalcitePrepareImpl; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.server.CalciteServerStatement; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.RelBuilder; import org.checkerframework.checker.nullness.qual.Nullable; @@ -31,8 +29,6 @@ protected OSRelBuilder( public FrameworkConfig config; public CalciteConnection connection; - public CalciteServerStatement statement; - public final CalcitePrepareImpl prepare; public final RelBuilder relBuilder; public final ExtendedRexBuilder rexBuilder; @@ -41,18 +37,7 @@ protected OSRelBuilder( public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) { this.config = config; this.connection = connection; - try { - this.statement = connection.createStatement().unwrap(CalciteServerStatement.class); - } catch (Exception e) { - throw new RuntimeException("create statement failed", e); - } - this.prepare = new CalcitePrepareImpl(); - this.relBuilder = - prepare.perform( - statement, - config, - (cluster, relOptSchema, rootSchema, statement) -> - new OSRelBuilder(config.getContext(), cluster, relOptSchema)); + this.relBuilder = RelBuilder.create(config); this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder()); } @@ -65,6 +50,7 @@ public RexNode resolveJoinCondition( return result; } + // for testing only public static CalcitePlanContext create(FrameworkConfig config) { return new CalcitePlanContext(config, null); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java b/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java index a3693dea242..642e84929e9 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java +++ b/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java @@ -6,7 +6,6 @@ package org.opensearch.sql.calcite; import java.util.HashMap; -import java.util.List; import java.util.Map; import lombok.AllArgsConstructor; import lombok.Getter; @@ -14,6 +13,7 @@ import org.apache.calcite.schema.impl.AbstractSchema; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver; +import org.opensearch.sql.ast.expression.QualifiedName; import org.opensearch.sql.datasource.DataSourceService; @Getter @@ -28,15 +28,15 @@ public class OpenSearchSchema extends AbstractSchema { @Override public Table get(Object key) { if (!super.containsKey(key)) { - registerTable((String) key); + registerTable(new QualifiedName((String) key)); } return super.get(key); } }; - public void registerTable(String name) { + public void registerTable(QualifiedName qualifiedName) { DataSourceSchemaIdentifierNameResolver nameResolver = - new DataSourceSchemaIdentifierNameResolver(dataSourceService, List.of(name.split("\\."))); + new DataSourceSchemaIdentifierNameResolver(dataSourceService, qualifiedName.getParts()); org.opensearch.sql.storage.Table table = dataSourceService .getDataSource(nameResolver.getDataSourceName()) @@ -45,6 +45,6 @@ public void registerTable(String name) { new DataSourceSchemaName( nameResolver.getDataSourceName(), nameResolver.getSchemaName()), nameResolver.getIdentifierName()); - tableMap.put(name, (org.apache.calcite.schema.Table) table); + tableMap.put(qualifiedName.toString(), (org.apache.calcite.schema.Table) table); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index b9de0b242d8..0fd65dd9325 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -15,9 +15,7 @@ import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; -import org.apache.calcite.rel.RelHomogeneousShuttle; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelShuttle; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.tools.RelRunner; @@ -111,8 +109,6 @@ public ExplainResponseNode visitTableScan( public void execute( RelNode rel, CalcitePlanContext context, ResponseListener listener) { Connection connection = context.connection; - final RelShuttle shuttle = new RelHomogeneousShuttle(); - rel = rel.accept(shuttle); try { RelRunner runner = connection.unwrap(RelRunner.class); PreparedStatement statement = runner.prepareStatement(rel);