diff --git a/docs/docs/en/guide/task/kubernetes.md b/docs/docs/en/guide/task/kubernetes.md index 871a020294c9..914a4f6e1a2b 100644 --- a/docs/docs/en/guide/task/kubernetes.md +++ b/docs/docs/en/guide/task/kubernetes.md @@ -18,6 +18,8 @@ K8S task type used to execute a batch task. In this task, the worker submits the | **Parameter** | **Description** | |-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| customConfig | The switch enables user-customized Kubernetes YAML mode when the default low-code Kubernetes Job provided does not meet the requirements. | +| yamlContent | The YAML configuration file content for user-customized Kubernetes task. | | Namespace | The namespace for running k8s task. | | Min CPU | Minimum CPU requirement for running k8s task. | | Min Memory | Minimum memory requirement for running k8s task. | @@ -41,6 +43,10 @@ Configure the required content according to the parameter descriptions above. ![K8S](../../../../img/tasks/demo/kubernetes-task-en.png) +User-customized Kubernetes YAML mode can be turned on by switching to "Custom Template". + +![K8S-YAML](../../../../img/tasks/demo/kubernetes-yaml-task-en.png) + ## Note Task name contains only lowercase alphanumeric characters or '-' diff --git a/docs/docs/zh/guide/task/kubernetes.md b/docs/docs/zh/guide/task/kubernetes.md index dbfbcd20d2a6..5e64fa9595c1 100644 --- a/docs/docs/zh/guide/task/kubernetes.md +++ b/docs/docs/zh/guide/task/kubernetes.md @@ -18,6 +18,8 @@ kubernetes任务类型,用于在kubernetes上执行一个短时和批处理的 | **任务参数** | **描述** | |----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 自定义模板 | 当默认的低代码 Kubernetes 任务不符合要求时,该开关可以启用用户自定义的 Kubernetes YAML 模式。 | +| YAML | 用户自定义 Kubernetes 任务的 YAML 配置文件内容 | | 命名空间 | 选择kubernetes集群上存在的命名空间 | | 最小CPU | 任务在kubernetes上运行所需的最小CPU | | 最小内存 | 任务在kubernetes上运行所需的最小内存 | @@ -41,6 +43,10 @@ kubernetes任务类型,用于在kubernetes上执行一个短时和批处理的 ![kubernetes](../../../../img/tasks/demo/kubernetes-task-en.png) +切换到用户自定义模板,即可使用 YAML 配置文件启动自定义 kubernetes 任务。 + +![K8S-YAML](../../../../img/tasks/demo/kubernetes-yaml-task-en.png) + ## 注意事项 任务名字限制在小写字母、数字和-这三种字符之中 diff --git a/docs/img/tasks/demo/kubernetes-yaml-task-en.png b/docs/img/tasks/demo/kubernetes-yaml-task-en.png new file mode 100644 index 000000000000..c757b9d3d323 Binary files /dev/null and b/docs/img/tasks/demo/kubernetes-yaml-task-en.png differ diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index b7efb4815cf1..6c14712a36da 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -365,6 +365,12 @@ jackson-datatype-jsr310 ${jackson.version} + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + ${jackson.version} + + io.protostuff diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 6c83c580ae25..5ac0d476d60d 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -74,6 +74,12 @@ com.fasterxml.jackson.core jackson-databind + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + org.apache.commons commons-collections4 diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/YamlUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/YamlUtils.java new file mode 100644 index 000000000000..2763ad98ed90 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/YamlUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import java.io.File; + +import lombok.extern.slf4j.Slf4j; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; + +/** + * YAML Utilities + */ +@Slf4j +public class YamlUtils { + + // YAML parser + private static final ObjectMapper objectMapper = YAMLMapper.builder() + .enable(SerializationFeature.INDENT_OUTPUT) + .disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER) + .build(); + + // ensure Singleton Pattern of `YamlUtils` + private YamlUtils() { + throw new UnsupportedOperationException("Construct YamlUtils"); + } + + /** + * parse the YAML String + * + * @param yamlString YAML string to load + * @param typeReference the type reference specifying the type of the object to parse into + * @param the type of the object + * @return an object of type T parsed from the YAML file, or null if parsing fails + */ + public static T load(String yamlString, TypeReference typeReference) { + try { + return objectMapper.readValue(yamlString, typeReference); + } catch (Exception exception) { + log.error("failed to parse YAML String ({}):" + "\n" + + "```yaml" + "\n" + + "{}" + "\n" + + "```" + "\n" + + "\n", + exception.getMessage(), yamlString); + return null; + } + } + + /** + * Loads and parses a YAML file into an object of the specified class. + * + * @param file YAML file to load + * @param typeReference the type reference specifying the type of the object to parse into + * @param the type of the object + * @return an object of type T parsed from the YAML file, or null if parsing fails + */ + public static T load(File file, TypeReference typeReference) { + try { + return objectMapper.readValue(file, typeReference); + } catch (Exception exception) { + log.error("failed to parse YAML file `{}`: {}", file.getName(), exception.getMessage()); + return null; + } + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/YamlUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/YamlUtilsTest.java new file mode 100644 index 000000000000..84cdb67b7830 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/YamlUtilsTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.type.TypeReference; + +public class YamlUtilsTest { + + private static final String yamlStringExpected = + "name: Yaml Parser" + '\n' + + "age: 30" + '\n' + + "address:" + '\n' + + " city: New York" + '\n' + + " state: NY" + '\n' + + " zipcode: 10001" + '\n'; + + private static final Map yamlDataMapExpected = new HashMap() { + + { + put("name", "Yaml Parser"); + put("age", 30); + put("address", new HashMap() { + + { + put("city", "New York"); + put("state", "NY"); + put("zipcode", 10001); + } + }); + } + }; + + @Test + public void testParseYamlString() { + + Assertions.assertNull(YamlUtils.load("", new TypeReference>() { + })); + + Assertions.assertNull(YamlUtils.load(yamlStringExpected, new TypeReference() { + })); + + Map yamlDataMapActual = + YamlUtils.load(yamlStringExpected, new TypeReference>() { + }); + Assertions.assertEquals( + yamlDataMapExpected, yamlDataMapActual, + "[!] Test FAILED: expected YAML data: " + yamlDataMapExpected + + ", but actual data parsed: " + yamlDataMapActual); + } + + @Test + public void testParseYamlFile() { + + File emptyFile = new File(""); + Assertions.assertNull(YamlUtils.load(emptyFile, new TypeReference() { + })); + + isFileTestcase01AddressYamlPassed(); + isFileTestcase02SimpleK8sPodYamlPassed(); + } + + /* + * The following methods are helpers and should be kept private + */ + + private void isFileTestcase01AddressYamlPassed() { + String filePathRelative = "yaml/testcase-01-address.yaml"; + String filePathAbsolute = + Objects.requireNonNull(getClass().getClassLoader().getResource(filePathRelative)).getFile(); + File file = new File(filePathAbsolute); + Map yamlDataMapActual = YamlUtils.load(file, new TypeReference>() { + }); + Assertions.assertEquals( + yamlDataMapExpected, yamlDataMapActual, + "[!] Test FAILED on YAML file: yaml/testcase-01-address.yaml"); + } + + private void isFileTestcase02SimpleK8sPodYamlPassed() { + String filePathRelative = "yaml/testcase-02-simple-k8s-pod.yaml"; + String filePathAbsolute = + Objects.requireNonNull(getClass().getClassLoader().getResource(filePathRelative)).getFile(); + + Map yamlDataK8sPodExpected = new HashMap() { + + { + put("apiVersion", "v1"); + put("kind", "Pod"); + put("metadata", new HashMap() { + + { + put("name", "testcase-02-simple-k8s-pod-nginx"); + put("labels", new HashMap() { + + { + put("app", "nginx"); + } + }); + } + }); + put("spec", new HashMap() { + + { + put("containers", new ArrayList() { + + { + add(new HashMap() { + + { + put("name", "nginx-container"); + put("image", "nginx:1.10"); + put("ports", new ArrayList() { + + { + add(new HashMap() { + + { + put("containerPort", 80); + } + }); + } + }); + } + }); + } + }); + } + }); + } + }; + File file = new File(filePathAbsolute); + Map yamlDataK8sPodActual = YamlUtils.load(file, new TypeReference>() { + }); + Assertions.assertEquals( + yamlDataK8sPodExpected, yamlDataK8sPodActual, + "[!] Test FAILED on YAML file: yaml/testcase-02-simple-k8s-pod.yaml"); + } +} diff --git a/dolphinscheduler-common/src/test/resources/yaml/testcase-01-address.yaml b/dolphinscheduler-common/src/test/resources/yaml/testcase-01-address.yaml new file mode 100644 index 000000000000..2877b3c5ace0 --- /dev/null +++ b/dolphinscheduler-common/src/test/resources/yaml/testcase-01-address.yaml @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# testcase-01-address.yaml +name: Yaml Parser +age: 30 +address: + city: New York + state: NY + zipcode: 10001 diff --git a/dolphinscheduler-common/src/test/resources/yaml/testcase-02-simple-k8s-pod.yaml b/dolphinscheduler-common/src/test/resources/yaml/testcase-02-simple-k8s-pod.yaml new file mode 100644 index 000000000000..893af5195aa0 --- /dev/null +++ b/dolphinscheduler-common/src/test/resources/yaml/testcase-02-simple-k8s-pod.yaml @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# testcase-02-simple-k8s-pod.yaml +apiVersion: v1 +kind: Pod +metadata: + name: testcase-02-simple-k8s-pod-nginx + labels: + app: nginx +spec: + containers: + - name: nginx-container + image: nginx:1.10 + ports: + - containerPort: 80 diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index a4cdfeadabbc..910626c234ac 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -434,7 +434,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. kubernetes-model-scheduling 5.10.2: https://mvnrepository.com/artifact/io.fabric8/kubernetes-model-scheduling/5.10.2, Apache 2.0 kubernetes-model-storageclass 5.10.2: https://mvnrepository.com/artifact/io.fabric8/kubernetes-model-storageclass/5.10.2, Apache 2.0 zjsonpatch 0.3.0 https://mvnrepository.com/artifact/io.fabric8/zjsonpatch/0.3.0, Apache 2.0 - jackson-dataformat-yaml 2.13.0 https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-yaml/2.13.0, Apache 2.0 + jackson-dataformat-yaml 2.13.4 https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-yaml/2.13.4, Apache 2.0 logging-interceptor 4.9.3 https://mvnrepository.com/artifact/com.squareup.okhttp3/logging-interceptor/4.9.3, Apache 2.0 okio 3.6.0 https://mvnrepository.com/artifact/com.squareup.okio/okio/3.6.0, Apache 2.0 okio-jvm 3.6.0 https://repo1.maven.org/maven2/com/squareup/okio/okio-jvm/3.6.0, Apache 2.0 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/K8sPodPhaseConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/K8sPodPhaseConstants.java new file mode 100644 index 000000000000..66f8413e54fa --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/K8sPodPhaseConstants.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.enums; + +/** + * K8sPodPhaseConstants describes the lifecycle of a Pod. + */ +public class K8sPodPhaseConstants { + + public static final String SUCCEEDED = "Succeeded"; + public static final String RUNNING = "Running"; + public static final String PENDING = "Pending"; + public static final String FAILED = "Failed"; + public static final String UNKNOWN = "Unknown"; +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/K8sYamlType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/K8sYamlType.java new file mode 100644 index 000000000000..ce67673a6681 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/K8sYamlType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.enums; + +/** + * K8sYamlType defines Kubernetes YAML types. + */ +public enum K8sYamlType { + Pod, + + ; +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sOperation.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sOperation.java new file mode 100644 index 000000000000..38afb1507f8d --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sOperation.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.StatusDetails; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.dsl.LogWatch; + +/** + * AbstractK8sOperation defines Operation for User-customized YAML tasks + */ +public interface AbstractK8sOperation { + + int MAX_RETRY_TIMES = 3; + + /** + * Builds metadata for Kubernetes resource from user-customized YAML content string. + * + * @param yamlContentStr user-customized YAML content string. + * @return The {@link HasMetadata} object representing the metadata of the Kubernetes resource. + */ + HasMetadata buildMetadata(String yamlContentStr); + + /** + * Creates or replaces a resource in the Kubernetes cluster. + * + * @param metadata The {@link HasMetadata} object representing the metadata of the Kubernetes resource + * @param taskInstanceId task instance id + * @throws Exception if error occurred in creating or replacing a resource + */ + void createOrReplaceMetadata(HasMetadata metadata, int taskInstanceId) throws Exception; + + /** + * stop a resource in the kubernetes cluster + * + * @param metadata {@link HasMetadata} object representing the metadata of the Kubernetes resource + * @return a list of StatusDetails + * @throws Exception if error occurred in stopping a resource + */ + List stopMetadata(HasMetadata metadata) throws Exception; + + /** + * Gets the state of a Kubernetes resource. + * + * @param hasMetadata {@link HasMetadata} object representing the metadata. + * @return An integer representing the state of the Pod. + */ + int getState(HasMetadata hasMetadata); + + /** + * Creates a watch to monitor the state of the Kubernetes resource. + * + * @param countDownLatch A CountDownLatch that will be counted down when the Pod's state changes or an error occurs. + * @param taskResponse the response of the task. + * @param hasMetadata {@link HasMetadata} object representing the Kubernetes resource metadata. + * @param taskRequest Context information for the task, including task instance ID and process instance ID. + * @return A {@link Watch} object that monitors the specified Pod and triggers events based on the Pod's status. + */ + Watch createBatchWatcher(CountDownLatch countDownLatch, + TaskResponse taskResponse, HasMetadata hasMetadata, + TaskExecutionContext taskRequest); + + /** + * Creates a log watcher for a Pod. + * + * @param labelValue The unique label value to filter and identify the Pod. + * @param namespace The namespace where Pod locates. If the namespace is not specified a default namespace is used. + * @return A {@link LogWatch} object that allows watching the logs of the identified Pod. + * Returns null if no Pod is found or if the Pod is not in a state where logs can be watched. + */ + LogWatch getLogWatcher(String labelValue, String namespace); + + /** + * Sets the status of a task. + * + * @param jobStatus The status of the job defined in {@link TaskConstants}. + * @param taskResponse the response of the task. + */ + default void setTaskStatus(int jobStatus, TaskResponse taskResponse) { + if (jobStatus == TaskConstants.EXIT_CODE_SUCCESS || jobStatus == TaskConstants.EXIT_CODE_FAILURE) { + if (jobStatus == TaskConstants.EXIT_CODE_SUCCESS) { + taskResponse.setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); + } else { + taskResponse.setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + } + } + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java index f77170f4824c..1f4a4ebe2354 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java @@ -17,14 +17,18 @@ package org.apache.dolphinscheduler.plugin.task.api.k8s; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sYamlTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters; import java.util.Map; +import java.util.Objects; import lombok.extern.slf4j.Slf4j; @@ -42,7 +46,23 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask { */ protected AbstractK8sTask(TaskExecutionContext taskRequest) { super(taskRequest); - this.abstractK8sTaskExecutor = new K8sTaskExecutor(taskRequest); + String taskParams = taskRequest.getTaskParams(); + + K8sTaskParameters k8sTaskParameters; + try { + k8sTaskParameters = Objects.requireNonNull( + JSONUtils.parseObject(taskParams, K8sTaskParameters.class)); + // load k8s task executor according to k8s task type + if (k8sTaskParameters.getCustomConfig() == 0) { + // for low-code k8s Job, use `K8sTaskExecutor` + this.abstractK8sTaskExecutor = new K8sTaskExecutor(taskRequest); + } else { + // for user-customized k8s YAML task, use `K8sYamlTaskExecutor` + this.abstractK8sTaskExecutor = new K8sYamlTaskExecutor(taskRequest); + } + } catch (Exception e) { + throw new TaskException("Invalid k8s Task parameters"); + } } // todo split handle to submit and track @@ -75,7 +95,7 @@ public void trackApplicationStatus() throws TaskException { /** * cancel application * - * @throws Exception exception + * @throws TaskException exception may occur during canceling an app */ @Override public void cancelApplication() throws TaskException { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java index 8d3d2513af89..ca875dd3d094 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java @@ -48,6 +48,16 @@ public Map getTaskOutputParams() { return taskOutputParams; } + /** + * Executes a task based on the provided Kubernetes parameters. + * + *

This method processes the input parameter which can either be a custom configuration + * of type {@link K8sTaskMainParameters} or YAML content describing the Kubernetes job.

+ * + * @param k8sParameterStr a string of either user-customized YAML or K8sTaskMainParameters + * @return a {@link TaskResponse} object containing the result of the task execution. + * @throws Exception if an error occurs during task execution or while handling pod logs. + */ public abstract TaskResponse run(String k8sParameterStr) throws Exception; public abstract void cancelApplication(String k8sParameterStr); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sPodOperation.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sPodOperation.java new file mode 100644 index 000000000000..ead534140b38 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sPodOperation.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.YamlUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.K8sPodPhaseConstants; +import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sOperation; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import lombok.extern.slf4j.Slf4j; + +import com.fasterxml.jackson.core.type.TypeReference; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.StatusDetails; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.dsl.PodResource; + +/** + * K8sPodOperation defines user-customized k8s Pod tasks + */ +@Slf4j +public class K8sPodOperation implements AbstractK8sOperation { + + private final KubernetesClient client; + + public K8sPodOperation(KubernetesClient client) { + this.client = client; + } + + /** + * Builds metadata for a Pod resource from user-customized YAML content string. + * + * @param yamlContentStr user-customized YAML content string + * @return The {@link HasMetadata} object representing the metadata of the Pod resource + */ + @Override + public HasMetadata buildMetadata(String yamlContentStr) { + Pod pod = (Pod) K8sUtils.getOrDefaultNamespacedResource( + YamlUtils.load(yamlContentStr, new TypeReference() { + })); + return client.pods().resource(pod).get(); + } + + /** + * create or replace a pod in the kubernetes cluster + * + * @param resource {@link HasMetadata} object representing the metadata of {@link Pod} + * @param taskInstanceId task instance id + * @throws Exception if error occurred in creating or replacing a resource + */ + @Override + public void createOrReplaceMetadata(HasMetadata resource, int taskInstanceId) throws Exception { + synchronized (K8sUtils.class) { + log.info("[k8s-label-{}-{}] Enter createOrReplacePod for namespace `{}`", + resource.getMetadata().getName(), taskInstanceId, resource.getMetadata().getNamespace()); + Pod pod = (Pod) K8sUtils.getOrDefaultNamespacedResource(resource); + ObjectMeta podMetadata = pod.getMetadata(); + if (client + .pods() + .inNamespace(podMetadata.getNamespace()) + .withName(podMetadata.getName()) + .get() != null) { + stopMetadata(pod); + } + + Map podLabelsMap = podMetadata.getLabels(); + if (podLabelsMap == null) { + podLabelsMap = new HashMap(); + } + podLabelsMap.put(K8sYamlTaskExecutor.DS_LOG_WATCH_LABEL_NAME, + String.format("%s-%d", podMetadata.getName(), taskInstanceId)); + podMetadata.setLabels(podLabelsMap); + client.pods().resource(pod).createOrReplace(); + log.info("[k8s-label-{}-{}] Leave createOrReplacePod for namespace `{}`", + resource.getMetadata().getName(), taskInstanceId, resource.getMetadata().getNamespace()); + } + } + + /** + * Gets the state of a Pod based on its phase. + * + * @param hasMetadata {@link HasMetadata} object representing the metadata of {@link Pod} + * @return An integer representing the state of the Pod. + */ + @Override + public int getState(HasMetadata hasMetadata) { + Pod pod = (Pod) K8sUtils.getOrDefaultNamespacedResource(hasMetadata); + String currentPodPhase = pod.getStatus().getPhase(); + + if (K8sPodPhaseConstants.SUCCEEDED.equals(currentPodPhase)) { + return TaskConstants.EXIT_CODE_SUCCESS; + } else if (K8sPodPhaseConstants.FAILED.equals(currentPodPhase)) { + return TaskConstants.EXIT_CODE_FAILURE; + } else { + return TaskConstants.RUNNING_CODE; + } + } + + /** + * Creates a watch to monitor the state of the pod. + * + * @param countDownLatch A CountDownLatch that will be counted down when the Pod's state changes or an error occurs. + * @param taskResponse the status of the task. + * @param hasMetadata {@link HasMetadata} object representing the metadata of {@link Pod} + * @param taskRequest Context information for the task, including task instance ID and process instance ID. + * @return A {@link Watch} object that monitors the specified Pod and triggers events based on the Pod's status. + */ + @Override + public Watch createBatchWatcher(CountDownLatch countDownLatch, + TaskResponse taskResponse, HasMetadata hasMetadata, + TaskExecutionContext taskRequest) { + final int taskInstanceId = taskRequest.getTaskInstanceId(); + final int processInstanceId = taskRequest.getProcessInstanceId(); + + Watcher watcher = new Watcher() { + + @Override + public void eventReceived(Action action, Pod pod) { + try { + ObjectMeta podMetadata = pod.getMetadata(); + LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstanceId, taskInstanceId); + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + log.info("[k8s-label-{}-{}] event received: action: {}", podMetadata.getName(), taskInstanceId, + action); + if (action == Action.DELETED) { + log.info("[k8s-label-{}-{}] to be deleted in k8s", podMetadata.getName(), taskInstanceId); + taskResponse.setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + countDownLatch.countDown(); + } else if (action != Action.ADDED) { + int jobStatus = getState(pod); + log.info("[k8s-label-{}-{}] status {}", podMetadata.getName(), taskInstanceId, jobStatus); + if (jobStatus == TaskConstants.RUNNING_CODE) { + return; + } + setTaskStatus(jobStatus, taskResponse); + countDownLatch.countDown(); + } + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + } + + @Override + public void onClose(WatcherException e) { + LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstanceId, taskInstanceId); + log.error("[k8s-label-{}-{}] fail in k8s: {}", hasMetadata.getMetadata().getName(), taskInstanceId, + e.getMessage()); + taskResponse.setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + countDownLatch.countDown(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + }; + return client.pods().inNamespace(hasMetadata.getMetadata().getNamespace()) + .withName(hasMetadata.getMetadata().getName()) + .watch(watcher); + } + + /** + * Creates a log watcher for a Pod. + * + * @param labelValue The unique label value to filter and identify the Pod + * @param namespace The namespace where Pod locates. If the namespace is not specified a default namespace is used. + * @return A {@link LogWatch} object that allows watching the logs of the identified Pod. + * Returns null if no Pod is found or if the Pod is not in a state where logs can be watched. + */ + @Override + public LogWatch getLogWatcher(String labelValue, String namespace) { + namespace = K8sUtils.getOrDefaultNamespace(namespace); + boolean metadataIsReady = false; + Pod pod = null; + while (!metadataIsReady) { + FilterWatchListDeletable watchList = + getListenPod(labelValue, namespace); + List podList = watchList == null ? null : watchList.list().getItems(); + if (CollectionUtils.isEmpty(podList)) { + log.warn("[k8s-label-{}] no pod found in namespace `{}`", labelValue, namespace); + return null; + } + pod = podList.get(0); + String phase = pod.getStatus().getPhase(); + if (phase.equals(K8sPodPhaseConstants.PENDING) || phase.equals(K8sPodPhaseConstants.UNKNOWN)) { + ThreadUtils.sleep(TaskConstants.SLEEP_TIME_MILLIS); + } else { + log.info("[k8s-label-{}] Pod `{}` in namespace `{}` is Ready (Phase = {})", + labelValue, pod.getMetadata().getName(), namespace, phase); + metadataIsReady = true; + } + } + return client.pods().inNamespace(pod.getMetadata().getNamespace()) + .withName(pod.getMetadata().getName()) + .watchLog(); + } + + /** + * Stops a pod in the Kubernetes cluster. + * + * @param metadata {@link HasMetadata} object representing the metadata of {@link Pod} + * @return a list of StatusDetails + * @throws Exception if error occurred in stopping a resource + */ + @Override + public List stopMetadata(HasMetadata metadata) throws Exception { + Pod pod = (Pod) K8sUtils.getOrDefaultNamespacedResource(metadata); + String taskName = pod.getMetadata().getName(); + String namespace = pod.getMetadata().getNamespace(); + return client.pods().inNamespace(namespace).withName(taskName).delete(); + } + + /* + * get driver pod + */ + private FilterWatchListDeletable getListenPod(String labelValue, String namespace) { + namespace = K8sUtils.getOrDefaultNamespace(namespace); + List podList = null; + FilterWatchListDeletable watchList = null; + int retryTimes = 0; + while (CollectionUtils.isEmpty(podList) && retryTimes < AbstractK8sOperation.MAX_RETRY_TIMES) { + watchList = client.pods() + .inNamespace(namespace) + .withLabel(K8sYamlTaskExecutor.DS_LOG_WATCH_LABEL_NAME, labelValue); + podList = watchList.list().getItems(); + if (!CollectionUtils.isEmpty(podList)) { + break; + } + log.info("[k8s-label-{}] Failed to get driver pod, retry in {}ms", + labelValue, TaskConstants.SLEEP_TIME_MILLIS); + ThreadUtils.sleep(TaskConstants.SLEEP_TIME_MILLIS); + retryTimes += 1; + } + + return watchList; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 986c9dc8a7a4..6ae1b208efce 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -291,6 +291,15 @@ private void parsePodLogOutput() { collectPodLogExecutorService.shutdown(); } + /** + * Executes a task based on the provided Kubernetes parameters. + * + *

This method processes the input parameter of type {@link K8sTaskMainParameters}.

+ * + * @param k8sParameterStr a string of K8sTaskMainParameters::toString + * @return a {@link TaskResponse} object containing the result of the task execution. + * @throws Exception if an error occurs during task execution or while handling pod logs. + */ @Override public TaskResponse run(String k8sParameterStr) throws Exception { TaskResponse result = new TaskResponse(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sYamlTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sYamlTaskExecutor.java new file mode 100644 index 000000000000..b8cfb94a2ae1 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sYamlTaskExecutor.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.YamlUtils; +import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.K8sYamlType; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sOperation; +import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser; +import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; + +import org.apache.commons.lang3.StringUtils; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +import com.fasterxml.jackson.core.type.TypeReference; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.dsl.LogWatch; + +/** + * K8sYamlTaskExecutor submits customized YAML k8s task to Kubernetes + */ +@Slf4j +public class K8sYamlTaskExecutor extends AbstractK8sTaskExecutor { + + // resource metadata parsed from user-customized YAML + private HasMetadata metadata; + + // type of metadata, used to generate operation + private K8sYamlType k8sYamlType; + + // k8s operation, generated based on `k8sYamlType` + private AbstractK8sOperation abstractK8sOperation; + + protected boolean podLogOutputIsFinished = false; + protected Future podLogOutputFuture; + + // k8s pod label name to collect pod log + public static final String DS_LOG_WATCH_LABEL_NAME = "ds-log-watch-label"; + + public K8sYamlTaskExecutor(TaskExecutionContext taskRequest) { + super(taskRequest); + } + + /** + * Executes a task based on the provided Kubernetes parameters. + * + *

This method processes the YAML content describing the Kubernetes job.

+ * + * @param yamlContentString a string of user-customized YAML + * @return a {@link TaskResponse} object containing the result of the task execution. + * @throws Exception if an error occurs during task execution or while handling pod logs. + */ + @Override + public TaskResponse run(String yamlContentString) throws Exception { + TaskResponse result = new TaskResponse(); + int taskInstanceId = taskRequest.getTaskInstanceId(); + try { + if (StringUtils.isEmpty(yamlContentString)) { + return result; + } + + K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext(); + k8sUtils.buildClient(k8sTaskExecutionContext.getConfigYaml()); + + // parse user-customized YAML string + metadata = K8sUtils.getOrDefaultNamespacedResource( + YamlUtils.load(yamlContentString, new TypeReference() { + })); + + k8sYamlType = K8sYamlType.valueOf(this.metadata.getKind()); + generateOperation(); + + submitJob2k8s(yamlContentString); + parseLogOutput(metadata); + registerBatchK8sYamlTaskWatcher(String.valueOf(taskInstanceId), result); + + if (podLogOutputFuture != null) { + try { + // Wait kubernetes pod log collection finished + podLogOutputFuture.get(); + log.info("[K8sYamlTaskExecutor-label-{}-{}] pod log collected successfully", + metadata.getMetadata().getName(), taskInstanceId); + } catch (ExecutionException e) { + log.error("[K8sYamlTaskExecutor-label-{}-{}] Handle pod log error", + metadata.getMetadata().getName(), taskInstanceId, e); + } + } + } catch (Exception e) { + cancelApplication(yamlContentString); + Thread.currentThread().interrupt(); + result.setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + throw e; + } + return result; + } + + @Override + public void cancelApplication(String yamlContentStr) { + if (metadata != null) { + stopJobOnK8s(yamlContentStr); + final String taskName = metadata.getMetadata().getName(); + final int taskInstanceId = taskRequest.getTaskInstanceId(); + log.info("[K8sYamlTaskExecutor-label-{}-{}] K8s task canceled", taskName, taskInstanceId); + } + } + + @Override + public void submitJob2k8s(String yamlContentString) { + final String taskName = metadata.getMetadata().getName(); + final int taskInstanceId = taskRequest.getTaskInstanceId(); + try { + abstractK8sOperation.createOrReplaceMetadata(metadata, taskInstanceId); + log.info("[K8sYamlTaskExecutor-label-{}-{}] K8s task submitted successfully", taskName, taskInstanceId); + } catch (Exception e) { + log.error("[K8sYamlTaskExecutor-label-{}-{}] failed to submit job", taskName, taskInstanceId); + e.printStackTrace(); + throw new TaskException("K8sYamlTaskExecutor failed to submit job", e); + } + } + + @Override + public void stopJobOnK8s(String k8sParameterStr) { + try { + abstractK8sOperation.stopMetadata(this.metadata); + } catch (Exception e) { + String taskName = this.metadata.getMetadata().getName(); + String taskNamespace = this.metadata.getMetadata().getNamespace(); + log.error("[K8sYamlTaskExecutor-label-{}] fail to stop job in namespace {}", taskName, taskNamespace); + throw new TaskException("K8sYamlTaskExecutor fail to stop job", e); + } + } + + /** + * Generates the Kubernetes operation based on the Kubernetes YAML type. + */ + private void generateOperation() { + switch (k8sYamlType) { + case Pod: + abstractK8sOperation = new K8sPodOperation(k8sUtils.getClient()); + break; + default: + throw new TaskException( + String.format("K8sYamlTaskExecutor do not support type %s", k8sYamlType.name())); + } + } + + public void registerBatchK8sYamlTaskWatcher(String taskInstanceId, TaskResponse taskResponse) { + final String taskName = metadata.getMetadata().getName(); + final String taskNamespace = metadata.getMetadata().getNamespace(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + try ( + Watch watch = + abstractK8sOperation.createBatchWatcher(countDownLatch, taskResponse, metadata, taskRequest)) { + boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED + || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; + if (timeoutFlag) { + Boolean timeout = !(countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS)); + waitTimeout(timeout); + } else { + countDownLatch.await(); + } + } catch (InterruptedException e) { + log.error("[K8sYamlTaskExecutor-label-{}-{}] failed in namespace `{}`: {}", + taskName, taskInstanceId, taskNamespace, e.getMessage(), e); + Thread.currentThread().interrupt(); + taskResponse.setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + } catch (Exception e) { + log.error("[K8sYamlTaskExecutor-label-{}-{}] failed in namespace `{}`: {}", + taskName, taskInstanceId, taskNamespace, e.getMessage(), e); + e.printStackTrace(); + taskResponse.setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + } + } + + private void parseLogOutput(HasMetadata resource) { + ObjectMeta resourceMetadata = resource.getMetadata(); + final int taskInstanceId = taskRequest.getTaskInstanceId(); + final int workflowInstanceId = taskRequest.getProcessInstanceId(); + final String taskName = resourceMetadata.getName().toLowerCase(Locale.ROOT); + final String namespace = resourceMetadata.getNamespace(); + final String labelPodLogWatch = String.format("%s-%d", taskName, taskInstanceId); + + ExecutorService collectPodLogExecutorService = ThreadUtils + .newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + taskName); + + podLogOutputFuture = collectPodLogExecutorService.submit(() -> { + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); + LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId); + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + + try (LogWatch watcher = abstractK8sOperation.getLogWatcher(labelPodLogWatch, namespace)) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { + String line; + while ((line = reader.readLine()) != null) { + log.info("[k8s-label-{}-pod-log] {}", labelPodLogWatch, line); + taskOutputParameterParser.appendParseLog(line); + } + } catch (Exception e) { + log.error("[k8s-label-{}-pod-log] failed to open BufferedReader on LogWatch", labelPodLogWatch); + e.printStackTrace(); + throw new RuntimeException("failed to open LogWatch", e); + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + podLogOutputIsFinished = true; + } + taskOutputParams = taskOutputParameterParser.getTaskOutputParams(); + log.info("[k8s-label-{}-result] ----------BEGIN K8S POD RESULT----------", labelPodLogWatch); + for (Map.Entry entry : taskOutputParams.entrySet()) { + log.info("[k8s-label-{}-result] (key, value) = ('{}', '{}')", + labelPodLogWatch, entry.getKey(), entry.getValue()); + } + log.info("[k8s-label-{}-result] ----------END K8S POD RESULT----------", labelPodLogWatch); + }); + + collectPodLogExecutorService.shutdown(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java index 4f045abe1907..c31ca9ae816f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java @@ -51,9 +51,22 @@ public class K8sTaskParameters extends AbstractParameters { private String kubeConfig; private int datasource; private String type; + // whether the YAML task is custom-configured(1) for Kubernetes or not(0) + private int customConfig; + // the YAML file content string, if `customConfig` == 1 + private String yamlContent; + @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(image); + if (customConfig == 0) { + // for low-code k8s Job + return StringUtils.isNotEmpty(image); + } else if (customConfig == 1) { + // for user-customized k8s YAML task + return StringUtils.isNotBlank(yamlContent); + } + // for `customConfig` invalid or unsupported + return false; } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java index a96f3ebb010e..ea4bcdbe0e68 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java @@ -21,9 +21,14 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.commons.lang3.StringUtils; + import java.util.List; +import lombok.Data; import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.Config; @@ -33,10 +38,39 @@ import io.fabric8.kubernetes.client.Watcher; @Slf4j +@Data public class K8sUtils { private KubernetesClient client; + private static final String K8S_NAMESPACE_DEFAULT = "default"; + + /** + * get the original namespace, or default namespace defined in `K8sUtils.K8S_NAMESPACE_DEFAULT` + * @param namespace the namespace to be inspected + * @return the original namespace if it is valid; otherwise, return default namespace + */ + public static String getOrDefaultNamespace(String namespace) { + return StringUtils.isBlank(namespace) ? K8S_NAMESPACE_DEFAULT : namespace; + } + + /** + * get the original resource, or the one with default namespace + * @param resource the resource to be inspected + * @return the original resource, or the default namespaced one + * @throws TaskException if resource is null + */ + public static HasMetadata getOrDefaultNamespacedResource(HasMetadata resource) throws TaskException { + if (resource == null) + throw new TaskException("failed to process k8s resource with null parameter"); + ObjectMeta metadata = resource.getMetadata(); + if (StringUtils.isBlank(metadata.getNamespace())) { + metadata.setNamespace(K8S_NAMESPACE_DEFAULT); + resource.setMetadata(metadata); + } + return resource; + } + public void createJob(String namespace, Job job) { try { client.batch() @@ -104,7 +138,13 @@ public String getPodLog(String jobName, String namespace) { return null; } - public void buildClient(String configYaml) { + /** + * Builds a Kubernetes API client using a kubeConfig YAML string. + * + * @param configYaml a YAML string containing the Kubernetes configuration + * @throws TaskException if there is an error building the Kubernetes client + */ + public void buildClient(String configYaml) throws TaskException { try { Config config = Config.fromKubeconfig(configYaml); client = new KubernetesClientBuilder().withConfig(config).build(); @@ -113,4 +153,15 @@ public void buildClient(String configYaml) { } } + /** + * Retrieves the Kubernetes client instance. + * + * @return The current KubernetesClient instance. + */ + public KubernetesClient getClient() { + if (client == null) + throw new TaskException("failed to get k8s ApiClient, since it has not yet been initialized"); + return client; + } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sPodOperationTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sPodOperationTest.java new file mode 100644 index 000000000000..54e051058c1f --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sPodOperationTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; + +import org.apache.dolphinscheduler.common.utils.YamlUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.K8sPodPhaseConstants; +import org.apache.dolphinscheduler.plugin.task.api.enums.K8sYamlType; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; + +import lombok.SneakyThrows; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import com.fasterxml.jackson.core.type.TypeReference; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatus; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.PodResource; + +public class K8sPodOperationTest { + + private static final KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + private static final K8sPodOperation k8sPodOperation = new K8sPodOperation(mockClient); + + private static final String simplePodYaml = "apiVersion: v1\n" + + "kind: Pod\n" + + "metadata:\n" + + " name: hello-mock-world\n" + + " namespace: default\n" + + "spec:\n" + + " containers:\n" + + " - name: hello-world-container\n" + + " image: hello-world\n"; + + private static final Pod pod = Objects.requireNonNull( + YamlUtils.load(simplePodYaml, new TypeReference() { + })); + + private static final String TEST_POD_NAME = pod.getMetadata().getName(); + private static final String TEST_NAMESPACE = K8sUtils.getOrDefaultNamespace(pod.getMetadata().getNamespace()); + private static final int taskInstanceId = 1000; + + @BeforeAll + public static void init() { + + PodStatus mockStatus = new PodStatus(); + pod.setStatus(mockStatus); + + // BEGIN Mockito Stub for chain: client.pods().resource(anyPodResource).* + MixedOperation mockPodsOperations = Mockito.mock(MixedOperation.class); + Mockito.when(mockClient.pods()).thenReturn(mockPodsOperations); + PodResource mockPodResource = Mockito.mock(PodResource.class); + Mockito.when(mockPodsOperations.resource(Mockito.any())).thenReturn(mockPodResource); + + // Mockito Stub for chain: client.pods().resource(anyPodResource).get() => Pod + Mockito.when(mockPodResource.get()).thenReturn(pod); + + // Mockito Stub for chain: client.pods().resource(anyPodResource).createOrReplace() => Pod + Mockito.when(mockPodResource.createOrReplace()).thenReturn(pod); + + // END Mockito Stub for chain: client.pods().resource(anyPodResource).* + + // Mockito Stub for chain: client.pods().inNamespace(anyNamespace).withLabel(anyKey, anyValue) => List + NonNamespaceOperation mockNonNamespaceOperation = + Mockito.mock(NonNamespaceOperation.class); + FilterWatchListDeletable mockWatchList = + Mockito.mock(FilterWatchListDeletable.class); + PodList mockPodListEntity = Mockito.mock(PodList.class); + Mockito.when(mockWatchList.list()).thenReturn(mockPodListEntity); + List mockPodList = new ArrayList(); + mockPodList.add(pod); + Mockito.when(mockPodListEntity.getItems()).thenReturn(mockPodList); + Mockito.when(mockNonNamespaceOperation.withLabel(Mockito.anyString(), Mockito.anyString())) + .thenReturn(mockWatchList); + + // BEGIN Mockito Stub for chain: client.pods().inNamespace(anyNamespace).withName(anyName).* + Mockito.when(mockPodsOperations.inNamespace(Mockito.anyString())).thenReturn(mockNonNamespaceOperation); + Mockito.when(mockNonNamespaceOperation.withName(Mockito.anyString())).thenReturn(mockPodResource); + + // Mockito Stub for chain: client.pods().inNamespace(anyNamespace).withName(anyName).watchLog() + LogWatch mockLogWatch = Mockito.mock(LogWatch.class); + Mockito.when(mockPodResource.watchLog()).thenReturn(mockLogWatch); + + // Mockito Stub for chain: client.pods().inNamespace(anyNamespace).withName(anyName).watch(anyWatcher) + Watch mockWatch = Mockito.mock(Watch.class); + Mockito.when(mockPodResource.watch(Mockito.any())).thenReturn(mockWatch); + + // Mockito Stub for chain: client.pods().inNamespace(anyNamespace).withName(anyName).delete() + List mockDeletionStatusDetails = Mockito.mock(List.class); + Mockito.when(mockPodResource.delete()).thenReturn(mockDeletionStatusDetails); + + // END Mockito Stub for chain: client.pods().inNamespace(anyNamespace).withName(anyName).* + } + + @Test + public void testBuildMetadata() { + Assertions.assertThrows(TaskException.class, () -> k8sPodOperation.buildMetadata(null)); + Assertions.assertThrows(TaskException.class, () -> k8sPodOperation.buildMetadata("")); + Assertions.assertNotNull(k8sPodOperation.buildMetadata(simplePodYaml)); + Assertions.assertEquals(K8sYamlType.Pod, K8sYamlType.valueOf(pod.getKind())); + } + + @Test + public void testCreateOrReplacePod() { + // Mockito Stub for chain: client.pods().inNamespace(anyNamespace).withName(anyName).get() => null (just create) + Mockito.when(mockClient.pods().inNamespace(TEST_NAMESPACE).withName(TEST_POD_NAME).get()).thenReturn(null); + Assertions.assertDoesNotThrow(() -> k8sPodOperation.createOrReplaceMetadata(pod, taskInstanceId)); + + // Mockito Stub for chain: client.pods().inNamespace(anyNamespace).withName(anyName).get() => pod (deletion) + Mockito.when(mockClient.pods().inNamespace(TEST_NAMESPACE).withName(TEST_POD_NAME).get()).thenReturn(pod); + Assertions.assertDoesNotThrow(() -> k8sPodOperation.createOrReplaceMetadata(pod, taskInstanceId)); + } + + @Test + public void testGetState() { + PodStatus mockStatus = pod.getStatus(); + mockStatus.setPhase(K8sPodPhaseConstants.SUCCEEDED); + Assertions.assertEquals(TaskConstants.EXIT_CODE_SUCCESS, k8sPodOperation.getState(pod)); + mockStatus.setPhase(K8sPodPhaseConstants.FAILED); + Assertions.assertEquals(TaskConstants.EXIT_CODE_FAILURE, k8sPodOperation.getState(pod)); + mockStatus.setPhase(K8sPodPhaseConstants.PENDING); + Assertions.assertEquals(TaskConstants.RUNNING_CODE, k8sPodOperation.getState(pod)); + } + + @Test + public void testCreateBatchWatcher() { + CountDownLatch countDownLatch = Mockito.mock(CountDownLatch.class); + TaskResponse taskResponse = Mockito.mock(TaskResponse.class); + TaskExecutionContext taskRequest = Mockito.mock(TaskExecutionContext.class); + Mockito.when(taskRequest.getTaskInstanceId()).thenReturn(1000); + Mockito.when(taskRequest.getProcessInstanceId()).thenReturn(2000); + Assertions.assertNotNull(k8sPodOperation.createBatchWatcher(countDownLatch, taskResponse, pod, taskRequest)); + } + + @Test + public void testGetLogWatcher() { + PodStatus mockPodStatus = pod.getStatus(); + mockPodStatus.setPhase(K8sPodPhaseConstants.SUCCEEDED); + Assertions.assertNotNull(k8sPodOperation.getLogWatcher("", "")); + } + + @Test + @SneakyThrows + public void testStopMetadata() { + Assertions.assertNotNull(k8sPodOperation.stopMetadata(pod)); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtilsTest.java new file mode 100644 index 000000000000..cda8dc194b3a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtilsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import org.apache.dolphinscheduler.common.utils.YamlUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.util.Objects; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; + +public class K8sUtilsTest { + + private static final Logger logger = LoggerFactory.getLogger(K8sUtilsTest.class); + + @Test + public void testGetOrDefaultNamespace() { + String namespace; + namespace = K8sUtils.getOrDefaultNamespace(null); + Assertions.assertTrue(StringUtils.isNotBlank(namespace)); + namespace = K8sUtils.getOrDefaultNamespace(""); + Assertions.assertTrue(StringUtils.isNotBlank(namespace)); + namespace = K8sUtils.getOrDefaultNamespace(" "); + Assertions.assertTrue(StringUtils.isNotBlank(namespace)); + namespace = "my-namespace-name"; + Assertions.assertEquals(namespace, K8sUtils.getOrDefaultNamespace(namespace)); + } + + @Test + public void testGetOrDefaultNamespacedResource() { + Assertions.assertThrows(TaskException.class, () -> K8sUtils.getOrDefaultNamespacedResource(null)); + + // Load Pod without namespace from YAML file + String filePathRelative = "k8s-yaml/hello-world-without-namespace.yaml"; + String filePathAbsolute = Objects.requireNonNull( + getClass().getClassLoader().getResource(filePathRelative)).getFile(); + Pod podNoNamespace = YamlUtils.load(new File(filePathAbsolute), new TypeReference() { + }); + + // for pod without namespace, assign a default namespace, e.g., "default", see `K8sUtils.K8S_NAMESPACE_DEFAULT` + Pod podWithNamespace = (Pod) K8sUtils.getOrDefaultNamespacedResource(podNoNamespace); + Assertions.assertTrue(StringUtils.isNotBlank(getNamespace(podWithNamespace))); + + // for pod with blank namespace, also assign a default namespace + setNamespace(podNoNamespace, " "); + Pod podWithBlankNamespaceFix = (Pod) K8sUtils.getOrDefaultNamespacedResource(podNoNamespace); + Assertions.assertTrue(StringUtils.isNotBlank(getNamespace(podWithBlankNamespaceFix))); + + // for a valid namespace, just keep it as it was + setNamespace(podWithNamespace, "my-namespace"); + Pod podWithValidNamespaceChecked = (Pod) K8sUtils.getOrDefaultNamespacedResource(podWithNamespace); + Assertions.assertEquals(getNamespace(podWithNamespace), getNamespace(podWithValidNamespaceChecked)); + } + + @Test + public void testGetClient() { + // for uninitialized K8sUtils, getClient throws TaskException + K8sUtils k8sUtilsUninitialized = new K8sUtils(); + Assertions.assertThrows(TaskException.class, k8sUtilsUninitialized::getClient); + + // Mockito Stub + K8sUtils k8sUtilsMockitoMocked = Mockito.mock(K8sUtils.class); + KubernetesClient kubernetesClientMocked = Mockito.mock(KubernetesClient.class); + Mockito.doNothing().when(k8sUtilsMockitoMocked).buildClient(Mockito.anyString()); + Mockito.when(k8sUtilsMockitoMocked.getClient()).thenReturn(kubernetesClientMocked); + + // for initialized with K8sUtils::buildClient, return non-null client + String yamlK8sClientMocked = ""; + k8sUtilsMockitoMocked.buildClient(yamlK8sClientMocked); + Assertions.assertNotNull(k8sUtilsMockitoMocked.getClient()); + + Mockito.verify(k8sUtilsMockitoMocked).buildClient(Mockito.anyString()); + Mockito.verify(k8sUtilsMockitoMocked).getClient(); + } + + private void setNamespace(Pod pod, String namespace) { + ObjectMeta podMetadata = Objects.requireNonNull(pod).getMetadata(); + podMetadata.setNamespace(namespace); + pod.setMetadata(podMetadata); + } + + private String getNamespace(Pod pod) { + return Objects.requireNonNull(pod).getMetadata().getNamespace(); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/k8s-yaml/hello-world-without-namespace.yaml b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/k8s-yaml/hello-world-without-namespace.yaml new file mode 100644 index 000000000000..e16f9a6eed25 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/k8s-yaml/hello-world-without-namespace.yaml @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: v1 +kind: Pod +metadata: + name: hello-world +spec: + containers: + - name: hello-world-container + image: busybox + command: + - "sh" + - "-c" + - "echo 'Hello, World!' && echo '#{setValue(name=hello world)}' && echo '#{setValue(type=custom yaml pod)}' && echo '#{setValue(testVersion=1)}'" + restartPolicy: Never diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java index fdb39d7c288f..5091868f62f7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java @@ -21,13 +21,18 @@ import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sConnectionParam; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTask; +import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters; +import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sYamlTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.model.Label; import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; @@ -56,6 +61,9 @@ public class K8sTask extends AbstractK8sTask { private K8sTaskExecutionContext k8sTaskExecutionContext; private K8sConnectionParam k8sConnectionParam; + + private AbstractK8sTaskExecutor abstractK8sTaskExecutor; + public K8sTask(TaskExecutionContext taskRequest) { super(taskRequest); this.taskExecutionContext = taskRequest; @@ -81,6 +89,14 @@ public void init() { k8sTaskExecutionContext.setConfigYaml(kubeConfig); taskRequest.setK8sTaskExecutionContext(k8sTaskExecutionContext); log.info("Initialize k8s task params:{}", JSONUtils.toPrettyJsonString(k8sTaskParameters)); + + if (k8sTaskParameters.getCustomConfig() == 0) { + // low-code k8s Job + this.abstractK8sTaskExecutor = new K8sTaskExecutor(taskRequest); + } else { + // user-customized k8s YAML task + this.abstractK8sTaskExecutor = new K8sYamlTaskExecutor(taskRequest); + } } @Override @@ -93,6 +109,27 @@ public AbstractParameters getParameters() { return k8sTaskParameters; } + @Override + public void handle(TaskCallBack taskCallBack) throws TaskException { + try { + TaskResponse response; + if (k8sTaskParameters.getCustomConfig() == 0) { + // low-code k8s Job + response = abstractK8sTaskExecutor.run(buildCommand()); + } else { + // k8s customized YAML task + response = abstractK8sTaskExecutor.run(k8sTaskParameters.getYamlContent()); + } + setExitStatusCode(response.getExitStatusCode()); + setAppIds(response.getAppIds()); + dealOutParam(abstractK8sTaskExecutor.getTaskOutputParams()); + } catch (Exception e) { + log.error("k8s task submit failed with error"); + exitStatusCode = -1; + throw new TaskException("Execute k8s task error", e); + } + } + @Override protected String buildCommand() { K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java index 543ec3e4bc04..3063f6138187 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java @@ -80,4 +80,28 @@ public void testK8sParameters() { Assertions.assertEquals(nodeSelectorExpressions, k8sTaskParameters.getNodeSelectors()); } + @Test + public void testCustomConfigCheckParameters() { + // by default, low-code mode for `k8sTaskParameters`: `customConfig` == 0 && `image` is not empty + Assertions.assertTrue(k8sTaskParameters.checkParameters()); + + // check for user-customized YAML mode + k8sTaskParameters.setCustomConfig(1); + k8sTaskParameters.setYamlContent(null); + Assertions.assertFalse(k8sTaskParameters.checkParameters()); + k8sTaskParameters.setYamlContent(""); + Assertions.assertFalse(k8sTaskParameters.checkParameters()); + k8sTaskParameters.setYamlContent(" "); + Assertions.assertFalse(k8sTaskParameters.checkParameters()); + k8sTaskParameters.setYamlContent(""); + Assertions.assertTrue(k8sTaskParameters.checkParameters()); + + // check for invalid customConfig + k8sTaskParameters.setCustomConfig(3); + Assertions.assertFalse(k8sTaskParameters.checkParameters()); + + // restore the default low-code mode + k8sTaskParameters.setCustomConfig(0); + } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java index 3895190cf21e..6e7162d6f9e0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java @@ -145,7 +145,7 @@ public void testBuildCommandNormal() { @Test public void testGetParametersNormal() { String expectedStr = - "K8sTaskParameters(image=ds-dev, namespace=namespace, command=[\"/bin/bash\", \"-c\"], args=[\"echo hello world\"], pullSecret=ds-secret, imagePullPolicy=IfNotPresent, minCpuCores=2.0, minMemorySpace=10.0, customizedLabels=[Label(label=test, value=1234)], nodeSelectors=[NodeSelectorExpression(key=node-label, operator=In, values=1234,12345)], kubeConfig={}, datasource=0, type=K8S)"; + "K8sTaskParameters(image=ds-dev, namespace=namespace, command=[\"/bin/bash\", \"-c\"], args=[\"echo hello world\"], pullSecret=ds-secret, imagePullPolicy=IfNotPresent, minCpuCores=2.0, minMemorySpace=10.0, customizedLabels=[Label(label=test, value=1234)], nodeSelectors=[NodeSelectorExpression(key=node-label, operator=In, values=1234,12345)], kubeConfig={}, datasource=0, type=K8S, customConfig=0, yamlContent=null)"; String result = k8sTask.getParameters().toString(); Assertions.assertEquals(expectedStr, result); } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index a04c9084f2ab..83048df77ac2 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -619,6 +619,9 @@ export default { switch_branch_flow_tips: 'Please select branch flow', and: 'and', or: 'or', + k8s_custom_template: 'Custom Template', + k8s_yaml_template: 'YAML', + k8s_yaml_empty_tips: 'The YAML can not be empty.', datax_custom_template: 'Custom Template', datax_json_template: 'JSON', datax_target_datasource_type: 'Target Datasource Types', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index eee20454463b..365e40deb233 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -605,6 +605,9 @@ export default { switch_branch_flow_tips: '请选择分支流转', and: '且', or: '或', + k8s_custom_template: '自定义模板', + k8s_yaml_template: 'YAML', + k8s_yaml_empty_tips: 'YAML不能为空', datax_custom_template: '自定义模板', datax_json_template: 'JSON', datax_target_datasource_type: '目标源类型', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts index 8d2a70246464..1d9e81152cb3 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts @@ -17,15 +17,84 @@ import { useCustomParams, useCustomLabels, useNodeSelectors } from '.' import type { IJsonItem } from '../types' import { useI18n } from 'vue-i18n' +import { onMounted, ref, watch } from 'vue' export function useK8s(model: { [field: string]: any }): IJsonItem[] { const { t } = useI18n() + // default width for + const yamlEditorSpan = ref(0) + const nodeSelectorSpan = ref(24) + const customLabelsSpan = ref(24) + const inputCommandSpan = ref(24) + const inputArgsSpan = ref(24) + const inputPullSecretSpan = ref(24) + const inputImageSpan = ref(17) + const selectImagePullPolicySpan = ref(7) + const inputNumberMinCpuCoresSpan = ref(12) + const inputNumberMinMemorySpace = ref(12) + const localParamsSpan = ref(24) + + const initConstants = () => { + if (model.customConfig) { + // when user selects 'Custom Template' option, display yamlEditor and hide low-code fields + yamlEditorSpan.value = 24 + nodeSelectorSpan.value = 0 + customLabelsSpan.value = 0 + inputCommandSpan.value = 0 + inputArgsSpan.value = 0 + inputPullSecretSpan.value = 0 + inputImageSpan.value = 0 + selectImagePullPolicySpan.value = 0 + inputNumberMinCpuCoresSpan.value = 0 + inputNumberMinMemorySpace.value = 0 + localParamsSpan.value = 0 + } else { + yamlEditorSpan.value = 0 + nodeSelectorSpan.value = 24 + customLabelsSpan.value = 24 + inputCommandSpan.value = 24 + inputArgsSpan.value = 24 + inputPullSecretSpan.value = 24 + inputImageSpan.value = 17 + selectImagePullPolicySpan.value = 7 + inputNumberMinCpuCoresSpan.value = 12 + inputNumberMinMemorySpace.value = 12 + localParamsSpan.value = 24 + } + } + + onMounted(() => { + initConstants() + }) + watch( + () => model.customConfig, + () => { + initConstants() + } + ) + return [ + { + type: 'switch', + field: 'customConfig', + name: t('project.node.k8s_custom_template') + }, + { + type: 'editor', + field: 'yamlContent', + name: t('project.node.k8s_yaml_template'), + span: yamlEditorSpan, + validate: { + trigger: ['input', 'trigger'], + required: true, + message: t('project.node.k8s_yaml_empty_tips') + } + }, { type: 'input-number', field: 'minCpuCores', - span: 12, + span: inputNumberMinCpuCoresSpan, props: { min: 0 }, @@ -37,7 +106,7 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] { { type: 'input-number', field: 'minMemorySpace', - span: 12, + span: inputNumberMinMemorySpace, props: { min: 0 }, @@ -50,7 +119,7 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] { type: 'input', field: 'image', name: t('project.node.image'), - span: 18, + span: inputImageSpan, props: { placeholder: t('project.node.image_tips') }, @@ -64,7 +133,7 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] { type: 'select', field: 'imagePullPolicy', name: t('project.node.image_pull_policy'), - span: 6, + span: selectImagePullPolicySpan, options: IMAGE_PULL_POLICY_LIST, validate: { trigger: ['input', 'blur'], @@ -77,6 +146,7 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] { type: 'input', field: 'pullSecret', name: t('project.node.pull_secret'), + span: inputPullSecretSpan, props: { placeholder: t('project.node.pull_secret_tips') } @@ -85,6 +155,7 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] { type: 'input', field: 'command', name: t('project.node.command'), + span: inputCommandSpan, props: { placeholder: t('project.node.command_tips') } @@ -93,6 +164,7 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] { type: 'input', field: 'args', name: t('project.node.args'), + span: inputArgsSpan, props: { placeholder: t('project.node.args_tips') } @@ -100,14 +172,21 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] { ...useCustomLabels({ model, field: 'customizedLabels', - name: 'custom_labels' + name: 'custom_labels', + span: customLabelsSpan }), ...useNodeSelectors({ model, field: 'nodeSelectors', - name: 'node_selectors' + name: 'node_selectors', + span: nodeSelectorSpan }), - ...useCustomParams({ model, field: 'localParams', isSimple: false }) + ...useCustomParams({ + model, + field: 'localParams', + isSimple: false, + span: localParamsSpan + }) ] } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 56dc9b1dfcca..a0c551860a73 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -351,19 +351,24 @@ export function formatParams(data: INodeData): { } if (data.taskType === 'K8S') { + taskParams.customConfig = data.customConfig ? 1 : 0 taskParams.namespace = data.namespace - taskParams.minCpuCores = data.minCpuCores - taskParams.minMemorySpace = data.minMemorySpace - taskParams.image = data.image - taskParams.imagePullPolicy = data.imagePullPolicy - taskParams.command = data.command - taskParams.args = data.args - taskParams.customizedLabels = data.customizedLabels - taskParams.nodeSelectors = data.nodeSelectors - taskParams.datasource = data.datasource taskParams.type = data.type taskParams.kubeConfig = data.kubeConfig - taskParams.pullSecret = data.pullSecret + taskParams.datasource = data.datasource + if (taskParams.customConfig === 0) { + taskParams.minCpuCores = data.minCpuCores + taskParams.minMemorySpace = data.minMemorySpace + taskParams.image = data.image + taskParams.imagePullPolicy = data.imagePullPolicy + taskParams.command = data.command + taskParams.args = data.args + taskParams.customizedLabels = data.customizedLabels + taskParams.nodeSelectors = data.nodeSelectors + taskParams.pullSecret = data.pullSecret + } else { + taskParams.yamlContent = data.yamlContent + } } if (data.taskType === 'JUPYTER') { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts index e20a506df6de..55183c548b64 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts @@ -50,7 +50,9 @@ export function useK8s({ displayRows: 10, timeoutNotifyStrategy: ['WARN'], kubeConfig: '', - namespace: '' + namespace: '', + customConfig: false, + yamlContent: '' } as INodeData) return { diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index c6780804d4f1..57c1eb093f91 100644 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -113,7 +113,7 @@ jackson-core-2.13.4.jar jackson-core-asl-1.9.13.jar jackson-databind-2.13.4.jar jackson-dataformat-cbor-2.13.3.jar -jackson-dataformat-yaml-2.13.3.jar +jackson-dataformat-yaml-2.13.4.jar jackson-datatype-jdk8-2.13.3.jar jackson-datatype-jsr310-2.13.4.jar jackson-jaxrs-base-2.13.3.jar