Skip to content

Commit

Permalink
[Enhance](resource group)db support replication_allocation (apache#25195
Browse files Browse the repository at this point in the history
)

- db support replication_allocation,when create table,if not set `replication_num` or `replication_allocation `,will use it in db
- fix partition property will disappear when table partition is not null
  • Loading branch information
zddr authored and morningman committed Oct 30, 2023
1 parent 3c973ce commit a2d9791
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 32 deletions.
39 changes: 37 additions & 2 deletions docs/en/docs/admin-manual/multi-tenant.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,15 @@ Node resource division refers to setting tags for BE nodes in a Doris cluster, a
│ │
└────────────────────────────────────────────────────┘
```


For the convenience of setting the data distribution strategy for tables, a unified data distribution strategy can be set at the database level, but the priority of setting tables is higher than that of the database

```sql
CREATE DATABASE db_name PROPERTIES (
"replication_allocation" = "tag.location.group_a:1, tag.location.group_b:1"
)
```

3. Use different resource groups for data query

After the execution of the first two steps is completed, we can limit a user's query by setting the user's resource usage permissions, and can only use the nodes in the specified resource group to execute.
Expand Down Expand Up @@ -189,7 +197,7 @@ Through memory and CPU resource limits. We can divide user queries into more fin

## Best practices and forward compatibility

Tag division and CPU limitation are new features in version 0.15. In order to ensure a smooth upgrade from the old version, Doris has made the following forward compatibility:
### Tag division and CPU limitation are new features in version 0.15. In order to ensure a smooth upgrade from the old version, Doris has made the following forward compatibility:

1. Each BE node will have a default Tag: `"tag.location": "default"`.
2. The BE node added through the `alter system add backend` statement will also set Tag: `"tag.location": "default"` by default.
Expand Down Expand Up @@ -234,3 +242,30 @@ Here we give an example of the steps to start using the resource division functi


Through the above 4 steps, we can smoothly use the resource division function after the original cluster is upgraded.

### How to conveniently set replica distribution strategies when there are many tables

For example, there is a db1 with four tables under it, and the replica distribution strategy required for table1 is `group_a:1,group_b:2`, the replica distribution strategy required for tables 2, 3, and 4 is `group_c:1,group_b:2`

Then you can use the following statement to create db1:

```sql
CREATE DATABASE db1 PROPERTIES (
"replication_allocation" = "tag.location.group_a:1, tag.location.group_b:2"
)
```

Create table1 using the following statement:

```sql
CREATE TABLE table1
(k1 int, k2 int)
distributed by hash(k1) buckets 1
properties(
"replication_allocation"="tag.location.group_c:1, tag.location.group_b:2"
)
```

The table creation statements for table2, table3, and table4 do not need to specify `replication_allocation` again.

Note: Changing the replica distribution policy of the database will not affect existing tables.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ illustrate:
After renaming the database, use the REVOKE and GRANT commands to modify the appropriate user permissions, if necessary.
The default data quota for the database is 1024GB, and the default replica quota is 1073741824.

4) Modify the properties of an existing database

```sql
ALTER DATABASE db_name SET PROPERTIES ("key"="value", ...);
```

### Example

1. Set the specified database data volume quota
Expand All @@ -82,6 +88,18 @@ ALTER DATABASE example_db RENAME example_db2;
ALTER DATABASE example_db SET REPLICA QUOTA 102400;
```

4. Modify the default replica distribution policy for tables in db (this operation only applies to newly created tables and will not modify existing tables in db)

```sql
ALTER DATABASE example_db SET PROPERTIES("replication_allocation" = "tag.location.default:2");
```

5. Cancel the default replica distribution policy for tables in db (this operation only applies to newly created tables and will not modify existing tables in db)

```sql
ALTER DATABASE example_db SET PROPERTIES("replication_allocation" = "");
```

### Keywords

```text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ CREATE DATABASE [IF NOT EXISTS] db_name
- `iceberg.hive.metastore.uris` : hive metastore service address;
- `iceberg.catalog.type`: The default is `HIVE_CATALOG`; currently only `HIVE_CATALOG` is supported, and more Iceberg catalog types will be supported in the future.
- If you want to specify the default replica distribution for tables in db, you need to specify `replication_allocation` (the `replication_allocation` attribute of table will have higher priority than db)
```sql
PROPERTIES (
"replication_allocation" = "tag.location.default:3"
)
```

### Example

1. Create a new database db_test
Expand Down
37 changes: 36 additions & 1 deletion docs/zh-CN/docs/admin-manual/multi-tenant.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ FE 不参与用户数据的处理计算等工作,因此是一个资源消耗
│ │
└────────────────────────────────────────────────────┘
```

为了方便设置table的数据分布策略,可以在database层面设置统一的数据分布策略,但是table设置的优先级高于database

```sql
CREATE DATABASE db_name PROPERTIES (
"replication_allocation" = "tag.location.group_a:1, tag.location.group_b:1"
)
```

3. 使用不同资源组进行数据查询

Expand Down Expand Up @@ -188,7 +196,7 @@ FE 不参与用户数据的处理计算等工作,因此是一个资源消耗

## 最佳实践和向前兼容

Tag 划分和 CPU 限制是 0.15 版本中的新功能。为了保证可以从老版本平滑升级,Doris 做了如下的向前兼容:
### Tag 划分和 CPU 限制是 0.15 版本中的新功能。为了保证可以从老版本平滑升级,Doris 做了如下的向前兼容:

1. 每个 BE 节点会有一个默认的 Tag:`"tag.location": "default"`
2. 通过 `alter system add backend` 语句新增的 BE 节点也会默认设置 Tag:`"tag.location": "default"`
Expand Down Expand Up @@ -232,3 +240,30 @@ Tag 划分和 CPU 限制是 0.15 版本中的新功能。为了保证可以从
等数据重分布完毕后。我们就可以开始设置用户的资源标签权限了。因为默认情况下,用户的 `resource_tags.location` 属性为空,即可以访问任意 Tag 的 BE。所以在前面步骤中,不会影响到已有用户的正常查询。当 `resource_tags.location` 属性非空时,用户将被限制访问指定 Tag 的 BE。

通过以上4步,我们可以较为平滑的在原有集群升级后,使用资源划分功能。

### table数量很多时如何方便的设置副本分布策略

比如有一个 db1,db1下有四个table,table1需要的副本分布策略为 `group_a:1,group_b:2`,table2,table3, table4需要的副本分布策略为 `group_c:1,group_b:2`

那么可以使用如下语句创建db1:

```sql
CREATE DATABASE db1 PROPERTIES (
"replication_allocation" = "tag.location.group_a:1, tag.location.group_b:2"
)
```

使用如下语句创建table1:

```sql
CREATE TABLE table1
(k1 int, k2 int)
distributed by hash(k1) buckets 1
properties(
"replication_allocation"="tag.location.group_c:1, tag.location.group_b:2"
)
```

table2,table3,table4的建表语句无需再指定`replication_allocation`

注意事项:更改database的副本分布策略不会对已有的table产生影响。
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ ALTER DATABASE db_name SET REPLICA QUOTA quota;
重命名数据库后,如需要,请使用 REVOKE 和 GRANT 命令修改相应的用户权限。
数据库的默认数据量配额为1024GB,默认副本数量配额为1073741824。

4) 对已有 database 的 property 进行修改操作

```sql
ALTER DATABASE db_name SET PROPERTIES ("key"="value", ...);
```

### Example

1. 设置指定数据库数据量配额
Expand All @@ -82,6 +88,18 @@ ALTER DATABASE example_db RENAME example_db2;
ALTER DATABASE example_db SET REPLICA QUOTA 102400;
```

4. 修改db下table的默认副本分布策略(该操作仅对新建的table生效,不会修改db下已存在的table)

```sql
ALTER DATABASE example_db SET PROPERTIES("replication_allocation" = "tag.location.default:2");
```

5. 取消db下table的默认副本分布策略(该操作仅对新建的table生效,不会修改db下已存在的table)

```sql
ALTER DATABASE example_db SET PROPERTIES("replication_allocation" = "");
```

### Keywords

```text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ CREATE DATABASE [IF NOT EXISTS] db_name
- `iceberg.hive.metastore.uris` :hive metastore 服务地址;
- `iceberg.catalog.type`: 默认为 `HIVE_CATALOG`;当前仅支持 `HIVE_CATALOG`,后续会支持更多 Iceberg catalog 类型。

- 如果要为db下的table指定默认的副本分布策略,需要指定`replication_allocation`(table的`replication_allocation`属性优先级会高于db)

```sql
PROPERTIES (
"replication_allocation" = "tag.location.default:3"
)
```

### Example

1. 新建数据库 db_test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ public void analyze(Analyzer analyzer) throws UserException {
// clone properties for analyse
Map<String, String> analysisProperties = new HashMap<String, String>(properties);
PropertyAnalyzer.analyzeBinlogConfig(analysisProperties);
if (!analysisProperties.isEmpty()) {
throw new UserException("Invalid property name or value: " + analysisProperties);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.IndexDef.IndexType;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
Expand All @@ -40,6 +41,7 @@
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.external.elasticsearch.EsUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -49,6 +51,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -325,7 +328,7 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
boolean enableDuplicateWithoutKeysByDefault = false;
if (properties != null) {
enableDuplicateWithoutKeysByDefault =
PropertyAnalyzer.analyzeEnableDuplicateWithoutKeysByDefault(properties);
PropertyAnalyzer.analyzeEnableDuplicateWithoutKeysByDefault(properties);
}
//pre-block creation with column type ALL
for (ColumnDef columnDef : columnDefs) {
Expand Down Expand Up @@ -627,9 +630,10 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
}
}

private Map<String, String> rewriteReplicaAllocationProperties(Map<String, String> properties) {
private Map<String, String> rewriteReplicaAllocationProperties(Map<String, String> properties)
throws AnalysisException {
if (Config.force_olap_table_replication_num <= 0) {
return properties;
return rewriteReplicaAllocationPropertiesByDatabase(properties);
}
// if force_olap_table_replication_num is set, use this value to rewrite the replication_num or
// replication_allocation properties
Expand All @@ -655,6 +659,45 @@ private Map<String, String> rewriteReplicaAllocationProperties(Map<String, Strin
return newProperties;
}

private Map<String, String> rewriteReplicaAllocationPropertiesByDatabase(Map<String, String> properties)
throws AnalysisException {
// if table contain `replication_allocation` or `replication_allocation`,not need rewrite by db
if (properties != null && (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)
|| properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM))) {
return properties;
}
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogNullable(tableName.getCtl());
if (catalog == null) {
return properties;
}
DatabaseIf db = catalog.getDbNullable(tableName.getDb());
if (db == null) {
return properties;
}
// if db not have properties,not need rewrite
if (db.getDbProperties() == null) {
return properties;
}
Map<String, String> dbProperties = db.getDbProperties().getProperties();
if (dbProperties == null) {
return properties;
}
if (properties == null) {
properties = Maps.newHashMap();
}
if (dbProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION) && StringUtils
.isNotEmpty(dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION))) {
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION));
}
if (dbProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM) && StringUtils
.isNotEmpty(dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM))) {
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
}
return properties;
}

private void analyzeEngineName() throws AnalysisException {
if (Strings.isNullOrEmpty(engineName)) {
engineName = "olap";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.base.Joiner;
import com.google.common.base.Joiner.MapJoiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;

import java.util.Map;

Expand Down Expand Up @@ -126,9 +127,16 @@ public void analyze(int partColNum, Map<String, String> otherProperties) throws

partitionKeyDesc.analyze(partColNum);

Map<String, String> mergedMap = Maps.newHashMap();
// Should putAll `otherProperties` before `this.properties`,
// because the priority of partition is higher than table
if (otherProperties != null) {
this.properties = otherProperties;
mergedMap.putAll(otherProperties);
}
if (this.properties != null) {
mergedMap.putAll(this.properties);
}
this.properties = mergedMap;

// analyze data property
partitionDataProperty = PropertyAnalyzer.analyzeDataProperty(properties,
Expand Down
Loading

0 comments on commit a2d9791

Please sign in to comment.