Skip to content

Commit

Permalink
add tag info for broker scan node and routine load
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed May 12, 2022
1 parent 8a0097c commit 8aa6e7c
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ public final class FeMetaVersion {
public static final int VERSION_108 = 108;
// add row policy
public static final int VERSION_109 = 109;
// For routine load user info
public static final int VERSION_109 = 110;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_109;
public static final int VERSION_CURRENT = VERSION_110;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
// 1. Broker scan node
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
scanNodes.add(scanNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -102,8 +103,9 @@ public KafkaRoutineLoadJob() {
}

public KafkaRoutineLoadJob(Long id, String name, String clusterName,
long dbId, long tableId, String brokerList, String topic) {
super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA);
long dbId, long tableId, String brokerList, String topic,
UserIdentity userIdentity) {
super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA, userIdentity);
this.brokerList = brokerList;
this.topic = topic;
this.progress = new KafkaProgress();
Expand Down Expand Up @@ -393,7 +395,7 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr
long id = Catalog.getCurrentCatalog().getNextId();
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
db.getClusterName(), db.getId(), tableId,
stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
stmt.getKafkaBrokerList(), stmt.getKafkaTopic(), stmt.getUserInfo());
kafkaRoutineLoadJob.setOptional(stmt);
kafkaRoutineLoadJob.checkCustomProperties();
kafkaRoutineLoadJob.checkCustomPartition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -228,6 +229,8 @@ public boolean isFinalState() {
// this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob,
// because we can not serialize the Expressions contained in job.
protected OriginStatement origStmt;
// User who submit this job. Maybe null for the old version job(before v1.1)
protected UserIdentity userIdentity;

protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete
Expand All @@ -249,13 +252,15 @@ public RoutineLoadJob(long id, LoadDataSourceType type) {
}

public RoutineLoadJob(Long id, String name, String clusterName,
long dbId, long tableId, LoadDataSourceType dataSourceType) {
long dbId, long tableId, LoadDataSourceType dataSourceType,
UserIdentity userIdentity) {
this(id, dataSourceType);
this.name = name;
this.clusterName = clusterName;
this.dbId = dbId;
this.tableId = tableId;
this.authCode = 0;
this.userIdentity = userIdentity;

if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
Expand Down Expand Up @@ -435,6 +440,10 @@ public PartitionNames getPartitions() {
return partitions;
}

public UserIdentity getUserIdentity() {
return userIdentity;
}

@Override
public LoadTask.MergeType getMergeType() {
return mergeType;
Expand Down Expand Up @@ -1541,6 +1550,13 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}

if (userIdentity == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
userIdentity.write(out);
}
}

public void readFields(DataInput in) throws IOException {
Expand Down Expand Up @@ -1616,6 +1632,15 @@ public void readFields(DataInput in) throws IOException {
} catch (Exception e) {
throw new IOException("error happens when parsing create routine load stmt: " + origStmt.originStmt, e);
}

if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_109) {
if (in.readBoolean()) {
userIdentity = UserIdentity.read(in);
userIdentity.setIsAnalyzed();
} else {
userIdentity = null;
}
}
}

abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand All @@ -38,10 +43,13 @@
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -410,19 +418,16 @@ public long getMinTaskBeId(String clusterName) throws LoadException {
// check if the specified BE is available for running task
// return true if it is available. return false if otherwise.
// throw exception if unrecoverable errors happen.
public long getAvailableBeForTask(long previousBeId, String clusterName) throws LoadException {
List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
if (beIdsInCluster == null) {
throw new LoadException("The " + clusterName + " has been deleted");
}
public long getAvailableBeForTask(long jobId, long previousBeId, String clusterName) throws LoadException {
List<Long> availableBeIds = getAvailableBackendIds(jobId, clusterName);

// check if be has idle slot
readLock();
try {
Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();

// 1. Find if the given BE id has available slots
if (previousBeId != -1L && beIdsInCluster.contains(previousBeId)) {
if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
// get the previousBackend info
Backend previousBackend = Catalog.getCurrentSystemInfo().getBackend(previousBeId);
// check previousBackend is not null && load available
Expand All @@ -446,7 +451,7 @@ public long getAvailableBeForTask(long previousBeId, String clusterName) throws
int idleTaskNum = 0;
long resultBeId = -1L;
int maxIdleSlotNum = 0;
for (Long beId : beIdsInCluster) {
for (Long beId : availableBeIds) {
if (beIdToMaxConcurrentTasks.containsKey(beId)) {
if (beIdToConcurrentTasks.containsKey(beId)) {
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
Expand All @@ -467,6 +472,60 @@ public long getAvailableBeForTask(long previousBeId, String clusterName) throws
}
}

/**
* The routine load task can only be scheduled on backends which has proper resource tags.
* The tags should be got from user property.
* But in the old version, the routine load job does not have user info, so for compatibility,
* if there is no user info, we will get tags from replica allocation if the first partition of the table.
*
* @param jobId
* @param cluster
* @return
* @throws LoadException
*/
private List<Long> getAvailableBackendIds(long jobId, String cluster) throws LoadException {
RoutineLoadJob job = getJob(jobId);
if (job == null) {
throw new LoadException("job " + jobId + " does not exist");
}
Set<Tag> tags;
if (job.getUserIdentity() == null) {
// For old job, there may be no user info. So we have to use tags from replica allocation
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
} else {
tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
// user may be dropped. Here we fall back to use replica tag
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
}
}
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
.addTags(tags).build();
return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
}

private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
try {
Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
OlapTable tbl = db.getTableOrMetaException(tblId, Table.TableType.OLAP);
tbl.readLock();
try {
PartitionInfo partitionInfo = tbl.getPartitionInfo();
for (Partition partition : tbl.getPartitions()) {
ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
// just use the first one
return replicaAlloc.getAllocMap().keySet();
}
// Should not run into here. Just make compiler happy.
return Sets.newHashSet();
} finally {
tbl.readUnlock();
}
} catch (MetaNotFoundException e) {
throw new LoadException(e.getMessage());
}
}

public RoutineLoadJob getJob(long jobId) {
return idToRoutineLoadJob.get(jobId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException
// return true if allocate successfully. return false if failed.
// throw exception if unrecoverable errors happen.
private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(),
routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
if (beId == -1L) {
return false;
}
Expand Down

This file was deleted.

Loading

0 comments on commit 8aa6e7c

Please sign in to comment.