From 1792700a979dfc6505dfa234ad8c39180afc1c06 Mon Sep 17 00:00:00 2001 From: tobe Date: Fri, 15 Sep 2023 11:24:37 +0800 Subject: [PATCH] fix: refactor taskmanager config and support deleting HDFS files when dropping tables (#3369) --- .../taskmanager/config/TaskManagerConfig.java | 551 +++++++++++++----- .../taskmanager/dao/JobIdGenerator.java | 31 +- .../taskmanager/server/JobResultSaver.java | 6 +- .../taskmanager/server/TaskManagerServer.java | 17 +- .../server/impl/TaskManagerImpl.java | 22 +- .../udf/ExternalFunctionManager.java | 2 +- .../taskmanager/zk/FailoverWatcher.java | 12 +- .../src/main/resources/taskmanager.properties | 1 - .../openmldb/taskmanager/JobInfoManager.scala | 15 +- .../openmldb/taskmanager/LogManager.scala | 4 +- .../taskmanager/k8s/K8sJobManager.scala | 34 +- .../taskmanager/spark/SparkJobManager.scala | 70 +-- .../tracker/YarnJobTrackerThread.scala | 2 +- .../openmldb/taskmanager/util/HdfsUtil.scala | 49 ++ .../taskmanager/util/VersionUtil.scala | 2 +- .../taskmanager/yarn/YarnClientUtil.scala | 16 +- .../server/impl/TestTaskManagerImpl.scala | 16 +- release/sbin/start-taskmanagers.sh | 6 +- 18 files changed, 581 insertions(+), 275 deletions(-) create mode 100644 java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/util/HdfsUtil.scala diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java index 3be3bcf39ee..76642ff17d6 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java @@ -32,168 +32,396 @@ public class TaskManagerConfig { private static Logger logger = LoggerFactory.getLogger(TaskManagerConfig.class); - public static String HOST; - public static int PORT; - public static int WORKER_THREAD; - public static int IO_THREAD; - public static int CHANNEL_KEEP_ALIVE_TIME; - public static String ZK_CLUSTER; - public static String ZK_ROOT_PATH; - public static String ZK_TASKMANAGER_PATH; - public static String ZK_MAX_JOB_ID_PATH; - public static int ZK_SESSION_TIMEOUT; - public static int ZK_CONNECTION_TIMEOUT; - public static int ZK_BASE_SLEEP_TIME; - public static int ZK_MAX_CONNECT_WAIT_TIME; - public static int ZK_MAX_RETRIES; - public static String SPARK_MASTER; - public static String SPARK_YARN_JARS; - public static String SPARK_HOME; - public static int PREFETCH_JOBID_NUM; - public static String JOB_LOG_PATH; - public static String EXTERNAL_FUNCTION_DIR; - public static boolean TRACK_UNFINISHED_JOBS; - public static int JOB_TRACKER_INTERVAL; - public static String SPARK_DEFAULT_CONF; - public static String SPARK_EVENTLOG_DIR; - public static int SPARK_YARN_MAXAPPATTEMPTS; - public static String OFFLINE_DATA_PREFIX; - public static String NAMENODE_URI; - public static String BATCHJOB_JAR_PATH; - public static String HADOOP_CONF_DIR; - public static String HADOOP_USER_NAME; - public static boolean ENABLE_HIVE_SUPPORT; - public static long BATCH_JOB_RESULT_MAX_WAIT_TIME; - public static String K8S_HADOOP_CONFIGMAP_NAME; - public static String K8S_MOUNT_LOCAL_PATH; - - private static volatile boolean isParsed = false; + private volatile static TaskManagerConfig instance; + + private volatile static Properties props; + + public static Properties getProps() { + return props; + } + + private static TaskManagerConfig getInstance() throws ConfigException { + if (instance == null) { + instance = new TaskManagerConfig(); + instance.init(); + } + return instance; + } public static void parse() throws ConfigException { - if (!isParsed) { - doParse(); - isParsed = true; + getInstance(); + } + + protected static String getString(String key) { + return props.getProperty(key); + } + + protected static int getInt(String key) { + return Integer.parseInt(getString(key)); + } + + protected static long getLong(String key) { + return Long.parseLong(getString(key)); + } + + protected static boolean getBool(String key) { + return Boolean.parseBoolean(getString(key)); + } + + + public static String getServerHost() { + return getString("server.host"); + } + + public static int getServerPort() { + return getInt("server.port"); + } + + public static int getServerWorkerThreads() { + return getInt("server.worker_threads"); + } + + public static int getServerIoThreads() { + return getInt("server.io_threads"); + } + + public static int getChannelKeepAliveTime() { + return getInt("server.channel_keep_alive_time"); + } + + public static int getZkSessionTimeout() { + return getInt("zookeeper.session_timeout"); + } + + public static String getZkCluster() { + return getString("zookeeper.cluster"); + } + + public static String getZkRootPath() { + return getString("zookeeper.root_path"); + } + + public static int getZkConnectionTimeout() { + return getInt("zookeeper.connection_timeout"); + } + + public static int getZkBaseSleepTime() { + return getInt("zookeeper.base_sleep_time"); + } + + public static int getZkMaxRetries() { + return getInt("zookeeper.max_retries"); + } + + public static int getZkMaxConnectWaitTime() { + return getInt("zookeeper.max_connect_waitTime"); + } + + public static String getSparkMaster() { + return getString("spark.master"); + } + + public static String getSparkYarnJars() { + return getString("spark.yarn.jars"); + } + + public static String getSparkHome() { + return getString("spark.home"); + } + + public static int getPrefetchJobidNum() { + return getInt("prefetch.jobid.num"); + } + + public static String getJobLogPath() { + return getString("job.log.path"); + } + + public static String getExternalFunctionDir() { + return getString("external.function.dir"); + } + + public static boolean getTrackUnfinishedJobs() { + return getBool("track.unfinished.jobs"); + } + + public static int getJobTrackerInterval() { + return getInt("job.tracker.interval"); + } + + public static String getSparkDefaultConf() { + return getString("spark.default.conf"); + } + + public static String getSparkEventlogDir() { + return getString("spark.eventLog.dir"); + } + + public static int getSparkYarnMaxappattempts() { + return getInt("spark.yarn.maxAppAttempts"); + } + + public static String getOfflineDataPrefix() { + return getString("offline.data.prefix"); + } + + public static String getBatchjobJarPath() { + return getString("batchjob.jar.path"); + } + + + public static String getHadoopConfDir() { + return getString("hadoop.conf.dir"); + } + + public static boolean getEnableHiveSupport() { + return getBool("enable.hive.support"); + } + + public static long getBatchJobResultMaxWaitTime() { + return getLong("batch.job.result.max.wait.time"); + } + + + public static String getK8sHadoopConfigmapName() { + return getString("k8s.hadoop.configmap"); + } + + public static String getK8sMountLocalPath() { + return getString("k8s.mount.local.path"); + } + + public static String getHadoopUserName() { + return getString("hadoop.user.name"); + } + + public static String getZkTaskmanagerPath() { + return getZkRootPath() + "/taskmanager"; + } + + public static String getZkMaxJobIdPath() { + return getZkTaskmanagerPath() + "/max_job_id"; + } + + public static boolean isK8s() { + return getSparkMaster().equals("k8s") || getSparkMaster().equals("kubernetes"); + } + + public static boolean isYarnCluster() { + return getSparkMaster().equals("yarn") || getSparkMaster().equals("yarn-cluster"); + } + + public static boolean isYarn() { + return getSparkMaster().startsWith("yarn"); + } + + public static void print() throws ConfigException { + parse(); + + StringBuilder builder = new StringBuilder(); + + for (String key : props.stringPropertyNames()) { + String value = props.getProperty(key); + builder.append(key + " = " + value + "\n"); } + + logger.info("Final TaskManager config: \n" + builder.toString()); } - public static void doParse() throws ConfigException { - Properties prop = new Properties(); + private void init() throws ConfigException { + props = new Properties(); + + // Load local properties file try { - prop.load(TaskManagerConfig.class.getClassLoader().getResourceAsStream("taskmanager.properties")); + props.load(TaskManagerConfig.class.getClassLoader().getResourceAsStream("taskmanager.properties")); } catch (IOException e) { throw new ConfigException(String.format("Fail to load taskmanager.properties, message: ", e.getMessage())); } - HOST = prop.getProperty("server.host", "0.0.0.0"); - PORT = Integer.parseInt(prop.getProperty("server.port", "9902")); - if (PORT < 1 || PORT > 65535) { + // Get properties and check + if (props.getProperty("server.host") == null) { + props.setProperty("server.host", "0.0.0.0"); + } + + if (props.getProperty("server.port") == null) { + props.setProperty("server.port", "9902"); + } + + if (getServerPort() < 1 || getServerPort() > 65535) { throw new ConfigException("server.port", "invalid port, should be in range of 1 through 65535"); } - WORKER_THREAD = Integer.parseInt(prop.getProperty("server.worker_threads", "16")); - IO_THREAD = Integer.parseInt(prop.getProperty("server.io_threads", "4")); - // alive time seconds - CHANNEL_KEEP_ALIVE_TIME = Integer.parseInt(prop.getProperty("server.channel_keep_alive_time", "1800")); - ZK_SESSION_TIMEOUT = Integer.parseInt(prop.getProperty("zookeeper.session_timeout", "5000")); - ZK_CLUSTER = prop.getProperty("zookeeper.cluster", ""); - if (ZK_CLUSTER.isEmpty()) { + if (props.getProperty("server.worker_threads") == null) { + props.setProperty("server.worker_threads", "16"); + } + + if (getServerWorkerThreads() <= 0) { + throw new ConfigException("server.worker_threads", "should be larger than 0"); + } + + if (props.getProperty("server.io_threads") == null) { + props.setProperty("server.io_threads", "4"); + } + + if (getServerIoThreads() <= 0) { + throw new ConfigException("server.io_threads", "should be larger than 0"); + } + + if (props.getProperty("server.channel_keep_alive_time") == null) { + props.setProperty("server.channel_keep_alive_time", "1800"); + } + + if (getChannelKeepAliveTime() <= 0) { + throw new ConfigException("server.channel_keep_alive_time", "should be larger than 0"); + } + + if (props.getProperty("zookeeper.session_timeout") == null) { + props.setProperty("zookeeper.session_timeout", "5000"); + } + + if (getZkSessionTimeout() <= 0) { + throw new ConfigException("zookeeper.session_timeout", "should be larger than 0"); + } + + if (props.getProperty("zookeeper.cluster") == null) { + props.setProperty("", ""); + } + + if (getZkCluster().isEmpty()) { throw new ConfigException("zookeeper.cluster", "should not be empty"); } - ZK_ROOT_PATH = prop.getProperty("zookeeper.root_path", ""); - if (ZK_ROOT_PATH.isEmpty()) { - throw new ConfigException("zookeeper.root_path", "should not be empty"); + if (props.getProperty("zookeeper.connection_timeout") == null) { + props.setProperty("zookeeper.connection_timeout", "5000"); + } + + if (getZkConnectionTimeout() <= 0) { + throw new ConfigException("zookeeper.connection_timeout", "should be larger than 0"); + } + + if (props.getProperty("zookeeper.base_sleep_time") == null) { + props.setProperty("zookeeper.base_sleep_time", "1000"); + } + + if (getZkBaseSleepTime() <= 0) { + throw new ConfigException("zookeeper.base_sleep_time", "should be larger than 0"); + } + + if (props.getProperty("zookeeper.max_retries") == null) { + props.setProperty("zookeeper.max_retries", "10"); + } + + if (getZkMaxRetries() <= 0) { + throw new ConfigException("zookeeper.max_retries", "should be larger than 0"); } - ZK_TASKMANAGER_PATH = ZK_ROOT_PATH + "/taskmanager"; - ZK_MAX_JOB_ID_PATH = ZK_TASKMANAGER_PATH + "/max_job_id"; - ZK_CONNECTION_TIMEOUT = Integer.parseInt(prop.getProperty("zookeeper.connection_timeout", "5000")); - ZK_BASE_SLEEP_TIME = Integer.parseInt(prop.getProperty("zookeeper.base_sleep_time", "1000")); - ZK_MAX_RETRIES = Integer.parseInt(prop.getProperty("zookeeper.max_retries", "10")); - ZK_MAX_CONNECT_WAIT_TIME = Integer.parseInt(prop.getProperty("zookeeper.max_connect_waitTime", "30000")); + if (props.getProperty("zookeeper.max_connect_waitTime") == null) { + props.setProperty("zookeeper.max_connect_waitTime", "30000"); + } + + if (getZkMaxConnectWaitTime() <= 0) { + throw new ConfigException("zookeeper.max_connect_waitTime", "should be larger than 0"); + } - SPARK_MASTER = prop.getProperty("spark.master", "local[*]").toLowerCase(); - if (!SPARK_MASTER.startsWith("local")) { - if (!Arrays.asList("yarn", "yarn-cluster", "yarn-client", "k8s", "kubernetes").contains(SPARK_MASTER)) { + if (props.getProperty("spark.master") == null) { + props.setProperty("spark.master", "local[*]"); + } else { + props.setProperty("spark.master", props.getProperty("spark.master").toLowerCase()); + } + + if (!getSparkMaster().startsWith("local")) { + if (!Arrays.asList("yarn", "yarn-cluster", "yarn-client", "k8s", "kubernetes").contains(getSparkMaster())) { throw new ConfigException("spark.master", "should be local, yarn, yarn-cluster, yarn-client, k8s or kubernetes"); } } - boolean isLocal = SPARK_MASTER.startsWith("local"); - boolean isYarn = SPARK_MASTER.startsWith("yarn"); - boolean isYarnCluster = SPARK_MASTER.equals("yarn") || SPARK_MASTER.equals("yarn-cluster"); - SPARK_YARN_JARS = prop.getProperty("spark.yarn.jars", ""); - if (isLocal && !SPARK_YARN_JARS.isEmpty()) { - logger.warn("Ignore the config of spark.yarn.jars which is invalid for local mode"); + if (props.getProperty("spark.yarn.jars") == null) { + props.setProperty("spark.yarn.jars", ""); + } + + if (isYarn() && !getSparkYarnJars().isEmpty() && getSparkYarnJars().startsWith("file://")) { + throw new ConfigException("spark.yarn.jars", "should not use local filesystem for yarn mode"); } - if (isYarn) { - if (!SPARK_YARN_JARS.isEmpty() && SPARK_YARN_JARS.startsWith("file://")) { - throw new ConfigException("spark.yarn.jars", "should not use local filesystem for yarn mode"); + + + if (props.getProperty("spark.home", "").isEmpty()) { + if (System.getenv("SPARK_HOME") == null) { + throw new ConfigException("spark.home", "should set config 'spark.home' or environment variable 'SPARK_HOME'"); + } else { + logger.info("Use SPARK_HOME from environment variable: " + System.getenv("SPARK_HOME")); + props.setProperty("spark.home", System.getenv("SPARK_HOME")); } } - SPARK_HOME = firstNonEmpty(prop.getProperty("spark.home"), System.getenv("SPARK_HOME")); + String SPARK_HOME = firstNonEmpty(props.getProperty("spark.home"), System.getenv("SPARK_HOME")); // isEmpty checks null and empty - if(isEmpty(SPARK_HOME)) { + if (isEmpty(SPARK_HOME)) { throw new ConfigException("spark.home", "should set config 'spark.home' or environment variable 'SPARK_HOME'"); } + if (SPARK_HOME != null) { + props.setProperty("spark.home", SPARK_HOME); + } // TODO: Check if we can get spark-submit - PREFETCH_JOBID_NUM = Integer.parseInt(prop.getProperty("prefetch.jobid.num", "1")); - if (PREFETCH_JOBID_NUM < 1) { + if (props.getProperty("prefetch.jobid.num") == null) { + props.setProperty("prefetch.jobid.num", "1"); + } + + if (getPrefetchJobidNum() < 1) { throw new ConfigException("prefetch.jobid.num", "should be larger or equal to 1"); } - NAMENODE_URI = prop.getProperty("namenode.uri", ""); - if (!NAMENODE_URI.isEmpty()) { - logger.warn("Config of 'namenode.uri' will be deprecated later"); + if (props.getProperty("job.log.path") == null) { + props.setProperty("job.log.path", "../log/"); } - JOB_LOG_PATH = prop.getProperty("job.log.path", "../log/"); - if (JOB_LOG_PATH.isEmpty()) { - throw new ConfigException("job.log.path", "should not be null"); - } else { - if (JOB_LOG_PATH.startsWith("hdfs") || JOB_LOG_PATH.startsWith("s3")) { - throw new ConfigException("job.log.path", "only support local filesystem"); - } + if (getJobLogPath().startsWith("hdfs") || getJobLogPath().startsWith("s3")) { + throw new ConfigException("job.log.path", "only support local filesystem"); + } - File directory = new File(JOB_LOG_PATH); - if (!directory.exists()) { - logger.info("The log path does not exist, try to create directory: " + JOB_LOG_PATH); - boolean created = directory.mkdirs(); - if (created) { - throw new ConfigException("job.log.path", "fail to create log path"); - } + File jobLogDirectory = new File(getJobLogPath()); + if (!jobLogDirectory.exists()) { + logger.info("The log path does not exist, try to create directory: " + getJobLogPath()); + jobLogDirectory.mkdirs(); + if (!jobLogDirectory.exists()) { + throw new ConfigException("job.log.path", "fail to create log path: " + jobLogDirectory); } } - EXTERNAL_FUNCTION_DIR = prop.getProperty("external.function.dir", "./udf/"); - if (EXTERNAL_FUNCTION_DIR.isEmpty()) { - throw new ConfigException("external.function.dir", "should not be null"); - } else { - File directory = new File(EXTERNAL_FUNCTION_DIR); - if (!directory.exists()) { - logger.info("The external function dir does not exist, try to create directory: " - + EXTERNAL_FUNCTION_DIR); - boolean created = directory.mkdirs(); - if (created) { - logger.warn("Fail to create external function directory: " + EXTERNAL_FUNCTION_DIR); - } + if (props.getProperty("external.function.dir") == null) { + props.setProperty("external.function.dir", "./udf/"); + } + + File externalFunctionDir = new File(getExternalFunctionDir()); + if (!externalFunctionDir.exists()) { + logger.info("The external function dir does not exist, try to create directory: " + + getExternalFunctionDir()); + externalFunctionDir.mkdirs(); + if (!externalFunctionDir.exists()) { + throw new ConfigException("job.log.path", "fail to create external function path: " + externalFunctionDir); } } - TRACK_UNFINISHED_JOBS = Boolean.parseBoolean(prop.getProperty("track.unfinished.jobs", "true")); + if (props.getProperty("track.unfinished.jobs") == null) { + props.setProperty("track.unfinished.jobs", "true"); + } + + if (props.getProperty("job.tracker.interval") == null) { + props.setProperty("job.tracker.interval", "30"); + } + + if (getJobTrackerInterval() <= 0) { + throw new ConfigException("job.tracker.interval", "should be larger than 0"); + } - JOB_TRACKER_INTERVAL = Integer.parseInt(prop.getProperty("job.tracker.interval", "30")); - if (JOB_TRACKER_INTERVAL <= 0) { - throw new ConfigException("job.tracker.interval", "interval should be larger than 0"); + if (props.getProperty("spark.default.conf") == null) { + props.setProperty("spark.default.conf", ""); } - SPARK_DEFAULT_CONF = prop.getProperty("spark.default.conf", ""); - if (!SPARK_DEFAULT_CONF.isEmpty()) { - String[] defaultSparkConfs = TaskManagerConfig.SPARK_DEFAULT_CONF.split(";"); - for (String sparkConfMap: defaultSparkConfs) { + if (!getSparkDefaultConf().isEmpty()) { + String[] defaultSparkConfs = getSparkDefaultConf().split(";"); + for (String sparkConfMap : defaultSparkConfs) { if (!sparkConfMap.isEmpty()) { String[] kv = sparkConfMap.split("="); if (kv.length < 2) { @@ -205,64 +433,85 @@ public static void doParse() throws ConfigException { } } - SPARK_EVENTLOG_DIR = prop.getProperty("spark.eventLog.dir", ""); - if (!SPARK_EVENTLOG_DIR.isEmpty() && isYarn) { - // TODO: Check if we can use local filesystem with yarn-client mode - if (SPARK_EVENTLOG_DIR.startsWith("file://")) { + if (props.getProperty("spark.eventLog.dir") == null) { + props.setProperty("spark.eventLog.dir", ""); + } + + if (!getSparkEventlogDir().isEmpty() && isYarn()) { + if (getSparkEventlogDir().startsWith("file://")) { throw new ConfigException("spark.eventLog.dir", "should not use local filesystem for yarn mode"); } } - SPARK_YARN_MAXAPPATTEMPTS = Integer.parseInt(prop.getProperty("spark.yarn.maxAppAttempts", "1")); - if (SPARK_YARN_MAXAPPATTEMPTS < 1) { - throw new ConfigException("spark.yarn.maxAppAttempts", "should be larger or equal to 1"); + if (props.getProperty("spark.yarn.maxAppAttempts") == null) { + props.setProperty("spark.yarn.maxAppAttempts", "1"); } - OFFLINE_DATA_PREFIX = prop.getProperty("offline.data.prefix", "file:///tmp/openmldb_offline_storage/"); - if (OFFLINE_DATA_PREFIX.isEmpty()) { - throw new ConfigException("offline.data.prefix", "should not be null"); + if (getSparkYarnMaxappattempts() <= 0) { + throw new ConfigException("spark.yarn.maxAppAttempts", "should be larger than 0"); + } + + + if (props.getProperty("offline.data.prefix") == null) { + props.setProperty("offline.data.prefix", "file:///tmp/openmldb_offline_storage/"); + } + + if (getOfflineDataPrefix().isEmpty()) { + throw new ConfigException("offline.data.prefix", "should not be null"); } else { - if (isYarnCluster && OFFLINE_DATA_PREFIX.startsWith("file://") ) { - throw new ConfigException("offline.data.prefix", "should not use local filesystem for yarn mode"); + if (isYarn() || isK8s()) { + if (getOfflineDataPrefix().startsWith("file://")) { + throw new ConfigException("offline.data.prefix", "should not use local filesystem for yarn mode or k8s mode"); + } } } - BATCHJOB_JAR_PATH = prop.getProperty("batchjob.jar.path", ""); - if (BATCHJOB_JAR_PATH.isEmpty()) { - try { - BATCHJOB_JAR_PATH = BatchJobUtil.findLocalBatchJobJar(); - } catch (Exception e) { - throw new ConfigException("batchjob.jar.path", "config is null and fail to load default openmldb-batchjob jar"); + if (props.getProperty("batchjob.jar.path", "").isEmpty()) { + props.setProperty("batchjob.jar.path", BatchJobUtil.findLocalBatchJobJar()); + } + + if (isYarn() && getHadoopConfDir().isEmpty()) { + if (System.getenv("HADOOP_CONF_DIR") == null) { + throw new ConfigException("hadoop.conf.dir", "should set config 'hadoop.conf.dir' or environment variable 'HADOOP_CONF_DIR'"); + } else { + // TODO: Check if we can get core-site.xml + props.setProperty("hadoop.conf.dir", System.getenv("HADOOP_CONF_DIR")); } } // TODO(hw): need default root? - HADOOP_USER_NAME = firstNonEmpty(prop.getProperty("hadoop.user.name"), System.getenv("HADOOP_USER_NAME")); + String HADOOP_USER_NAME = firstNonEmpty(props.getProperty("hadoop.user.name"), System.getenv("HADOOP_USER_NAME")); + if (HADOOP_USER_NAME != null) { + props.setProperty("hadoop.user.name", HADOOP_USER_NAME); + } - HADOOP_CONF_DIR = firstNonEmpty(prop.getProperty("hadoop.conf.dir"), System.getenv("HADOOP_CONF_DIR")); - if (isYarn && isEmpty(HADOOP_CONF_DIR)) { + + String HADOOP_CONF_DIR = firstNonEmpty(props.getProperty("hadoop.conf.dir"), System.getenv("HADOOP_CONF_DIR")); + if (isYarn() && isEmpty(HADOOP_CONF_DIR)) { throw new ConfigException("hadoop.conf.dir", "should set config 'hadoop.conf.dir' or environment variable 'HADOOP_CONF_DIR'"); } - // TODO: Check if we can get core-site.xml + if (HADOOP_CONF_DIR != null) { + props.setProperty("hadoop.conf.dir", HADOOP_CONF_DIR); + } - ENABLE_HIVE_SUPPORT = Boolean.parseBoolean(prop.getProperty("enable.hive.support", "true")); - BATCH_JOB_RESULT_MAX_WAIT_TIME = Long.parseLong(prop.getProperty("batch.job.result.max.wait.time", "600000")); // 10min + if (props.getProperty("enable.hive.support") == null) { + props.setProperty("enable.hive.support", "true"); + } - K8S_HADOOP_CONFIGMAP_NAME = prop.getProperty("k8s.hadoop.configmap", "hadoop-config"); + if (props.getProperty("batch.job.result.max.wait.time") == null) { + props.setProperty("batch.job.result.max.wait.time", "600000"); + } - K8S_MOUNT_LOCAL_PATH = prop.getProperty("k8s.mount.local.path", "/tmp"); - } + if (props.getProperty("k8s.hadoop.configmap") == null) { + props.setProperty("k8s.hadoop.configmap", "hadoop-config"); + } - public static boolean isK8s() throws ConfigException { - parse(); - return SPARK_MASTER.equals("k8s") || SPARK_MASTER.equals("kubernetes"); + if (props.getProperty("k8s.mount.local.path") == null) { + props.setProperty("k8s.mount.local.path", "/tmp"); + } } - public static boolean isYarnCluster() throws ConfigException { - parse(); - return SPARK_MASTER.equals("yarn") || SPARK_MASTER.equals("yarn-cluster"); - } // ref org.apache.spark.launcher.CommandBuilderUtils public static String firstNonEmpty(String... strings) { @@ -273,7 +522,13 @@ public static String firstNonEmpty(String... strings) { } return null; } + + public static boolean isEmpty(String s) { return s == null || s.isEmpty(); } + + + } + diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/dao/JobIdGenerator.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/dao/JobIdGenerator.java index 2e7ba638703..e9cd0c5014f 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/dao/JobIdGenerator.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/dao/JobIdGenerator.java @@ -28,28 +28,29 @@ public class JobIdGenerator { static { try { zkClient = new ZKClient(ZKConfig.builder() - .cluster(TaskManagerConfig.ZK_CLUSTER) - .namespace(TaskManagerConfig.ZK_ROOT_PATH) - .sessionTimeout(TaskManagerConfig.ZK_SESSION_TIMEOUT) - .baseSleepTime(TaskManagerConfig.ZK_BASE_SLEEP_TIME) - .connectionTimeout(TaskManagerConfig.ZK_CONNECTION_TIMEOUT) - .maxConnectWaitTime(TaskManagerConfig.ZK_MAX_CONNECT_WAIT_TIME) - .maxRetries(TaskManagerConfig.ZK_MAX_RETRIES) + .cluster(TaskManagerConfig.getZkCluster()) + .namespace(TaskManagerConfig.getZkRootPath()) + .sessionTimeout(TaskManagerConfig.getZkSessionTimeout()) + .baseSleepTime(TaskManagerConfig.getZkBaseSleepTime()) + .connectionTimeout(TaskManagerConfig.getZkConnectionTimeout()) + .maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime()) + .maxRetries(TaskManagerConfig.getZkMaxRetries()) .build()); zkClient.connect(); + // Initialize zk nodes - zkClient.createNode(TaskManagerConfig.ZK_ROOT_PATH, "".getBytes()); - zkClient.createNode(TaskManagerConfig.ZK_TASKMANAGER_PATH, "".getBytes()); + zkClient.createNode(TaskManagerConfig.getZkRootPath(), "".getBytes()); + zkClient.createNode(TaskManagerConfig.getZkTaskmanagerPath(), "".getBytes()); int lastMaxJobId = 0; - if (zkClient.checkExists(TaskManagerConfig.ZK_MAX_JOB_ID_PATH)) { + if (zkClient.checkExists(TaskManagerConfig.getZkMaxJobIdPath())) { // Get last max job id from zk - lastMaxJobId = Integer.parseInt(zkClient.getNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH)); + lastMaxJobId = Integer.parseInt(zkClient.getNodeValue(TaskManagerConfig.getZkMaxJobIdPath())); } currentJobId = lastMaxJobId; - maxJobId = lastMaxJobId + TaskManagerConfig.PREFETCH_JOBID_NUM; + maxJobId = lastMaxJobId + TaskManagerConfig.getPrefetchJobidNum(); // set max job id in zk - zkClient.setNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH, String.valueOf(maxJobId).getBytes()); + zkClient.setNodeValue(TaskManagerConfig.getZkMaxJobIdPath(), String.valueOf(maxJobId).getBytes()); } catch (Exception e) { zkClient = null; @@ -67,8 +68,8 @@ public static int getUniqueId() throws Exception { currentJobId += 1; if (currentJobId > maxJobId) { // Update zk before returning job id - maxJobId += TaskManagerConfig.PREFETCH_JOBID_NUM; - zkClient.setNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH, String.valueOf(maxJobId).getBytes()); + maxJobId += TaskManagerConfig.getPrefetchJobidNum(); + zkClient.setNodeValue(TaskManagerConfig.getZkMaxJobIdPath(), String.valueOf(maxJobId).getBytes()); } return currentJobId; } diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/JobResultSaver.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/JobResultSaver.java index 6f6c77482e5..6bb1f310b2f 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/JobResultSaver.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/JobResultSaver.java @@ -105,7 +105,7 @@ public boolean saveFile(int resultId, String jsonData) { return true; } // save to /tmp_result// - String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.JOB_LOG_PATH, resultId); + String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.getJobLogPath(), resultId); synchronized (this) { File saveP = new File(savePath); if (!saveP.exists()) { @@ -151,7 +151,7 @@ public String readResult(int resultId, long timeoutMs) throws InterruptedExcepti } String output = ""; // all finished, read csv from savePath - String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.JOB_LOG_PATH, resultId); + String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.getJobLogPath(), resultId); File saveP = new File(savePath); // If saveP not exists, means no real result saved. But it may use a uncleaned // path, whether read result succeed or not, we should delete it. @@ -225,7 +225,7 @@ public void reset() throws IOException { synchronized (idStatus) { Collections.fill(idStatus, 0); } - String tmpResultDir = String.format("%s/tmp_result", TaskManagerConfig.JOB_LOG_PATH); + String tmpResultDir = String.format("%s/tmp_result", TaskManagerConfig.getJobLogPath()); // delete anyway FileUtils.forceDelete(new File(tmpResultDir)); } diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java index 376ea41eee6..0a75c2e37b2 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java @@ -18,7 +18,6 @@ import com._4paradigm.openmldb.taskmanager.config.ConfigException; import com._4paradigm.openmldb.taskmanager.tracker.JobTrackerService; -import com._4paradigm.openmldb.taskmanager.util.VersionUtil; import com._4paradigm.openmldb.taskmanager.zk.FailoverWatcher; import lombok.extern.slf4j.Slf4j; import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig; @@ -45,7 +44,7 @@ public class TaskManagerServer { * @throws ConfigException if config file does not exist or some configs are incorrect. */ public TaskManagerServer() throws ConfigException { - TaskManagerConfig.parse(); + TaskManagerConfig.print(); } /** @@ -69,7 +68,7 @@ public void start(Boolean blocking) throws ConfigException, IOException, Interru logger.info("The server runs and prepares for leader election"); if (failoverWatcher.blockUntilActive()) { logger.info("The server becomes active master and prepare to do business logic"); - if (TaskManagerConfig.TRACK_UNFINISHED_JOBS) { + if (TaskManagerConfig.getTrackUnfinishedJobs()) { // Start threads to track unfinished jobs JobTrackerService.startTrackerThreads(); } @@ -97,14 +96,14 @@ public void startRpcServer(Boolean blocking) throws ConfigException, Interrupted RpcServerOptions options = new RpcServerOptions(); options.setReceiveBufferSize(64 * 1024 * 1024); options.setSendBufferSize(64 * 1024 * 1024); - options.setIoThreadNum(TaskManagerConfig.IO_THREAD); - options.setWorkThreadNum(TaskManagerConfig.WORKER_THREAD); - options.setKeepAliveTime(TaskManagerConfig.CHANNEL_KEEP_ALIVE_TIME); - rpcServer = new RpcServer(TaskManagerConfig.PORT, options); + options.setIoThreadNum(TaskManagerConfig.getServerIoThreads()); + options.setWorkThreadNum(TaskManagerConfig.getServerWorkerThreads()); + options.setKeepAliveTime(TaskManagerConfig.getChannelKeepAliveTime()); + rpcServer = new RpcServer(TaskManagerConfig.getServerPort(), options); rpcServer.registerService(new TaskManagerImpl()); rpcServer.start(); - log.info("Start TaskManager on {} with worker thread number {}", TaskManagerConfig.PORT, - TaskManagerConfig.WORKER_THREAD); + log.info("Start TaskManager on {} with worker thread number {}", TaskManagerConfig.getServerPort(), + TaskManagerConfig.getServerWorkerThreads()); if (blocking) { // make server keep running diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java index 3a06b96b2c2..6fd43d4200c 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java @@ -73,17 +73,17 @@ public TaskManagerImpl() throws InterruptedException, ConfigException { */ private void initExternalFunction() throws InterruptedException { ZKClient zkClient = new ZKClient(ZKConfig.builder() - .cluster(TaskManagerConfig.ZK_CLUSTER) - .namespace(TaskManagerConfig.ZK_ROOT_PATH) - .sessionTimeout(TaskManagerConfig.ZK_SESSION_TIMEOUT) - .baseSleepTime(TaskManagerConfig.ZK_BASE_SLEEP_TIME) - .connectionTimeout(TaskManagerConfig.ZK_CONNECTION_TIMEOUT) - .maxConnectWaitTime(TaskManagerConfig.ZK_MAX_CONNECT_WAIT_TIME) - .maxRetries(TaskManagerConfig.ZK_MAX_RETRIES) + .cluster(TaskManagerConfig.getZkCluster()) + .namespace(TaskManagerConfig.getZkRootPath()) + .sessionTimeout(TaskManagerConfig.getZkSessionTimeout()) + .baseSleepTime(TaskManagerConfig.getZkBaseSleepTime()) + .connectionTimeout(TaskManagerConfig.getZkConnectionTimeout()) + .maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime()) + .maxRetries(TaskManagerConfig.getZkMaxRetries()) .build()); zkClient.connect(); - String funPath = TaskManagerConfig.ZK_ROOT_PATH + "/data/function"; + String funPath = TaskManagerConfig.getZkRootPath() + "/data/function"; try { List funNames = zkClient.getChildren(funPath); for (String name : funNames) { @@ -220,7 +220,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques // HOST can't be 0.0.0.0 if distributed or spark is not local confMap.put("spark.openmldb.savejobresult.http", String.format("http://%s:%d/openmldb.taskmanager.TaskManagerServer/SaveJobResult", - TaskManagerConfig.HOST, TaskManagerConfig.PORT)); + TaskManagerConfig.getServerHost(), TaskManagerConfig.getServerPort())); // we can't get spark job id here, so we use JobResultSaver id, != spark job id // if too much running jobs to save result, throw exception int resultId = jobResultSaver.genResultId(); @@ -234,7 +234,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques if (finalJobInfo.isSuccess()) { // wait for all files of result saved and read them, large timeout // TODO: Test for K8S backend - String output = jobResultSaver.readResult(resultId, TaskManagerConfig.BATCH_JOB_RESULT_MAX_WAIT_TIME); + String output = jobResultSaver.readResult(resultId, TaskManagerConfig.getBatchJobResultMaxWaitTime()); return TaskManager.RunBatchSqlResponse.newBuilder().setCode(StatusCode.SUCCESS).setOutput(output) .build(); } else { @@ -253,7 +253,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques // rpc max time is CHANNEL_KEEP_ALIVE_TIME, so we don't need to wait too long private JobInfo busyWaitJobInfo(int jobId, int waitSeconds) throws InterruptedException { long maxWaitEnd = System.currentTimeMillis() - + (waitSeconds == 0 ? TaskManagerConfig.CHANNEL_KEEP_ALIVE_TIME : waitSeconds) * 1000; + + (waitSeconds == 0 ? TaskManagerConfig.getChannelKeepAliveTime() : waitSeconds) * 1000; while (System.currentTimeMillis() < maxWaitEnd) { Option info = JobInfoManager.getJob(jobId); if (info.isEmpty()) { diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/udf/ExternalFunctionManager.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/udf/ExternalFunctionManager.java index 00bc94e9fb4..2ee03a8742a 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/udf/ExternalFunctionManager.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/udf/ExternalFunctionManager.java @@ -32,7 +32,7 @@ public class ExternalFunctionManager { static private Map nameFileMap = new ConcurrentHashMap<>(); static public String getLibraryFilePath(String libraryFileName) { - return Paths.get(TaskManagerConfig.EXTERNAL_FUNCTION_DIR, libraryFileName).toString(); + return Paths.get(TaskManagerConfig.getExternalFunctionDir(), libraryFileName).toString(); } static public void addFunction(String fnName, String libraryFileName) throws Exception { diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java index e2e9d8560d9..69c7689bd45 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java @@ -51,13 +51,13 @@ public class FailoverWatcher implements Watcher { */ public FailoverWatcher() throws IOException { - baseZnode = TaskManagerConfig.ZK_ROOT_PATH + "/taskmanager"; + baseZnode = TaskManagerConfig.getZkRootPath() + "/taskmanager"; masterZnode = baseZnode + "/leader"; - zkQuorum = TaskManagerConfig.ZK_CLUSTER; - sessionTimeout = TaskManagerConfig.ZK_SESSION_TIMEOUT; + zkQuorum = TaskManagerConfig.getZkCluster(); + sessionTimeout = TaskManagerConfig.getZkSessionTimeout(); connectRetryTimes = 3; - String serverHost = TaskManagerConfig.HOST; - int serverPort = TaskManagerConfig.PORT; + String serverHost = TaskManagerConfig.getServerHost(); + int serverPort = TaskManagerConfig.getServerPort(); hostPort = new HostPort(serverHost, serverPort); connectZooKeeper(); @@ -91,7 +91,7 @@ protected void connectZooKeeper() throws IOException { */ protected void initZnode() { try { - ZooKeeperUtil.createAndFailSilent(this, TaskManagerConfig.ZK_ROOT_PATH); + ZooKeeperUtil.createAndFailSilent(this, TaskManagerConfig.getZkRootPath()); ZooKeeperUtil.createAndFailSilent(this, baseZnode); } catch (Exception e) { LOG.fatal("Error to create znode " + baseZnode diff --git a/java/openmldb-taskmanager/src/main/resources/taskmanager.properties b/java/openmldb-taskmanager/src/main/resources/taskmanager.properties index ce13bd8be10..e3c41a4b5f8 100644 --- a/java/openmldb-taskmanager/src/main/resources/taskmanager.properties +++ b/java/openmldb-taskmanager/src/main/resources/taskmanager.properties @@ -27,6 +27,5 @@ spark.default.conf= spark.eventLog.dir= spark.yarn.maxAppAttempts=1 batchjob.jar.path= -namenode.uri= offline.data.prefix=file:///tmp/openmldb_offline_storage/ hadoop.conf.dir= diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala index e16bed38cc3..10f958d4472 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala @@ -20,6 +20,7 @@ import com._4paradigm.openmldb.sdk.SdkOption import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig import com._4paradigm.openmldb.taskmanager.dao.{JobIdGenerator, JobInfo} +import com._4paradigm.openmldb.taskmanager.util.HdfsUtil import com._4paradigm.openmldb.taskmanager.yarn.YarnClientUtil import org.slf4j.LoggerFactory import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path} @@ -42,8 +43,8 @@ object JobInfoManager { private val JOB_INFO_TABLE_NAME = "JOB_INFO" private val option = new SdkOption - option.setZkCluster(TaskManagerConfig.ZK_CLUSTER) - option.setZkPath(TaskManagerConfig.ZK_ROOT_PATH) + option.setZkCluster(TaskManagerConfig.getZkCluster) + option.setZkPath(TaskManagerConfig.getZkRootPath) val sqlExecutor = new SqlClusterExecutor(option) sqlExecutor.executeSQL("", "set @@execute_mode='online';") @@ -52,7 +53,7 @@ object JobInfoManager { val startTime = new java.sql.Timestamp(Calendar.getInstance.getTime().getTime()) val initialState = "Submitted" val parameter = if (args != null && args.length>0) args.mkString(",") else "" - val cluster = sparkConf.getOrElse("spark.master", TaskManagerConfig.SPARK_MASTER) + val cluster = sparkConf.getOrElse("spark.master", TaskManagerConfig.getSparkMaster) // TODO: Parse if run in yarn or local val jobInfo = new JobInfo(jobId, jobType, initialState, startTime, null, parameter, cluster, "", "") @@ -210,12 +211,8 @@ object JobInfoManager { FileUtils.deleteDirectory(dir) } else if (filePath.startsWith("hdfs://")) { - val conf = new Configuration(); - // TODO: Get namenode uri from config file - val namenodeUri = TaskManagerConfig.NAMENODE_URI - val hdfs = FileSystem.get(URI.create(s"hdfs://$namenodeUri"), conf) - hdfs.delete(new Path(filePath), true) - + logger.info(s"Try to delete the HDFS path ${filePath}") + HdfsUtil.deleteHdfsDir(filePath) } else { throw new Exception(s"Get unsupported file path: $filePath") } diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/LogManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/LogManager.scala index a700f69145c..2e8fcc7a330 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/LogManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/LogManager.scala @@ -26,11 +26,11 @@ object LogManager { private val logger = LoggerFactory.getLogger(this.getClass) def getJobLogFile(id: Int): File = { - Paths.get(TaskManagerConfig.JOB_LOG_PATH, s"job_${id}.log").toFile + Paths.get(TaskManagerConfig.getJobLogPath, s"job_${id}.log").toFile } def getJobErrorLogFile(id: Int): File = { - Paths.get(TaskManagerConfig.JOB_LOG_PATH, s"job_${id}_error.log").toFile + Paths.get(TaskManagerConfig.getJobLogPath, s"job_${id}_error.log").toFile } def getFileContent(inputFile: File): String = { diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala index 2393a3833a3..b9985a263b0 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala @@ -56,7 +56,7 @@ object K8sJobManager { val finalSparkConf: mutable.Map[String, String] = mutable.Map(sparkConf.toSeq: _*) - val defaultSparkConfs = TaskManagerConfig.SPARK_DEFAULT_CONF.split(";") + val defaultSparkConfs = TaskManagerConfig.getSparkDefaultConf.split(";") defaultSparkConfs.map(sparkConf => { if (sparkConf.nonEmpty) { val kvList = sparkConf.split("=") @@ -66,36 +66,36 @@ object K8sJobManager { } }) - if (TaskManagerConfig.SPARK_EVENTLOG_DIR.nonEmpty) { + if (TaskManagerConfig.getSparkEventlogDir.nonEmpty) { finalSparkConf.put("spark.eventLog.enabled", "true") - finalSparkConf.put("spark.eventLog.dir", TaskManagerConfig.SPARK_EVENTLOG_DIR) + finalSparkConf.put("spark.eventLog.dir", TaskManagerConfig.getSparkEventlogDir) } // Set ZooKeeper config for openmldb-batch jobs - if (TaskManagerConfig.ZK_CLUSTER.nonEmpty && TaskManagerConfig.ZK_ROOT_PATH.nonEmpty) { - finalSparkConf.put("spark.openmldb.zk.cluster", TaskManagerConfig.ZK_CLUSTER) - finalSparkConf.put("spark.openmldb.zk.root.path", TaskManagerConfig.ZK_ROOT_PATH) + if (TaskManagerConfig.getZkCluster.nonEmpty && TaskManagerConfig.getZkRootPath.nonEmpty) { + finalSparkConf.put("spark.openmldb.zk.cluster", TaskManagerConfig.getZkCluster) + finalSparkConf.put("spark.openmldb.zk.root.path", TaskManagerConfig.getZkRootPath) } if (defaultDb.nonEmpty) { finalSparkConf.put("spark.openmldb.default.db", defaultDb) } - if (TaskManagerConfig.OFFLINE_DATA_PREFIX.nonEmpty) { - finalSparkConf.put("spark.openmldb.offline.data.prefix", TaskManagerConfig.OFFLINE_DATA_PREFIX) + if (TaskManagerConfig.getOfflineDataPrefix.nonEmpty) { + finalSparkConf.put("spark.openmldb.offline.data.prefix", TaskManagerConfig.getOfflineDataPrefix) } // Set external function dir for offline jobs - val absoluteExternalFunctionDir = if (TaskManagerConfig.EXTERNAL_FUNCTION_DIR.startsWith("/")) { - TaskManagerConfig.EXTERNAL_FUNCTION_DIR + val absoluteExternalFunctionDir = if (TaskManagerConfig.getExternalFunctionDir.startsWith("/")) { + TaskManagerConfig.getExternalFunctionDir } else { // TODO: The current path is incorrect if running in IDE, please set `external.function.dir` with absolute path // Concat to generate absolute path - Paths.get(Paths.get(".").toAbsolutePath.toString, TaskManagerConfig.EXTERNAL_FUNCTION_DIR).toString + Paths.get(Paths.get(".").toAbsolutePath.toString, TaskManagerConfig.getExternalFunctionDir).toString } finalSparkConf.put("spark.openmldb.taskmanager.external.function.dir", absoluteExternalFunctionDir) - if(TaskManagerConfig.ENABLE_HIVE_SUPPORT) { + if(TaskManagerConfig.getEnableHiveSupport) { finalSparkConf.put("spark.sql.catalogImplementation", "hive") } @@ -107,7 +107,7 @@ object K8sJobManager { mainJarFile = "local:///opt/spark/jars/openmldb-batchjob-0.7.2-SNAPSHOT.jar", arguments = args, sparkConf = finalSparkConf.toMap, - mountLocalPath = TaskManagerConfig.K8S_MOUNT_LOCAL_PATH + mountLocalPath = TaskManagerConfig.getK8sMountLocalPath ) manager.submitJob(jobConfig) @@ -170,7 +170,7 @@ class K8sJobManager(val namespace:String = "default", | type: Never | env: | - name: SPARK_USER - | value: ${TaskManagerConfig.HADOOP_USER_NAME} + | value: ${TaskManagerConfig.getHadoopUserName} | volumes: | - name: host-local | hostPath: @@ -178,7 +178,7 @@ class K8sJobManager(val namespace:String = "default", | type: Directory | - name: hadoop-config | configMap: - | name: ${TaskManagerConfig.K8S_HADOOP_CONFIGMAP_NAME} + | name: ${TaskManagerConfig.getK8sHadoopConfigmapName} | driver: | cores: ${jobConfig.driverCores} | memory: "${jobConfig.driverMemory}" @@ -194,7 +194,7 @@ class K8sJobManager(val namespace:String = "default", | - name: HADOOP_CONF_DIR | value: /etc/hadoop/conf | - name: HADOOP_USER_NAME - | value: ${TaskManagerConfig.HADOOP_USER_NAME} + | value: ${TaskManagerConfig.getHadoopUserName} | executor: | cores: ${jobConfig.executorCores} | instances: ${jobConfig.executorNum} @@ -210,7 +210,7 @@ class K8sJobManager(val namespace:String = "default", | - name: HADOOP_CONF_DIR | value: /etc/hadoop/conf | - name: HADOOP_USER_NAME - | value: ${TaskManagerConfig.HADOOP_USER_NAME} + | value: ${TaskManagerConfig.getHadoopUserName} """.stripMargin // Create a CustomResourceDefinitionContext for the SparkApplication diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala index 8d2410fd13a..bc8c5dfebbe 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala @@ -36,38 +36,40 @@ object SparkJobManager { * @return the SparkLauncher object */ def createSparkLauncher(mainClass: String): SparkLauncher = { + + + val launcher = new SparkLauncher() + .setAppResource(TaskManagerConfig.getBatchjobJarPath) + .setMainClass(mainClass) + + if (TaskManagerConfig.getSparkHome != null && TaskManagerConfig.getSparkHome.nonEmpty) { + launcher.setSparkHome(TaskManagerConfig.getSparkHome) + } + val env: java.util.Map[String, String] = new java.util.HashMap[String, String] // config may empty, need check - if (!TaskManagerConfig.isEmpty(TaskManagerConfig.HADOOP_CONF_DIR)) { - env.put("HADOOP_CONF_DIR", TaskManagerConfig.HADOOP_CONF_DIR) - } - // env.put("YARN_CONF_DIR", TaskManagerConfig.HADOOP_CONF_DIR) // unused now - if (!TaskManagerConfig.isEmpty(TaskManagerConfig.HADOOP_USER_NAME)){ - env.put("HADOOP_USER_NAME", TaskManagerConfig.HADOOP_USER_NAME) + if (TaskManagerConfig.getHadoopConfDir != null && TaskManagerConfig.getHadoopConfDir.nonEmpty) { + env.put("HADOOP_CONF_DIR", TaskManagerConfig.getHadoopConfDir) } - - val launcher = new SparkLauncher(env) - .setAppResource(TaskManagerConfig.BATCHJOB_JAR_PATH) - .setMainClass(mainClass) - if (!TaskManagerConfig.isEmpty(TaskManagerConfig.SPARK_HOME)) { - launcher.setSparkHome(TaskManagerConfig.SPARK_HOME) + if (TaskManagerConfig.getHadoopUserName != null && TaskManagerConfig.getHadoopUserName.nonEmpty){ + env.put("HADOOP_USER_NAME", TaskManagerConfig.getHadoopUserName) } - if (TaskManagerConfig.SPARK_MASTER.toLowerCase.startsWith("local")) { - launcher.setMaster(TaskManagerConfig.SPARK_MASTER) + if (TaskManagerConfig.getSparkMaster.startsWith("local")) { + launcher.setMaster(TaskManagerConfig.getSparkMaster) } else { - TaskManagerConfig.SPARK_MASTER.toLowerCase match { + TaskManagerConfig.getSparkMaster.toLowerCase match { case "yarn" | "yarn-cluster" => launcher.setMaster("yarn").setDeployMode("cluster") case "yarn-client" => launcher.setMaster("yarn").setDeployMode("client") - case _ => throw new Exception(s"Unsupported Spark master ${TaskManagerConfig.SPARK_MASTER}") + case _ => throw new Exception(s"Unsupported Spark master ${TaskManagerConfig.getSparkMaster}") } } - - if (!TaskManagerConfig.isEmpty(TaskManagerConfig.SPARK_YARN_JARS)) { - launcher.setConf("spark.yarn.jars", TaskManagerConfig.SPARK_YARN_JARS) + + if (TaskManagerConfig.getSparkYarnJars != null && TaskManagerConfig.getSparkYarnJars.nonEmpty) { + launcher.setConf("spark.yarn.jars", TaskManagerConfig.getSparkYarnJars) } launcher @@ -95,17 +97,17 @@ object SparkJobManager { // TODO: Avoid using zh_CN to load openmldb jsdk so - if (TaskManagerConfig.SPARK_EVENTLOG_DIR.nonEmpty) { + if (TaskManagerConfig.getSparkEventlogDir.nonEmpty) { launcher.setConf("spark.eventLog.enabled", "true") - launcher.setConf("spark.eventLog.dir", TaskManagerConfig.SPARK_EVENTLOG_DIR) + launcher.setConf("spark.eventLog.dir", TaskManagerConfig.getSparkEventlogDir) } - if (TaskManagerConfig.SPARK_YARN_MAXAPPATTEMPTS >= 1 ) { - launcher.setConf("spark.yarn.maxAppAttempts", TaskManagerConfig.SPARK_YARN_MAXAPPATTEMPTS.toString) + if (TaskManagerConfig.getSparkYarnMaxappattempts >= 1 ) { + launcher.setConf("spark.yarn.maxAppAttempts", TaskManagerConfig.getSparkYarnMaxappattempts.toString) } // Set default Spark conf by TaskManager configuration file - val defaultSparkConfs = TaskManagerConfig.SPARK_DEFAULT_CONF.split(";") + val defaultSparkConfs = TaskManagerConfig.getSparkDefaultConf.split(";") defaultSparkConfs.map(sparkConf => { if (sparkConf.nonEmpty) { val kvList = sparkConf.split("=") @@ -116,9 +118,9 @@ object SparkJobManager { }) // Set ZooKeeper config for openmldb-batch jobs - if (TaskManagerConfig.ZK_CLUSTER.nonEmpty && TaskManagerConfig.ZK_ROOT_PATH.nonEmpty) { - launcher.setConf("spark.openmldb.zk.cluster", TaskManagerConfig.ZK_CLUSTER) - launcher.setConf("spark.openmldb.zk.root.path", TaskManagerConfig.ZK_ROOT_PATH) + if (TaskManagerConfig.getZkCluster.nonEmpty && TaskManagerConfig.getZkRootPath.nonEmpty) { + launcher.setConf("spark.openmldb.zk.cluster", TaskManagerConfig.getZkCluster) + launcher.setConf("spark.openmldb.zk.root.path", TaskManagerConfig.getZkRootPath) } // Set ad-hoc Spark configuration @@ -126,17 +128,17 @@ object SparkJobManager { launcher.setConf("spark.openmldb.default.db", defaultDb) } - if (TaskManagerConfig.OFFLINE_DATA_PREFIX.nonEmpty) { - launcher.setConf("spark.openmldb.offline.data.prefix", TaskManagerConfig.OFFLINE_DATA_PREFIX) + if (TaskManagerConfig.getOfflineDataPrefix.nonEmpty) { + launcher.setConf("spark.openmldb.offline.data.prefix", TaskManagerConfig.getOfflineDataPrefix) } // Set external function dir for offline jobs - val absoluteExternalFunctionDir = if (TaskManagerConfig.EXTERNAL_FUNCTION_DIR.startsWith("/")) { - TaskManagerConfig.EXTERNAL_FUNCTION_DIR + val absoluteExternalFunctionDir = if (TaskManagerConfig.getExternalFunctionDir.startsWith("/")) { + TaskManagerConfig.getExternalFunctionDir } else { // TODO: The current path is incorrect if running in IDE, please set `external.function.dir` with absolute path // Concat to generate absolute path - Paths.get(Paths.get(".").toAbsolutePath.toString, TaskManagerConfig.EXTERNAL_FUNCTION_DIR).toString + Paths.get(Paths.get(".").toAbsolutePath.toString, TaskManagerConfig.getExternalFunctionDir).toString } launcher.setConf("spark.openmldb.taskmanager.external.function.dir", absoluteExternalFunctionDir) @@ -145,13 +147,13 @@ object SparkJobManager { launcher.setConf(k, v) } - if (TaskManagerConfig.JOB_LOG_PATH.nonEmpty) { + if (TaskManagerConfig.getJobLogPath.nonEmpty) { // Create local file and redirect the log of job into files launcher.redirectOutput(LogManager.getJobLogFile(jobInfo.getId)) launcher.redirectError(LogManager.getJobErrorLogFile(jobInfo.getId)) } - if(TaskManagerConfig.ENABLE_HIVE_SUPPORT) { + if(TaskManagerConfig.getEnableHiveSupport) { launcher.setConf("spark.sql.catalogImplementation", "hive") } diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/tracker/YarnJobTrackerThread.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/tracker/YarnJobTrackerThread.scala index 7424f82cd84..8e45ee5c935 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/tracker/YarnJobTrackerThread.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/tracker/YarnJobTrackerThread.scala @@ -31,7 +31,7 @@ class YarnJobTrackerThread(job: JobInfo) extends Thread { } // Sleep for interval time - Thread.sleep(TaskManagerConfig.JOB_TRACKER_INTERVAL * 1000) + Thread.sleep(TaskManagerConfig.getJobTrackerInterval * 1000) val currentYarnState = appReport.getYarnApplicationState.toString.toLowerCase() diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/util/HdfsUtil.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/util/HdfsUtil.scala new file mode 100644 index 00000000000..c4bbb2ef6f2 --- /dev/null +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/util/HdfsUtil.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com._4paradigm.openmldb.taskmanager.util + +import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.slf4j.LoggerFactory + +object HdfsUtil { + + private val logger = LoggerFactory.getLogger(this.getClass) + + def deleteHdfsDir(path: String): Unit = { + + val conf = new Configuration() + conf.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "core-site.xml")) + conf.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "hdfs-site.xml")) + + val fs = FileSystem.get(conf) + + val pathToDelete = new Path(path) + + if (fs.exists(pathToDelete)) { + fs.delete(pathToDelete, true); + logger.info("File deleted successfully: " + path) + } else { + logger.warn("File does not exist: " + path) + } + + fs.close() + + } + +} diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/util/VersionUtil.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/util/VersionUtil.scala index a416ce9cd0d..93f66b8eba2 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/util/VersionUtil.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/util/VersionUtil.scala @@ -27,7 +27,7 @@ object VersionUtil { private val logger = LoggerFactory.getLogger(this.getClass) def getBatchVersion(): String = { - val sparkJarsPath = Paths.get(TaskManagerConfig.SPARK_HOME, "jars").toString + val sparkJarsPath = Paths.get(TaskManagerConfig.getSparkHome, "jars").toString val batchJarPath = BatchJobUtil.findOpenmldbBatchJar(sparkJarsPath) if (batchJarPath == null) { logger.error("Fail to find batch jar file and the version is unknown") diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/yarn/YarnClientUtil.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/yarn/YarnClientUtil.scala index ffb7d5eebaf..803bd9ce5bf 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/yarn/YarnClientUtil.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/yarn/YarnClientUtil.scala @@ -31,10 +31,10 @@ object YarnClientUtil { def createYarnClient(): YarnClient = { val config = new Configuration() - config.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR, "core-site.xml")) - config.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR, "hdfs-site.xml")) - config.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR, "yarn-site.xml")) - config.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR, "mapred-site.xml")) + config.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "core-site.xml")) + config.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "hdfs-site.xml")) + config.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "yarn-site.xml")) + config.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "mapred-site.xml")) // Create yarn client val yarnClient = YarnClient.createYarnClient() @@ -90,10 +90,10 @@ object YarnClientUtil { val config = new YarnConfiguration() // TODO: Load config file in better way - config.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR, "core-site.xml")) - config.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR, "hdfs-site.xml")) - config.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR, "yarn-site.xml")) - config.addResource(new Path(TaskManagerConfig.HADOOP_CONF_DIR, "mapred-site.xml")) + config.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "core-site.xml")) + config.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "hdfs-site.xml")) + config.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "yarn-site.xml")) + config.addResource(new Path(TaskManagerConfig.getHadoopConfDir, "mapred-site.xml")) val logCliHelper = new LogCLIHelpers logCliHelper.setConf(config) diff --git a/java/openmldb-taskmanager/src/test/scala/com/_4paradigm/openmldb/taskmanager/server/impl/TestTaskManagerImpl.scala b/java/openmldb-taskmanager/src/test/scala/com/_4paradigm/openmldb/taskmanager/server/impl/TestTaskManagerImpl.scala index 311f8166eec..86d85ad2919 100644 --- a/java/openmldb-taskmanager/src/test/scala/com/_4paradigm/openmldb/taskmanager/server/impl/TestTaskManagerImpl.scala +++ b/java/openmldb-taskmanager/src/test/scala/com/_4paradigm/openmldb/taskmanager/server/impl/TestTaskManagerImpl.scala @@ -148,8 +148,8 @@ class TestTaskManagerImpl extends FunSuite { val testDb = "db1" val testTable = "t1" val option = new SdkOption - option.setZkCluster(TaskManagerConfig.ZK_CLUSTER) - option.setZkPath(TaskManagerConfig.ZK_ROOT_PATH) + option.setZkCluster(TaskManagerConfig.getZkCluster) + option.setZkPath(TaskManagerConfig.getZkRootPath) val executor = new SqlClusterExecutor(option) executor.createDB(testDb) executor.executeDDL(testDb, s"drop table $testTable") @@ -191,8 +191,8 @@ class TestTaskManagerImpl extends FunSuite { val testDb = "db1" val testTable = "t1" val option = new SdkOption - option.setZkCluster(TaskManagerConfig.ZK_CLUSTER) - option.setZkPath(TaskManagerConfig.ZK_ROOT_PATH) + option.setZkCluster(TaskManagerConfig.getZkCluster) + option.setZkPath(TaskManagerConfig.getZkRootPath) val executor = new SqlClusterExecutor(option) executor.createDB(testDb) @@ -239,8 +239,8 @@ class TestTaskManagerImpl extends FunSuite { val testDb = "db1" val testTable = "t1" val option = new SdkOption - option.setZkCluster(TaskManagerConfig.ZK_CLUSTER) - option.setZkPath(TaskManagerConfig.ZK_ROOT_PATH) + option.setZkCluster(TaskManagerConfig.getZkCluster) + option.setZkPath(TaskManagerConfig.getZkRootPath) val executor = new SqlClusterExecutor(option) executor.createDB(testDb) @@ -295,8 +295,8 @@ class TestTaskManagerImpl extends FunSuite { val testDb = "db1" val testTable = "t1" val option = new SdkOption - option.setZkCluster(TaskManagerConfig.ZK_CLUSTER) - option.setZkPath(TaskManagerConfig.ZK_ROOT_PATH) + option.setZkCluster(TaskManagerConfig.getZkCluster) + option.setZkPath(TaskManagerConfig.getZkRootPath) val executor = new SqlClusterExecutor(option) executor.createDB(testDb) diff --git a/release/sbin/start-taskmanagers.sh b/release/sbin/start-taskmanagers.sh index e0a55767877..b6873c33089 100755 --- a/release/sbin/start-taskmanagers.sh +++ b/release/sbin/start-taskmanagers.sh @@ -39,6 +39,10 @@ else echo "start taskmanager in $dir with endpoint $host:$port " cmd="cd $dir && SPARK_HOME=${SPARK_HOME} bin/start.sh start taskmanager $*" run_auto "$host" "$cmd" + + # Print the log of taskmanager if fail + #cmd="cd $dir && cat taskmanager/bin/logs/taskmanager.log" + #run_auto "$host" "$cmd" done IFS="$old_IFS" -fi \ No newline at end of file +fi