Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-6540][server] Optimize the code related to the processInstanceExecMaps object #6546

Closed
wants to merge 10 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;

import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;

import org.quartz.SchedulerException;
Expand Down Expand Up @@ -100,8 +97,6 @@ public class MasterServer implements IStoppable {
@Autowired
private EventExecuteService eventExecuteService;

private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps = new ConcurrentHashMap<>();

/**
* master server startup, not use web service
*
Expand All @@ -121,28 +116,18 @@ public void run() {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
TaskAckProcessor ackProcessor = new TaskAckProcessor();
ackProcessor.init(processInstanceExecMaps);
TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor();
taskResponseProcessor.init(processInstanceExecMaps);
StateEventProcessor stateEventProcessor = new StateEventProcessor();
stateEventProcessor.init(processInstanceExecMaps);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor());
this.nettyRemotingServer.start();

// self tolerant
this.masterRegistryClient.init(this.processInstanceExecMaps);
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);

this.eventExecuteService.init(this.processInstanceExecMaps);
this.eventExecuteService.start();
// scheduler start
this.masterSchedulerService.init(this.processInstanceExecMaps);

this.masterSchedulerService.start();

// start QuartzExecutors
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.cache;

import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;

import java.util.Collection;

/**
* cache of process instance id and WorkflowExecuteThread
*/
public interface ProcessInstanceExecCacheManager {

/**
* get WorkflowExecuteThread by process instance id
*
* @param processInstanceId processInstanceId
* @return WorkflowExecuteThread
*/
WorkflowExecuteThread getByProcessInstanceId(int processInstanceId);

/**
* judge the process instance does it exist
*
* @param processInstanceId processInstanceId
* @return true - if process instance id exists in cache
*/
boolean contains(int processInstanceId);

/**
* remove cache by process instance id
*
* @param processInstanceId processInstanceId
*/
void removeByProcessInstanceId(int processInstanceId);

/**
* cache
*
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread);
eye-gu marked this conversation as resolved.
Show resolved Hide resolved

/**
* get all WorkflowExecuteThread from cache
*
* @return all WorkflowExecuteThread in cache
*/
Collection<WorkflowExecuteThread> getAll();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.cache.impl;

import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Component;

import com.google.common.collect.ImmutableList;

/**
* cache of process instance id and WorkflowExecuteThread
*/
@Component
public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager {

private final ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps = new ConcurrentHashMap<>();

@Override
public WorkflowExecuteThread getByProcessInstanceId(int processInstanceId) {
return processInstanceExecMaps.get(processInstanceId);
}

@Override
public boolean contains(int processInstanceId) {
return processInstanceExecMaps.containsKey(processInstanceId);
}

@Override
public void removeByProcessInstanceId(int processInstanceId) {
processInstanceExecMaps.remove(processInstanceId);
}

@Override
public void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread) {
if (workflowExecuteThread == null) {
return;
}
processInstanceExecMaps.put(processInstanceId, workflowExecuteThread);
}

@Override
public Collection<WorkflowExecuteThread> getAll() {
return ImmutableList.copyOf(processInstanceExecMaps.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,10 +48,6 @@ public StateEventProcessor() {
stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class);
}

public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.stateEventResponseService.init(processInstanceExecMaps);
}

@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -62,9 +59,6 @@ public TaskAckProcessor() {
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}

public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.taskResponseService.init(processInstanceExecMaps);
}

/**
* task ack process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -61,9 +58,6 @@ public TaskResponseProcessor() {
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}

public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.taskResponseService.init(processInstanceExecMaps);
}

/**
* task final result response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.channel.Channel;
Expand All @@ -59,13 +60,11 @@ public class StateEventResponseService {
*/
private Thread responseWorker;

private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;

public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) {
if (this.processInstanceMapper == null) {
this.processInstanceMapper = processInstanceMapper;
}
}
/**
* cache of process instance id and WorkflowExecuteThread
*/
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

@PostConstruct
public void start() {
Expand Down Expand Up @@ -131,12 +130,12 @@ private void writeResponse(StateEvent stateEvent, ExecutionStatus status) {

private void persist(StateEvent stateEvent) {
try {
if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) {
if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
writeResponse(stateEvent, ExecutionStatus.FAILURE);
return;
}

WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId());
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent);
writeResponse(stateEvent, ExecutionStatus.SUCCESS);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -66,19 +66,17 @@ public class TaskResponseService {
@Autowired
private ProcessService processService;

/**
* cache of process instance id and WorkflowExecuteThread
*/
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

/**
* task response worker
*/
private Thread taskResponseWorker;

private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;

public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) {
if (this.processInstanceMapper == null) {
this.processInstanceMapper = processInstanceMapper;
}
}

@PostConstruct
public void start() {
this.taskResponseWorker = new TaskResponseWorker();
Expand Down Expand Up @@ -156,11 +154,11 @@ private void persist(TaskResponseEvent taskResponseEvent) {
if (taskInstance != null) {
ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
Expand All @@ -175,11 +173,11 @@ private void persist(TaskResponseEvent taskResponseEvent) {
try {
if (taskInstance != null) {
processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
Expand All @@ -194,7 +192,7 @@ private void persist(TaskResponseEvent taskResponseEvent) {
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
Expand Down
Loading