From 40a03a6c2ba12d276ef020b29d6b31f84a3a7ddf Mon Sep 17 00:00:00 2001 From: toutian <953372946@qq.com> Date: Tue, 8 Nov 2022 20:08:26 +0800 Subject: [PATCH 1/4] [feat_863][taier-worker-plugin] spark sql proxy auto upload --- .../sparkyarn/sparkyarn/SparkYarnClient.java | 6 + .../sparkyarn/SparkYarnResourceInfo.java | 6 + .../sparkyarn/file/ResourceCleaner.java | 67 +++++++++ .../sparkyarn/file/SparkResourceUploader.java | 130 ++++++++++++++++++ 4 files changed, 209 insertions(+) create mode 100644 taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java create mode 100644 taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnClient.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnClient.java index 3d20192ec1..e360f3ff62 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnClient.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnClient.java @@ -45,6 +45,7 @@ import com.dtstack.taier.sparkyarn.sparkext.ClientExt; import com.dtstack.taier.sparkyarn.sparkext.ClientExtFactory; import com.dtstack.taier.sparkyarn.sparkyarn.constant.AppEnvConstant; +import com.dtstack.taier.sparkyarn.sparkyarn.file.SparkResourceUploader; import com.dtstack.taier.sparkyarn.sparkyarn.parser.AddJarOperator; import com.dtstack.taier.sparkyarn.sparkyarn.util.HadoopConf; import com.google.common.base.Charsets; @@ -165,6 +166,11 @@ public void init(Properties prop) throws Exception { AcceptedApplicationMonitor.start(yarnConf, sparkYarnConfig.getQueue(), sparkYarnConfig); } + SparkResourceUploader sparkResourceUploader = + new SparkResourceUploader( + yarnConf, sparkYarnConfig, sparkExtProp, filesystemManager); + sparkResourceUploader.uploadSparkResource(); + this.threadPoolExecutor = new ThreadPoolExecutor(sparkYarnConfig.getAsyncCheckYarnClientThreadNum(), sparkYarnConfig.getAsyncCheckYarnClientThreadNum(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new CustomThreadFactory("spark_yarnclient")); diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnResourceInfo.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnResourceInfo.java index b75a865728..14895bffd8 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnResourceInfo.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnResourceInfo.java @@ -60,6 +60,12 @@ public class SparkYarnResourceInfo extends AbstractYarnResourceInfo { public final static int DEFAULT_MEM_OVERHEAD = 384; + public static final String SPARK_CLEAR_RESOURCED_RATE = "spark.clear.resource.rate"; + + public static final String SPARK_RESOURCES_DIR = "spark.resources.dir"; + + public static final String DEFAULT_SPARK_RESOURCES_DIR = "hdfs:///dtInsight/spark"; + private YarnClient yarnClient; private String queueName; private Integer yarnAccepterTaskNumber; diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java new file mode 100644 index 0000000000..7de627ff2d --- /dev/null +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java @@ -0,0 +1,67 @@ +package com.dtstack.taier.sparkyarn.sparkyarn.file; + +import com.dtstack.taier.pluginapi.CustomThreadFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ResourceCleaner implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(ResourceCleaner.class); + + public static final Long INTERVAL = 24 * 60 * 60 * 1000L; + + private FileSystem fileSystem; + + private String sparkResourcesDirHostName; + + private String sparkResourcesDirMd5sum; + + private Long clearInterval; + + public static void start( + FileSystem fileSystem, + String sparkResourcesDirHostName, + String sparkResourcesDirMd5sum, + String sparkClearResourceRate) { + ResourceCleaner cleaner = new ResourceCleaner(); + String namePrefix = cleaner.getClass().getSimpleName(); + cleaner.fileSystem = fileSystem; + cleaner.sparkResourcesDirHostName = sparkResourcesDirHostName; + cleaner.sparkResourcesDirMd5sum = sparkResourcesDirMd5sum; + cleaner.clearInterval = Long.parseLong(sparkClearResourceRate) * INTERVAL; + + logger.info("ResourceCleaner Interval: {}", cleaner.clearInterval); + + ScheduledExecutorService scheduledService = + new ScheduledThreadPoolExecutor(1, new CustomThreadFactory(namePrefix)); + scheduledService.scheduleWithFixedDelay(cleaner, 0, INTERVAL, TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + try { + Path compareFile = + new Path(sparkResourcesDirMd5sum + SparkResourceUploader.SP + "compareFile"); + fileSystem.create(compareFile); + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(sparkResourcesDirHostName)); + for (FileStatus fileStatus : fileStatuses) { + long fileTimeStamps = fileStatus.getModificationTime(); + long nowTime = System.currentTimeMillis(); + if (nowTime - fileTimeStamps > clearInterval) { + fileSystem.delete(fileStatus.getPath()); + } + } + } catch (IOException e) { + logger.error("ResourcesDir Exception: ", e); + } catch (ClassCastException e) { + logger.error("spark.clear.resource.rate 请用int类型,单位为天", e); + } + } +} diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java new file mode 100644 index 0000000000..fb3d006640 --- /dev/null +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java @@ -0,0 +1,130 @@ +package com.dtstack.taier.sparkyarn.sparkyarn.file; + +import com.dtstack.taier.base.filesystem.FilesystemManager; +import com.dtstack.taier.base.util.KerberosUtils; +import com.dtstack.taier.pluginapi.exception.PluginDefineException; +import com.dtstack.taier.sparkyarn.sparkyarn.SparkYarnConfig; +import com.dtstack.taier.sparkyarn.sparkyarn.SparkYarnResourceInfo; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Properties; + +public class SparkResourceUploader { + + private static final Logger logger = LoggerFactory.getLogger(SparkResourceUploader.class); + + public static final String SP = File.separator; + + // 默认hdfs resource文件清除频率 + public static final String SPARK_DEFAULT_CLEAR_RESOURCED_RATE = "30"; + + private final YarnConfiguration yarnConf; + + private final Properties sparkExtProp; + + private final SparkYarnConfig sparkYarnConfig; + + private final FilesystemManager filesystemManager; + + public SparkResourceUploader( + YarnConfiguration yarnConf, + SparkYarnConfig sparkYarnConfig, + Properties sparkExtProp, + FilesystemManager filesystemManager) { + this.yarnConf = yarnConf; + this.sparkExtProp = sparkExtProp; + this.sparkYarnConfig = sparkYarnConfig; + this.filesystemManager = filesystemManager; + } + + public void uploadSparkResource() { + String sparkResourcesDirProp = sparkExtProp.get(SparkYarnResourceInfo.SPARK_RESOURCES_DIR).toString(); + if (StringUtils.isBlank(sparkResourcesDirProp)) { + sparkResourcesDirProp = SparkYarnResourceInfo.DEFAULT_SPARK_RESOURCES_DIR; + } + final String sparkResourcesDir = sparkResourcesDirProp; + String md5sum = sparkYarnConfig.getMd5sum(); + String sparkClearResourceRate = + sparkExtProp + .getOrDefault( + SparkYarnResourceInfo.SPARK_CLEAR_RESOURCED_RATE, + SPARK_DEFAULT_CLEAR_RESOURCED_RATE) + .toString(); + try { + KerberosUtils.login( + sparkYarnConfig, + () -> { + try { + FileSystem fileSystem = FileSystem.get(yarnConf); + String hostName = InetAddress.getLocalHost().getHostName(); + String sparkResourcesDirHostName = + sparkResourcesDir + SparkResourceUploader.SP + hostName; + String sparkResourcesDirMd5sum = + sparkResourcesDir + + SparkResourceUploader.SP + + hostName + + SparkResourceUploader.SP + + md5sum; + ResourceCleaner.start( + fileSystem, + sparkResourcesDirHostName, + sparkResourcesDirMd5sum, + sparkClearResourceRate); + uploadSparkSqlProxy(fileSystem, sparkResourcesDirMd5sum); + + } catch (IOException e) { + throw new PluginDefineException("upload hadoop conf", e); + } + return null; + }, + yarnConf); + } catch (Exception e) { + throw new PluginDefineException("upload hadoop conf", e); + } + } + + private void uploadSparkSqlProxy(FileSystem fileSystem, String sparkResourcesDirMd5sum) { + try { + Path localPath = new Path(getSqlProxyJarPath()); + logger.info("local path {}", localPath); + String sparkSqlProxyPath = sparkResourcesDirMd5sum + "/spark-sql-proxy.jar"; + Path remotePath = new Path(sparkSqlProxyPath); + fileSystem.copyFromLocalFile(localPath, remotePath); + sparkYarnConfig.setSparkSqlProxyPath(sparkSqlProxyPath); + } catch (IOException e) { + throw new PluginDefineException("upload spark sql proxy failed", e); + } + } + + private String getSqlProxyJarPath() { + String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); + + File pluginDir = new File(path).getParentFile().getParentFile(); + File[] sqlProxyDirs = + pluginDir.listFiles( + (dir, name) -> + dir.isDirectory() + && name.toLowerCase().startsWith("spark-sql-proxy")); + if (sqlProxyDirs != null && sqlProxyDirs.length == 1) { + File[] sqlProxyJars = + sqlProxyDirs[0].listFiles( + (dir, name) -> + name.toLowerCase().startsWith("spark-sql-proxy") + && name.toLowerCase().endsWith(".jar")); + + if (sqlProxyJars != null && sqlProxyJars.length == 1) { + return sqlProxyJars[0].getAbsolutePath(); + } + } + throw new PluginDefineException( + "Can not find spark sql proxy jar in path: " + pluginDir); + } +} From 9106ee84b54ab75ac32447f5cc6f6bccba712f1d Mon Sep 17 00:00:00 2001 From: toutian <953372946@qq.com> Date: Wed, 9 Nov 2022 11:54:43 +0800 Subject: [PATCH 2/4] [feat_863][taier-worker-plugin] spark sql proxy auto upload --- .../sparkyarn/file/ResourceCleaner.java | 18 ++++++++++++++++++ .../sparkyarn/file/SparkResourceUploader.java | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java index 7de627ff2d..277d5095db 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.dtstack.taier.sparkyarn.sparkyarn.file; import com.dtstack.taier.pluginapi.CustomThreadFactory; diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java index fb3d006640..9d3837c01c 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.dtstack.taier.sparkyarn.sparkyarn.file; import com.dtstack.taier.base.filesystem.FilesystemManager; From 45347e9ed74265f02277bfb7a764ecf90ecadc62 Mon Sep 17 00:00:00 2001 From: toutian <953372946@qq.com> Date: Wed, 9 Nov 2022 11:59:19 +0800 Subject: [PATCH 3/4] [feat_863][taier-worker-plugin] spark sql proxy auto upload --- .../dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java | 2 +- .../taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java index 277d5095db..35bd37a966 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java @@ -79,7 +79,7 @@ public void run() { } catch (IOException e) { logger.error("ResourcesDir Exception: ", e); } catch (ClassCastException e) { - logger.error("spark.clear.resource.rate 请用int类型,单位为天", e); + logger.error("spark.clear.resource.rate, use dataType:'int', unit:'day' ", e); } } } diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java index 9d3837c01c..ce05d8adf3 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java @@ -41,7 +41,7 @@ public class SparkResourceUploader { public static final String SP = File.separator; - // 默认hdfs resource文件清除频率 + // default hdfs resource cleaner rate public static final String SPARK_DEFAULT_CLEAR_RESOURCED_RATE = "30"; private final YarnConfiguration yarnConf; From cfa04f91b47ec09e90f3aaad7b967016980758fd Mon Sep 17 00:00:00 2001 From: toutian <953372946@qq.com> Date: Wed, 9 Nov 2022 21:03:23 +0800 Subject: [PATCH 4/4] [feat_863][taier-worker-plugin] spark sql proxy auto upload --- .../sparkyarn/file/SparkResourceUploader.java | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java index ce05d8adf3..deb9654f3f 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java @@ -64,11 +64,11 @@ public SparkResourceUploader( } public void uploadSparkResource() { - String sparkResourcesDirProp = sparkExtProp.get(SparkYarnResourceInfo.SPARK_RESOURCES_DIR).toString(); - if (StringUtils.isBlank(sparkResourcesDirProp)) { + Object sparkResourcesDirProp = sparkExtProp.get(SparkYarnResourceInfo.SPARK_RESOURCES_DIR); + if (sparkResourcesDirProp == null || StringUtils.isBlank(sparkResourcesDirProp.toString())) { sparkResourcesDirProp = SparkYarnResourceInfo.DEFAULT_SPARK_RESOURCES_DIR; } - final String sparkResourcesDir = sparkResourcesDirProp; + final String sparkResourcesDir = sparkResourcesDirProp.toString(); String md5sum = sparkYarnConfig.getMd5sum(); String sparkClearResourceRate = sparkExtProp @@ -125,20 +125,15 @@ private void uploadSparkSqlProxy(FileSystem fileSystem, String sparkResourcesDir private String getSqlProxyJarPath() { String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); - File pluginDir = new File(path).getParentFile().getParentFile(); - File[] sqlProxyDirs = + File pluginDir = new File(path).getParentFile(); + File[] sqlProxyJars = pluginDir.listFiles( (dir, name) -> dir.isDirectory() && name.toLowerCase().startsWith("spark-sql-proxy")); - if (sqlProxyDirs != null && sqlProxyDirs.length == 1) { - File[] sqlProxyJars = - sqlProxyDirs[0].listFiles( - (dir, name) -> - name.toLowerCase().startsWith("spark-sql-proxy") - && name.toLowerCase().endsWith(".jar")); - - if (sqlProxyJars != null && sqlProxyJars.length == 1) { + if (sqlProxyJars != null && sqlProxyJars.length == 1) { + String sqlProxyJar = sqlProxyJars[0].getName(); + if (sqlProxyJar.toLowerCase().startsWith("spark-sql-proxy") && sqlProxyJar.toLowerCase().endsWith(".jar")) { return sqlProxyJars[0].getAbsolutePath(); } }