From 9bee724b108f84859dfc99bac26559a1ce67c8db Mon Sep 17 00:00:00 2001 From: MarkPotato777 Date: Tue, 24 Sep 2024 18:55:01 +0800 Subject: [PATCH 1/3] write JobContext to file --- .../service/task/caller/JobCallerBuilder.java | 24 ++++++++- .../task/caller/JobEnvironmentEncryptor.java | 9 ++++ .../task/constants/JobEnvKeyConstants.java | 4 ++ .../task/executor/TaskApplication.java | 13 +++-- .../context/JobContextProviderFactory.java | 15 ++++-- ...ovider.java => K8sJobContextProvider.java} | 2 +- .../context/ProcessJobContextProvider.java | 52 +++++++++++++++++++ .../odc/service/task/util/JobUtils.java | 8 +++ 8 files changed, 119 insertions(+), 8 deletions(-) rename server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/{DefaultJobContextProvider.java => K8sJobContextProvider.java} (94%) create mode 100644 server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/ProcessJobContextProvider.java diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java index 91c9b828f7..44bc723870 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java @@ -15,8 +15,13 @@ */ package com.oceanbase.odc.service.task.caller; +import java.io.File; +import java.nio.charset.Charset; import java.util.Map; +import org.apache.commons.io.FileUtils; + +import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.service.task.config.JobConfigurationHolder; import com.oceanbase.odc.service.task.config.TaskFrameworkProperties; import com.oceanbase.odc.service.task.constants.JobEnvKeyConstants; @@ -36,7 +41,24 @@ public class JobCallerBuilder { public static JobCaller buildProcessCaller(JobContext context) { Map environments = new JobEnvironmentFactory().build(context, TaskRunMode.PROCESS); JobUtils.encryptEnvironments(environments); - + /** + * write JobContext to file in case of exceeding the environments size limit; set the file path in + * the environment instead + */ + String jobContextFilePath = JobUtils.getExecutorDataPath() + "/" + StringUtils.uuid() + ".enc"; + try { + FileUtils.writeStringToFile(new File(jobContextFilePath), + environments.get(JobEnvKeyConstants.ODC_JOB_CONTEXT), + Charset.defaultCharset()); + } catch (Exception ex) { + FileUtils.deleteQuietly(new File(jobContextFilePath)); + throw new RuntimeException("Failed to write job context to file: " + jobContextFilePath, ex); + } + environments.put(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH, + JobUtils.encrypt(environments.get(JobEnvKeyConstants.ENCRYPT_KEY), + environments.get(JobEnvKeyConstants.ENCRYPT_SALT), jobContextFilePath)); + // remove JobContext from environments + environments.remove(JobEnvKeyConstants.ODC_JOB_CONTEXT); ProcessConfig config = new ProcessConfig(); config.setEnvironments(environments); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentEncryptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentEncryptor.java index 47efeaf0b5..63328a7136 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentEncryptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentEncryptor.java @@ -42,6 +42,7 @@ public class JobEnvironmentEncryptor { JobEnvKeyConstants.ODC_EXECUTOR_DATABASE_PASSWORD, JobEnvKeyConstants.ODC_OBJECT_STORAGE_CONFIGURATION, JobEnvKeyConstants.ODC_PROPERTY_ENCRYPTION_SALT, + JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH, JobEnvKeyConstants.ODC_JOB_CONTEXT); private final AtomicBoolean encrypted = new AtomicBoolean(false); @@ -85,5 +86,13 @@ public void decrypt(@NonNull Map environments) { }); } + public String encrypt(String key, String salt, String raw) { + TextEncryptor textEncryptor = Encryptors.aesBase64(key, salt); + return textEncryptor.encrypt(raw); + } + public String decrypt(String key, String salt, String encrypted) { + TextEncryptor textEncryptor = Encryptors.aesBase64(key, salt); + return textEncryptor.decrypt(encrypted); + } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java index ab53f505f1..afd8b2a300 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java @@ -27,12 +27,16 @@ public class JobEnvKeyConstants { public static final String ODC_JOB_CONTEXT = "ODC_JOB_CONTEXT"; + public static final String ODC_JOB_CONTEXT_FILE_PATH = "ODC_JOB_CONTEXT_FILE_PATH"; + public static final String ODC_TASK_RUN_MODE = "ODC_TASK_RUN_MODE"; public static final String ODC_BOOT_MODE = "ODC_BOOT_MODE"; public static final String ODC_LOG_DIRECTORY = "odc.log.directory"; + public static final String ODC_DATA_DIRECTORY = "file.storage.dir"; + public static final String ODC_EXECUTOR_PORT = "ODC_EXECUTOR_PORT"; public static final String ODC_SERVICE_HOST = "ODC_SERVICE_HOST"; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java index 0e78ca0b09..1807fbb6aa 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LoggerContext; +import org.eclipse.jgit.util.StringUtils; import com.oceanbase.odc.common.trace.TaskContextHolder; import com.oceanbase.odc.common.trace.TraceContextHolder; @@ -90,7 +91,8 @@ private void init(String[] args) { log.info("decrypt environment variables success."); // 3 step: get JobContext from environment - context = JobContextProviderFactory.create().provide(); + context = JobContextProviderFactory.create(SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ODC_TASK_RUN_MODE)) + .provide(); log.info("initial job context success."); // 4 step: trace taskId in log4j2 context @@ -151,9 +153,14 @@ private void setLog4JConfigXml() { } private void validEnvValues() { - validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT); - validNotBlank(JobEnvKeyConstants.ODC_BOOT_MODE); validNotBlank(JobEnvKeyConstants.ODC_TASK_RUN_MODE); + if (StringUtils.equalsIgnoreCase("PROCESS", + SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ODC_TASK_RUN_MODE))) { + validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH); + } else { + validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT); + } + validNotBlank(JobEnvKeyConstants.ODC_BOOT_MODE); validNotBlank(JobEnvKeyConstants.ENCRYPT_SALT); validNotBlank(JobEnvKeyConstants.ENCRYPT_KEY); validNotBlank(JobEnvKeyConstants.ODC_EXECUTOR_USER_ID); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/JobContextProviderFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/JobContextProviderFactory.java index 01ccbd813d..c285caf3fa 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/JobContextProviderFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/JobContextProviderFactory.java @@ -16,14 +16,23 @@ package com.oceanbase.odc.service.task.executor.context; +import com.oceanbase.odc.service.task.enums.TaskRunMode; + +import lombok.NonNull; + /** * @author gaoda.xy * @date 2023/11/23 13:55 */ public class JobContextProviderFactory { - public static JobContextProvider create() { - return new DefaultJobContextProvider(); + public static JobContextProvider create(@NonNull String taskRunMode) { + if (taskRunMode.equalsIgnoreCase(TaskRunMode.PROCESS.name())) { + return new ProcessJobContextProvider(); + } else if (taskRunMode.equalsIgnoreCase(TaskRunMode.K8S.name())) { + return new K8sJobContextProvider(); + } else { + throw new RuntimeException("Unsupported task run mode: " + taskRunMode); + } } - } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/DefaultJobContextProvider.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/K8sJobContextProvider.java similarity index 94% rename from server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/DefaultJobContextProvider.java rename to server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/K8sJobContextProvider.java index 02c1ea299a..03eaad455e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/DefaultJobContextProvider.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/K8sJobContextProvider.java @@ -25,7 +25,7 @@ * @author gaoda.xy * @date 2023/11/22 20:21 */ -public class DefaultJobContextProvider implements JobContextProvider { +public class K8sJobContextProvider implements JobContextProvider { @Override public JobContext provide() { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/ProcessJobContextProvider.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/ProcessJobContextProvider.java new file mode 100644 index 0000000000..c76f02739a --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/ProcessJobContextProvider.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.task.executor.context; + +import java.io.File; +import java.nio.charset.Charset; + +import org.apache.commons.io.FileUtils; + +import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.common.util.SystemUtils; +import com.oceanbase.odc.service.task.caller.DefaultJobContext; +import com.oceanbase.odc.service.task.caller.JobContext; +import com.oceanbase.odc.service.task.constants.JobEnvKeyConstants; +import com.oceanbase.odc.service.task.util.JobUtils; + +/** + * @Author: Lebie + * @Date: 2024/9/24 17:27 + * @Description: [] + */ +public class ProcessJobContextProvider implements JobContextProvider { + @Override + public JobContext provide() { + String encryptedJobContextJson; + try { + encryptedJobContextJson = FileUtils + .readFileToString(new File(System.getProperty(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH)), + Charset.defaultCharset()); + } catch (Exception ex) { + throw new RuntimeException("read job context file failed, ex=", ex); + } finally { + FileUtils.deleteQuietly(new File(System.getProperty(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH))); + } + String rawJobContextJson = JobUtils.decrypt(SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ENCRYPT_KEY), + SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ENCRYPT_SALT), encryptedJobContextJson); + return JsonUtils.fromJson(rawJobContextJson, DefaultJobContext.class); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/util/JobUtils.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/util/JobUtils.java index d8c4dc708d..9f02bff787 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/util/JobUtils.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/util/JobUtils.java @@ -180,4 +180,12 @@ public static void putEnvToSysProperties(String environmentKey) { public static void encryptEnvironments(Map environments) { new JobEnvironmentEncryptor().encrypt(environments); } + + public static String encrypt(String key, String salt, String raw) { + return new JobEnvironmentEncryptor().encrypt(key, salt, raw); + } + + public static String decrypt(String key, String salt, String encrypted) { + return new JobEnvironmentEncryptor().decrypt(key, salt, encrypted); + } } From 9d14029a4cf96ab3607036a3d50a02bf4a080198 Mon Sep 17 00:00:00 2001 From: MarkPotato777 Date: Tue, 24 Sep 2024 19:03:41 +0800 Subject: [PATCH 2/3] delete useless constant --- .../odc/service/task/constants/JobEnvKeyConstants.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java index afd8b2a300..be733708e3 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java @@ -35,8 +35,6 @@ public class JobEnvKeyConstants { public static final String ODC_LOG_DIRECTORY = "odc.log.directory"; - public static final String ODC_DATA_DIRECTORY = "file.storage.dir"; - public static final String ODC_EXECUTOR_PORT = "ODC_EXECUTOR_PORT"; public static final String ODC_SERVICE_HOST = "ODC_SERVICE_HOST"; From 53af87a287ee2ff07b1a7a5818487f3f7abc229a Mon Sep 17 00:00:00 2001 From: MarkPotato777 Date: Tue, 24 Sep 2024 20:06:32 +0800 Subject: [PATCH 3/3] response to comments --- .../oceanbase/odc/service/task/caller/JobCallerBuilder.java | 5 ++--- .../odc/service/task/caller/JobEnvironmentFactory.java | 5 +++-- .../oceanbase/odc/service/task/executor/TaskApplication.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java index 44bc723870..bf21acf357 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java @@ -48,7 +48,8 @@ public static JobCaller buildProcessCaller(JobContext context) { String jobContextFilePath = JobUtils.getExecutorDataPath() + "/" + StringUtils.uuid() + ".enc"; try { FileUtils.writeStringToFile(new File(jobContextFilePath), - environments.get(JobEnvKeyConstants.ODC_JOB_CONTEXT), + JobUtils.encrypt(environments.get(JobEnvKeyConstants.ENCRYPT_KEY), + environments.get(JobEnvKeyConstants.ENCRYPT_SALT), JobUtils.toJson(context)), Charset.defaultCharset()); } catch (Exception ex) { FileUtils.deleteQuietly(new File(jobContextFilePath)); @@ -57,8 +58,6 @@ public static JobCaller buildProcessCaller(JobContext context) { environments.put(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH, JobUtils.encrypt(environments.get(JobEnvKeyConstants.ENCRYPT_KEY), environments.get(JobEnvKeyConstants.ENCRYPT_SALT), jobContextFilePath)); - // remove JobContext from environments - environments.remove(JobEnvKeyConstants.ODC_JOB_CONTEXT); ProcessConfig config = new ProcessConfig(); config.setEnvironments(environments); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentFactory.java index dc64dfe467..290e0ffad1 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentFactory.java @@ -44,8 +44,9 @@ public class JobEnvironmentFactory { public Map build(JobContext context, TaskRunMode runMode) { putEnv(JobEnvKeyConstants.ODC_BOOT_MODE, () -> JobConstants.ODC_BOOT_MODE_EXECUTOR); putEnv(JobEnvKeyConstants.ODC_TASK_RUN_MODE, runMode::name); - putEnv(JobEnvKeyConstants.ODC_JOB_CONTEXT, () -> JobUtils.toJson(context)); - + if (runMode.isK8s()) { + putEnv(JobEnvKeyConstants.ODC_JOB_CONTEXT, () -> JobUtils.toJson(context)); + } JobCredentialProvider jobCredentialProvider = JobConfigurationHolder.getJobConfiguration() .getJobCredentialProvider(); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java index 1807fbb6aa..84b5a3758f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java @@ -24,10 +24,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LoggerContext; -import org.eclipse.jgit.util.StringUtils; import com.oceanbase.odc.common.trace.TaskContextHolder; import com.oceanbase.odc.common.trace.TraceContextHolder; +import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.common.util.SystemUtils; import com.oceanbase.odc.core.shared.Verify; import com.oceanbase.odc.service.task.caller.JobContext;