Skip to content

Commit

Permalink
[Improve][Server] Improve job scheduler (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Nov 1, 2024
1 parent ede9640 commit 6f89c19
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public enum ExecutionStatus {
NEED_FAULT_TOLERANCE(8, "need fault tolerance","需要容错"),
KILL(9, "kill", "强制终止"),
WAITING_THREAD(10, "waiting thread", "等待线程"),
WAITING_DEPEND(11, "waiting depend node complete","");
WAITING_SUMMIT(11, "waiting_summit","待提交");

ExecutionStatus(int code, String description,String zhDescription){
this.code = code;
Expand Down Expand Up @@ -132,7 +132,7 @@ public boolean typeIsStop() {
* @return status
*/
public boolean typeIsRunning() {
return this == RUNNING_EXECUTION || this == WAITING_DEPEND;
return this == RUNNING_EXECUTION;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ private void doKillCommand(Long jobExecutionId) {
JobRunner jobRunner = jobExecutionContext.getJobRunner();
jobRunner.kill();
} else {
if (unFinishedJobExecutionMap.get(jobExecutionId) == null) {
return;
}

unFinishedJobExecutionMap.remove(jobExecutionId);
JobExecution jobExecution = jobExternalService.getJobExecutionById(jobExecutionId);
if (jobExecution != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package io.datavines.server.dqc.coordinator.runner;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import io.datavines.common.enums.ExecutionStatus;
import io.datavines.common.utils.*;
import io.datavines.server.dqc.coordinator.cache.JobExecuteManager;
import io.datavines.server.registry.Register;
Expand All @@ -28,6 +30,7 @@
import io.datavines.server.utils.SpringApplicationContext;
import io.datavines.server.repository.entity.Command;

import java.util.List;
import java.util.Map;

import static io.datavines.common.CommonConstants.*;
Expand Down Expand Up @@ -59,16 +62,31 @@ public void run() {
while (Stopper.isRunning()) {
Command command = null;
try {

String executeHost = NetUtils.getAddr(CommonPropertyUtils.getInt(CommonPropertyUtils.SERVER_PORT, CommonPropertyUtils.SERVER_PORT_DEFAULT));

// 获取执行地址在本机的Kill命令
List<Command> commandList = jobExternalService.listKillCommandByExecuteHost(executeHost);
if (CollectionUtils.isNotEmpty(commandList)) {
commandList.forEach(killCommand -> {
if (killCommand != null) {
jobExecuteManager.addKillCommand(killCommand.getJobExecutionId());
logger.info(String.format("kill job execution %s in %s", killCommand.getJobExecutionId(), executeHost));
jobExternalService.deleteCommandById(killCommand.getId());
}
});
}

boolean runCheckFlag = OSUtils.checkResource(
CommonPropertyUtils.getDouble(MAX_CPU_LOAD_AVG, MAX_CPU_LOAD_AVG_DEFAULT),
CommonPropertyUtils.getDouble(RESERVED_MEMORY, RESERVED_MEMORY_DEFAULT));

if (!runCheckFlag) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS*10);
ThreadUtils.sleep(SLEEP_TIME_MILLIS * 10);
continue;
}

command = jobExternalService.getCommand(register.getTotalSlot(), register.getSlot());
command = jobExternalService.getStartCommand(register.getTotalSlot(), register.getSlot());
if (command != null) {
String parameter = command.getParameter();
String engineType = LOCAL;
Expand All @@ -79,24 +97,20 @@ public void run() {
}
}

if (CommandType.START == command.getType()) {
JobExecution jobExecution = jobExternalService.executeCommand(command);
if (jobExecution == null) {
logger.warn(String.format("job execution not found , command : %s", JSONUtils.toJsonString(command)));
jobExternalService.deleteCommandById(command.getId());
continue;
}
JobExecution jobExecution = jobExternalService.executeCommand(command);
if (jobExecution == null) {
logger.warn(String.format("job execution not found , command : %s", JSONUtils.toJsonString(command)));
jobExternalService.deleteCommandById(command.getId());
continue;
}

if (!executionOutOfThreshold(engineType)) {
logger.info("start submit job execution : {} ", JSONUtils.toJsonString(jobExecution));
jobExecuteManager.addExecuteCommand(jobExecution);
logger.info(String.format("submit success, job execution : %s", jobExecution.getName()) );
jobExternalService.deleteCommandById(command.getId());
}
} else if (CommandType.STOP == command.getType()) {
jobExecuteManager.addKillCommand(command.getJobExecutionId());
logger.info(String.format("kill job execution : %s", command.getJobExecutionId()) );
if (!executionOutOfThreshold(engineType)) {
logger.info("start submit job execution : {} ", JSONUtils.toJsonString(jobExecution));
jobExecuteManager.addExecuteCommand(jobExecution);
logger.info(String.format("submit success, job execution : %s", jobExecution.getName()) );
jobExternalService.deleteCommandById(command.getId());
jobExecution.setStatus(ExecutionStatus.SUBMITTED_SUCCESS);
jobExternalService.updateJobExecution(jobExecution);
}

ThreadUtils.sleep(SLEEP_TIME_MILLIS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class Command implements Serializable {
@TableField(value = "job_execution_id")
private Long jobExecutionId;

@TableField(value = "execute_host")
private String executeHost;

@TableField(value = "priority")
private Priority priority;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
@Mapper
public interface CommandMapper extends BaseMapper<Command> {

/**
* SELECT BY ID
* @return
*/
@Select("SELECT * from dv_command where type in (0,1) and id % #{totalSlot} = #{currentSlot} order by update_time limit 1 ")
Command getOne(@Param("totalSlot") int totalSlot, @Param("currentSlot") int currentSlot);
@Select("SELECT * from dv_command where type in (0,1) and id % #{totalSlot} = #{currentSlot} and type = 0 order by update_time limit 1 ")
Command getStartCommand(@Param("totalSlot") int totalSlot, @Param("currentSlot") int currentSlot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.baomidou.mybatisplus.extension.service.IService;
import io.datavines.server.repository.entity.Command;

import java.util.List;

public interface CommandService extends IService<Command> {

long insert(Command command);
Expand All @@ -27,7 +29,11 @@ public interface CommandService extends IService<Command> {

Command getById(long id);

Command getOne(int totalSlot, int currentSlot);
Command getStartCommand(int totalSlot, int currentSlot);

int deleteById(long id);

boolean deleteByJobExecutionId(Long jobExecutionId);

List<Command> listKillCommandByExecuteHost(String executeHost);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@
*/
package io.datavines.server.repository.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import io.datavines.server.enums.CommandType;
import io.datavines.server.repository.entity.Command;
import io.datavines.server.repository.mapper.CommandMapper;
import io.datavines.server.repository.service.CommandService;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

import java.util.ArrayList;
import java.util.List;

@Service("commandService")
public class CommandServiceImpl extends ServiceImpl<CommandMapper, Command> implements CommandService {

Expand All @@ -43,12 +49,30 @@ public Command getById(long id) {
}

@Override
public Command getOne(int totalSlot, int currentSlot) {
return baseMapper.getOne(totalSlot, currentSlot);
public Command getStartCommand(int totalSlot, int currentSlot) {
return baseMapper.getStartCommand(totalSlot, currentSlot);
}

@Override
public int deleteById(long id) {
return baseMapper.deleteById(id);
}

@Override
public boolean deleteByJobExecutionId(Long jobExecutionId) {
return remove(new LambdaQueryWrapper<Command>().eq(Command::getJobExecutionId, jobExecutionId));
}

@Override
public List<Command> listKillCommandByExecuteHost(String executeHost) {
List<Command> commands = list(new LambdaQueryWrapper<Command>()
.eq(Command::getType, CommandType.STOP)
.eq(Command::getExecuteHost, executeHost).last("limit 20"));

if (CollectionUtils.isEmpty(commands)) {
return new ArrayList<>();
}

return commands;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,23 @@ public Long killJob(Long jobExecutionId) {
return jobExecutionId;
}

Command command = new Command();
Map<String, String> parameter = new HashMap<>();
parameter.put("engine", jobExecution.getEngineType());
boolean deleteCommandResult = false;
if (ExecutionStatus.WAITING_SUMMIT == jobExecution.getStatus()) {
deleteCommandResult = commandService.deleteByJobExecutionId(jobExecutionId);
}

command.setType(CommandType.STOP);
command.setPriority(Priority.MEDIUM);
command.setParameter(JSONUtils.toJsonString(parameter));
command.setJobExecutionId(jobExecutionId);
commandService.insert(command);
if (!deleteCommandResult) {
Command command = new Command();
Map<String, String> parameter = new HashMap<>();
parameter.put("engine", jobExecution.getEngineType());

command.setType(CommandType.STOP);
command.setPriority(Priority.MEDIUM);
command.setParameter(JSONUtils.toJsonString(parameter));
command.setJobExecutionId(jobExecutionId);
command.setExecuteHost(jobExecution.getExecuteHost());
commandService.insert(command);
}

return jobExecutionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ public JobExecution getJobExecutionById(Long id){
return jobExecutionService.getById(id);
}

public Command getCommand(int totalSlot, int currentSlot){
return commandService.getOne(totalSlot, currentSlot);
public Command getStartCommand(int totalSlot, int currentSlot) {
return commandService.getStartCommand(totalSlot, currentSlot);
}

public List<Command> listKillCommandByExecuteHost(String executeHost) {
return commandService.listKillCommandByExecuteHost(executeHost);
}

public CommonTaskCommand getCatalogCommand(int totalSlot, int currentSlot){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,11 +529,15 @@ private Long executeJob(Job job, LocalDateTime scheduleTime) {

jobExecutionService.save(jobExecution);

Map<String, String> parameter = new HashMap<>();
parameter.put("engine", jobExecution.getEngineType());

// add a command
Command command = new Command();
command.setType(CommandType.START);
command.setPriority(Priority.MEDIUM);
command.setJobExecutionId(jobExecution.getId());
command.setParameter(JSONUtils.toJsonString(parameter));
commandService.insert(command);

return jobExecution.getId();
Expand Down Expand Up @@ -585,7 +589,7 @@ private JobExecution getJobExecution(Job job, LocalDateTime scheduleTime) {
jobExecution.setErrorDataStorageType(errorDataStorageType);
jobExecution.setErrorDataStorageParameter(errorDataStorageParameter);
jobExecution.setErrorDataFileName(getErrorDataFileName(job.getParameter()));
jobExecution.setStatus(ExecutionStatus.SUBMITTED_SUCCESS);
jobExecution.setStatus(ExecutionStatus.WAITING_SUMMIT);
jobExecution.setTenantCode(tenantStr);
jobExecution.setEnv(envStr);
jobExecution.setSubmitTime(LocalDateTime.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ const JobsInstance = () => {
width: 300,
render: (text: string, record: TJobsInstanceTableItem) => (
<>
<IF visible={record.status === 'submitted' || record.status === 'running' || record.status === '已提交' || record.status === '执行中'}>
<IF visible={record.status === 'submitted' || record.status === 'running' || record.status === 'waiting_summit' || record.status === '已提交' || record.status === '执行中' || record.status === '待提交'}>
<a style={{ marginRight: 5 }} onClick={() => { onStop(record); }}>{intl.formatMessage({ id: 'jobs_task_stop_btn' })}</a>
</IF>
<a style={{ marginRight: 5 }} onClick={() => { onLog(record); }}>{intl.formatMessage({ id: 'jobs_task_log_btn' })}</a>
Expand Down
1 change: 1 addition & 0 deletions scripts/sql/datavines-mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ CREATE TABLE `dv_command` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`type` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Command type: 0 start task, 1 stop task',
`parameter` text COMMENT 'json command parameters',
`execute_host` varchar(255) COMMENT 'job execute host',
`job_execution_id` bigint(20) NOT NULL COMMENT 'task id',
`priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
Expand Down

0 comments on commit 6f89c19

Please sign in to comment.