Skip to content

Commit

Permalink
[DSIP-54][Master] Use ClusterManager to manage the cluster in master (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Jul 5, 2024
1 parent b539942 commit 8f6df8d
Show file tree
Hide file tree
Showing 89 changed files with 3,131 additions and 2,436 deletions.
48 changes: 24 additions & 24 deletions docs/docs/en/architecture/configuration.md

Large diffs are not rendered by default.

48 changes: 24 additions & 24 deletions docs/docs/zh/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,30 +284,30 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId

位置:`master-server/conf/application.yaml`

| 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master监听端口 |
| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command |
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight |
| master.max-heartbeat-interval | 10s | master最大心跳间隔 |
| master.task-commit-retry-times | 5 | 任务重试次数 |
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.state-wheel-interval | 5 | 轮询检查状态时间 |
| master.server-load-protection.enabled | true | 是否开启系统保护策略 |
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| master.failover-interval | 10 | failover间隔,单位为分钟 |
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |
| 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master监听端口 |
| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command |
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master 将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载,负载越低的worker将会有更高的机会被分发任务 |
| master.max-heartbeat-interval | 10s | master最大心跳间隔 |
| master.task-commit-retry-times | 5 | 任务重试次数 |
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.state-wheel-interval | 5 | 轮询检查状态时间 |
| master.server-load-protection.enabled | true | 是否开启系统保护策略 |
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| master.failover-interval | 10 | failover间隔,单位为分钟 |
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |

## Worker Server相关配置

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@
public class WorkerHeartBeat extends BaseHeartBeat implements HeartBeat {

private int workerHostWeight; // worker host weight
private int threadPoolUsage; // worker waiting task count
private double threadPoolUsage; // worker waiting task count

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.utils;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.collections4.SetUtils;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.ToString;

@Getter
@ToString
public class MapComparator<K, V> {

private final Map<K, V> oldMap;
private final Map<K, V> newMap;

public MapComparator(Map<K, V> oldMap, Map<K, V> newMap) {
this.oldMap = oldMap;
this.newMap = newMap;
}

/**
* Get keys that are in the new map but not in the old map
*/
public Set<K> getKeysToAdd() {
if (MapUtils.isEmpty(newMap)) {
return SetUtils.emptySet();
}
if (MapUtils.isEmpty(oldMap)) {
return new HashSet<>(newMap.keySet());
}
Set<K> keysToAdd = new HashSet<>(newMap.keySet());
keysToAdd.removeAll(oldMap.keySet());
return keysToAdd;
}

/**
* Get values which key in the new map but not in the old map
*/
public List<V> getValuesToAdd() {
if (MapUtils.isEmpty(newMap)) {
return Collections.emptyList();
}
return getKeysToAdd().stream().map(newMap::get).collect(Collectors.toList());
}

/**
* Get keys which in the old map but not in the new map
*/
public Set<K> getKeysToRemove() {
if (MapUtils.isEmpty(oldMap)) {
return SetUtils.emptySet();
}
if (MapUtils.isEmpty(newMap)) {
return new HashSet<>(oldMap.keySet());
}
Set<K> keysToRemove = new HashSet<>(oldMap.keySet());
keysToRemove.removeAll(newMap.keySet());
return keysToRemove;
}

/**
* Get values which key in the old map but not in the new map
*/
public List<V> getValuesToRemove() {
if (MapUtils.isEmpty(oldMap)) {
return Collections.emptyList();
}
return getKeysToRemove().stream().map(oldMap::get).collect(Collectors.toList());
}

/**
* Get keys which in both the old map and the new map, but the value is different
*/
public Set<K> getKeysToUpdate() {
if (MapUtils.isEmpty(oldMap) || MapUtils.isEmpty(newMap)) {
return SetUtils.emptySet();
}
Set<K> keysToUpdate = new HashSet<>(newMap.keySet());
keysToUpdate.retainAll(oldMap.keySet());
keysToUpdate.removeIf(key -> newMap.get(key).equals(oldMap.get(key)));

return keysToUpdate;
}

/**
* Get new values which key in both the old map and the new map, but the value is different
*/
public List<V> getNewValuesToUpdate() {
if (MapUtils.isEmpty(oldMap) || MapUtils.isEmpty(newMap)) {
return Collections.emptyList();
}
return getKeysToUpdate().stream().map(newMap::get).collect(Collectors.toList());
}
}
Loading

0 comments on commit 8f6df8d

Please sign in to comment.