diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnClient.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnClient.java index 3d20192ec1..e360f3ff62 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnClient.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnClient.java @@ -45,6 +45,7 @@ import com.dtstack.taier.sparkyarn.sparkext.ClientExt; import com.dtstack.taier.sparkyarn.sparkext.ClientExtFactory; import com.dtstack.taier.sparkyarn.sparkyarn.constant.AppEnvConstant; +import com.dtstack.taier.sparkyarn.sparkyarn.file.SparkResourceUploader; import com.dtstack.taier.sparkyarn.sparkyarn.parser.AddJarOperator; import com.dtstack.taier.sparkyarn.sparkyarn.util.HadoopConf; import com.google.common.base.Charsets; @@ -165,6 +166,11 @@ public void init(Properties prop) throws Exception { AcceptedApplicationMonitor.start(yarnConf, sparkYarnConfig.getQueue(), sparkYarnConfig); } + SparkResourceUploader sparkResourceUploader = + new SparkResourceUploader( + yarnConf, sparkYarnConfig, sparkExtProp, filesystemManager); + sparkResourceUploader.uploadSparkResource(); + this.threadPoolExecutor = new ThreadPoolExecutor(sparkYarnConfig.getAsyncCheckYarnClientThreadNum(), sparkYarnConfig.getAsyncCheckYarnClientThreadNum(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new CustomThreadFactory("spark_yarnclient")); diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnResourceInfo.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnResourceInfo.java index b75a865728..14895bffd8 100644 --- a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnResourceInfo.java +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/SparkYarnResourceInfo.java @@ -60,6 +60,12 @@ public class SparkYarnResourceInfo extends AbstractYarnResourceInfo { public final static int DEFAULT_MEM_OVERHEAD = 384; + public static final String SPARK_CLEAR_RESOURCED_RATE = "spark.clear.resource.rate"; + + public static final String SPARK_RESOURCES_DIR = "spark.resources.dir"; + + public static final String DEFAULT_SPARK_RESOURCES_DIR = "hdfs:///dtInsight/spark"; + private YarnClient yarnClient; private String queueName; private Integer yarnAccepterTaskNumber; diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java new file mode 100644 index 0000000000..35bd37a966 --- /dev/null +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/ResourceCleaner.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.taier.sparkyarn.sparkyarn.file; + +import com.dtstack.taier.pluginapi.CustomThreadFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ResourceCleaner implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(ResourceCleaner.class); + + public static final Long INTERVAL = 24 * 60 * 60 * 1000L; + + private FileSystem fileSystem; + + private String sparkResourcesDirHostName; + + private String sparkResourcesDirMd5sum; + + private Long clearInterval; + + public static void start( + FileSystem fileSystem, + String sparkResourcesDirHostName, + String sparkResourcesDirMd5sum, + String sparkClearResourceRate) { + ResourceCleaner cleaner = new ResourceCleaner(); + String namePrefix = cleaner.getClass().getSimpleName(); + cleaner.fileSystem = fileSystem; + cleaner.sparkResourcesDirHostName = sparkResourcesDirHostName; + cleaner.sparkResourcesDirMd5sum = sparkResourcesDirMd5sum; + cleaner.clearInterval = Long.parseLong(sparkClearResourceRate) * INTERVAL; + + logger.info("ResourceCleaner Interval: {}", cleaner.clearInterval); + + ScheduledExecutorService scheduledService = + new ScheduledThreadPoolExecutor(1, new CustomThreadFactory(namePrefix)); + scheduledService.scheduleWithFixedDelay(cleaner, 0, INTERVAL, TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + try { + Path compareFile = + new Path(sparkResourcesDirMd5sum + SparkResourceUploader.SP + "compareFile"); + fileSystem.create(compareFile); + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(sparkResourcesDirHostName)); + for (FileStatus fileStatus : fileStatuses) { + long fileTimeStamps = fileStatus.getModificationTime(); + long nowTime = System.currentTimeMillis(); + if (nowTime - fileTimeStamps > clearInterval) { + fileSystem.delete(fileStatus.getPath()); + } + } + } catch (IOException e) { + logger.error("ResourcesDir Exception: ", e); + } catch (ClassCastException e) { + logger.error("spark.clear.resource.rate, use dataType:'int', unit:'day' ", e); + } + } +} diff --git a/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java new file mode 100644 index 0000000000..deb9654f3f --- /dev/null +++ b/taier-worker/taier-worker-plugin/spark/yarn2-hdfs2-spark210-core/spark-yarn-client-core/src/main/java/com/dtstack/taier/sparkyarn/sparkyarn/file/SparkResourceUploader.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.taier.sparkyarn.sparkyarn.file; + +import com.dtstack.taier.base.filesystem.FilesystemManager; +import com.dtstack.taier.base.util.KerberosUtils; +import com.dtstack.taier.pluginapi.exception.PluginDefineException; +import com.dtstack.taier.sparkyarn.sparkyarn.SparkYarnConfig; +import com.dtstack.taier.sparkyarn.sparkyarn.SparkYarnResourceInfo; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Properties; + +public class SparkResourceUploader { + + private static final Logger logger = LoggerFactory.getLogger(SparkResourceUploader.class); + + public static final String SP = File.separator; + + // default hdfs resource cleaner rate + public static final String SPARK_DEFAULT_CLEAR_RESOURCED_RATE = "30"; + + private final YarnConfiguration yarnConf; + + private final Properties sparkExtProp; + + private final SparkYarnConfig sparkYarnConfig; + + private final FilesystemManager filesystemManager; + + public SparkResourceUploader( + YarnConfiguration yarnConf, + SparkYarnConfig sparkYarnConfig, + Properties sparkExtProp, + FilesystemManager filesystemManager) { + this.yarnConf = yarnConf; + this.sparkExtProp = sparkExtProp; + this.sparkYarnConfig = sparkYarnConfig; + this.filesystemManager = filesystemManager; + } + + public void uploadSparkResource() { + Object sparkResourcesDirProp = sparkExtProp.get(SparkYarnResourceInfo.SPARK_RESOURCES_DIR); + if (sparkResourcesDirProp == null || StringUtils.isBlank(sparkResourcesDirProp.toString())) { + sparkResourcesDirProp = SparkYarnResourceInfo.DEFAULT_SPARK_RESOURCES_DIR; + } + final String sparkResourcesDir = sparkResourcesDirProp.toString(); + String md5sum = sparkYarnConfig.getMd5sum(); + String sparkClearResourceRate = + sparkExtProp + .getOrDefault( + SparkYarnResourceInfo.SPARK_CLEAR_RESOURCED_RATE, + SPARK_DEFAULT_CLEAR_RESOURCED_RATE) + .toString(); + try { + KerberosUtils.login( + sparkYarnConfig, + () -> { + try { + FileSystem fileSystem = FileSystem.get(yarnConf); + String hostName = InetAddress.getLocalHost().getHostName(); + String sparkResourcesDirHostName = + sparkResourcesDir + SparkResourceUploader.SP + hostName; + String sparkResourcesDirMd5sum = + sparkResourcesDir + + SparkResourceUploader.SP + + hostName + + SparkResourceUploader.SP + + md5sum; + ResourceCleaner.start( + fileSystem, + sparkResourcesDirHostName, + sparkResourcesDirMd5sum, + sparkClearResourceRate); + uploadSparkSqlProxy(fileSystem, sparkResourcesDirMd5sum); + + } catch (IOException e) { + throw new PluginDefineException("upload hadoop conf", e); + } + return null; + }, + yarnConf); + } catch (Exception e) { + throw new PluginDefineException("upload hadoop conf", e); + } + } + + private void uploadSparkSqlProxy(FileSystem fileSystem, String sparkResourcesDirMd5sum) { + try { + Path localPath = new Path(getSqlProxyJarPath()); + logger.info("local path {}", localPath); + String sparkSqlProxyPath = sparkResourcesDirMd5sum + "/spark-sql-proxy.jar"; + Path remotePath = new Path(sparkSqlProxyPath); + fileSystem.copyFromLocalFile(localPath, remotePath); + sparkYarnConfig.setSparkSqlProxyPath(sparkSqlProxyPath); + } catch (IOException e) { + throw new PluginDefineException("upload spark sql proxy failed", e); + } + } + + private String getSqlProxyJarPath() { + String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); + + File pluginDir = new File(path).getParentFile(); + File[] sqlProxyJars = + pluginDir.listFiles( + (dir, name) -> + dir.isDirectory() + && name.toLowerCase().startsWith("spark-sql-proxy")); + if (sqlProxyJars != null && sqlProxyJars.length == 1) { + String sqlProxyJar = sqlProxyJars[0].getName(); + if (sqlProxyJar.toLowerCase().startsWith("spark-sql-proxy") && sqlProxyJar.toLowerCase().endsWith(".jar")) { + return sqlProxyJars[0].getAbsolutePath(); + } + } + throw new PluginDefineException( + "Can not find spark sql proxy jar in path: " + pluginDir); + } +}