Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load #9492

Merged
merged 4 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/en/admin-manual/multi-tenant.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ Node resource division refers to setting tags for BE nodes in a Doris cluster, a
In this way, we have achieved physical resource isolation for different user queries by dividing nodes and restricting user resource usage. Furthermore, we can create different users for different business departments and restrict each user from using different resource groups. In order to avoid the use of resource interference between different business parts. For example, there is a business table in the cluster that needs to be shared by all 9 business departments, but it is hoped that resource preemption between different departments can be avoided as much as possible. Then we can create 3 copies of this table and store them in 3 resource groups. Next, we create 9 users for 9 business departments, and limit the use of one resource group for every 3 users. In this way, the degree of competition for resources is reduced from 9 to 3.

On the other hand, for the isolation of online and offline tasks. We can use resource groups to achieve this. For example, we can divide nodes into two resource groups, Online and Offline. The table data is still stored in 3 copies, of which 2 copies are stored in the Online resource group, and 1 copy is stored in the Offline resource group. The Online resource group is mainly used for online data services with high concurrency and low latency. Some large queries or offline ETL operations can be executed using nodes in the Offline resource group. So as to realize the ability to provide online and offline services simultaneously in a unified cluster.

4. Resource group assignments for load job

The resource usage of load jobs (including insert, broker load, routine load, stream load, etc.) can be divided into two parts:
1. Computing resources: responsible for reading data sources, data transformation and distribution.
2. Write resource: responsible for data encoding, compression and writing to disk.

The write resource must be the node where the replica is located, and the computing resource can theoretically select any node to complete. Therefore, the allocation of resource groups for load jobs is divided into two steps:
1. Use user-level resource tags to limit the resource groups that computing resources can use.
2. Use the resource tag of the replica to limit the resource group that the write resource can use.

So if you want all the resources used by the load operation to be limited to the resource group where the data is located, you only need to set the resource tag of the user level to the same as the resource tag of the replica.

## Single query resource limit

Expand Down
14 changes: 13 additions & 1 deletion docs/zh-CN/admin-manual/multi-tenant.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ FE 不参与用户数据的处理计算等工作,因此是一个资源消耗

另一方面,针对在线和离线任务的隔离。我们可以利用资源组的方式实现。比如我们可以将节点划分为 Online 和 Offline 两个资源组。表数据依然以3副本的方式存储,其中 2 个副本存放在 Online 资源组,1 个副本存放在 Offline 资源组。Online 资源组主要用于高并发低延迟的在线数据服务,而一些大查询或离线ETL操作,则可以使用 Offline 资源组中的节点执行。从而实现在统一集群内同时提供在线和离线服务的能力。

4. 导入作业的资源组分配

导入作业(包括insert、broker load、routine load、stream load等)的资源使用可以分为两部分:
1. 计算资源:负责读取数据源、数据转换和分发。
2. 写入资源:负责数据编码、压缩并写入磁盘。

其中写入资源必须是数据副本所在的节点,而计算资源理论上可以选择任意节点完成。所以对于导入作业的资源组的分配分成两个步骤:
1. 使用用户级别的 resource tag 来限定计算资源所能使用的资源组。
2. 使用副本的 resource tag 来限定写入资源所能使用的资源组。

所以如果希望导入操作所使用的全部资源都限定在数据所在的资源组的话,只需将用户级别的 resource tag 设置为和副本的 resource tag 相同即可。

## 单查询资源限制

前面提到的资源组方法是节点级别的资源隔离和限制。而在资源组内,依然可能发生资源抢占问题。比如前文提到的将3个业务部门安排在同一资源组内。虽然降低了资源竞争程度,但是这3个部门的查询依然有可能相互影响。
Expand Down Expand Up @@ -217,4 +229,4 @@ Tag 划分和 CPU 限制是 0.15 版本中的新功能。为了保证可以从

等数据重分布完毕后。我们就可以开始设置用户的资源标签权限了。因为默认情况下,用户的 `resource_tags.location` 属性为空,即可以访问任意 Tag 的 BE。所以在前面步骤中,不会影响到已有用户的正常查询。当 `resource_tags.location` 属性非空时,用户将被限制访问指定 Tag 的 BE。

通过以上4步,我们可以较为平滑的在原有集群升级后,使用资源划分功能。
通过以上4步,我们可以较为平滑的在原有集群升级后,使用资源划分功能。
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ public final class FeMetaVersion {
public static final int VERSION_108 = 108;
// add row policy
public static final int VERSION_109 = 109;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_109;
// For routine load user info
public static final int VERSION_110 = 110;
// NOTE: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_110;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
// 1. Broker scan node
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
scanNodes.add(scanNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -102,8 +103,9 @@ public KafkaRoutineLoadJob() {
}

public KafkaRoutineLoadJob(Long id, String name, String clusterName,
long dbId, long tableId, String brokerList, String topic) {
super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA);
long dbId, long tableId, String brokerList, String topic,
UserIdentity userIdentity) {
super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA, userIdentity);
this.brokerList = brokerList;
this.topic = topic;
this.progress = new KafkaProgress();
Expand Down Expand Up @@ -393,7 +395,7 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr
long id = Catalog.getCurrentCatalog().getNextId();
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
db.getClusterName(), db.getId(), tableId,
stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
stmt.getKafkaBrokerList(), stmt.getKafkaTopic(), stmt.getUserInfo());
kafkaRoutineLoadJob.setOptional(stmt);
kafkaRoutineLoadJob.checkCustomProperties();
kafkaRoutineLoadJob.checkCustomPartition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -228,6 +229,8 @@ public boolean isFinalState() {
// this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob,
// because we can not serialize the Expressions contained in job.
protected OriginStatement origStmt;
// User who submit this job. Maybe null for the old version job(before v1.1)
protected UserIdentity userIdentity;

protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete
Expand All @@ -249,13 +252,15 @@ public RoutineLoadJob(long id, LoadDataSourceType type) {
}

public RoutineLoadJob(Long id, String name, String clusterName,
long dbId, long tableId, LoadDataSourceType dataSourceType) {
long dbId, long tableId, LoadDataSourceType dataSourceType,
UserIdentity userIdentity) {
this(id, dataSourceType);
this.name = name;
this.clusterName = clusterName;
this.dbId = dbId;
this.tableId = tableId;
this.authCode = 0;
this.userIdentity = userIdentity;

if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
Expand Down Expand Up @@ -435,6 +440,10 @@ public PartitionNames getPartitions() {
return partitions;
}

public UserIdentity getUserIdentity() {
return userIdentity;
}

@Override
public LoadTask.MergeType getMergeType() {
return mergeType;
Expand Down Expand Up @@ -1541,6 +1550,13 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}

if (userIdentity == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
userIdentity.write(out);
}
}

public void readFields(DataInput in) throws IOException {
Expand Down Expand Up @@ -1616,6 +1632,15 @@ public void readFields(DataInput in) throws IOException {
} catch (Exception e) {
throw new IOException("error happens when parsing create routine load stmt: " + origStmt.originStmt, e);
}

if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_110) {
if (in.readBoolean()) {
userIdentity = UserIdentity.read(in);
userIdentity.setIsAnalyzed();
} else {
userIdentity = null;
}
}
}

abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand All @@ -38,15 +43,19 @@
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -410,19 +419,16 @@ public long getMinTaskBeId(String clusterName) throws LoadException {
// check if the specified BE is available for running task
// return true if it is available. return false if otherwise.
// throw exception if unrecoverable errors happen.
public long getAvailableBeForTask(long previousBeId, String clusterName) throws LoadException {
List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
if (beIdsInCluster == null) {
throw new LoadException("The " + clusterName + " has been deleted");
}
public long getAvailableBeForTask(long jobId, long previousBeId, String clusterName) throws LoadException {
List<Long> availableBeIds = getAvailableBackendIds(jobId, clusterName);

// check if be has idle slot
readLock();
try {
Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();

// 1. Find if the given BE id has available slots
if (previousBeId != -1L && beIdsInCluster.contains(previousBeId)) {
if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
// get the previousBackend info
Backend previousBackend = Catalog.getCurrentSystemInfo().getBackend(previousBeId);
// check previousBackend is not null && load available
Expand All @@ -446,7 +452,7 @@ public long getAvailableBeForTask(long previousBeId, String clusterName) throws
int idleTaskNum = 0;
long resultBeId = -1L;
int maxIdleSlotNum = 0;
for (Long beId : beIdsInCluster) {
for (Long beId : availableBeIds) {
if (beIdToMaxConcurrentTasks.containsKey(beId)) {
if (beIdToConcurrentTasks.containsKey(beId)) {
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
Expand All @@ -467,6 +473,60 @@ public long getAvailableBeForTask(long previousBeId, String clusterName) throws
}
}

/**
* The routine load task can only be scheduled on backends which has proper resource tags.
* The tags should be got from user property.
* But in the old version, the routine load job does not have user info, so for compatibility,
* if there is no user info, we will get tags from replica allocation of the first partition of the table.
*
* @param jobId
* @param cluster
* @return
* @throws LoadException
*/
private List<Long> getAvailableBackendIds(long jobId, String cluster) throws LoadException {
RoutineLoadJob job = getJob(jobId);
if (job == null) {
throw new LoadException("job " + jobId + " does not exist");
}
Set<Tag> tags;
if (job.getUserIdentity() == null) {
// For old job, there may be no user info. So we have to use tags from replica allocation
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
} else {
tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
// user may be dropped. Here we fall back to use replica tag
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
}
}
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
.addTags(tags).build();
return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
}

private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
try {
Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
OlapTable tbl = db.getTableOrMetaException(tblId, Table.TableType.OLAP);
tbl.readLock();
try {
PartitionInfo partitionInfo = tbl.getPartitionInfo();
for (Partition partition : tbl.getPartitions()) {
ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
// just use the first one
return replicaAlloc.getAllocMap().keySet();
}
// Should not run into here. Just make compiler happy.
return Sets.newHashSet();
} finally {
tbl.readUnlock();
}
} catch (MetaNotFoundException e) {
throw new LoadException(e.getMessage());
}
}

public RoutineLoadJob getJob(long jobId) {
return idToRoutineLoadJob.get(jobId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException
// return true if allocate successfully. return false if failed.
// throw exception if unrecoverable errors happen.
private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(),
routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
if (beId == -1L) {
return false;
}
Expand Down

This file was deleted.

Loading