From fb2548cd25ed6114a412a6ef2553a6dd55acfa3f Mon Sep 17 00:00:00 2001
From: vainhope <845869847@qq.com>
Date: Mon, 13 Mar 2023 10:56:16 +0800
Subject: [PATCH] [issue_997][taier-all] support hive properties set fix #997
---
conf/logback.xml | 14 +++++++++-----
.../plugin/common/utils/PropertiesUtil.java | 9 +++++++--
.../datasource/plugin/hive2/HiveConnFactory.java | 2 +-
.../datasource/plugin/hive1/HiveConnFactory.java | 15 +++++++++------
.../datasource/plugin/hive3/HiveConnFactory.java | 5 ++++-
.../taier/scheduler/executor/RdbJobExecutor.java | 9 +++++++++
6 files changed, 39 insertions(+), 15 deletions(-)
diff --git a/conf/logback.xml b/conf/logback.xml
index 8905a577f1..0ddb983481 100644
--- a/conf/logback.xml
+++ b/conf/logback.xml
@@ -90,19 +90,23 @@
-
+
-
+
-
+
+
+
+
+
-
-
+
+
diff --git a/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-common/src/main/java/com/dtstack/taier/datasource/plugin/common/utils/PropertiesUtil.java b/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-common/src/main/java/com/dtstack/taier/datasource/plugin/common/utils/PropertiesUtil.java
index d0c3eb30d7..95a14e2a10 100644
--- a/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-common/src/main/java/com/dtstack/taier/datasource/plugin/common/utils/PropertiesUtil.java
+++ b/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-common/src/main/java/com/dtstack/taier/datasource/plugin/common/utils/PropertiesUtil.java
@@ -35,10 +35,14 @@
public class PropertiesUtil {
public static Properties convertToProp(RdbmsSourceDTO rdbmsSourceDTO) {
- return convertToProp(rdbmsSourceDTO, null);
+ return convertToProp(rdbmsSourceDTO, null, null);
}
public static Properties convertToProp(RdbmsSourceDTO rdbmsSourceDTO, Properties properties) {
+ return convertToProp(rdbmsSourceDTO, properties, null);
+ }
+
+ public static Properties convertToProp(RdbmsSourceDTO rdbmsSourceDTO, Properties properties, String prefix) {
if (Objects.isNull(properties)) {
properties = new Properties();
}
@@ -56,7 +60,8 @@ public static Properties convertToProp(RdbmsSourceDTO rdbmsSourceDTO, Properties
for (String key : propertiesJson.keySet()) {
String value = propertiesJson.getString(key);
if (StringUtils.isNotBlank(value)) {
- properties.setProperty(key, value);
+ String newKey = key.startsWith(prefix) ? key : prefix + key;
+ properties.setProperty(newKey, value);
}
}
}
diff --git a/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive/src/main/java/com/dtstack/taier/datasource/plugin/hive2/HiveConnFactory.java b/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive/src/main/java/com/dtstack/taier/datasource/plugin/hive2/HiveConnFactory.java
index 8c5e5bd657..34f84da86f 100644
--- a/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive/src/main/java/com/dtstack/taier/datasource/plugin/hive2/HiveConnFactory.java
+++ b/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive/src/main/java/com/dtstack/taier/datasource/plugin/hive2/HiveConnFactory.java
@@ -79,7 +79,7 @@ public Connection getConn(ISourceDTO sourceDTO) throws Exception {
Properties properties = new Properties();
SSLUtil.SSLConfiguration sslConfiguration = SSLUtil.getSSLConfiguration(hiveSourceDTO);
dealSsl(properties, sslConfiguration);
- PropertiesUtil.convertToProp(hiveSourceDTO, properties);
+ PropertiesUtil.convertToProp(hiveSourceDTO, properties, HIVE_CONF_PREFIX);
properties.put(DtClassConsistent.PublicConsistent.USER, hiveSourceDTO.getUsername() == null ? "" : hiveSourceDTO.getUsername());
properties.put(DtClassConsistent.PublicConsistent.PASSWORD, hiveSourceDTO.getPassword() == null ? "" : hiveSourceDTO.getPassword());
setQueue(properties, hiveSourceDTO);
diff --git a/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive1/src/main/java/com/dtstack/taier/datasource/plugin/hive1/HiveConnFactory.java b/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive1/src/main/java/com/dtstack/taier/datasource/plugin/hive1/HiveConnFactory.java
index 8f4bb734d9..83a395b9d4 100644
--- a/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive1/src/main/java/com/dtstack/taier/datasource/plugin/hive1/HiveConnFactory.java
+++ b/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive1/src/main/java/com/dtstack/taier/datasource/plugin/hive1/HiveConnFactory.java
@@ -18,6 +18,11 @@
package com.dtstack.taier.datasource.plugin.hive1;
+import com.dtstack.taier.datasource.api.dto.SqlQueryDTO;
+import com.dtstack.taier.datasource.api.dto.source.Hive1SourceDTO;
+import com.dtstack.taier.datasource.api.dto.source.ISourceDTO;
+import com.dtstack.taier.datasource.api.exception.SourceException;
+import com.dtstack.taier.datasource.api.source.DataBaseType;
import com.dtstack.taier.datasource.plugin.common.DtClassConsistent;
import com.dtstack.taier.datasource.plugin.common.constant.CommonConstant;
import com.dtstack.taier.datasource.plugin.common.exception.ErrorCode;
@@ -27,11 +32,6 @@
import com.dtstack.taier.datasource.plugin.common.utils.SqlFormatUtil;
import com.dtstack.taier.datasource.plugin.kerberos.core.util.KerberosLoginUtil;
import com.dtstack.taier.datasource.plugin.rdbms.ConnFactory;
-import com.dtstack.taier.datasource.api.dto.SqlQueryDTO;
-import com.dtstack.taier.datasource.api.dto.source.Hive1SourceDTO;
-import com.dtstack.taier.datasource.api.dto.source.ISourceDTO;
-import com.dtstack.taier.datasource.api.exception.SourceException;
-import com.dtstack.taier.datasource.api.source.DataBaseType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
@@ -53,6 +53,9 @@
@Slf4j
public class HiveConnFactory extends ConnFactory {
+ // Hive 属性前缀
+ private static final String HIVE_CONF_PREFIX = "hiveconf:";
+
private static final String SSL_FLAG = "ssl";
private static final String SSL_TRUST_STORE = "sslTrustStore";
@@ -82,7 +85,7 @@ public Connection getConn(ISourceDTO sourceDTO) throws Exception {
properties.put(DtClassConsistent.PublicConsistent.USER, hive1SourceDTO.getUsername() == null ? "" : hive1SourceDTO.getUsername());
properties.put(DtClassConsistent.PublicConsistent.PASSWORD, hive1SourceDTO.getPassword() == null ? "" : hive1SourceDTO.getPassword());
- PropertiesUtil.convertToProp(hive1SourceDTO, properties);
+ PropertiesUtil.convertToProp(hive1SourceDTO, properties, HIVE_CONF_PREFIX);
setQueue(properties, hive1SourceDTO);
String urlWithoutSchema = HiveDriverUtil.removeSchema(hive1SourceDTO.getUrl());
return DriverManager.getConnection(urlWithoutSchema, properties);
diff --git a/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive3/src/main/java/com/dtstack/taier/datasource/plugin/hive3/HiveConnFactory.java b/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive3/src/main/java/com/dtstack/taier/datasource/plugin/hive3/HiveConnFactory.java
index 91514bf1f7..75577735f9 100644
--- a/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive3/src/main/java/com/dtstack/taier/datasource/plugin/hive3/HiveConnFactory.java
+++ b/taier-datasource/taier-datasource-plugin/taier-datasource-plugin-hive3/src/main/java/com/dtstack/taier/datasource/plugin/hive3/HiveConnFactory.java
@@ -52,6 +52,9 @@
@Slf4j
public class HiveConnFactory extends ConnFactory {
+ // Hive 属性前缀
+ private static final String HIVE_CONF_PREFIX = "hiveconf:";
+
private static final String SSL_FLAG = "ssl";
private static final String SSL_TRUST_STORE = "sslTrustStore";
@@ -79,7 +82,7 @@ public Connection getConn(ISourceDTO sourceDTO) throws Exception {
dealSsl(properties, sslConfiguration);
properties.put(DtClassConsistent.PublicConsistent.USER, hiveSourceDTO.getUsername() == null ? "" : hiveSourceDTO.getUsername());
properties.put(DtClassConsistent.PublicConsistent.PASSWORD, hiveSourceDTO.getPassword() == null ? "" : hiveSourceDTO.getPassword());
- PropertiesUtil.convertToProp(hiveSourceDTO, properties);
+ PropertiesUtil.convertToProp(hiveSourceDTO, properties, HIVE_CONF_PREFIX);
setQueue(properties, hiveSourceDTO);
String urlWithoutSchema = HiveDriverUtil.removeSchema(hiveSourceDTO.getUrl());
return DriverManager.getConnection(urlWithoutSchema, properties);
diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/executor/RdbJobExecutor.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/executor/RdbJobExecutor.java
index 140f0b048f..666a8cdbd8 100644
--- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/executor/RdbJobExecutor.java
+++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/executor/RdbJobExecutor.java
@@ -25,6 +25,7 @@
import com.dtstack.taier.datasource.api.client.IClient;
import com.dtstack.taier.datasource.api.dto.SqlQueryDTO;
import com.dtstack.taier.datasource.api.dto.source.ISourceDTO;
+import com.dtstack.taier.datasource.api.dto.source.RdbmsSourceDTO;
import com.dtstack.taier.pluginapi.JobClient;
import com.dtstack.taier.pluginapi.enums.TaskStatus;
import com.dtstack.taier.pluginapi.pojo.JudgeResult;
@@ -82,6 +83,14 @@ public void executeJob(JobClient jobClient) {
LOGGER.info("jobId:{} taskType:{} submit to job start run", jobClient.getJobId(), jobClient.getTaskType());
// executeBatchQuery 执行不成功 会执行抛异常,不会返回false
ISourceDTO sourceDTO = sourceDTOLoader.buildSourceDTO(jobClient.getDatasourceId());
+ if (sourceDTO instanceof RdbmsSourceDTO) {
+ RdbmsSourceDTO rdbmsSourceDTO = (RdbmsSourceDTO) sourceDTO;
+ try {
+ rdbmsSourceDTO.setProperties(JSONObject.toJSONString(jobClient.getConfProperties()));
+ } catch (Exception e) {
+ }
+ sourceDTO = rdbmsSourceDTO;
+ }
IClient client = ClientCache.getClient(sourceDTO.getSourceType());
client.executeBatchQuery(sourceDTO, SqlQueryDTO.builder().sql(jobClient.getSql()).build());
try {