Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Engine] Support execute job without view permission #440

Merged
merged 2 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ default String getCreateTableAsSelectStatement(String srcTable, String targetDat
return String.format("CREATE TABLE %s.%s AS SELECT * FROM %s", quoteIdentifier(targetDatabase), quoteIdentifier(targetTable), quoteIdentifier(srcTable));
}

default String getCreateTableAsSelectStatementFromSql(String srcTable, String targetDatabase, String targetTable) {
return String.format("CREATE TABLE %s.%s AS SELECT t.* FROM %s", quoteIdentifier(targetDatabase), quoteIdentifier(targetTable), srcTable);
}

default String getCreateTableStatement(String table, List<StructField> fields, TypeConverter typeConverter) {
if (CollectionUtils.isNotEmpty(fields)) {
String columns = fields.stream().map(field -> {
Expand All @@ -127,6 +131,10 @@ default String getInsertAsSelectStatement(String srcTable, String targetDatabase
return String.format("INSERT INTO %s.%s SELECT * FROM %s", quoteIdentifier(targetDatabase), quoteIdentifier(targetTable), quoteIdentifier(srcTable));
}

default String getInsertAsSelectStatementFromSql(String srcTable, String targetDatabase, String targetTable) {
return String.format("INSERT INTO %s.%s SELECT t.* FROM %s", quoteIdentifier(targetDatabase), quoteIdentifier(targetTable), srcTable);
}

String getErrorDataScript(Map<String, String> configMap);

String getValidateResultDataScript(Map<String, String> configMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,4 @@ protected InputParam getDatabaseInput(boolean isEn) {
isEn ? "please enter database" : "请填入数据库", 1, null,
null);
}

@Override
protected List<PluginParams> getOtherParams(boolean isEn) {

List<PluginParams> list = new ArrayList<>();

InputParam enableExternalCatalog = getInputParam("enable_use_view",
isEn ? "enable.use.view" : "允许使用视图(设置为false时无法导出错误数据)",
isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null,
"true");

list.add(enableExternalCatalog);
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,16 @@ protected InputParam getInputParam(String field, String title, String placeholde
}

protected List<PluginParams> getOtherParams(boolean isEn) {
return Collections.emptyList();

List<PluginParams> list = new ArrayList<>();

InputParam enableExternalCatalog = getInputParam("enable_use_view",
isEn ? "enable.use.view" : "允许使用视图",
isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null,
"false");

list.add(enableExternalCatalog);
return list;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,4 @@ protected InputParam getDatabaseInput(boolean isEn) {
isEn ? "please enter database" : "请填入数据库", 1, null,
null);
}

@Override
protected List<PluginParams> getOtherParams(boolean isEn) {

List<PluginParams> list = new ArrayList<>();

InputParam enableExternalCatalog = getInputParam("enable_use_view",
isEn ? "enable.use.view" : "允许使用视图(设置为false时无法导出错误数据)",
isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null,
"true");

list.add(enableExternalCatalog);
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ public void buildTransformConfigs() {
invalidateItemCanOutput &= sqlMetric.isInvalidateItemsCanOutput();
metricInputParameter.put(INVALIDATE_ITEM_CAN_OUTPUT, String.valueOf(invalidateItemCanOutput));

boolean isEnableExternalCatalog = true;
boolean isEnableUseView = false;
if (metricInputParameter.get(ENABLE_USE_VIEW) != null) {
isEnableExternalCatalog = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW));
isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW));
}

if (isEnableExternalCatalog) {
if (isEnableUseView) {
// generate invalidate item execute sql
if (sqlMetric.getInvalidateItems(metricInputParameter) != null) {
ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter);
Expand All @@ -237,16 +237,30 @@ public void buildTransformConfigs() {
metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable());
}
} else {
// generate actual value execute sql
ExecuteSql actualValueExecuteSql = sqlMetric.getDirectActualValue(metricInputParameter);
if (actualValueExecuteSql != null) {
actualValueExecuteSql.setResultTable(sqlMetric.getDirectActualValue(metricInputParameter).getResultTable());
MetricParserUtils.setTransformerConfig(
metricInputParameter,
transformConfigs,
actualValueExecuteSql,
TransformType.ACTUAL_VALUE.getDescription());
metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable());
if (sqlMetric.getInvalidateItems(metricInputParameter) != null) {
// generate actual value execute sql
ExecuteSql actualValueExecuteSql = sqlMetric.getDirectActualValue(metricInputParameter);
if (actualValueExecuteSql != null) {
actualValueExecuteSql.setResultTable(sqlMetric.getDirectActualValue(metricInputParameter).getResultTable());
MetricParserUtils.setTransformerConfig(
metricInputParameter,
transformConfigs,
actualValueExecuteSql,
TransformType.ACTUAL_VALUE.getDescription());
metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable());
}
} else {
// generate actual value execute sql
ExecuteSql actualValueExecuteSql = sqlMetric.getActualValue(metricInputParameter);
if (actualValueExecuteSql != null) {
actualValueExecuteSql.setResultTable(sqlMetric.getActualValue(metricInputParameter).getResultTable());
MetricParserUtils.setTransformerConfig(
metricInputParameter,
transformConfigs,
actualValueExecuteSql,
TransformType.ACTUAL_VALUE.getDescription());
metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@

import io.datavines.common.config.SinkConfig;
import io.datavines.common.config.enums.SinkType;
import io.datavines.common.entity.ConnectorParameter;
import io.datavines.common.entity.ExecuteSql;
import io.datavines.common.entity.MappingColumn;
import io.datavines.common.entity.job.BaseJobParameter;
import io.datavines.common.exception.DataVinesException;
import io.datavines.common.utils.JSONUtils;
import io.datavines.common.utils.ParameterUtils;
import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.ConnectorFactory;
import io.datavines.engine.config.MetricParserUtils;
import io.datavines.metric.api.ExpectedValue;
import io.datavines.metric.api.SqlMetric;
import io.datavines.spi.PluginLoader;
import org.apache.commons.collections4.CollectionUtils;

Expand All @@ -35,8 +37,6 @@
import java.util.List;
import java.util.Map;

import static io.datavines.common.CommonConstants.DATABASE2;
import static io.datavines.common.CommonConstants.TABLE2;
import static io.datavines.common.ConfigConstants.*;

/**
Expand Down Expand Up @@ -125,7 +125,24 @@ public void buildSinkConfigs() throws DataVinesException {
connectorParameterMap.put(ERROR_DATA_FILE_NAME, jobExecutionInfo.getErrorDataFileName());
connectorParameterMap.put(ERROR_DATA_DIR, metricInputParameter.get(ERROR_DATA_DIR));
connectorParameterMap.put(METRIC_NAME, metricInputParameter.get(METRIC_NAME));
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE));
boolean isEnableUseView = false;
if (metricInputParameter.get(ENABLE_USE_VIEW) != null) {
isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW));
}

if (isEnableUseView) {
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE));
} else {
String metricType = parameter.getMetricType();
SqlMetric sqlMetric = PluginLoader
.getPluginLoader(SqlMetric.class)
.getNewPlugin(metricType);
MetricParserUtils.operateInputParameter(metricInputParameter, sqlMetric, jobExecutionInfo);
if (sqlMetric.getInvalidateItems(metricInputParameter) != null) {
ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter);
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, "(" + ParameterUtils.convertParameterPlaceholders(invalidateItemExecuteSql.getSql(), metricInputParameter) + ") t");
}
}
connectorParameterMap.put(INVALIDATE_ITEM_CAN_OUTPUT, metricInputParameter.get(INVALIDATE_ITEM_CAN_OUTPUT));
// use to get source type converter in sink
connectorParameterMap.put(SRC_CONNECTOR_TYPE, metricInputParameter.get(SRC_CONNECTOR_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

import io.datavines.common.config.SinkConfig;
import io.datavines.common.config.enums.SinkType;
import io.datavines.common.entity.ConnectorParameter;
import io.datavines.common.entity.ExecuteSql;
import io.datavines.common.entity.job.BaseJobParameter;
import io.datavines.common.exception.DataVinesException;
import io.datavines.common.utils.JSONUtils;
import io.datavines.common.utils.ParameterUtils;
import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.ConnectorFactory;
import io.datavines.engine.config.MetricParserUtils;
import io.datavines.metric.api.ExpectedValue;
import io.datavines.metric.api.SqlMetric;
import io.datavines.spi.PluginLoader;
import org.apache.commons.collections4.CollectionUtils;

Expand Down Expand Up @@ -94,7 +97,26 @@ public void buildSinkConfigs() throws DataVinesException {
connectorParameterMap.put(ERROR_DATA_FILE_NAME, jobExecutionInfo.getErrorDataFileName());
connectorParameterMap.put(ERROR_DATA_DIR, metricInputParameter.get(ERROR_DATA_DIR));
connectorParameterMap.put(METRIC_NAME, metricInputParameter.get(METRIC_NAME));
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE));
boolean isEnableUseView = false;
if (metricInputParameter.get(ENABLE_USE_VIEW) != null) {
isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW));
}

if (isEnableUseView) {
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE));
} else {
String metricType = parameter.getMetricType();
SqlMetric sqlMetric = PluginLoader
.getPluginLoader(SqlMetric.class)
.getNewPlugin(metricType);
MetricParserUtils.operateInputParameter(metricInputParameter, sqlMetric, jobExecutionInfo);
if (sqlMetric.getInvalidateItems(metricInputParameter) != null) {
ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter);
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, "(" + ParameterUtils.convertParameterPlaceholders(invalidateItemExecuteSql.getSql(), metricInputParameter) + ") t");
}
}

connectorParameterMap.put(ENABLE_USE_VIEW, isEnableUseView);
connectorParameterMap.put(INVALIDATE_ITEM_CAN_OUTPUT, metricInputParameter.get(INVALIDATE_ITEM_CAN_OUTPUT));
// use to get source type converter in sink
connectorParameterMap.put(SRC_CONNECTOR_TYPE, metricInputParameter.get(SRC_CONNECTOR_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,22 @@ private void sinkErrorDataToDataSource() {
}

String srcConnectorType = config.getString(SRC_CONNECTOR_TYPE);
boolean isEnableUseView = config.getBoolean(ENABLE_USE_VIEW);
ConnectorFactory connectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(srcConnectorType);
Dialect dialect = connectorFactory.getDialect();
if (!checkTableExist(getConnectionHolder().getConnection(),
dialect.quoteIdentifier(targetDatabase)+"."+dialect.quoteIdentifier(targetTable), dialect)) {
sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatement(sourceTable, targetDatabase, targetTable));
if (isEnableUseView) {
sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatement(sourceTable, targetDatabase, targetTable));
} else {
sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatementFromSql(sourceTable, targetDatabase, targetTable));
}
} else {
// drop data and insert new data
sourceConnectionStatement.execute(dialect.getInsertAsSelectStatement(sourceTable, targetDatabase, targetTable));
if (isEnableUseView) {
sourceConnectionStatement.execute(dialect.getInsertAsSelectStatement(sourceTable, targetDatabase, targetTable));
} else {
sourceConnectionStatement.execute(dialect.getInsertAsSelectStatementFromSql(sourceTable, targetDatabase, targetTable));
}
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public ResultList execute(Connection connection, Config config, LocalRuntimeEnvi
statement = connection.createStatement();
env.setCurrentStatement(statement);
resultSet = statement.executeQuery(sql);
ResultList resultList = SqlUtils.getListFromResultSet(resultSet, SqlUtils.getQueryFromsAndJoins(sql));
return resultList;
return SqlUtils.getListFromResultSet(resultSet, SqlUtils.getQueryFromsAndJoins(sql));
} finally {
SqlUtils.closeResultSet(resultSet);
SqlUtils.closeStatement(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ public ExecuteSql getActualValue(Map<String,String> inputParameter) {
inputParameter.put(ACTUAL_TABLE, inputParameter.get(TABLE));
String actualAggregateSql = inputParameter.get(ACTUAL_AGGREGATE_SQL);
if (StringUtils.isNotEmpty(actualAggregateSql)) {
actualAggregateSql = actualAggregateSql.replace("as actual_value", "as actual_value_" + inputParameter.get(METRIC_UNIQUE_KEY));
if (actualAggregateSql.contains("as actual_value")) {
actualAggregateSql = actualAggregateSql.replace("as actual_value", "as actual_value_" + inputParameter.get(METRIC_UNIQUE_KEY));
} else if (actualAggregateSql.contains("AS actual_value")) {
actualAggregateSql = actualAggregateSql.replace("AS actual_value", "as actual_value_" + inputParameter.get(METRIC_UNIQUE_KEY));
}
}
return new ExecuteSql(actualAggregateSql, inputParameter.get(TABLE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ public ExecuteSql getActualValue(Map<String,String> inputParameter) {
return executeSql;
}

@Override
public ExecuteSql getDirectActualValue(Map<String, String> inputParameter) {
String uniqueKey = inputParameter.get(METRIC_UNIQUE_KEY);
ExecuteSql executeSql = new ExecuteSql();
executeSql.setResultTable("invalidate_count_" + uniqueKey);
executeSql.setSql("select count(1) as actual_value_" + uniqueKey + " from ( " + invalidateItemsSql.toString() + " ) t");
executeSql.setErrorOutput(false);
return executeSql;
}

@Override
public List<DataVinesDataType> suitableType() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static void main(String[] args) {
JobExecutionInfo jobExecutionInfo = new JobExecutionInfo(
id, submitJob.getName(),
submitJob.getEngineType(), JSONUtils.toJsonString(submitJob.getEngineParameter()),
submitJob.getErrorDataStorageType(), JSONUtils.toJsonString(submitJob.getErrorDataStorageParameter()), submitJob.getName()+"_"+ id,
submitJob.getErrorDataStorageType(), JSONUtils.toJsonString(submitJob.getErrorDataStorageParameter()), submitJob.getName() + "_" + id,
submitJob.getValidateResultDataStorageType(), JSONUtils.toJsonString(submitJob.getValidateResultDataStorageParameter()),
submitJob.getParameter());

Expand Down
8 changes: 8 additions & 0 deletions datavines-ui/Editor/hooks/useNotRequiredRule/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { useIntl } from 'react-intl';

export default () => {
const intl = useIntl();
return [{
required: false
}];
};
Loading