diff --git a/dinky-admin/src/main/java/org/dinky/controller/DataSourceController.java b/dinky-admin/src/main/java/org/dinky/controller/DataSourceController.java index e221437987..e1b56b1c4d 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/DataSourceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/DataSourceController.java @@ -237,7 +237,6 @@ public Result checkHeartBeatByDataSourceId(@RequestParam Integer id) { * @param id {@link Integer} * @return {@link Result}< {@link List}< {@link Schema}>> */ - @Cacheable(cacheNames = "metadata_schema", key = "#id") @GetMapping("/getSchemasAndTables") @ApiOperation("Get All Schemas And Tables") @ApiImplicitParam( diff --git a/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java b/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java index 760e46dae7..194e4c504d 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java @@ -44,7 +44,8 @@ Dialect.DORIS, Dialect.PHOENIX, Dialect.STAR_ROCKS, - Dialect.PRESTO + Dialect.PRESTO, + Dialect.KYUUBI }) public class CommonSqlTask extends BaseTask { diff --git a/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java b/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java index c90391361f..f58f16c230 100644 --- a/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java +++ b/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java @@ -135,6 +135,9 @@ public static void write(String table, List dataList, Class clazz) { DataType type = dataField.type(); String fieldName = StrUtil.toCamelCase(dataField.name()); Object fieldValue = ReflectUtil.getFieldValue(t, fieldName); + if (fieldValue == null){ + continue; + } try { // TODO BinaryWriter.write已被废弃,后续可以考虑改成这种方式 // BinaryWriter.createValueSetter(type).setValue(writer, i, fieldValue); diff --git a/dinky-assembly/src/main/assembly/package.xml b/dinky-assembly/src/main/assembly/package.xml index bc7fc05962..f0e349c532 100644 --- a/dinky-assembly/src/main/assembly/package.xml +++ b/dinky-assembly/src/main/assembly/package.xml @@ -210,6 +210,14 @@ dinky-metadata-presto-${project.version}.jar + + ${project.parent.basedir}/dinky-metadata/dinky-metadata-kyuubi/target + + lib + + dinky-metadata-kyuubi-${project.version}.jar + + ${project.parent.basedir}/dinky-alert/dinky-alert-dingtalk/target diff --git a/dinky-common/src/main/java/org/dinky/config/Dialect.java b/dinky-common/src/main/java/org/dinky/config/Dialect.java index 92d6950fb3..728a11bb4a 100644 --- a/dinky-common/src/main/java/org/dinky/config/Dialect.java +++ b/dinky-common/src/main/java/org/dinky/config/Dialect.java @@ -44,6 +44,7 @@ public enum Dialect { HIVE("Hive"), STAR_ROCKS("StarRocks"), PRESTO("Presto"), + KYUUBI("Kyuubi"), KUBERNETES_APPLICATION("KubernetesApplication"); private String value; diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java index ffa0aaa14b..8dc6fd3ed3 100644 --- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java +++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java @@ -102,6 +102,10 @@ public String test() { return CommonConstant.HEALTHY; } + public String getCustomDbType(){ + return "hive"; + } + public DruidDataSource createDataSource() throws SQLException { if (null == dataSource) { synchronized (this.getClass()) { @@ -621,8 +625,15 @@ public JdbcSelectResult query(String sql, Integer limit) { public JdbcSelectResult executeSql(String sql, Integer limit) { // TODO 改为ProcessStep注释 log.info("Start parse sql..."); + String dbType = config.getType().toLowerCase(); + + //todo 这里暂时不知道怎么处理好,sqlutils 中的数据库类型不支持 kyuubi 不支持spark, 而config.getType() 被两个地方用到,一处是这里,另一处是初始化driver的时候,需要传递driver的名字, + //其他的数据源 driver的名字就是数据源类型也被 druid sqlutils 支持的,但是 kyuubi 特殊,driver 名和数据库类型不同 + if (config.getType().equalsIgnoreCase("kyuubi")){ + dbType = this.getCustomDbType(); + } List stmtList = - SQLUtils.parseStatements(sql, config.getType().toLowerCase()); + SQLUtils.parseStatements(sql, dbType); log.info(CharSequenceUtil.format("A total of {} statement have been Parsed.", stmtList.size())); List resList = new ArrayList<>(); JdbcSelectResult result = JdbcSelectResult.buildResult(); diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/enums/DriverType.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/enums/DriverType.java index a0232e9159..97c744c914 100644 --- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/enums/DriverType.java +++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/enums/DriverType.java @@ -40,8 +40,11 @@ public enum DriverType { PHOENIX("Phoenix"), GREENPLUM("Greenplum"), HIVE("Hive"), + KYUUBI("Kyuubi"), PRESTO("Presto"); + + public final String value; DriverType(String value) { diff --git a/dinky-metadata/dinky-metadata-hive/src/main/java/org/dinky/metadata/driver/HiveDriver.java b/dinky-metadata/dinky-metadata-hive/src/main/java/org/dinky/metadata/driver/HiveDriver.java index 9946f9a3de..f08ecd1b9b 100644 --- a/dinky-metadata/dinky-metadata-hive/src/main/java/org/dinky/metadata/driver/HiveDriver.java +++ b/dinky-metadata/dinky-metadata-hive/src/main/java/org/dinky/metadata/driver/HiveDriver.java @@ -46,8 +46,11 @@ import java.util.List; import java.util.Map; -public class HiveDriver extends AbstractJdbcDriver implements Driver { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class HiveDriver extends AbstractJdbcDriver implements Driver { + protected static final Logger logger = LoggerFactory.getLogger(HiveDriver.class); @Override public Table getTable(String schemaName, String tableName) { List tables = listTables(schemaName); @@ -75,6 +78,14 @@ public List
listTables(String schemaName) { execute(String.format(HiveConstant.USE_DB, schemaName)); preparedStatement = conn.get().prepareStatement(sql); results = preparedStatement.executeQuery(); + + ResultSetMetaData rsmd = results.getMetaData(); + int columnCount = rsmd.getColumnCount(); + for (int i = 1; i <= columnCount; i++ ) { + String name = rsmd.getColumnName(i); + logger.info("listTables {}",name); + } + ResultSetMetaData metaData = results.getMetaData(); List columnList = new ArrayList<>(); for (int i = 1; i <= metaData.getColumnCount(); i++) { @@ -124,7 +135,15 @@ public List listSchemas() { try { preparedStatement = conn.get().prepareStatement(schemasSql); results = preparedStatement.executeQuery(); + ResultSetMetaData rsmd = results.getMetaData(); + int columnCount = rsmd.getColumnCount(); + for (int i = 1; i <= columnCount; i++ ) { + String name = rsmd.getColumnName(i); + logger.info("xxxlistSchemas {}",name); + } + while (results.next()) { + String schemaName = results.getString(getDBQuery().schemaName()); if (Asserts.isNotNullString(schemaName)) { Schema schema = new Schema(schemaName); @@ -153,6 +172,15 @@ public List listColumns(String schemaName, String tableName) { preparedStatement = conn.get().prepareStatement(tableFieldsSql); results = preparedStatement.executeQuery(); ResultSetMetaData metaData = results.getMetaData(); + + ResultSetMetaData rsmd = results.getMetaData(); + int columnCount = rsmd.getColumnCount(); + for (int i = 1; i <= columnCount; i++ ) { + String name = rsmd.getColumnName(i); + logger.info("listColumns {}",name); + } + + List columnList = new ArrayList<>(); for (int i = 1; i <= metaData.getColumnCount(); i++) { columnList.add(metaData.getColumnLabel(i)); diff --git a/dinky-metadata/dinky-metadata-kyuubi/pom.xml b/dinky-metadata/dinky-metadata-kyuubi/pom.xml new file mode 100644 index 0000000000..1e2f3ec05c --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/pom.xml @@ -0,0 +1,102 @@ + + + + 4.0.0 + + org.dinky + dinky-metadata + ${revision} + ../pom.xml + + dinky-metadata-kyuubi + + jar + + Dinky : Metadata : Kyuubi + + + 2.3.9 + + + + + org.dinky + dinky-metadata-base + + + com.alibaba + druid-spring-boot-starter + provided + + + + org.slf4j + slf4j-nop + 1.6.1 + + + + junit + junit + provided + + + org.apache.commons + commons-lang3 + 3.4 + + + org.apache.hive + hive-jdbc + ${hive.version} + + + ${scope.runtime} + + + jdk.tools + jdk.tools + + + + + org.apache.hive + hive-service + ${hive.version} + + + ${scope.runtime} + + + org.apache.hive + hive-exec + + + + + + + + + + + + + + diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/constant/KyuubiConstant.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/constant/KyuubiConstant.java new file mode 100644 index 0000000000..eb54d37286 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/constant/KyuubiConstant.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metadata.constant; + +public interface KyuubiConstant { + + /** 查询所有database */ + String QUERY_ALL_DATABASE = " show databases"; + /** 查询所有schema下的所有表 */ + String QUERY_ALL_TABLES_BY_SCHEMA = "show tables"; + /** 扩展信息Key */ + String DETAILED_TABLE_INFO = "Detailed Table Information"; + /** 查询指定schema.table的扩展信息 */ + String QUERY_TABLE_SCHEMA_EXTENED_INFOS = " describe extended `%s`.`%s`"; + /** 查询指定schema.table的信息 列 列类型 列注释 */ + String QUERY_TABLE_SCHEMA = " describe `%s`.`%s`"; + /** 使用 DB */ + String USE_DB = "use `%s`"; + /** 只查询指定schema.table的列名 */ + String QUERY_TABLE_COLUMNS_ONLY = "show columns in `%s`.`%s`"; +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/constant/TrinoEngineConstant.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/constant/TrinoEngineConstant.java new file mode 100644 index 0000000000..e978afdbaf --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/constant/TrinoEngineConstant.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metadata.constant; + +public interface TrinoEngineConstant { + + /** 查询所有database */ + String QUERY_ALL_DATABASE = "show catalogs"; + /** 查询某个schema下的所有表 */ + String QUERY_ALL_TABLES_BY_SCHEMA = "show tables from %s"; + /** 查询指定schema.table的信息 列 列类型 列注释 */ + String QUERY_TABLE_SCHEMA = " describe %s.%s"; + /** 只查询指定schema.table的列名 */ + String QUERY_TABLE_COLUMNS_ONLY = "show schemas from %s"; + /** 查询schema列名 */ + String SCHEMA = "SCHEMA"; + /** 需要排除的catalog */ + String EXTRA_SCHEMA = "system"; + /** 需要排除的schema */ + String EXTRA_DB = "information_schema"; +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/convert/KyuubiTypeConvert.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/convert/KyuubiTypeConvert.java new file mode 100644 index 0000000000..6d454c3b48 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/convert/KyuubiTypeConvert.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metadata.convert; + +import org.dinky.data.enums.ColumnType; + +public class KyuubiTypeConvert extends AbstractJdbcTypeConvert { + + public KyuubiTypeConvert() { + this.convertMap.clear(); + register("char", ColumnType.STRING); + register("boolean", ColumnType.BOOLEAN, ColumnType.JAVA_LANG_BOOLEAN); + register("tinyint", ColumnType.BYTE, ColumnType.JAVA_LANG_BYTE); + register("smallint", ColumnType.SHORT, ColumnType.JAVA_LANG_SHORT); + register("bigint", ColumnType.LONG, ColumnType.JAVA_LANG_LONG); + register("largeint", ColumnType.STRING); + register("int", ColumnType.INT, ColumnType.INTEGER); + register("float", ColumnType.FLOAT, ColumnType.JAVA_LANG_FLOAT); + register("double", ColumnType.DOUBLE, ColumnType.JAVA_LANG_DOUBLE); + register("timestamp", ColumnType.TIMESTAMP); + register("date", ColumnType.STRING); + register("datetime", ColumnType.STRING); + register("decimal", ColumnType.DECIMAL); + register("time", ColumnType.DOUBLE, ColumnType.JAVA_LANG_DOUBLE); + } + + @Override + public String convertToDB(ColumnType columnType) { + switch (columnType) { + case STRING: + return "varchar"; + case BOOLEAN: + case JAVA_LANG_BOOLEAN: + return "boolean"; + case BYTE: + case JAVA_LANG_BYTE: + return "tinyint"; + case SHORT: + case JAVA_LANG_SHORT: + return "smallint"; + case LONG: + case JAVA_LANG_LONG: + return "bigint"; + case FLOAT: + case JAVA_LANG_FLOAT: + return "float"; + case DOUBLE: + case JAVA_LANG_DOUBLE: + return "double"; + case DECIMAL: + return "decimal"; + case INT: + case INTEGER: + return "int"; + case TIMESTAMP: + return "timestamp"; + default: + return "varchar"; + } + } +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/driver/KyuubiDriver.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/driver/KyuubiDriver.java new file mode 100644 index 0000000000..2d9da12b57 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/driver/KyuubiDriver.java @@ -0,0 +1,335 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metadata.driver; + +import org.dinky.assertion.Asserts; +import org.dinky.data.model.Column; +import org.dinky.data.model.Schema; +import org.dinky.data.model.Table; +import org.dinky.metadata.config.AbstractJdbcConfig; +import org.dinky.metadata.constant.KyuubiConstant; +import org.dinky.metadata.convert.KyuubiTypeConvert; +import org.dinky.metadata.convert.ITypeConvert; +import org.dinky.metadata.enums.DriverType; +import org.dinky.metadata.query.IDBQuery; +import org.dinky.metadata.query.KyuubiQueryFactory; +import org.dinky.metadata.result.JdbcSelectResult; +import org.dinky.utils.LogUtil; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class KyuubiDriver extends AbstractJdbcDriver implements Driver { + protected static final Logger logger = LoggerFactory.getLogger(KyuubiDriver.class); + + @Override + public Table getTable(String schemaName, String tableName) { + List
tables = listTables(schemaName); + Table table = null; + for (Table item : tables) { + if (Asserts.isEquals(item.getName(), tableName)) { + table = item; + break; + } + } + if (Asserts.isNotNull(table)) { + table.setColumns(listColumns(schemaName, table.getName())); + } + return table; + } + + @Override + public List
listTables(String schemaName) { + List
tableList = new ArrayList<>(); + PreparedStatement preparedStatement = null; + ResultSet results = null; + IDBQuery dbQuery = getDBQuery(); + String sql = dbQuery.tablesSql(schemaName); + try { + execute(String.format(KyuubiConstant.USE_DB, schemaName)); + preparedStatement = conn.get().prepareStatement(sql); + results = preparedStatement.executeQuery(); + ResultSetMetaData metaData = results.getMetaData(); + List columnList = new ArrayList<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + columnList.add(metaData.getColumnLabel(i)); + } + while (results.next()) { + String tableName = results.getString(dbQuery.tableName()); + if (Asserts.isNotNullString(tableName)) { + Table tableInfo = new Table(); + tableInfo.setName(tableName); + if (columnList.contains(dbQuery.tableComment())) { + tableInfo.setComment(results.getString(dbQuery.tableComment())); + } + tableInfo.setSchema(schemaName); + if (columnList.contains(dbQuery.tableType())) { + tableInfo.setType(results.getString(dbQuery.tableType())); + } + if (columnList.contains(dbQuery.catalogName())) { + tableInfo.setCatalog(results.getString(dbQuery.catalogName())); + } + if (columnList.contains(dbQuery.engine())) { + tableInfo.setEngine(results.getString(dbQuery.engine())); + } + tableList.add(tableInfo); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + close(preparedStatement, results); + } + return tableList; + } + + @Override + public List getSchemasAndTables() { + return listSchemas(); + } + + @Override + public List listSchemas() { + + List schemas = new ArrayList<>(); + PreparedStatement preparedStatement = null; + ResultSet results = null; + String schemasSql = getDBQuery().schemaAllSql(); + try { + preparedStatement = conn.get().prepareStatement(schemasSql); + results = preparedStatement.executeQuery(); + while (results.next()) { + String schemaName = results.getString(getDBQuery().schemaName()); + if (Asserts.isNotNullString(schemaName)) { + Schema schema = new Schema(schemaName); + if (execute(String.format(KyuubiConstant.USE_DB, schemaName))) { + schema.setTables(listTables(schema.getName())); + } + schemas.add(schema); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + close(preparedStatement, results); + } + return schemas; + } + + @Override + public List listColumns(String schemaName, String tableName) { + List columns = new ArrayList<>(); + PreparedStatement preparedStatement = null; + ResultSet results = null; + IDBQuery dbQuery = getDBQuery(); + String tableFieldsSql = dbQuery.columnsSql(schemaName, tableName); + try { + preparedStatement = conn.get().prepareStatement(tableFieldsSql); + results = preparedStatement.executeQuery(); + ResultSetMetaData metaData = results.getMetaData(); + List columnList = new ArrayList<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + columnList.add(metaData.getColumnLabel(i)); + } + Integer positionId = 1; + while (results.next()) { + Column field = new Column(); + if (StringUtils.isEmpty(results.getString(dbQuery.columnName()))) { + break; + } else { + if (columnList.contains(dbQuery.columnName())) { + String columnName = results.getString(dbQuery.columnName()); + field.setName(columnName); + } + if (columnList.contains(dbQuery.columnType())) { + field.setType(results.getString(dbQuery.columnType())); + } + if (columnList.contains(dbQuery.columnComment()) + && Asserts.isNotNull(results.getString(dbQuery.columnComment()))) { + String columnComment = + results.getString(dbQuery.columnComment()).replaceAll("\"|'", ""); + field.setComment(columnComment); + } + field.setPosition(positionId++); + field.setJavaType(getTypeConvert().convert(field)); + } + columns.add(field); + } + } catch (SQLException e) { + e.printStackTrace(); + } finally { + close(preparedStatement, results); + } + return columns; + } + + @Override + public String getCreateTableSql(Table table) { + StringBuilder createTable = new StringBuilder(); + PreparedStatement preparedStatement = null; + ResultSet results = null; + String createTableSql = getDBQuery().createTableSql(table.getSchema(), table.getName()); + try { + preparedStatement = conn.get().prepareStatement(createTableSql); + results = preparedStatement.executeQuery(); + while (results.next()) { + createTable + .append(results.getString(getDBQuery().createTableName())) + .append("\n"); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + close(preparedStatement, results); + } + return createTable.toString(); + } + + @Override + public int executeUpdate(String sql) throws Exception { + Asserts.checkNullString(sql, "Sql 语句为空"); + String querySQL = sql.trim().replaceAll(";$", ""); + int res = 0; + try (Statement statement = conn.get().createStatement()) { + res = statement.executeUpdate(querySQL); + } + return res; + } + + @Override + public JdbcSelectResult query(String sql, Integer limit) { + if (Asserts.isNull(limit)) { + limit = 100; + } + JdbcSelectResult result = new JdbcSelectResult(); + List> datas = new ArrayList<>(); + List columns = new ArrayList<>(); + List columnNameList = new ArrayList<>(); + PreparedStatement preparedStatement = null; + ResultSet results = null; + int count = 0; + try { + String querySQL = sql.trim().replaceAll(";$", ""); + preparedStatement = conn.get().prepareStatement(querySQL); + results = preparedStatement.executeQuery(); + if (Asserts.isNull(results)) { + result.setSuccess(true); + close(preparedStatement, results); + return result; + } + ResultSetMetaData metaData = results.getMetaData(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + columnNameList.add(metaData.getColumnLabel(i)); + Column column = new Column(); + column.setName(metaData.getColumnLabel(i)); + column.setType(metaData.getColumnTypeName(i)); + column.setAutoIncrement(metaData.isAutoIncrement(i)); + column.setNullable(metaData.isNullable(i) == 0 ? false : true); + column.setJavaType(getTypeConvert().convert(column)); + columns.add(column); + } + result.setColumns(columnNameList); + while (results.next()) { + LinkedHashMap data = new LinkedHashMap<>(); + for (int i = 0; i < columns.size(); i++) { + data.put( + columns.get(i).getName(), + getTypeConvert() + .convertValue( + results, + columns.get(i).getName(), + columns.get(i).getType())); + } + datas.add(data); + count++; + if (count >= limit) { + break; + } + } + result.setSuccess(true); + } catch (Exception e) { + result.setError(LogUtil.getError(e)); + result.setSuccess(false); + } finally { + close(preparedStatement, results); + result.setRowData(datas); + return result; + } + } + + @Override + public IDBQuery getDBQuery() { + logger.info("xxxxxx connector config is {}",config.getConnectConfig().toString()); + return KyuubiQueryFactory.getKyuubiQuery("Spark"); + } + + @Override + public ITypeConvert getTypeConvert() { + return new KyuubiTypeConvert(); + } + + @Override + String getDriverClass() { + return "org.apache.hive.jdbc.HiveDriver"; + } + + //todo 选择不同引擎传入不同的 type antspark hive , trino,flink, + //todo 不同引擎通过页面来选择,验证是否可以动态传参 + //考虑切换引擎时的 connection 维护 + @Override + public String getType() { + return DriverType.KYUUBI.getValue(); + } + + public String getCustomDbType(){ + //druid 暂时不支持 其它类型的database 比如spark,就先返回hive + return "hive"; + } + + @Override + public String getName() { + return "Kyuubi"; + } + + @Override + public Map getFlinkColumnTypeConversion() { + HashMap map = new HashMap<>(); + map.put("BOOLEAN", "BOOLEAN"); + map.put("TINYINT", "TINYINT"); + map.put("SMALLINT", "SMALLINT"); + map.put("INT", "INT"); + map.put("VARCHAR", "STRING"); + map.put("TEXT", "STRING"); + map.put("DATETIME", "TIMESTAMP"); + return map; + } +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/FlinkEngineQuery.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/FlinkEngineQuery.java new file mode 100644 index 0000000000..16f25f55e6 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/FlinkEngineQuery.java @@ -0,0 +1,20 @@ +package org.dinky.metadata.query; + +import org.dinky.metadata.constant.KyuubiConstant; + +public class FlinkEngineQuery extends KyuubiEngineQuery{ + @Override + public String schemaAllSql() { + return KyuubiConstant.QUERY_ALL_DATABASE; + } + + @Override + public String tablesSql(String schemaName) { + return KyuubiConstant.QUERY_ALL_TABLES_BY_SCHEMA; + } + + @Override + public String columnsSql(String schemaName, String tableName) { + return String.format(KyuubiConstant.QUERY_TABLE_SCHEMA, schemaName, tableName); + } +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/HiveEngineQuery.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/HiveEngineQuery.java new file mode 100644 index 0000000000..1ca3261188 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/HiveEngineQuery.java @@ -0,0 +1,56 @@ +package org.dinky.metadata.query; + +import org.dinky.metadata.constant.KyuubiConstant; + +public class HiveEngineQuery extends KyuubiEngineQuery{ + + @Override + public String schemaAllSql() { + return KyuubiConstant.QUERY_ALL_DATABASE; + } + + @Override + public String tablesSql(String schemaName) { + return KyuubiConstant.QUERY_ALL_TABLES_BY_SCHEMA; + } + + @Override + public String columnsSql(String schemaName, String tableName) { + return String.format(KyuubiConstant.QUERY_TABLE_SCHEMA, schemaName, tableName); + } + + @Override + public String schemaName() { + return "database_name"; + } + + @Override + public String createTableName() { + return "createtab_stmt"; + } + + @Override + public String tableName() { + return "tab_name"; + } + + @Override + public String tableComment() { + return "comment"; + } + + @Override + public String columnName() { + return "col_name"; + } + + @Override + public String columnType() { + return "data_type"; + } + + @Override + public String columnComment() { + return "comment"; + } +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/KyuubiEngineQuery.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/KyuubiEngineQuery.java new file mode 100644 index 0000000000..746178f3c7 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/KyuubiEngineQuery.java @@ -0,0 +1,5 @@ +package org.dinky.metadata.query; + +public abstract class KyuubiEngineQuery extends AbstractDBQuery{ + +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/KyuubiQueryFactory.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/KyuubiQueryFactory.java new file mode 100644 index 0000000000..fefc586184 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/KyuubiQueryFactory.java @@ -0,0 +1,18 @@ +package org.dinky.metadata.query; + +public class KyuubiQueryFactory { + public static IDBQuery getKyuubiQuery(String databaseType) { + switch (databaseType) { + case "Spark": + return new SparkEngineQuery(); + case "Hive": + return new HiveEngineQuery(); + case "Trino": + return new TrinoEngineQuery(); + case "Flink": + return new FlinkEngineQuery(); + default: + throw new IllegalArgumentException("Unsupported database type: " + databaseType); + } + } +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/SparkEngineQuery.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/SparkEngineQuery.java new file mode 100644 index 0000000000..221edb41c3 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/SparkEngineQuery.java @@ -0,0 +1,56 @@ +package org.dinky.metadata.query; + +import org.dinky.metadata.constant.KyuubiConstant; + +public class SparkEngineQuery extends KyuubiEngineQuery{ + + @Override + public String schemaAllSql() { + return KyuubiConstant.QUERY_ALL_DATABASE; + } + + @Override + public String tablesSql(String schemaName) { + return KyuubiConstant.QUERY_ALL_TABLES_BY_SCHEMA; + } + + @Override + public String columnsSql(String schemaName, String tableName) { + return String.format(KyuubiConstant.QUERY_TABLE_SCHEMA, schemaName, tableName); + } + + @Override + public String schemaName() { + return "namespace"; + } + + @Override + public String createTableName() { + return "createtab_stmt"; + } + + @Override + public String tableName() { + return "tab_name"; + } + + @Override + public String tableComment() { + return "comment"; + } + + @Override + public String columnName() { + return "col_name"; + } + + @Override + public String columnType() { + return "data_type"; + } + + @Override + public String columnComment() { + return "comment"; + } +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/TrinoEngineQuery.java b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/TrinoEngineQuery.java new file mode 100644 index 0000000000..17b3bc4f90 --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/java/org/dinky/metadata/query/TrinoEngineQuery.java @@ -0,0 +1,55 @@ +package org.dinky.metadata.query; +import org.dinky.metadata.constant.TrinoEngineConstant; + +public class TrinoEngineQuery extends KyuubiEngineQuery { + + @Override + public String schemaAllSql() { + return TrinoEngineConstant.QUERY_ALL_DATABASE; + } + + @Override + public String tablesSql(String schemaName) { + return TrinoEngineConstant.QUERY_ALL_TABLES_BY_SCHEMA; + } + + @Override + public String columnsSql(String schemaName, String tableName) { + return String.format(TrinoEngineConstant.QUERY_TABLE_SCHEMA, schemaName, tableName); + } + + @Override + public String schemaName() { + return "Catalog"; + } + + @Override + public String createTableName() { + return "Create Table"; + } + + @Override + public String tableName() { + return "Table"; + } + + @Override + public String tableComment() { + return "Comment"; + } + + @Override + public String columnName() { + return "Column"; + } + + @Override + public String columnType() { + return "Type"; + } + + @Override + public String columnComment() { + return "Comment"; + } +} diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/main/resources/META-INF/services/org.dinky.metadata.driver.Driver b/dinky-metadata/dinky-metadata-kyuubi/src/main/resources/META-INF/services/org.dinky.metadata.driver.Driver new file mode 100644 index 0000000000..edf839803a --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/main/resources/META-INF/services/org.dinky.metadata.driver.Driver @@ -0,0 +1 @@ +org.dinky.metadata.driver.KyuubiDriver \ No newline at end of file diff --git a/dinky-metadata/dinky-metadata-kyuubi/src/test/java/org/dinky/metadata/HiveTest.java b/dinky-metadata/dinky-metadata-kyuubi/src/test/java/org/dinky/metadata/HiveTest.java new file mode 100644 index 0000000000..0ab97ea97e --- /dev/null +++ b/dinky-metadata/dinky-metadata-kyuubi/src/test/java/org/dinky/metadata/HiveTest.java @@ -0,0 +1,171 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metadata; + +import org.dinky.data.model.Column; +import org.dinky.data.model.Schema; +import org.dinky.data.model.Table; +import org.dinky.metadata.config.AbstractJdbcConfig; +import org.dinky.metadata.config.DriverConfig; +import org.dinky.metadata.driver.Driver; +import org.dinky.metadata.result.JdbcSelectResult; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MysqlTest + * + * @since 2021/7/20 15:32 + */ +@Ignore +public class HiveTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(HiveTest.class); + + private static final String IP = "cdh1"; + private static final Integer PORT = 10000; + private static final String hiveDB = "test"; + private static final String username = "zhumingye"; + private static final String passwd = "123456"; + private static final String hive = "Hive"; + + private static String url = "jdbc:hive2://" + IP + ":" + PORT + "/" + hiveDB; + + public Driver getDriver() { + DriverConfig config = new DriverConfig<>(); + config.setType(hive); + config.setName(hive); + config.setConnectConfig(AbstractJdbcConfig.builder() + .ip(IP) + .port(PORT) + .username(username) + .password(passwd) + .url(url) + .build()); + return Driver.build(config); + } + + @Ignore + @Test + public void connectTest() { + DriverConfig config = new DriverConfig<>(); + config.setType(hive); + config.setName(hive); + config.setConnectConfig(AbstractJdbcConfig.builder() + .ip(IP) + .port(PORT) + .username(username) + .password(passwd) + .url(url) + .build()); + String test = Driver.build(config).test(); + LOGGER.info(test); + LOGGER.info("end..."); + } + + @Ignore + @Test + public void getDBSTest() { + Driver driver = getDriver(); + List schemasAndTables = driver.listSchemas(); + schemasAndTables.forEach(schema -> { + LOGGER.info(schema.getName() + "\t\t" + schema.getTables().toString()); + }); + LOGGER.info("end..."); + } + + @Ignore + @Test + public void getTablesByDBTest() throws Exception { + Driver driver = getDriver(); + driver.execute("use odsp "); + List
tableList = driver.listTables(hiveDB); + tableList.forEach(schema -> { + LOGGER.info(schema.getName()); + }); + LOGGER.info("end..."); + } + + @Ignore + @Test + public void getColumnsByTableTest() { + Driver driver = getDriver(); + List columns = driver.listColumns(hiveDB, "biz_college_planner_mysql_language_score_item"); + for (Column column : columns) { + LOGGER.info(column.getName() + " \t " + column.getType() + " \t " + column.getComment()); + } + LOGGER.info("end..."); + } + + @Ignore + @Test + public void getCreateTableTest() throws Exception { + Driver driver = getDriver(); + Table driverTable = driver.getTable(hiveDB, "biz_college_planner_mysql_language_score_item"); + String createTableSql = driver.getCreateTableSql(driverTable); + LOGGER.info(createTableSql); + LOGGER.info("end..."); + } + + @Ignore + @Test + public void getTableExtenedInfoTest() throws Exception { + Driver driver = getDriver(); + Table driverTable = driver.getTable(hiveDB, "employees"); + for (Column column : driverTable.getColumns()) { + LOGGER.info(column.getName() + "\t\t" + column.getType() + "\t\t" + column.getComment()); + } + } + + /** + * @Author: zhumingye + * @return: + */ + @Ignore + @Test + public void multipleSQLTest() throws Exception { + Driver driver = getDriver(); + String sql = "select\n" + + " date_format(create_time,'yyyy-MM') as pay_success_time,\n" + + " sum(pay_amount)/100 as amount\n" + + "from\n" + + " odsp.pub_pay_mysql_pay_order\n" + + " group by date_format(create_time,'yyyy-MM') ;\n" + + "select\n" + + " *\n" + + "from\n" + + " odsp.pub_pay_mysql_pay_order ;"; + JdbcSelectResult selectResult = driver.executeSql(sql, 100); + for (LinkedHashMap rowDatum : selectResult.getRowData()) { + Set> entrySet = rowDatum.entrySet(); + for (Map.Entry stringObjectEntry : entrySet) { + LOGGER.info(stringObjectEntry.getKey() + "\t\t" + stringObjectEntry.getValue()); + } + } + } +} diff --git a/dinky-metadata/pom.xml b/dinky-metadata/pom.xml index 63b948c0fe..e35d336a3e 100644 --- a/dinky-metadata/pom.xml +++ b/dinky-metadata/pom.xml @@ -42,6 +42,7 @@ dinky-metadata-hive dinky-metadata-starrocks dinky-metadata-presto + dinky-metadata-kyuubi @@ -55,4 +56,4 @@ - + \ No newline at end of file diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/function.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/function.tsx index 636d0121a7..42f6df174f 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/function.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/function.tsx @@ -69,6 +69,7 @@ export const isSql = (dialect: string) => { case DIALECT.HIVE: case DIALECT.STARROCKS: case DIALECT.PRESTO: + case DIALECT.KYUUBI: return true; default: return false; diff --git a/dinky-web/src/pages/DataStudio/LeftContainer/Project/constants.tsx b/dinky-web/src/pages/DataStudio/LeftContainer/Project/constants.tsx index 9728e52a3e..86c9efb188 100644 --- a/dinky-web/src/pages/DataStudio/LeftContainer/Project/constants.tsx +++ b/dinky-web/src/pages/DataStudio/LeftContainer/Project/constants.tsx @@ -174,6 +174,10 @@ export const JOB_TYPE: DefaultOptionType[] = [ { value: 'Presto', label: 'Presto' + }, + { + value: 'Kyuubi', + label: 'Kyuubi' } ] }, diff --git a/dinky-web/src/pages/RegCenter/DataSource/components/constants.ts b/dinky-web/src/pages/RegCenter/DataSource/components/constants.ts index 41cad752f7..6f01d1a0ef 100644 --- a/dinky-web/src/pages/RegCenter/DataSource/components/constants.ts +++ b/dinky-web/src/pages/RegCenter/DataSource/components/constants.ts @@ -67,6 +67,10 @@ export const DATA_SOURCE_TYPE_OPTIONS = [ { label: 'Presto', value: 'Presto' + }, + { + label: 'Kyuubi', + value: 'Kyuubi' } ] }, @@ -165,6 +169,11 @@ export const AUTO_COMPLETE_TYPE = [ key: 'presto', value: 'jdbc:presto://localhost:8080/dinky', label: 'jdbc:presto://localhost:8080/dinky' + }, + { + key: 'kyuubi', + value: 'jdbc:hive2://localhost:8080/default', + label: 'jdbc:hive2://localhost:8080/default' } ]; diff --git a/dinky-web/src/pages/RegCenter/DataSource/components/function.tsx b/dinky-web/src/pages/RegCenter/DataSource/components/function.tsx index 952c7b7396..17d5139c97 100644 --- a/dinky-web/src/pages/RegCenter/DataSource/components/function.tsx +++ b/dinky-web/src/pages/RegCenter/DataSource/components/function.tsx @@ -59,6 +59,8 @@ export const renderDBIcon = (type: string, size?: number) => { return ; case 'presto': return ; + case 'kyuubi': + return ; default: return ; } diff --git a/dinky-web/src/services/constants.tsx b/dinky-web/src/services/constants.tsx index f7109280da..c19cd23c2b 100644 --- a/dinky-web/src/services/constants.tsx +++ b/dinky-web/src/services/constants.tsx @@ -223,7 +223,8 @@ export const DIALECT = { HIVE: 'hive', PHOENIX: 'phoenix', STARROCKS: 'starrocks', - PRESTO: 'presto' + PRESTO: 'presto', + KYUUBI: 'kyuubi' }; export const RUN_MODE = { diff --git a/pom.xml b/pom.xml index 26c2535f39..0ed0ba0c5a 100644 --- a/pom.xml +++ b/pom.xml @@ -931,10 +931,43 @@ - - apache-snapshots + aliyun-central + https://maven.aliyun.com/repository/central + + + aliyun-public + https://maven.aliyun.com/repository/public + + + aliyun-snapshots https://maven.aliyun.com/repository/apache-snapshots + + apache-spring-plugin + https://maven.aliyun.com/repository/spring-plugin + + + aliyun-spring + https://maven.aliyun.com/repository/spring + + + aliyun-google + https://maven.aliyun.com/repository/google + + + aliyun-gradle-plugin + https://maven.aliyun.com/repository/gradle-plugin + + + + aliyun-jcenter + https://maven.aliyun.com/repository/jcenter + + + + aliyun-releases + https://maven.aliyun.com/repository/releases +