Skip to content

Commit

Permalink
[DSIP-69] Fix master dispatch task timeout might cause task duplicate…
Browse files Browse the repository at this point in the history
… running in worker
  • Loading branch information
pegasas committed Sep 4, 2024
1 parent 0f4bce1 commit e2dca9a
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class WorkerConfig implements Validator {

private int listenPort = 1234;
private int execThreads = 10;
private long registerInitialDelay = 60L;
private long registerDelay = 60L;
private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
private int hostWeight = 100;
private WorkerServerLoadProtection serverLoadProtection = new WorkerServerLoadProtection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.dolphinscheduler.server.worker.runner.TaskCoordinator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand All @@ -40,12 +41,16 @@ public class TaskInstanceExecutionEventAckListenerImpl implements ITaskInstanceE
@Autowired
private MessageRetryRunner messageRetryRunner;

@Autowired
private TaskCoordinator taskCoordinator;

@Override
public void handleTaskInstanceDispatchedEventAck(TaskInstanceExecutionDispatchedEventAck taskInstanceExecutionDispatchedEventAck) {
try {
final int taskInstanceId = taskInstanceExecutionDispatchedEventAck.getTaskInstanceId();
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
log.info("Receive TaskInstanceDispatchedEventAck: {}", taskInstanceExecutionDispatchedEventAck);
taskCoordinator.onDispatchedEventAck(taskInstanceId);
if (taskInstanceExecutionDispatchedEventAck.isSuccess()) {
messageRetryRunner.removeRetryMessage(taskInstanceId, TaskInstanceExecutionEventType.DISPATCH);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.worker.runner;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.master.transportor.ITaskExecutionEvent;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.message.TaskInstanceExecutionEventSender;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.kyuubi.shade.org.apache.arrow.flatbuf.Int;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TaskCoordinator {

private final Map<Integer, Integer> dispatchedAckMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService scheduledExecutorService;

private final WorkerConfig workerConfig;

@Autowired
private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;

@Autowired
private WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder;

@Autowired
private WorkerMessageSender workerMessageSender;

public TaskCoordinator(
WorkerConfig workerConfig,
WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool,
WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder,
WorkerMessageSender workerMessageSender) {
this.workerConfig = workerConfig;
this.scheduledExecutorService = Executors.newScheduledThreadPool(
this.workerConfig.getExecThreads(),
new ThreadFactoryBuilder().setNameFormat("TaskCoordinator").setDaemon(true).build());
this.workerTaskExecutorThreadPool = workerTaskExecutorThreadPool;
this.workerTaskExecutorFactoryBuilder = workerTaskExecutorFactoryBuilder;
this.workerMessageSender = workerMessageSender;
}

public boolean publishDispatchedEvent(WorkerTaskExecutor workerTaskExecutor) {
if (TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy())) {
WorkerTaskExecutorHolder.put(workerTaskExecutor);
sendDispatchedEvent(workerTaskExecutor);
return true;
}
if (workerTaskExecutorThreadPool.isOverload()) {
log.warn("WorkerTaskExecutorThreadPool is overload, cannot submit new WorkerTaskExecutor");
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
return false;
}
WorkerTaskExecutorHolder.put(workerTaskExecutor);
sendDispatchedEvent(workerTaskExecutor);
return true;
}

protected void sendDispatchedEvent(WorkerTaskExecutor workerTaskExecutor) {
workerMessageSender.sendMessageWithRetry(
workerTaskExecutor.getTaskExecutionContext(),
ITaskExecutionEvent.TaskInstanceExecutionEventType.DISPATCH);
}

public void onDispatchedEventAck(int taskInstanceId) {
dispatchedAckMap.put(taskInstanceId, taskInstanceId);
}

public boolean register(TaskExecutionContext taskExecutionContext) {
synchronized (TaskCoordinator.class) {
WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
.createWorkerTaskExecutorFactory(taskExecutionContext)
.createWorkerTaskExecutor();

if (!publishDispatchedEvent(workerTaskExecutor)) {
log.info("Abort task: {} publishDispatchedEvent failed", taskExecutionContext.getTaskName());
return false;
}

this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (!this.dispatchedAckMap.containsKey(taskExecutionContext.getTaskInstanceId())) {
log.info("Abort task: {}", taskExecutionContext.getTaskName());
}

TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());

if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
} else {

log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
}
},
this.workerConfig.getRegisterInitialDelay(),
this.workerConfig.getRegisterDelay(),
TimeUnit.SECONDS);
return true;
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,47 +34,23 @@
@Slf4j
public class WorkerTaskExecutorThreadPool {

private final WorkerMessageSender workerMessageSender;

private final ThreadPoolExecutor threadPoolExecutor;

private final WorkerConfig workerConfig;

public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig, WorkerMessageSender workerMessageSender) {
public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) {
this.threadPoolExecutor =
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", workerConfig.getExecThreads());
threadPoolExecutor.prestartAllCoreThreads();
this.workerConfig = workerConfig;
this.workerMessageSender = workerMessageSender;

WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(this::getWaitingTaskExecutorSize);
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(this::getRunningTaskExecutorSize);
}

public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) {
synchronized (WorkerTaskExecutorThreadPool.class) {
if (TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy())) {
WorkerTaskExecutorHolder.put(workerTaskExecutor);
sendDispatchedEvent(workerTaskExecutor);
threadPoolExecutor.execute(workerTaskExecutor);
return true;
}
if (isOverload()) {
log.warn("WorkerTaskExecutorThreadPool is overload, cannot submit new WorkerTaskExecutor");
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
return false;
}
WorkerTaskExecutorHolder.put(workerTaskExecutor);
sendDispatchedEvent(workerTaskExecutor);
threadPoolExecutor.execute(workerTaskExecutor);
return true;
}
}

private void sendDispatchedEvent(WorkerTaskExecutor workerTaskExecutor) {
workerMessageSender.sendMessageWithRetry(
workerTaskExecutor.getTaskExecutionContext(),
ITaskExecutionEvent.TaskInstanceExecutionEventType.DISPATCH);
threadPoolExecutor.execute(workerTaskExecutor);
return true;
}

public boolean isOverload() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import org.apache.dolphinscheduler.server.worker.runner.TaskCoordinator;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -44,21 +40,13 @@ public class TaskInstanceDispatchOperationFunction
private WorkerConfig workerConfig;

@Autowired
private WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder;

@Autowired
private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;

@Autowired
private WorkerMessageSender workerMessageSender;
private TaskCoordinator taskCoordinator;

public TaskInstanceDispatchOperationFunction(
WorkerConfig workerConfig,
WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder,
WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool) {
TaskCoordinator taskCoordinator) {
this.workerConfig = workerConfig;
this.workerTaskExecutorFactoryBuilder = workerTaskExecutorFactoryBuilder;
this.workerTaskExecutorThreadPool = workerTaskExecutorThreadPool;
this.taskCoordinator = taskCoordinator;
}

@Override
Expand All @@ -79,19 +67,16 @@ public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInst
"server is not running");
}

TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());

WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
.createWorkerTaskExecutorFactory(taskExecutionContext)
.createWorkerTaskExecutor();
if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
if (!taskCoordinator.register(taskExecutionContext)) {
log.info("Register task: {} to taskCoordinator failed", taskExecutionContext.getTaskName());
return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
"WorkerManagerThread is full");
} else {
log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
log.info("Register task: {} to taskCoordinator success", taskExecutionContext.getTaskName());
return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
}


} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.runner.TaskCoordinator;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;

import lombok.NonNull;
Expand Down
Loading

0 comments on commit e2dca9a

Please sign in to comment.