Skip to content

Commit

Permalink
[chore](user) Add user property parallel_fragment_exec_instance_num (
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored and stephen committed Dec 28, 2023
1 parent d213f46 commit 94c7cc1
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,12 @@ public void updateUserPropertyInternal(String user, List<Pair<String, String>> p
Env.getCurrentEnv().getEditLog().logUpdateUserProperty(propertyInfo);
}
LOG.info("finished to set properties for user: {}", user);
} catch (DdlException e) {
if (isReplay && e.getMessage().contains("Unknown user property")) {
LOG.warn("ReplayUpdateUserProperty failed, maybe FE rolled back version, " + e.getMessage());
} else {
throw e;
}
} finally {
writeUnlock();
}
Expand Down Expand Up @@ -1000,6 +1006,15 @@ public long getMaxQueryInstances(String qualifiedUser) {
}
}

public int getParallelFragmentExecInstanceNum(String qualifiedUser) {
readLock();
try {
return propertyMgr.getParallelFragmentExecInstanceNum(qualifiedUser);
} finally {
readUnlock();
}
}

public String[] getSqlBlockRules(String qualifiedUser) {
readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class CommonUserProperties implements Writable {
// The maximum total number of query instances that the user is allowed to send from this FE
@SerializedName("maxQueryInstances")
private long maxQueryInstances = -1;
@SerializedName("parallelFragmentExecInstanceNum")
private int parallelFragmentExecInstanceNum = -1;
@SerializedName("sqlBlockRules")
private String sqlBlockRules = "";
@SerializedName("cpuResourceLimit")
Expand Down Expand Up @@ -75,6 +77,10 @@ long getMaxQueryInstances() {
return maxQueryInstances;
}

int getParallelFragmentExecInstanceNum() {
return parallelFragmentExecInstanceNum;
}

String getSqlBlockRules() {
return sqlBlockRules;
}
Expand All @@ -91,6 +97,10 @@ void setMaxQueryInstances(long maxQueryInstances) {
this.maxQueryInstances = maxQueryInstances;
}

void setParallelFragmentExecInstanceNum(int parallelFragmentExecInstanceNum) {
this.parallelFragmentExecInstanceNum = parallelFragmentExecInstanceNum;
}

void setSqlBlockRules(String sqlBlockRules) {
this.sqlBlockRules = sqlBlockRules;
setSqlBlockRulesSplit(sqlBlockRules);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class UserProperty implements Writable {
// advanced properties
private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections";
private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances";
private static final String PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
private static final String PROP_RESOURCE_TAGS = "resource_tags";
private static final String PROP_RESOURCE = "resource";
private static final String PROP_SQL_BLOCK_RULES = "sql_block_rules";
Expand Down Expand Up @@ -113,6 +114,8 @@ public class UserProperty implements Writable {
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_LOAD_CLUSTER + "." + DppConfig.CLUSTER_NAME_REGEX + "."
+ DppConfig.PRIORITY + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_QUERY_INSTANCES + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + "$",
Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_SQL_BLOCK_RULES + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_CPU_RESOURCE_LIMIT + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE_TAGS + "$", Pattern.CASE_INSENSITIVE));
Expand Down Expand Up @@ -154,6 +157,10 @@ public long getMaxQueryInstances() {
return commonProperties.getMaxQueryInstances(); // maxQueryInstances;
}

public int getParallelFragmentExecInstanceNum() {
return commonProperties.getParallelFragmentExecInstanceNum();
}

public String[] getSqlBlockRules() {
return commonProperties.getSqlBlockRulesSplit();
}
Expand Down Expand Up @@ -187,6 +194,7 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
// copy
long newMaxConn = this.commonProperties.getMaxConn();
long newMaxQueryInstances = this.commonProperties.getMaxQueryInstances();
int newParallelFragmentExecInstanceNum = this.commonProperties.getParallelFragmentExecInstanceNum();
String sqlBlockRules = this.commonProperties.getSqlBlockRules();
int cpuResourceLimit = this.commonProperties.getCpuResourceLimit();
Set<Tag> resourceTags = this.commonProperties.getResourceTags();
Expand Down Expand Up @@ -242,6 +250,17 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
} catch (NumberFormatException e) {
throw new DdlException(PROP_MAX_QUERY_INSTANCES + " is not number");
}
} else if (keyArr[0].equalsIgnoreCase(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)) {
// set property "parallel_fragment_exec_instance_num" = "16"
if (keyArr.length != 1) {
throw new DdlException(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + " format error");
}

try {
newParallelFragmentExecInstanceNum = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new DdlException(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + " is not number");
}
} else if (keyArr[0].equalsIgnoreCase(PROP_SQL_BLOCK_RULES)) {
// set property "sql_block_rules" = "test_rule1,test_rule2"
if (keyArr.length != 1) {
Expand Down Expand Up @@ -337,6 +356,7 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
// set
this.commonProperties.setMaxConn(newMaxConn);
this.commonProperties.setMaxQueryInstances(newMaxQueryInstances);
this.commonProperties.setParallelFragmentExecInstanceNum(newParallelFragmentExecInstanceNum);
this.commonProperties.setSqlBlockRules(sqlBlockRules);
this.commonProperties.setCpuResourceLimit(cpuResourceLimit);
this.commonProperties.setResourceTags(resourceTags);
Expand Down Expand Up @@ -456,6 +476,10 @@ public List<List<String>> fetchProperty() {
result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES,
String.valueOf(commonProperties.getMaxQueryInstances())));

// parallel fragment exec instance num
result.add(Lists.newArrayList(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM,
String.valueOf(commonProperties.getParallelFragmentExecInstanceNum())));

// sql block rules
result.add(Lists.newArrayList(PROP_SQL_BLOCK_RULES, commonProperties.getSqlBlockRules()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ public long getMaxQueryInstances(String qualifiedUser) {
return existProperty.getMaxQueryInstances();
}

public int getParallelFragmentExecInstanceNum(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
if (existProperty == null) {
return -1;
}
return existProperty.getParallelFragmentExecInstanceNum();
}

public Set<Tag> getResourceTags(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,14 @@ public void setEnableFoldConstantByBe(boolean foldConstantByBe) {
}

public int getParallelExecInstanceNum() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getEnv() != null && connectContext.getEnv().getAuth() != null) {
int userParallelExecInstanceNum = connectContext.getEnv().getAuth()
.getParallelFragmentExecInstanceNum(connectContext.getQualifiedUser());
if (userParallelExecInstanceNum > 0) {
return userParallelExecInstanceNum;
}
}
if (getEnablePipelineEngine() && parallelPipelineTaskNum == 0) {
int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
int autoInstance = (size + 1) / 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void testUpdate() throws UserException {
properties.add(Pair.of("load_cluster.dpp-cluster.hadoop_palo_path", "/user/palo2"));
properties.add(Pair.of("default_load_cluster", "dpp-cluster"));
properties.add(Pair.of("max_qUERY_instances", "3000"));
properties.add(Pair.of("parallel_fragment_exec_instance_num", "2000"));
properties.add(Pair.of("sql_block_rules", "rule1,rule2"));
properties.add(Pair.of("cpu_resource_limit", "2"));
properties.add(Pair.of("query_timeout", "500"));
Expand All @@ -114,6 +115,7 @@ public void testUpdate() throws UserException {
Assert.assertEquals("/user/palo2", userProperty.getLoadClusterInfo("dpp-cluster").second.getPaloPath());
Assert.assertEquals("dpp-cluster", userProperty.getDefaultLoadCluster());
Assert.assertEquals(3000, userProperty.getMaxQueryInstances());
Assert.assertEquals(2000, userProperty.getParallelFragmentExecInstanceNum());
Assert.assertEquals(new String[]{"rule1", "rule2"}, userProperty.getSqlBlockRules());
Assert.assertEquals(2, userProperty.getCpuResourceLimit());
Assert.assertEquals(500, userProperty.getQueryTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public void test() throws Exception {
Assert.assertEquals(1000000, execMemLimit);

List<List<String>> userProps = Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER);
Assert.assertEquals(10, userProps.size());
Assert.assertEquals(11, userProps.size());

// now :
// be1 be2 be3 ==>tag1;
Expand Down

0 comments on commit 94c7cc1

Please sign in to comment.