Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadGroupsScanner::_s_tbls_colu
{"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"TAG", TYPE_VARCHAR, sizeof(StringRef), true},
};

SchemaWorkloadGroupsScanner::SchemaWorkloadGroupsScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;

import java.util.Map;

Expand Down Expand Up @@ -69,6 +70,11 @@ public void analyze(Analyzer analyzer) throws UserException {
if (properties == null || properties.isEmpty()) {
throw new AnalysisException("Resource group properties can't be null");
}

String wgTag = properties.get(WorkloadGroup.TAG);
if (wgTag != null) {
FeNameFormat.checkCommonName("workload group tag", wgTag);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ public class SchemaTable extends Table {
.column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("TAG", ScalarType.createVarchar(256))
.build()))
.put("processlist", new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA,
builder().column("ID", ScalarType.createType(PrimitiveType.LARGEINT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,17 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {

public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark";

public static final String TAG = "tag";

// NOTE(wb): all property is not required, some properties default value is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK).build();
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
.add(TAG).build();

public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
Expand Down Expand Up @@ -142,6 +145,9 @@ private WorkloadGroup(long id, String name, Map<String, String> properties, long
}
this.properties.put(SPILL_THRESHOLD_HIGH_WATERMARK, highWatermarkStr);
}
if (properties.containsKey(TAG)) {
this.properties.put(TAG, properties.get(TAG).toLowerCase());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trim the tag value

}
resetQueueProperty(properties);
}

Expand Down Expand Up @@ -206,11 +212,25 @@ private static void checkProperties(Map<String, String> properties) throws DdlEx

if (properties.containsKey(CPU_HARD_LIMIT)) {
String cpuHardLimit = properties.get(CPU_HARD_LIMIT);
if (cpuHardLimit.endsWith("%")) {
cpuHardLimit = cpuHardLimit.substring(0, cpuHardLimit.length() - 1);
}
if (!StringUtils.isNumeric(cpuHardLimit) || Long.parseLong(cpuHardLimit) <= 0) {
throw new DdlException(CPU_HARD_LIMIT + " " + cpuHardLimit + " requires a positive integer.");
String originValue = cpuHardLimit;
try {
boolean endWithSign = false;
if (cpuHardLimit.endsWith("%")) {
cpuHardLimit = cpuHardLimit.substring(0, cpuHardLimit.length() - 1);
endWithSign = true;
}

int intVal = Integer.parseInt(cpuHardLimit);
if (endWithSign && intVal == -1) {
throw new NumberFormatException();
}
if (!(intVal >= 1 && intVal <= 100) && -1 != intVal) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new DdlException(
"workload group's " + WorkloadGroup.CPU_HARD_LIMIT
+ " must be a positive integer[1,100] or -1, but input value is " + originValue);
}
}

Expand Down Expand Up @@ -395,7 +415,9 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
if (CPU_HARD_LIMIT.equals(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) { // cpu_hard_limit is not required
row.add("0%");
row.add("-1");
} else if ("-1".equals(val)) {
row.add(val);
} else {
row.add(val + "%");
}
Expand Down Expand Up @@ -431,6 +453,13 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum()));
} else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum()));
} else if (TAG.equals(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) {
row.add("");
} else {
row.add(val);
}
} else {
row.add(properties.get(key));
}
Expand All @@ -442,6 +471,10 @@ public int getCpuHardLimit() {
return cpuHardLimit;
}

public String getTag() {
return properties.get(TAG);
}

@Override
public String toString() {
return GsonUtils.GSON.toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -59,7 +60,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
Expand All @@ -73,6 +73,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
.add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK)
.add(WorkloadGroup.TAG)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
.build();

Expand Down Expand Up @@ -301,38 +302,44 @@ public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlExceptio
LOG.info("Create workload group success: {}", workloadGroup);
}

private void checkGlobalUnlock(WorkloadGroup workloadGroup, WorkloadGroup old) throws DdlException {
double totalMemoryLimit = idToWorkloadGroup.values().stream().mapToDouble(WorkloadGroup::getMemoryLimitPercent)
.sum() + workloadGroup.getMemoryLimitPercent();
if (!Objects.isNull(old)) {
totalMemoryLimit -= old.getMemoryLimitPercent();
}
if (totalMemoryLimit > 100.0 + 1e-6) {
throw new DdlException(
"The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " cannot be greater than 100.0%.");
}

// 1, check new group
int newGroupCpuHardLimit = workloadGroup.getCpuHardLimit();
if (newGroupCpuHardLimit > 100 || newGroupCpuHardLimit < 0) {
throw new DdlException(
"new group's " + WorkloadGroup.CPU_HARD_LIMIT
+ " value can not be greater than 100% or less than or equal 0%");
}

// 2, check sum of all cpu hard limit
// NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit
// when create/alter workload group with same tag.
// when oldWg is null it means caller is an alter stmt.
private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException {
String wgTag = newWg.getTag();
double sumOfAllMemLimit = 0;
int sumOfAllCpuHardLimit = 0;
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
if (old != null && entry.getKey() == old.getId()) {
WorkloadGroup wg = entry.getValue();
if (!StringUtils.equals(wgTag, wg.getTag())) {
continue;
}

if (oldWg != null && entry.getKey() == oldWg.getId()) {
continue;
}
sumOfAllCpuHardLimit += entry.getValue().getCpuHardLimit();

if (wg.getCpuHardLimit() > 0) {
sumOfAllCpuHardLimit += wg.getCpuHardLimit();
}
if (wg.getMemoryLimitPercent() > 0) {
sumOfAllMemLimit += wg.getMemoryLimitPercent();
}
}

sumOfAllMemLimit += newWg.getMemoryLimitPercent();
sumOfAllCpuHardLimit += newWg.getCpuHardLimit();

if (sumOfAllMemLimit > 100.0 + 1e-6) {
throw new DdlException(
"The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag
+ " cannot be greater than 100.0%.");
}
sumOfAllCpuHardLimit += newGroupCpuHardLimit;

if (sumOfAllCpuHardLimit > 100) {
throw new DdlException("sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT
+ " can not be greater than 100% ");
throw new DdlException(
"sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag "
+ wgTag + " can not be greater than 100% ");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,11 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TSchemaT
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10))));
// min remote scan thread num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We miss some logic?
When publish topic to BE, we should use tag to match the Backends?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider it when support multi process.

trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12))); // spill low watermark
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13))); // spill high watermark
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14))); // tag
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15))); // running query num
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(16))); // waiting query num
dataBatch.add(trow);
}

Expand Down
24 changes: 16 additions & 8 deletions regression-test/data/workload_manager_p0/test_curd_wlg.out
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
2

-- !show_1 --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 10% true 2147483647 0 0 0% -1
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 10% true 2147483647 0 0 -1 -1

-- !mem_limit_1 --
2

-- !mem_limit_2 --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% true 2147483647 0 0 0% -1
test_group 10 11% true 2147483647 0 0 -1 -1

-- !mem_overcommit_1 --
2
Expand All @@ -24,7 +24,7 @@ test_group 10 11% true 2147483647 0 0 0% -1

-- !mem_overcommit_3 --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 2147483647 0 0 0% -1
test_group 10 11% false 2147483647 0 0 -1 -1

-- !cpu_hard_limit_1 --
2
Expand All @@ -45,14 +45,22 @@ normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 100 0 0 20% -1

-- !show_spill_1 --
spill_group_test 1024 0% true 2147483647 0 0 0% -1 10% 10%
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 10% 10%

-- !show_spill_1 --
spill_group_test 1024 0% true 2147483647 0 0 0% -1 -1 10%
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 -1 10%

-- !show_spill_2 --
spill_group_test 1024 0% true 2147483647 0 0 0% -1 5% 10%
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 10%

-- !show_spill_3 --
spill_group_test 1024 0% true 2147483647 0 0 0% -1 5% 40%
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 40%

-- !show_wg_tag --
tag1_mem_wg1 50% -1 mem_tag1
tag1_mem_wg2 49% -1 mem_tag1
tag1_mem_wg3 1% -1 mem_tag1
tag1_wg1 0% 10% tag1
tag1_wg2 0% 10% tag1
tag1_wg3 0% 80% tag1

Loading