diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 7b2a1cdcbf7c..e7e26be08b80 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -30,13 +30,10 @@ import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; -import java.util.concurrent.ConcurrentHashMap; - import javax.annotation.PostConstruct; import org.quartz.SchedulerException; @@ -100,8 +97,6 @@ public class MasterServer implements IStoppable { @Autowired private EventExecuteService eventExecuteService; - private ConcurrentHashMap processInstanceExecMaps = new ConcurrentHashMap<>(); - /** * master server startup, not use web service * @@ -121,28 +116,18 @@ public void run() { NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - TaskAckProcessor ackProcessor = new TaskAckProcessor(); - ackProcessor.init(processInstanceExecMaps); - TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor(); - taskResponseProcessor.init(processInstanceExecMaps); - StateEventProcessor stateEventProcessor = new StateEventProcessor(); - stateEventProcessor.init(processInstanceExecMaps); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor()); this.nettyRemotingServer.start(); // self tolerant - this.masterRegistryClient.init(this.processInstanceExecMaps); this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); - this.eventExecuteService.init(this.processInstanceExecMaps); this.eventExecuteService.start(); // scheduler start - this.masterSchedulerService.init(this.processInstanceExecMaps); - this.masterSchedulerService.start(); // start QuartzExecutors diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java new file mode 100644 index 000000000000..5fdf64493cf6 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cache; + +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; + +import java.util.Collection; + +/** + * cache of process instance id and WorkflowExecuteThread + */ +public interface ProcessInstanceExecCacheManager { + + /** + * get WorkflowExecuteThread by process instance id + * + * @param processInstanceId processInstanceId + * @return WorkflowExecuteThread + */ + WorkflowExecuteThread getByProcessInstanceId(int processInstanceId); + + /** + * judge the process instance does it exist + * + * @param processInstanceId processInstanceId + * @return true - if process instance id exists in cache + */ + boolean contains(int processInstanceId); + + /** + * remove cache by process instance id + * + * @param processInstanceId processInstanceId + */ + void removeByProcessInstanceId(int processInstanceId); + + /** + * cache + * + * @param processInstanceId processInstanceId + * @param workflowExecuteThread if it is null, will not be cached + */ + void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread); + + /** + * get all WorkflowExecuteThread from cache + * + * @return all WorkflowExecuteThread in cache + */ + Collection getAll(); +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java new file mode 100644 index 000000000000..1d0ab4841aaf --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cache.impl; + +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.stereotype.Component; + +import com.google.common.collect.ImmutableList; + +/** + * cache of process instance id and WorkflowExecuteThread + */ +@Component +public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager { + + private final ConcurrentHashMap processInstanceExecMaps = new ConcurrentHashMap<>(); + + @Override + public WorkflowExecuteThread getByProcessInstanceId(int processInstanceId) { + return processInstanceExecMaps.get(processInstanceId); + } + + @Override + public boolean contains(int processInstanceId) { + return processInstanceExecMaps.containsKey(processInstanceId); + } + + @Override + public void removeByProcessInstanceId(int processInstanceId) { + processInstanceExecMaps.remove(processInstanceId); + } + + @Override + public void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread) { + if (workflowExecuteThread == null) { + return; + } + processInstanceExecMaps.put(processInstanceId, workflowExecuteThread); + } + + @Override + public Collection getAll() { + return ImmutableList.copyOf(processInstanceExecMaps.values()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index 2f9a6342501e..b03940395d8c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -26,11 +26,8 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import java.util.concurrent.ConcurrentHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,10 +48,6 @@ public StateEventProcessor() { stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class); } - public void init(ConcurrentHashMap processInstanceExecMaps) { - this.stateEventResponseService.init(processInstanceExecMaps); - } - @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 15f97c17a574..5d96194d3cdf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -28,11 +28,8 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import java.util.concurrent.ConcurrentHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,9 +59,6 @@ public TaskAckProcessor() { this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } - public void init(ConcurrentHashMap processInstanceExecMaps) { - this.taskResponseService.init(processInstanceExecMaps); - } /** * task ack process diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 5c6ade7fccb3..4288f180b65e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -27,11 +27,8 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import java.util.concurrent.ConcurrentHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +58,6 @@ public TaskResponseProcessor() { this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } - public void init(ConcurrentHashMap processInstanceExecMaps) { - this.taskResponseService.init(processInstanceExecMaps); - } /** * task final result response diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index bc9c77cfaf92..c27cf8504e64 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -21,12 +21,12 @@ import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.PostConstruct; @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import io.netty.channel.Channel; @@ -59,13 +60,11 @@ public class StateEventResponseService { */ private Thread responseWorker; - private ConcurrentHashMap processInstanceMapper; - - public void init(ConcurrentHashMap processInstanceMapper) { - if (this.processInstanceMapper == null) { - this.processInstanceMapper = processInstanceMapper; - } - } + /** + * cache of process instance id and WorkflowExecuteThread + */ + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @PostConstruct public void start() { @@ -131,12 +130,12 @@ private void writeResponse(StateEvent stateEvent, ExecutionStatus status) { private void persist(StateEvent stateEvent) { try { - if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) { + if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) { writeResponse(stateEvent, ExecutionStatus.FAILURE); return; } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId()); + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); workflowExecuteThread.addStateEvent(stateEvent); writeResponse(stateEvent, ExecutionStatus.SUCCESS); } catch (Exception e) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 27b96e14d8c1..586be7121b03 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -25,13 +25,13 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.PostConstruct; @@ -66,19 +66,17 @@ public class TaskResponseService { @Autowired private ProcessService processService; + /** + * cache of process instance id and WorkflowExecuteThread + */ + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + /** * task response worker */ private Thread taskResponseWorker; - private ConcurrentHashMap processInstanceMapper; - - public void init(ConcurrentHashMap processInstanceMapper) { - if (this.processInstanceMapper == null) { - this.processInstanceMapper = processInstanceMapper; - } - } - @PostConstruct public void start() { this.taskResponseWorker = new TaskResponseWorker(); @@ -156,11 +154,11 @@ private void persist(TaskResponseEvent taskResponseEvent) { if (taskInstance != null) { ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState(); processService.changeTaskState(taskInstance, status, - taskResponseEvent.getStartTime(), - taskResponseEvent.getWorkerAddress(), - taskResponseEvent.getExecutePath(), - taskResponseEvent.getLogPath(), - taskResponseEvent.getTaskInstanceId()); + taskResponseEvent.getStartTime(), + taskResponseEvent.getWorkerAddress(), + taskResponseEvent.getExecutePath(), + taskResponseEvent.getLogPath(), + taskResponseEvent.getTaskInstanceId()); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); @@ -175,11 +173,11 @@ private void persist(TaskResponseEvent taskResponseEvent) { try { if (taskInstance != null) { processService.changeTaskState(taskInstance, taskResponseEvent.getState(), - taskResponseEvent.getEndTime(), - taskResponseEvent.getProcessId(), - taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId(), - taskResponseEvent.getVarPool() + taskResponseEvent.getEndTime(), + taskResponseEvent.getProcessId(), + taskResponseEvent.getAppIds(), + taskResponseEvent.getTaskInstanceId(), + taskResponseEvent.getVarPool() ); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success @@ -194,7 +192,7 @@ private void persist(TaskResponseEvent taskResponseEvent) { default: throw new IllegalArgumentException("invalid event type : " + event); } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId()); + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(taskResponseEvent.getProcessInstanceId()); if (workflowExecuteThread != null) { StateEvent stateEvent = new StateEvent(); stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 7bae6de16201..b77a28aca8d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; @@ -48,7 +49,6 @@ import java.util.Date; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -87,13 +87,17 @@ public class MasterRegistryClient { @Autowired private MasterConfig masterConfig; + /** + * cache of process instance id and WorkflowExecuteThread + */ + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + /** * heartbeat executor */ private ScheduledExecutorService heartBeatExecutor; - private ConcurrentHashMap processInstanceExecMaps; - /** * master startup time, ms */ @@ -101,11 +105,10 @@ public class MasterRegistryClient { private String localNodePath; - public void init(ConcurrentHashMap processInstanceExecMaps) { + public MasterRegistryClient() { this.startupTime = System.currentTimeMillis(); this.registryClient = RegistryClient.getInstance(); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); - this.processInstanceExecMaps = processInstanceExecMaps; } public void start() { @@ -149,7 +152,7 @@ public void closeRegistry() { /** * remove zookeeper node path * - * @param path zookeeper node path + * @param path zookeeper node path * @param nodeType zookeeper node type * @param failover is failover */ @@ -185,7 +188,7 @@ public void removeNodePath(String path, NodeType nodeType, boolean failover) { * failover server when server down * * @param serverHost server host - * @param nodeType zookeeper node type + * @param nodeType zookeeper node type */ private void failoverServerWhenDown(String serverHost, NodeType nodeType) { switch (nodeType) { @@ -273,7 +276,7 @@ private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { * 2. change task state from running to need failover. * 3. failover all tasks when workerHost is null * - * @param workerHost worker host + * @param workerHost worker host * @param needCheckWorkerAlive need check worker alive */ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive, boolean checkOwner) { @@ -288,30 +291,30 @@ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive, boo ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); if (workerHost == null - || !checkOwner - || processInstance.getHost().equalsIgnoreCase(workerHost)) { + || !checkOwner + || processInstance.getHost().equalsIgnoreCase(workerHost)) { // only failover the task owned myself if worker down. // failover master need handle worker at the same time if (processInstance == null) { logger.error("failover error, the process {} of task {} do not exists.", - taskInstance.getProcessInstanceId(), taskInstance.getId()); + taskInstance.getProcessInstanceId(), taskInstance.getId()); continue; } taskInstance.setProcessInstance(processInstance); TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); // only kill yarn job if exists , the local thread has exited ProcessUtils.killYarnJob(taskExecutionContext); taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processService.saveTaskInstance(taskInstance); - if (!processInstanceExecMaps.containsKey(processInstance.getId())) { + if (!processInstanceExecCacheManager.contains(processInstance.getId())) { return; } - WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); + WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId()); StateEvent stateEvent = new StateEvent(); stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); @@ -364,11 +367,11 @@ public void registry() { localNodePath = getMasterPath(); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, - masterConfig.getMasterMaxCpuloadAvg(), - masterConfig.getMasterReservedMemory(), - Sets.newHashSet(getMasterPath()), - Constants.MASTER_TYPE, - registryClient); + masterConfig.getMasterMaxCpuloadAvg(), + masterConfig.getMasterReservedMemory(), + Sets.newHashSet(getMasterPath()), + Constants.MASTER_TYPE, + registryClient); registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index 7c4b32130d67..7b61de8a1968 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -65,6 +66,12 @@ public class EventExecuteService extends Thread { @Autowired private MasterConfig masterConfig; + /** + * cache of process instance id and WorkflowExecuteThread + */ + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + private ExecutorService eventExecService; /** @@ -73,19 +80,14 @@ public class EventExecuteService extends Thread { private StateEventCallbackService stateEventCallbackService; - private ConcurrentHashMap processInstanceExecMaps; - private ConcurrentHashMap eventHandlerMap = new ConcurrentHashMap(); + private ConcurrentHashMap eventHandlerMap = new ConcurrentHashMap<>(); ListeningExecutorService listeningExecutorService; - public void init(ConcurrentHashMap processInstanceExecMaps) { - + public EventExecuteService() { eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getMasterExecThreads()); - this.processInstanceExecMaps = processInstanceExecMaps; - listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService); this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); - } @Override @@ -115,10 +117,10 @@ public void run() { } private void eventHandler() { - for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecMaps.values()) { + for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { if (workflowExecuteThread.eventSize() == 0 - || StringUtils.isEmpty(workflowExecuteThread.getKey()) - || eventHandlerMap.containsKey(workflowExecuteThread.getKey())) { + || StringUtils.isEmpty(workflowExecuteThread.getKey()) + || eventHandlerMap.containsKey(workflowExecuteThread.getKey())) { continue; } int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); @@ -132,13 +134,13 @@ private void eventHandler() { @Override public void onSuccess(Object o) { if (workflowExecuteThread.workFlowFinish()) { - processInstanceExecMaps.remove(processInstanceId); + processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); notifyProcessChanged(); logger.info("process instance {} finished.", processInstanceId); } if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) { - processInstanceExecMaps.remove(processInstanceId); - processInstanceExecMaps.put(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread); + processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); + processInstanceExecCacheManager.cache(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread); } eventHandlerMap.remove(workflowExecuteThread.getKey()); @@ -146,7 +148,7 @@ public void onSuccess(Object o) { private void notifyProcessChanged() { Map fatherMaps - = processService.notifyProcessList(processInstanceId, 0); + = processService.notifyProcessList(processInstanceId, 0); for (ProcessInstance processInstance : fatherMaps.keySet()) { String address = NetUtils.getAddr(masterConfig.getListenPort()); @@ -160,10 +162,10 @@ private void notifyProcessChanged() { private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) { logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId()); - if (!processInstanceExecMaps.containsKey(processInstance.getId())) { + if (!processInstanceExecCacheManager.contains(processInstance.getId())) { return; } - WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); + WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId()); StateEvent stateEvent = new StateEvent(); stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); @@ -176,15 +178,15 @@ private void notifyProcess(ProcessInstance processInstance, TaskInstance taskIns String host = processInstance.getHost(); if (StringUtils.isEmpty(host)) { logger.info("process {} host is empty, cannot notify task {} now.", - processInstance.getId(), taskInstance.getId()); + processInstance.getId(), taskInstance.getId()); return; } String address = host.split(":")[0]; int port = Integer.parseInt(host.split(":")[1]); logger.info("notify process {} task {} state change, host:{}", - processInstance.getId(), taskInstance.getId(), host); + processInstance.getId(), taskInstance.getId(), host); StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( - processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId() + processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId() ); stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 803ba09bccaa..cb8a079597bf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; @@ -90,14 +91,16 @@ public class MasterSchedulerService extends Thread { NettyExecutorManager nettyExecutorManager; /** - * master exec service + * cache of process instance id and WorkflowExecuteThread */ - private ThreadPoolExecutor masterExecService; + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; /** - * process instance execution list + * master exec service */ - private ConcurrentHashMap processInstanceExecMaps; + private ThreadPoolExecutor masterExecService; + /** * process timeout check list */ @@ -119,16 +122,15 @@ public class MasterSchedulerService extends Thread { /** * constructor of MasterSchedulerService */ - public void init(ConcurrentHashMap processInstanceExecMaps) { - this.processInstanceExecMaps = processInstanceExecMaps; + public MasterSchedulerService() { this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList, - taskTimeoutCheckList, - this.processInstanceExecMaps, - masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); + taskTimeoutCheckList, + this.processInstanceExecCacheManager, + masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); } @Override @@ -187,28 +189,28 @@ private void scheduleProcess() throws Exception { logger.info("find one command: id: {}, type: {}", command.getId(), command.getCommandType()); try { ProcessInstance processInstance = processService.handleCommand(logger, - getLocalAddress(), - command, - processDefinitionCacheMaps); + getLocalAddress(), + command, + processDefinitionCacheMaps); if (!masterConfig.getMasterCacheProcessDefinition() - && processDefinitionCacheMaps.size() > 0) { + && processDefinitionCacheMaps.size() > 0) { processDefinitionCacheMaps.clear(); } if (processInstance != null) { WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread( - processInstance - , processService - , nettyExecutorManager - , processAlertManager - , masterConfig - , taskTimeoutCheckList); + processInstance + , processService + , nettyExecutorManager + , processAlertManager + , masterConfig + , taskTimeoutCheckList); - this.processInstanceExecMaps.put(processInstance.getId(), workflowExecuteThread); + this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread); if (processInstance.getTimeout() > 0) { this.processTimeoutCheckList.put(processInstance.getId(), processInstance); } logger.info("handle command end, command {} process {} start...", - command.getId(), processInstance.getId()); + command.getId(), processInstance.getId()); masterExecService.execute(workflowExecuteThread); } } catch (Exception e) { @@ -235,15 +237,15 @@ private Command findOneCommand() { for (Command command : commandList) { int slot = ServerNodeManager.getSlot(); if (ServerNodeManager.MASTER_SIZE != 0 - && command.getId() % ServerNodeManager.MASTER_SIZE == slot) { + && command.getId() % ServerNodeManager.MASTER_SIZE == slot) { result = command; break; } } if (result != null) { logger.info("find command {}, slot:{} :", - result.getId(), - ServerNodeManager.getSlot()); + result.getId(), + ServerNodeManager.getSlot()); break; } pageNumber += 1; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index f2b10f789838..69bd3cb4ad30 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.hadoop.util.ThreadUtil; @@ -44,17 +45,17 @@ public class StateWheelExecuteThread extends Thread { ConcurrentHashMap processInstanceCheckList; ConcurrentHashMap taskInstanceCheckList; - private ConcurrentHashMap processInstanceExecMaps; + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; private int stateCheckIntervalSecs; public StateWheelExecuteThread(ConcurrentHashMap processInstances, ConcurrentHashMap taskInstances, - ConcurrentHashMap processInstanceExecMaps, + ProcessInstanceExecCacheManager processInstanceExecCacheManager, int stateCheckIntervalSecs) { this.processInstanceCheckList = processInstances; this.taskInstanceCheckList = taskInstances; - this.processInstanceExecMaps = processInstanceExecMaps; + this.processInstanceExecCacheManager = processInstanceExecCacheManager; this.stateCheckIntervalSecs = stateCheckIntervalSecs; } @@ -121,10 +122,10 @@ private void checkProcess() { private void putEvent(StateEvent stateEvent) { - if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) { + if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) { return; } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId()); + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); workflowExecuteThread.addStateEvent(stateEvent); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java new file mode 100644 index 000000000000..b0d1385bd86d --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cache.impl; + +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; + +import java.util.Collection; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ProcessInstanceExecCacheManagerImplTest { + + ProcessInstanceExecCacheManager processInstanceExecCacheManager = new ProcessInstanceExecCacheManagerImpl(); + + @Mock + private WorkflowExecuteThread workflowExecuteThread; + + @Before + public void before() { + + Mockito.when(workflowExecuteThread.getKey()).thenReturn("workflowExecuteThread1"); + + processInstanceExecCacheManager.cache(1, workflowExecuteThread); + } + + @Test + public void testGetByProcessInstanceId() { + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1); + Assert.assertNotNull(workflowExecuteThread); + Assert.assertEquals("workflowExecuteThread1", workflowExecuteThread.getKey()); + } + + @Test + public void testContains() { + Assert.assertTrue(processInstanceExecCacheManager.contains(1)); + } + + @Test + public void testRemoveByProcessInstanceId() { + processInstanceExecCacheManager.removeByProcessInstanceId(1); + Assert.assertNull(processInstanceExecCacheManager.getByProcessInstanceId(1)); + } + + @Test + public void testGetAll() { + Collection workflowExecuteThreads = processInstanceExecCacheManager.getAll(); + Assert.assertEquals(1, workflowExecuteThreads.size()); + Assert.assertEquals("workflowExecuteThread1", workflowExecuteThreads.stream().findFirst().get().getKey()); + } +} \ No newline at end of file