Skip to content

Commit

Permalink
[Refactor](dialect) Add sql dialect converter plugins.
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiangyu committed Dec 22, 2023
1 parent 8759bce commit 72f272e
Show file tree
Hide file tree
Showing 32 changed files with 360 additions and 1,103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2356,6 +2356,8 @@ public class Config extends ConfigBase {
"Whether to enable the function of getting log files through http interface"})
public static boolean enable_get_log_file_api = false;


@Deprecated
@ConfField(description = {"用于SQL方言转换的服务地址。",
"The service address for SQL dialect conversion."})
public static String sql_convertor_service = "";
Expand Down
6 changes: 0 additions & 6 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -680,12 +680,6 @@ under the License.
<artifactId>kryo-shaded</artifactId>
</dependency>

<!-- trino-parser -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parser</artifactId>
</dependency>

<!-- for arrow flight sql -->
<dependency>
<groupId>org.apache.arrow</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.parser.ParseDialect;
import org.apache.doris.nereids.parser.spark.SparkSql3LogicalPlanBuilder;
import org.apache.doris.nereids.parser.Dialect;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.thrift.TNullSide;
Expand All @@ -50,6 +49,8 @@
public class InlineViewRef extends TableRef {
private static final Logger LOG = LogManager.getLogger(InlineViewRef.class);

private static final String DEFAULT_TABLE_ALIAS_FOR_SPARK_SQL = "__auto_generated_subquery_name";

// Catalog or local view that is referenced.
// Null for inline views parsed directly from a query string.
private final View view;
Expand Down Expand Up @@ -198,12 +199,12 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {

if (view == null && !hasExplicitAlias()) {
String dialect = ConnectContext.get().getSessionVariable().getSqlDialect();
ParseDialect.Dialect sqlDialect = ParseDialect.Dialect.getByName(dialect);
if (ParseDialect.Dialect.SPARK_SQL != sqlDialect) {
Dialect sqlDialect = Dialect.getByName(dialect);
if (Dialect.SPARK_SQL != sqlDialect) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DERIVED_MUST_HAVE_ALIAS);
}
hasExplicitAlias = true;
aliases = new String[] { SparkSql3LogicalPlanBuilder.DEFAULT_TABLE_ALIAS };
aliases = new String[] { DEFAULT_TABLE_ALIAS_FOR_SPARK_SQL };
}

// Analyze the inline view query statement with its own analyzer
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import org.apache.doris.persist.meta.MetaReader;
import org.apache.doris.persist.meta.MetaWriter;
import org.apache.doris.planner.TabletLoadIndexRecorderMgr;
import org.apache.doris.plugin.DialectConverterPluginMgr;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.plugin.PluginMgr;
import org.apache.doris.policy.PolicyMgr;
Expand Down Expand Up @@ -472,6 +473,8 @@ public class Env {

private PluginMgr pluginMgr;

private DialectConverterPluginMgr dialectConverterPluginMgr;

private AuditEventProcessor auditEventProcessor;

private RefreshManager refreshManager;
Expand Down Expand Up @@ -721,6 +724,7 @@ private Env(boolean isCheckpointCatalog) {

this.pluginMgr = new PluginMgr();
this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr);
this.dialectConverterPluginMgr = new DialectConverterPluginMgr(this.pluginMgr);
this.refreshManager = new RefreshManager();
this.policyMgr = new PolicyMgr();
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
Expand Down Expand Up @@ -789,6 +793,10 @@ public PluginMgr getPluginMgr() {
return pluginMgr;
}

public DialectConverterPluginMgr getSqlDialectPluginMgr() {
return dialectConverterPluginMgr;
}

public Auth getAuth() {
return auth;
}
Expand Down Expand Up @@ -954,6 +962,7 @@ public void initialize(String[] args) throws Exception {

// init plugin manager
pluginMgr.init();
dialectConverterPluginMgr.init();
auditEventProcessor.start();

// 2. get cluster id and role (Observer or Follower)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.doris.nereids.exceptions;

/**
* DialectTransformException when have not supported transforming for the
* {@link io.trino.sql.tree.Node}.
* DialectTransformException when have not supported transforming for dialect converters.
*/
public class DialectTransformException extends UnsupportedOperationException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@

package org.apache.doris.nereids.exceptions;

import org.apache.doris.nereids.parser.ParseDialect;
import org.apache.doris.nereids.parser.Dialect;

/**
* UnsupportedDialectException when not match any in
* {@link org.apache.doris.nereids.parser.ParseDialect}.
* {@link Dialect}.
*/
public class UnsupportedDialectException extends UnsupportedOperationException {

public UnsupportedDialectException(ParseDialect dialect) {
super(String.format("Unsupported dialect name is %s, version is %s",
dialect.getDialect().getDialectName(), dialect.getVersion().getVersionName()));
public UnsupportedDialectException(Dialect dialect) {
super(String.format("Unsupported dialect name is %s", dialect.getDialectName()));
}

public UnsupportedDialectException(String type, String msg) {
Expand Down
116 changes: 116 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.parser;

import javax.annotation.Nullable;

/**
* ParseDialect enum, maybe support other dialect.
*/
public enum Dialect {
/**
* Doris parser dialect
*/
DORIS("doris"),
/**
* Trino parser dialect
*/
TRINO("trino"),
/**
* Presto parser dialect
*/
PRESTO("presto"),
/**
* Spark sql parser dialect
*/
SPARK_SQL("spark_sql"),
/**
* Hive parser dialect
*/
HIVE("hive"),
/**
* Iceberg parser dialect
*/
ICEBERG("iceberg"),
/**
* Hudi parser dialect
*/
HUDI("hudi"),
/**
* Paimon parser dialect
*/
PAIMON("paimon"),
/**
* Alibaba dlf parser dialect
*/
DLF("dlf"),
/**
* Alibaba max compute parser dialect
*/
MAX_COMPUTE("max_compute"),
/**
* Mysql parser dialect
*/
MYSQL("mysql"),
/**
* Postgresql parser dialect
*/
POSTGRESQL("postgresql"),
/**
* Sqlserver parser dialect
*/
SQLSERVER("sqlserver"),
/**
* Clickhouse parser dialect
*/
CLICKHOUSE("clickhouse"),
/**
* Sap hana parser dialect
*/
SAP_HANA("sap_hana"),
/**
* OceanBase parser dialect
*/
OCEANBASE("oceanbase");


private final String dialectName;

Dialect(String dialectName) {
this.dialectName = dialectName;
}

public String getDialectName() {
return dialectName;
}

/**
* Get dialect by name
*/
public static @Nullable Dialect getByName(String dialectName) {
if (dialectName == null) {
return null;
}
for (Dialect dialect : Dialect.values()) {
if (dialect.getDialectName().equals(dialectName.toLowerCase())) {
return dialect;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@
package org.apache.doris.nereids.parser;

import org.apache.doris.analysis.StatementBase;
import org.apache.doris.common.Config;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.DorisLexer;
import org.apache.doris.nereids.DorisParser;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.spark.SparkSql3LogicalPlanBuilder;
import org.apache.doris.nereids.parser.trino.TrinoParser;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.plugin.DialectConverterPlugin;
import org.apache.doris.plugin.DialectConverterPluginMgr;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
Expand Down Expand Up @@ -67,11 +66,13 @@ public List<StatementBase> parseSQL(String originStr) {
* ParseSQL with dialect.
*/
public List<StatementBase> parseSQL(String sql, SessionVariable sessionVariable) {
@Nullable ParseDialect.Dialect sqlDialect = ParseDialect.Dialect.getByName(sessionVariable.getSqlDialect());
return parseSQLWithDialect(sql, sqlDialect, sessionVariable);
return parseSQLWithDialect(sql, sessionVariable);
}

private List<StatementBase> parseSQL(String originStr, @Nullable LogicalPlanBuilder logicalPlanBuilder) {
/**
* ParseSQL with logicalPlanBuilder.
*/
public List<StatementBase> parseSQL(String originStr, @Nullable LogicalPlanBuilder logicalPlanBuilder) {
List<Pair<LogicalPlan, StatementContext>> logicalPlans = parseMultiple(originStr, logicalPlanBuilder);
List<StatementBase> statementBases = Lists.newArrayList();
for (Pair<LogicalPlan, StatementContext> parsedPlanToContext : logicalPlans) {
Expand All @@ -81,26 +82,27 @@ private List<StatementBase> parseSQL(String originStr, @Nullable LogicalPlanBuil
}

private List<StatementBase> parseSQLWithDialect(String sql,
@Nullable ParseDialect.Dialect sqlDialect,
SessionVariable sessionVariable) {
if (!Strings.isNullOrEmpty(Config.sql_convertor_service)) {
// if sql convertor service is enabled, no need to parse sql again by specific dialect.
@Nullable Dialect sqlDialect = Dialect.getByName(sessionVariable.getSqlDialect());
if (sqlDialect == null) {
return parseSQL(sql);
}
switch (sqlDialect) {
case TRINO:
final List<StatementBase> logicalPlans = TrinoParser.parse(sql, sessionVariable);
if (CollectionUtils.isEmpty(logicalPlans)) {
return parseSQL(sql);
}
return logicalPlans;

case SPARK_SQL:
return parseSQL(sql, new SparkSql3LogicalPlanBuilder());

default:
return parseSQL(sql);
DialectConverterPluginMgr pluginMgr = Env.getCurrentEnv().getSqlDialectPluginMgr();
List<DialectConverterPlugin> plugins = pluginMgr.getDialectConverterPlugins(sqlDialect);
for (DialectConverterPlugin plugin : plugins) {
try {
List<StatementBase> statementBases = plugin.parseSqlWithDialect(sql, sessionVariable);
if (CollectionUtils.isNotEmpty(statementBases)) {
return statementBases;
}
} catch (Throwable throwable) {
LOG.warn("Parse sql with dialect {} failed, sql: {}.", sqlDialect, sql, throwable);
}
}

// fallback if any exception occurs before
return parseSQL(sql);
}

/**
Expand Down
Loading

0 comments on commit 72f272e

Please sign in to comment.