diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java index e3d564ddd870dc..c9b0dfcf1ae1db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java @@ -959,6 +959,12 @@ public void updateUserPropertyInternal(String user, List> p Env.getCurrentEnv().getEditLog().logUpdateUserProperty(propertyInfo); } LOG.info("finished to set properties for user: {}", user); + } catch (DdlException e) { + if (isReplay && e.getMessage().contains("Unknown user property")) { + LOG.warn("ReplayUpdateUserProperty failed, maybe FE rolled back version, " + e.getMessage()); + } else { + throw e; + } } finally { writeUnlock(); } @@ -1000,6 +1006,15 @@ public long getMaxQueryInstances(String qualifiedUser) { } } + public int getParallelFragmentExecInstanceNum(String qualifiedUser) { + readLock(); + try { + return propertyMgr.getParallelFragmentExecInstanceNum(qualifiedUser); + } finally { + readUnlock(); + } + } + public String[] getSqlBlockRules(String qualifiedUser) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java index 15205c273c13f9..63365e1280c94c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java @@ -45,6 +45,8 @@ public class CommonUserProperties implements Writable { // The maximum total number of query instances that the user is allowed to send from this FE @SerializedName("maxQueryInstances") private long maxQueryInstances = -1; + @SerializedName("parallelFragmentExecInstanceNum") + private int parallelFragmentExecInstanceNum = -1; @SerializedName("sqlBlockRules") private String sqlBlockRules = ""; @SerializedName("cpuResourceLimit") @@ -75,6 +77,10 @@ long getMaxQueryInstances() { return maxQueryInstances; } + int getParallelFragmentExecInstanceNum() { + return parallelFragmentExecInstanceNum; + } + String getSqlBlockRules() { return sqlBlockRules; } @@ -91,6 +97,10 @@ void setMaxQueryInstances(long maxQueryInstances) { this.maxQueryInstances = maxQueryInstances; } + void setParallelFragmentExecInstanceNum(int parallelFragmentExecInstanceNum) { + this.parallelFragmentExecInstanceNum = parallelFragmentExecInstanceNum; + } + void setSqlBlockRules(String sqlBlockRules) { this.sqlBlockRules = sqlBlockRules; setSqlBlockRulesSplit(sqlBlockRules); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java index 2ee8bc1e82c587..2375db0d920cf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java @@ -63,6 +63,7 @@ public class UserProperty implements Writable { // advanced properties private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections"; private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances"; + private static final String PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; private static final String PROP_RESOURCE_TAGS = "resource_tags"; private static final String PROP_RESOURCE = "resource"; private static final String PROP_SQL_BLOCK_RULES = "sql_block_rules"; @@ -113,6 +114,8 @@ public class UserProperty implements Writable { ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_LOAD_CLUSTER + "." + DppConfig.CLUSTER_NAME_REGEX + "." + DppConfig.PRIORITY + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_QUERY_INSTANCES + "$", Pattern.CASE_INSENSITIVE)); + ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + "$", + Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_SQL_BLOCK_RULES + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_CPU_RESOURCE_LIMIT + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE_TAGS + "$", Pattern.CASE_INSENSITIVE)); @@ -154,6 +157,10 @@ public long getMaxQueryInstances() { return commonProperties.getMaxQueryInstances(); // maxQueryInstances; } + public int getParallelFragmentExecInstanceNum() { + return commonProperties.getParallelFragmentExecInstanceNum(); + } + public String[] getSqlBlockRules() { return commonProperties.getSqlBlockRulesSplit(); } @@ -187,6 +194,7 @@ public void update(List> properties, boolean isReplay) thro // copy long newMaxConn = this.commonProperties.getMaxConn(); long newMaxQueryInstances = this.commonProperties.getMaxQueryInstances(); + int newParallelFragmentExecInstanceNum = this.commonProperties.getParallelFragmentExecInstanceNum(); String sqlBlockRules = this.commonProperties.getSqlBlockRules(); int cpuResourceLimit = this.commonProperties.getCpuResourceLimit(); Set resourceTags = this.commonProperties.getResourceTags(); @@ -242,6 +250,17 @@ public void update(List> properties, boolean isReplay) thro } catch (NumberFormatException e) { throw new DdlException(PROP_MAX_QUERY_INSTANCES + " is not number"); } + } else if (keyArr[0].equalsIgnoreCase(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)) { + // set property "parallel_fragment_exec_instance_num" = "16" + if (keyArr.length != 1) { + throw new DdlException(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + " format error"); + } + + try { + newParallelFragmentExecInstanceNum = Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new DdlException(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + " is not number"); + } } else if (keyArr[0].equalsIgnoreCase(PROP_SQL_BLOCK_RULES)) { // set property "sql_block_rules" = "test_rule1,test_rule2" if (keyArr.length != 1) { @@ -337,6 +356,7 @@ public void update(List> properties, boolean isReplay) thro // set this.commonProperties.setMaxConn(newMaxConn); this.commonProperties.setMaxQueryInstances(newMaxQueryInstances); + this.commonProperties.setParallelFragmentExecInstanceNum(newParallelFragmentExecInstanceNum); this.commonProperties.setSqlBlockRules(sqlBlockRules); this.commonProperties.setCpuResourceLimit(cpuResourceLimit); this.commonProperties.setResourceTags(resourceTags); @@ -456,6 +476,10 @@ public List> fetchProperty() { result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES, String.valueOf(commonProperties.getMaxQueryInstances()))); + // parallel fragment exec instance num + result.add(Lists.newArrayList(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM, + String.valueOf(commonProperties.getParallelFragmentExecInstanceNum()))); + // sql block rules result.add(Lists.newArrayList(PROP_SQL_BLOCK_RULES, commonProperties.getSqlBlockRules())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java index d4d34af2538e74..8f634bccbe3d28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java @@ -119,6 +119,15 @@ public long getMaxQueryInstances(String qualifiedUser) { return existProperty.getMaxQueryInstances(); } + public int getParallelFragmentExecInstanceNum(String qualifiedUser) { + UserProperty existProperty = propertyMap.get(qualifiedUser); + existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty); + if (existProperty == null) { + return -1; + } + return existProperty.getParallelFragmentExecInstanceNum(); + } + public Set getResourceTags(String qualifiedUser) { UserProperty existProperty = propertyMap.get(qualifiedUser); existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 27b8a02a2c1096..7578da745e030f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1969,6 +1969,14 @@ public void setEnableFoldConstantByBe(boolean foldConstantByBe) { } public int getParallelExecInstanceNum() { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext != null && connectContext.getEnv() != null && connectContext.getEnv().getAuth() != null) { + int userParallelExecInstanceNum = connectContext.getEnv().getAuth() + .getParallelFragmentExecInstanceNum(connectContext.getQualifiedUser()); + if (userParallelExecInstanceNum > 0) { + return userParallelExecInstanceNum; + } + } if (getEnablePipelineEngine() && parallelPipelineTaskNum == 0) { int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize(); int autoInstance = (size + 1) / 2; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java index 2a3252caadc0ff..a6d63dcf23ba21 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java @@ -104,6 +104,7 @@ public void testUpdate() throws UserException { properties.add(Pair.of("load_cluster.dpp-cluster.hadoop_palo_path", "/user/palo2")); properties.add(Pair.of("default_load_cluster", "dpp-cluster")); properties.add(Pair.of("max_qUERY_instances", "3000")); + properties.add(Pair.of("parallel_fragment_exec_instance_num", "2000")); properties.add(Pair.of("sql_block_rules", "rule1,rule2")); properties.add(Pair.of("cpu_resource_limit", "2")); properties.add(Pair.of("query_timeout", "500")); @@ -114,6 +115,7 @@ public void testUpdate() throws UserException { Assert.assertEquals("/user/palo2", userProperty.getLoadClusterInfo("dpp-cluster").second.getPaloPath()); Assert.assertEquals("dpp-cluster", userProperty.getDefaultLoadCluster()); Assert.assertEquals(3000, userProperty.getMaxQueryInstances()); + Assert.assertEquals(2000, userProperty.getParallelFragmentExecInstanceNum()); Assert.assertEquals(new String[]{"rule1", "rule2"}, userProperty.getSqlBlockRules()); Assert.assertEquals(2, userProperty.getCpuResourceLimit()); Assert.assertEquals(500, userProperty.getQueryTimeout()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java index 20e540136a62b2..baf5486195fae4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java @@ -280,7 +280,7 @@ public void test() throws Exception { Assert.assertEquals(1000000, execMemLimit); List> userProps = Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER); - Assert.assertEquals(10, userProps.size()); + Assert.assertEquals(11, userProps.size()); // now : // be1 be2 be3 ==>tag1;