diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java index 5db69ac8f0d0a95..676817654f6be67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java @@ -18,8 +18,13 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; +import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.JdbcAnalysisTask; +import org.apache.doris.statistics.TableStats; import org.apache.doris.thrift.TTableDescriptor; import org.apache.logging.log4j.LogManager; @@ -93,4 +98,27 @@ private JdbcTable toJdbcTable() { jdbcTable.setCheckSum(jdbcCatalog.getCheckSum()); return jdbcTable; } + + @Override + public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { + makeSureInitialized(); + return new JdbcAnalysisTask(info); + } + + @Override + public long getRowCount() { + makeSureInitialized(); + TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id); + if (tableStats != null) { + long rowCount = tableStats.rowCount; + LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount); + return rowCount; + } + return 1; + } + + @Override + public long estimatedRowCount() { + return getRowCount(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index fd2d844ce13f9d2..39656d0c153c6eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -688,11 +688,10 @@ public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException { Set cols = dropStatsStmt.getColumnNames(); long tblId = dropStatsStmt.getTblId(); TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId()); - if (tableStats == null) { - return; + if (tableStats != null) { + tableStats.updatedTime = 0; + replayUpdateTableStatsStatus(tableStats); } - tableStats.updatedTime = 0; - replayUpdateTableStatsStatus(tableStats); StatisticsRepository.dropStatistics(tblId, cols); for (String col : cols) { Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java new file mode 100644 index 000000000000000..5a1b1b88c12e74b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.external.JdbcExternalTable; +import org.apache.doris.common.FeConstants; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.commons.text.StringSubstitutor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class JdbcAnalysisTask extends BaseAnalysisTask { + private static final Logger LOG = LogManager.getLogger(JdbcAnalysisTask.class); + + private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " SELECT " + + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + + "${catalogId} AS catalog_id, " + + "${dbId} AS db_id, " + + "${tblId} AS tbl_id, " + + "${idxId} AS idx_id, " + + "'${colId}' AS col_id, " + + "NULL AS part_id, " + + "COUNT(1) AS row_count, " + + "NDV(`${colName}`) AS ndv, " + + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " + + "MIN(`${colName}`) AS min, " + + "MAX(`${colName}`) AS max, " + + "${dataSizeFunction} AS data_size, " + + "NOW() " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + + private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + + private final boolean isTableLevelTask; + private JdbcExternalTable table; + + public JdbcAnalysisTask(AnalysisInfo info) { + super(info); + isTableLevelTask = info.externalTableLevelTask; + table = (JdbcExternalTable) tbl; + } + + public void doExecute() throws Exception { + if (isTableLevelTask) { + getTableStats(); + } else { + getTableColumnStats(); + } + } + + /** + * Get table row count and store the result to metadata. + */ + private void getTableStats() throws Exception { + Map params = buildTableStatsParams(null); + List columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE)); + String rowCount = columnResult.get(0).get(0); + Env.getCurrentEnv().getAnalysisManager() + .updateTableStatsStatus(new TableStats(table.getId(), Long.parseLong(rowCount), info)); + } + + /** + * Get column statistics and insert the result to __internal_schema.column_statistics + */ + private void getTableColumnStats() throws Exception { + // An example sql for a column stats: + // INSERT INTO __internal_schema.column_statistics + // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id, + // 13002 AS catalog_id, + // 13038 AS db_id, + // 13055 AS tbl_id, + // -1 AS idx_id, + // 'r_regionkey' AS col_id, + // 'NULL' AS part_id, + // COUNT(1) AS row_count, + // NDV(`r_regionkey`) AS ndv, + // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count, + // MIN(`r_regionkey`) AS min, + // MAX(`r_regionkey`) AS max, + // 0 AS data_size, + // NOW() FROM `hive`.`tpch100`.`region` + StringBuilder sb = new StringBuilder(); + sb.append(ANALYZE_SQL_TABLE_TEMPLATE); + Map params = buildTableStatsParams("NULL"); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("colName", col.getName()); + params.put("colId", info.colName); + params.put("dataSizeFunction", getDataSizeFunction(col)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(sb.toString()); + executeInsertSql(sql); + } + + private void executeInsertSql(String sql) throws Exception { + long startTime = System.currentTimeMillis(); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + r.connectContext.setExecutor(stmtExecutor); + this.stmtExecutor.execute(); + QueryState queryState = r.connectContext.getState(); + if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) { + LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]", + info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage())); + throw new RuntimeException(queryState.getErrorMessage()); + } + LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.", + info.catalogName, info.dbName, info.colName, sql, (System.currentTimeMillis() - startTime))); + } + } + + private Map buildTableStatsParams(String partId) { + Map commonParams = new HashMap<>(); + String id = StatisticsUtil.constructId(tbl.getId(), -1); + if (partId == null) { + commonParams.put("partId", "NULL"); + } else { + id = StatisticsUtil.constructId(id, partId); + commonParams.put("partId", "\'" + partId + "\'"); + } + commonParams.put("id", id); + commonParams.put("catalogId", String.valueOf(catalog.getId())); + commonParams.put("dbId", String.valueOf(db.getId())); + commonParams.put("tblId", String.valueOf(tbl.getId())); + commonParams.put("indexId", "-1"); + commonParams.put("idxId", "-1"); + commonParams.put("catalogName", catalog.getName()); + commonParams.put("dbName", db.getFullName()); + commonParams.put("tblName", tbl.getName()); + if (col != null) { + commonParams.put("type", col.getType().toString()); + } + commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); + return commonParams; + } + + @Override + protected void afterExecution() { + // Table level task doesn't need to sync any value to sync stats, it stores the value in metadata. + if (isTableLevelTask) { + return; + } + Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 87d8a0ba15c9c92..c6f906d289dc78d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -42,9 +42,9 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.VariantType; +import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -704,6 +704,6 @@ public static boolean isExternalTable(String catalogName, String dbName, String LOG.warn(e.getMessage()); return false; } - return table.getType().equals(TableType.HMS_EXTERNAL_TABLE); + return table instanceof ExternalTable; } } diff --git a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy new file mode 100644 index 000000000000000..d6f0ca351ddc8fb --- /dev/null +++ b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mysql_jdbc_statistics", "p0,external,mysql,external_docker,external_docker_mysql") { + String enabled = context.config.otherConfigs.get("enableJdbcTest") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String catalog_name = "test_mysql_jdbc_statistics"; + + sql """create catalog if not exists ${catalog_name} properties( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false&zeroDateTimeBehavior=convertToNull", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver" + );""" + + sql """use ${catalog_name}.doris_test""" + sql """analyze table ex_tb0 with sync""" + def result = sql """show column stats ex_tb0 (name)""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] == "name") + assertTrue(result[0][1] == "5.0") + assertTrue(result[0][2] == "5.0") + assertTrue(result[0][3] == "0.0") + assertTrue(result[0][4] == "18.0") + assertTrue(result[0][5] == "3.0") + assertTrue(result[0][6] == "'abc'") + assertTrue(result[0][7] == "'abg'") + + result = sql """show column stats ex_tb0 (id)""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] == "id") + assertTrue(result[0][1] == "5.0") + assertTrue(result[0][2] == "5.0") + assertTrue(result[0][3] == "0.0") + assertTrue(result[0][4] == "24.0") + assertTrue(result[0][5] == "4.0") + assertTrue(result[0][6] == "111") + assertTrue(result[0][7] == "115") + + sql """drop catalog ${catalog_name}""" + } +} +