Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-15603][API] When removing or modifying a workflow the system can check if there any tasks depend on it. #15681

Merged
merged 19 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ProjectWorkerGroupController extends BaseController {
@ @RequestParam(value = "workerGroups", required = false) String workerGroups
* @return create result code
*/
@Operation(summary = "assignWorkerGroups", description = "CREATE_PROCESS_DEFINITION_NOTES")
@Operation(summary = "assignWorkerGroups", description = "ASSIGN_WORKER_GROUPS_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", schema = @Schema(implementation = long.class, example = "123456")),
@Parameter(name = "workerGroups", description = "WORKER_GROUP_LIST", schema = @Schema(implementation = List.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,20 @@
putMsg(result, Status.SUCCESS);
return result;
}

@Operation(summary = "queryDownstreamDependentTaskList", description = "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES")
@Parameters({
@Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = Long.class)),
@Parameter(name = "taskCode", description = "TASK_DEFINITION_CODE", required = false, schema = @Schema(implementation = Long.class, example = "123456789")),
})
@GetMapping(value = "/query-dependent-tasks")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKFLOW_LINEAGE_ERROR)
public Result<Map<String, Object>> queryDownstreamDependentTaskList(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'loginUser' is never used.
@RequestParam(value = "workFlowCode") Long workFlowCode,
@RequestParam(value = "taskCode", required = false, defaultValue = "0") Long taskCode) {
Map<String, Object> result =
workFlowLineageService.queryDownstreamDependentTasks(workFlowCode, taskCode);
return returnDataList(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public interface WorkFlowLineageService {
*/
Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode);

/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode);

/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,7 @@ public void offlineWorkflowDefinition(User loginUser, Long projectCode, Long wor
// do nothing if the workflow is already offline
return;
}

workflowDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefinitionDao.updateById(workflowDefinition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -278,11 +279,29 @@ public Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitio
public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode) {
Set<TaskMainInfo> taskMainInfos = new HashSet<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode, processDefinitionCode);
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, 0);
List<TaskMainInfo> taskSubProcess =
workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode);
taskMainInfos.addAll(taskDependents);
taskMainInfos.addAll(taskSubProcess);
return taskMainInfos;
}

/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
@Override
public Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode) {
Map<String, Object> result = new HashMap<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode,
Objects.isNull(taskCode) ? 0 : taskCode.longValue());
result.put(Constants.DATA_LIST, taskDependents);
putMsg(result, Status.SUCCESS);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,17 @@ public void testQueryWorkFlowLineageByCode() {
Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode, code)).thenReturn(new HashMap<>());
assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code));
}

@Test
public void testQueryDownstreamDependentTaskList() {
long code = 1L;
long taskCode = 1L;
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(workFlowLineageService.queryDownstreamDependentTasks(code, taskCode))
.thenReturn(result);

assertDoesNotThrow(
() -> workFlowLineageController.queryDownstreamDependentTaskList(user, code, taskCode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public class TaskMainInfo {
*/
private Date taskUpdateTime;

/**
* projectCode
*/
private long projectCode;

/**
* processDefinitionCode
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode") long pr
* current method `queryTaskDepOnProcess`. Which mean with the same parameter processDefinitionCode, all tasks in
* `queryTaskDepOnTask` are in the result of method `queryTaskDepOnProcess`.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return List of TaskMainInfo
*/
List<TaskMainInfo> queryTaskDependentDepOnProcess(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
List<TaskMainInfo> queryTaskDependentOnProcess(@Param("processDefinitionCode") long processDefinitionCode,
@Param("taskCode") long taskCode);

/**
* Query all tasks depend on task, only downstream task support currently(from dependent task type).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,13 @@
</where>
</select>

<select id="queryTaskDependentDepOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
<select id="queryTaskDependentOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, pd.project_code as projectCode
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
Expand All @@ -205,16 +206,16 @@
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<!-- For dependnet task type, using `like concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
<if test="processDefinitionCode != 0">
and td.task_type = 'DEPENDENT'
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%')
</if>
<if test="taskCode != 0">
and (td.task_params like concat('%"depTaskCode":', #{taskCode}, '%') or td.task_params like concat('%"depTaskCode":-1%'))
</if>
</where>
</select>

Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ export default {
confirm_to_offline: 'Confirm to make the workflow offline?',
time_to_online: 'Confirm to make the Scheduler online?',
time_to_offline: 'Confirm to make the Scheduler offline?',
warning_dependent_tasks_title: 'Warning',
warning_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to make the workflow offline?',
warning_dependencies: 'Dependencies:',
delete_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the workflow.',
warning_offline_scheduler_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to make the scheduler offline?',
delete_task_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the task.',
warning_delete_scheduler_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to delete the scheduler?',
},
task: {
on_line: 'Online',
Expand Down Expand Up @@ -306,7 +313,8 @@ export default {
startup_parameter: 'Startup Parameter',
whether_dry_run: 'Whether Dry-Run',
please_choose: 'Please Choose',
remove_task_cache: 'Clear cache'
remove_task_cache: 'Clear cache',
delete_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the task.',
},
dag: {
create: 'Create Workflow',
Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ export default {
confirm_to_offline: '是否确定下线该工作流?',
time_to_online: '是否确定上线该定时?',
time_to_offline: '是否确定下线该定时?',
warning_dependent_tasks_title: '警告',
warning_dependent_tasks_desc: '下游存在依赖, 下线操作可能会对下游任务产生影响. 你确定要下线该工作流嘛?',
warning_dependencies: '依赖如下:',
delete_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该工作流',
warning_offline_scheduler_dependent_tasks_desc: '下游存在依赖, 下线操作可能会对下游任务产生影响. 你确定要下线该定时嘛?',
delete_task_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该任务.',
warning_delete_scheduler_dependent_tasks_desc: '下游存在依赖, 删除定时可能会对下游任务产生影响. 你确定要删除该定时嘛?',
},
task: {
on_line: '线上',
Expand Down Expand Up @@ -304,7 +311,8 @@ export default {
startup_parameter: '启动参数',
whether_dry_run: '是否空跑',
please_choose: '请选择',
remove_task_cache: '清除缓存'
remove_task_cache: '清除缓存',
delete_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该任务定义',
},
dag: {
create: '创建工作流',
Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/service/modules/lineages/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

import { axios } from '@/service/service'
import { ProjectCodeReq, WorkflowCodeReq } from './types'
import {DependentTaskReq, ProjectCodeReq, WorkflowCodeReq} from './types'

export function queryWorkFlowList(projectCode: ProjectCodeReq): any {
return axios({
Expand All @@ -41,3 +41,11 @@ export function queryLineageByWorkFlowCode(
method: 'get'
})
}

export function queryDependentTasks(projectCode: number, params: DependentTaskReq): any {
return axios({
url: `/projects/${projectCode}/lineages/query-dependent-tasks`,
method: 'get',
params
})
}
5 changes: 5 additions & 0 deletions dolphinscheduler-ui/src/service/modules/lineages/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ interface WorkflowRes {
workFlowRelationList: WorkFlowRelationList[]
}

interface DependentTaskReq extends WorkflowCodeReq {
taskCode?: number
}

export {
ProjectCodeReq,
WorkflowCodeReq,
WorkFlowNameReq,
DependentTaskReq,
WorkflowRes,
WorkFlowListRes
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
defineComponent,
PropType,
h,
ref, watch
} from 'vue'
import { useI18n } from 'vue-i18n'
import {NEllipsis, NModal, NSpace} from 'naive-ui'
import {IDefinitionData} from "@/views/projects/workflow/definition/types";
import ButtonLink from "@/components/button-link";

const props = {
row: {
type: Object as PropType<IDefinitionData>,
default: {},
required: false
},
show: {
type: Boolean as PropType<boolean>,
default: false
},
required: {
type: Boolean as PropType<boolean>,
default: true
},
taskLinks: {
type: Array,
default: []
},
content: {
type: String,
default: ''
}
}

export default defineComponent({
name: 'dependenciesConfirm',
props,
emits: ['update:show', 'update:row', 'confirm'],
setup(props, ctx) {
const { t } = useI18n()

const showRef = ref(props.show)

const confirmToHandle = () => {
ctx.emit('confirm')
}

const cancelToHandle = () => {
ctx.emit('update:show', showRef)
}

const renderDownstreamDependencies = () => {
return h(
<NSpace vertical>
<div>{props.content}</div>
<div>{t('project.workflow.warning_dependencies')}</div>
{props.taskLinks.map((item: any) => {
return (
<ButtonLink
onClick={item.action}
disabled={false}
>
{{
default: () =>
h(NEllipsis,
{
style: 'max-width: 350px;line-height: 1.5'
},
() => item.text
)
}}
</ButtonLink>
)
})}
</NSpace>
)
}

watch(()=> props.show,
() => {
showRef.value = props.show
})

return {renderDownstreamDependencies, confirmToHandle, cancelToHandle, showRef}
},

render() {
const { t } = useI18n()

return (
<NModal
v-model:show={this.showRef}
preset={'dialog'}
type={this.$props.required? 'error':'warning'}
title={t('project.workflow.warning_dependent_tasks_title')}
positiveText={this.$props.required? '':t('project.workflow.confirm')}
negativeText={t('project.workflow.cancel')}
maskClosable={false}
onNegativeClick={this.cancelToHandle}
onPositiveClick={this.confirmToHandle}
onClose={this.cancelToHandle}
>
{{
default: () => (
this.renderDownstreamDependencies()
)
}}
</NModal>
)
}
})
Loading
Loading