From 5d09cecd8891e5b77d8cfd4bb2c852ee580ce1ba Mon Sep 17 00:00:00 2001 From: Bilwa ST Date: Wed, 6 Nov 2024 20:13:19 +0530 Subject: [PATCH] [Addendum][Bug] [Seatunnel-web] Error when conditional column is not used in SE --- .../service/impl/JobInstanceServiceImpl.java | 18 ------------------ .../impl/BaseJdbcDataSourceConfigSwitcher.java | 9 +++++++++ 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 684f529dc..8d0b79579 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -103,10 +103,6 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl private static final String DAG_PARSING_MODE = "dag-parsing.mode"; - private static final String WHERE_CONDITION = "where_condition"; - - private static final String QUERY = "query"; - @Resource private ConnectorDataSourceMapperConfig dataSourceMapperConfig; @Resource private IDatasourceService datasourceService; @@ -237,7 +233,6 @@ public String generateJobConfig( businessMode, config, optionRule); - mergeConfig = appendWhereClauseToQuery(mergeConfig); sourceMap .get(task.getConnectorType()) .add(filterEmptyValue(mergeConfig)); @@ -335,19 +330,6 @@ public String generateJobConfig( return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam); } - private Config appendWhereClauseToQuery(Config mergeConfig) { - String where_condition = mergeConfig.getString(WHERE_CONDITION); - if (where_condition != null && !where_condition.isEmpty()) { - String query = mergeConfig.getString(QUERY); - String queryWithWhereClause = query + " " + where_condition; - mergeConfig = - mergeConfig.withValue( - QUERY, ConfigValueFactory.fromAnyRef(queryWithWhereClause)); - mergeConfig = mergeConfig.withoutPath(WHERE_CONDITION); - } - return mergeConfig; - } - @Override public JobExecutorRes getExecuteResource(@NonNull Long jobEngineId) { funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_INSTANCE, 0); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java index d3cdb8e8b..9c2a21053 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java @@ -61,6 +61,8 @@ public abstract class BaseJdbcDataSourceConfigSwitcher extends AbstractDataSourc private static final String BASE_URL = "base-url"; private static final String CATALOG_SCHEMA = "schema"; + private static final String WHERE_CONDITION = "where_condition"; + private static final Option DATABASE_SCHEMA = Options.key("database_schema") .stringType() @@ -138,6 +140,13 @@ public Config mergeDatasourceConfig( String sql = tableFieldsToSql(tableFields, databaseName, tableName); + String where_condition = connectorConfig.getString(WHERE_CONDITION); + + if (where_condition != null && !where_condition.isEmpty()) { + sql = sql + " " + where_condition; + connectorConfig = connectorConfig.withoutPath(WHERE_CONDITION); + } + connectorConfig = connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql)); } else if (pluginType.equals(PluginType.SINK)) {