Skip to content

Commit

Permalink
Support basic jdbc external table stats collection.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Sep 6, 2023
1 parent aee7735 commit 16c3270
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
package org.apache.doris.catalog.external;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.JdbcAnalysisTask;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.thrift.TTableDescriptor;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -93,4 +98,27 @@ private JdbcTable toJdbcTable() {
jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
return jdbcTable;
}

@Override
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
makeSureInitialized();
return new JdbcAnalysisTask(info);
}

@Override
public long getRowCount() {
makeSureInitialized();
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
if (tableStats != null) {
long rowCount = tableStats.rowCount;
LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount);
return rowCount;
}
return 1;
}

@Override
public long estimatedRowCount() {
return getRowCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,10 @@ public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
Set<String> cols = dropStatsStmt.getColumnNames();
long tblId = dropStatsStmt.getTblId();
TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
if (tableStats != null) {
tableStats.updatedTime = 0;
replayUpdateTableStatsStatus(tableStats);
}
tableStats.updatedTime = 0;
replayUpdateTableStatsStatus(tableStats);
StatisticsRepository.dropStatistics(tblId, cols);
for (String col : cols) {
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.JdbcExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;

import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class JdbcAnalysisTask extends BaseAnalysisTask {
private static final Logger LOG = LogManager.getLogger(JdbcAnalysisTask.class);

private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
+ "${tblId} AS tbl_id, "
+ "${idxId} AS idx_id, "
+ "'${colId}' AS col_id, "
+ "NULL AS part_id, "
+ "COUNT(1) AS row_count, "
+ "NDV(`${colName}`) AS ndv, "
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
+ "MIN(`${colName}`) AS min, "
+ "MAX(`${colName}`) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";

private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";

private final boolean isTableLevelTask;
private JdbcExternalTable table;

public JdbcAnalysisTask(AnalysisInfo info) {
super(info);
isTableLevelTask = info.externalTableLevelTask;
table = (JdbcExternalTable) tbl;
}

public void doExecute() throws Exception {
if (isTableLevelTask) {
getTableStats();
} else {
getTableColumnStats();
}
}

/**
* Get table row count and store the result to metadata.
*/
private void getTableStats() throws Exception {
Map<String, String> params = buildTableStatsParams(null);
List<ResultRow> columnResult =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE));
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(new TableStats(table.getId(), Long.parseLong(rowCount), info));
}

/**
* Get column statistics and insert the result to __internal_schema.column_statistics
*/
private void getTableColumnStats() throws Exception {
// An example sql for a column stats:
// INSERT INTO __internal_schema.column_statistics
// SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
// 13002 AS catalog_id,
// 13038 AS db_id,
// 13055 AS tbl_id,
// -1 AS idx_id,
// 'r_regionkey' AS col_id,
// 'NULL' AS part_id,
// COUNT(1) AS row_count,
// NDV(`r_regionkey`) AS ndv,
// SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count,
// MIN(`r_regionkey`) AS min,
// MAX(`r_regionkey`) AS max,
// 0 AS data_size,
// NOW() FROM `hive`.`tpch100`.`region`
StringBuilder sb = new StringBuilder();
sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
Map<String, String> params = buildTableStatsParams("NULL");
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("colName", col.getName());
params.put("colId", info.colName);
params.put("dataSizeFunction", getDataSizeFunction(col));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
executeInsertSql(sql);
}

private void executeInsertSql(String sql) throws Exception {
long startTime = System.currentTimeMillis();
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
this.stmtExecutor.execute();
QueryState queryState = r.connectContext.getState();
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]",
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
throw new RuntimeException(queryState.getErrorMessage());
}
LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.",
info.catalogName, info.dbName, info.colName, sql, (System.currentTimeMillis() - startTime)));
}
}

private Map<String, String> buildTableStatsParams(String partId) {
Map<String, String> commonParams = new HashMap<>();
String id = StatisticsUtil.constructId(tbl.getId(), -1);
if (partId == null) {
commonParams.put("partId", "NULL");
} else {
id = StatisticsUtil.constructId(id, partId);
commonParams.put("partId", "\'" + partId + "\'");
}
commonParams.put("id", id);
commonParams.put("catalogId", String.valueOf(catalog.getId()));
commonParams.put("dbId", String.valueOf(db.getId()));
commonParams.put("tblId", String.valueOf(tbl.getId()));
commonParams.put("indexId", "-1");
commonParams.put("idxId", "-1");
commonParams.put("catalogName", catalog.getName());
commonParams.put("dbName", db.getFullName());
commonParams.put("tblName", tbl.getName());
if (col != null) {
commonParams.put("type", col.getType().toString());
}
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
return commonParams;
}

@Override
protected void afterExecution() {
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
if (isTableLevelTask) {
return;
}
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.VariantType;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -704,6 +704,6 @@ public static boolean isExternalTable(String catalogName, String dbName, String
LOG.warn(e.getMessage());
return false;
}
return table.getType().equals(TableType.HMS_EXTERNAL_TABLE);
return table instanceof ExternalTable;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_mysql_jdbc_statistics", "p0,external,mysql,external_docker,external_docker_mysql") {
String enabled = context.config.otherConfigs.get("enableJdbcTest")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String mysql_port = context.config.otherConfigs.get("mysql_57_port");
String s3_endpoint = getS3Endpoint()
String bucket = getS3BucketName()
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar"
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String catalog_name = "test_mysql_jdbc_statistics";

sql """create catalog if not exists ${catalog_name} properties(
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false&zeroDateTimeBehavior=convertToNull",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver"
);"""

sql """use ${catalog_name}.doris_test"""
sql """analyze table ex_tb0 with sync"""
def result = sql """show column stats ex_tb0 (name)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "name")
assertTrue(result[0][1] == "5.0")
assertTrue(result[0][2] == "5.0")
assertTrue(result[0][3] == "0.0")
assertTrue(result[0][4] == "18.0")
assertTrue(result[0][5] == "3.0")
assertTrue(result[0][6] == "'abc'")
assertTrue(result[0][7] == "'abg'")

result = sql """show column stats ex_tb0 (id)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "id")
assertTrue(result[0][1] == "5.0")
assertTrue(result[0][2] == "5.0")
assertTrue(result[0][3] == "0.0")
assertTrue(result[0][4] == "24.0")
assertTrue(result[0][5] == "4.0")
assertTrue(result[0][6] == "111")
assertTrue(result[0][7] == "115")

sql """drop catalog ${catalog_name}"""
}
}

0 comments on commit 16c3270

Please sign in to comment.