Skip to content

Commit 8488e9c

Browse files
authored
[Refactor] Refactor ConnectContext.getCluster (#50783)
1 parent af38235 commit 8488e9c

File tree

21 files changed

+213
-165
lines changed

21 files changed

+213
-165
lines changed

fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.doris.alter;
1919

20+
import org.apache.doris.analysis.UserIdentity;
2021
import org.apache.doris.catalog.Database;
2122
import org.apache.doris.catalog.Env;
2223
import org.apache.doris.catalog.OlapTable;
@@ -105,6 +106,9 @@ public enum JobType {
105106
@SerializedName(value = "failedTabletBackends")
106107
protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
107108

109+
@SerializedName(value = "uid")
110+
protected UserIdentity userIdentity = null;
111+
108112
public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName,
109113
long timeoutMs) {
110114
this.rawSql = rawSql;
@@ -117,6 +121,10 @@ public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long ta
117121

118122
this.createTimeMs = System.currentTimeMillis();
119123
this.jobState = JobState.PENDING;
124+
125+
if (ConnectContext.get() != null) {
126+
userIdentity = ConnectContext.get().getCurrentUserIdentity();
127+
}
120128
}
121129

122130
protected AlterJobV2(JobType type) {
@@ -228,6 +236,9 @@ public synchronized void run() {
228236
ConnectContext ctx = new ConnectContext();
229237
ctx.setThreadLocalInfo();
230238
ctx.setCloudCluster(cloudClusterName);
239+
// currently used for CloudReplica.getCurrentClusterId
240+
// later maybe used for managing all workload in BE.
241+
ctx.setCurrentUserIdentity(this.userIdentity);
231242
}
232243

233244
// /api/debug_point/add/FE.STOP_ALTER_JOB_RUN

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,9 @@ public void checkCloudClusterPriv(String clusterName) throws DdlException {
278278
if (!Env.getCurrentEnv().getAccessManager().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
279279
clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
280280
throw new DdlException("USAGE denied to user "
281-
+ ConnectContext.get().getQualifiedUser() + "'@'" + ConnectContext.get().getRemoteIP()
282-
+ "' for cloud cluster '" + clusterName + "'", ErrorCode.ERR_CLUSTER_NO_PERMISSIONS);
281+
+ ConnectContext.get().getCurrentUserIdentity().getQualifiedUser() + "'@'" + ConnectContext.get()
282+
.getRemoteIP()
283+
+ "' for cloud cluster '" + clusterName + "'", ErrorCode.ERR_CLUSTER_NO_PERMISSIONS);
283284
}
284285

285286
if (!getCloudSystemInfoService().getCloudClusterNames().contains(clusterName)) {

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.doris.cloud.catalog;
1919

20+
import org.apache.doris.analysis.UserIdentity;
2021
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
2122
import org.apache.doris.catalog.Env;
2223
import org.apache.doris.catalog.Partition;
@@ -37,6 +38,7 @@
3738
import com.google.common.hash.HashCode;
3839
import com.google.common.hash.Hashing;
3940
import com.google.gson.annotations.SerializedName;
41+
import org.apache.commons.lang3.StringUtils;
4042
import org.apache.logging.log4j.LogManager;
4143
import org.apache.logging.log4j.Logger;
4244

@@ -212,41 +214,47 @@ private String getCurrentClusterId() throws ComputeGroupException {
212214
String cluster = null;
213215
ConnectContext context = ConnectContext.get();
214216
if (context != null) {
215-
if (!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) {
216-
cluster = context.getSessionVariable().getCloudCluster();
217+
// TODO(wb) rethinking whether should update err status.
218+
cluster = context.getCloudCluster();
219+
220+
if (LOG.isDebugEnabled()) {
221+
LOG.debug("get compute group by context {}", cluster);
222+
}
223+
224+
UserIdentity currentUid = context.getCurrentUserIdentity();
225+
if (currentUid != null && !StringUtils.isEmpty(currentUid.getQualifiedUser())) {
217226
try {
218227
((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
219228
} catch (Exception e) {
220-
LOG.warn("get compute group by session context exception");
221-
throw new ComputeGroupException(String.format("session context compute group %s check auth failed",
222-
cluster),
223-
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
224-
}
225-
if (LOG.isDebugEnabled()) {
226-
LOG.debug("get compute group by session context compute group: {}", cluster);
229+
LOG.warn("check compute group {} for {} auth failed.", cluster,
230+
context.getCurrentUserIdentity().toString());
231+
throw new ComputeGroupException(
232+
String.format("context compute group %s check auth failed, user is %s",
233+
cluster, context.getCurrentUserIdentity().toString()),
234+
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
227235
}
228236
} else {
229-
cluster = context.getCloudCluster(false);
230-
if (LOG.isDebugEnabled()) {
231-
LOG.debug("get compute group by context {}", cluster);
232-
}
233-
String clusterStatus = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
234-
.getCloudStatusByName(cluster);
235-
if (!Strings.isNullOrEmpty(clusterStatus)
236-
&& Cloud.ClusterStatus.valueOf(clusterStatus)
237-
== Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
238-
LOG.warn("auto start compute group {} in manual shutdown status", cluster);
239-
throw new ComputeGroupException(
237+
LOG.info("connect context user is null.");
238+
throw new ComputeGroupException("connect context's user is null",
239+
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
240+
}
241+
242+
String clusterStatus = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
243+
.getCloudStatusByName(cluster);
244+
if (!Strings.isNullOrEmpty(clusterStatus)
245+
&& Cloud.ClusterStatus.valueOf(clusterStatus)
246+
== Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
247+
LOG.warn("auto start compute group {} in manual shutdown status", cluster);
248+
throw new ComputeGroupException(
240249
String.format("The current compute group %s has been manually shutdown", cluster),
241250
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_BEEN_MANUAL_SHUTDOWN);
242-
}
243251
}
244252
} else {
245253
if (LOG.isDebugEnabled()) {
246254
LOG.debug("connect context is null in getBackendId");
247255
}
248-
throw new ComputeGroupException("connect context not set",
249-
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
256+
throw new ComputeGroupException("connect context not set cluster ",
257+
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
250258
}
251259

252260
return getCloudClusterIdByName(cluster);

fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,9 @@ private void setCloudClusterId() throws MetaNotFoundException {
105105

106106
this.cloudClusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
107107
.getCloudClusterIdByName(clusterName);
108-
if (!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) {
109-
clusterName = context.getSessionVariable().getCloudCluster();
110-
this.cloudClusterId =
111-
((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName);
112-
}
113108
if (Strings.isNullOrEmpty(this.cloudClusterId)) {
114-
LOG.warn("cluster id is empty, cluster name {}", clusterName);
115-
throw new MetaNotFoundException("cluster id is empty, cluster name: " + clusterName);
109+
LOG.warn("can not find compute group: {}", clusterName);
110+
throw new MetaNotFoundException("can not find compute group: " + clusterName);
116111
}
117112
sessionVariables.put(CLOUD_CLUSTER_ID, this.cloudClusterId);
118113
}
@@ -127,15 +122,20 @@ private AutoCloseConnectContext buildConnectContext() throws UserException {
127122
throw new UserException("cluster name is empty, cluster id is: " + cloudClusterId);
128123
}
129124

125+
// NOTE: set user info in context in for auth check in CloudReplica
130126
if (ConnectContext.get() == null) {
131127
ConnectContext connectContext = new ConnectContext();
132128
connectContext.setCloudCluster(clusterName);
129+
connectContext.setCurrentUserIdentity(this.userInfo);
130+
connectContext.setQualifiedUser(this.userInfo.getQualifiedUser());
133131
if (connectContext.getEnv() == null) {
134132
connectContext.setEnv(Env.getCurrentEnv());
135133
}
136134
return new AutoCloseConnectContext(connectContext);
137135
} else {
138136
ConnectContext.get().setCloudCluster(clusterName);
137+
ConnectContext.get().setCurrentUserIdentity(this.userInfo);
138+
ConnectContext.get().setQualifiedUser(this.userInfo.getQualifiedUser());
139139
if (ConnectContext.get().getEnv() == null) {
140140
ConnectContext.get().setEnv(Env.getCurrentEnv());
141141
}
@@ -155,7 +155,7 @@ protected LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFi
155155
boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey aggKey,
156156
BrokerPendingTaskAttachment attachment) throws UserException {
157157
cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
158-
LoadLoadingTask task = new CloudLoadLoadingTask(db, table, brokerDesc,
158+
LoadLoadingTask task = new CloudLoadLoadingTask(this.userInfo, db, table, brokerDesc,
159159
brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
160160
isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(),
161161
getLoadParallelism(), getSendBatchParallelism(),
@@ -167,6 +167,7 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
167167
try (AutoCloseConnectContext r = buildConnectContext()) {
168168
task.init(loadId, attachment.getFileStatusByTable(aggKey),
169169
attachment.getFileNumByTable(aggKey), getUserInfo());
170+
task.settWorkloadGroups(tWorkloadGroups);
170171
} catch (UserException e) {
171172
throw e;
172173
}

fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.doris.cloud.load;
1919

2020
import org.apache.doris.analysis.BrokerDesc;
21+
import org.apache.doris.analysis.UserIdentity;
2122
import org.apache.doris.catalog.Database;
2223
import org.apache.doris.catalog.Env;
2324
import org.apache.doris.catalog.OlapTable;
@@ -41,15 +42,15 @@ public class CloudLoadLoadingTask extends LoadLoadingTask {
4142

4243
private String cloudClusterId;
4344

44-
public CloudLoadLoadingTask(Database db, OlapTable table,
45+
public CloudLoadLoadingTask(UserIdentity userinfo, Database db, OlapTable table,
4546
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
4647
long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate,
4748
long txnId, LoadTaskCallback callback, String timezone,
4849
long timeoutS, int loadParallelism, int sendBatchParallelism,
4950
boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink,
5051
Priority priority, boolean enableMemTableOnSinkNode, int batchSize,
5152
String clusterId) {
52-
super(db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit, strictMode, isPartialUpdate,
53+
super(userinfo, db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit, strictMode, isPartialUpdate,
5354
txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, loadZeroTolerance,
5455
jobProfile, singleTabletLoadPerSink, priority, enableMemTableOnSinkNode, batchSize);
5556
this.cloudClusterId = clusterId;
@@ -63,12 +64,17 @@ private AutoCloseConnectContext buildConnectContext() throws UserException {
6364
throw new UserException("cluster name is empty, cluster id is: " + this.cloudClusterId);
6465
}
6566

67+
// NOTE: set user info here for the following text auth check.
6668
if (ConnectContext.get() == null) {
6769
ConnectContext connectContext = new ConnectContext();
6870
connectContext.setCloudCluster(clusterName);
71+
connectContext.setCurrentUserIdentity(this.userInfo);
72+
connectContext.setQualifiedUser(this.userInfo.getQualifiedUser());
6973
return new AutoCloseConnectContext(connectContext);
7074
} else {
7175
ConnectContext.get().setCloudCluster(clusterName);
76+
ConnectContext.get().setCurrentUserIdentity(this.userInfo);
77+
ConnectContext.get().setQualifiedUser(this.userInfo.getQualifiedUser());
7278
return null;
7379
}
7480
}

fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ public ImmutableMap<Long, Backend> getBackendsByCurrentCluster() throws Analysis
539539

540540
Map<Long, Backend> idToBackend = Maps.newHashMap();
541541
try {
542-
String cluster = ctx.getCurrentCloudCluster();
542+
String cluster = ctx.getCloudCluster();
543543
if (Strings.isNullOrEmpty(cluster)) {
544544
throw new AnalysisException("cluster name is empty");
545545
}

fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,12 @@ public void init(BeSelectionPolicy policy) throws UserException {
179179

180180
backends.addAll(policy.getCandidateBackends(computeGroup.getBackendList()));
181181
if (backends.isEmpty()) {
182-
throw new UserException("No available backends, "
183-
+ "in cloud maybe this cluster has been dropped, please `use @otherClusterName` switch it");
182+
if (Config.isCloudMode()) {
183+
throw new UserException("No available backends, "
184+
+ "in cloud maybe this cluster has been dropped, please `use @otherClusterName` switch it");
185+
} else {
186+
throw new UserException("No available backends for compute group: " + computeGroup.toString());
187+
}
184188
}
185189
for (Backend backend : backends) {
186190
assignedWeightPerBackend.put(backend, 0L);

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,9 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
435435
int number = groupCommit ? -1 : 1;
436436
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, number, computeGroup.getBackendList());
437437
if (backendIds.isEmpty()) {
438-
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
438+
throw new LoadException(
439+
SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy + ", compute group is "
440+
+ computeGroup.toString());
439441
}
440442
if (groupCommit) {
441443
backend = selectBackendForGroupCommit("", request, tableId);

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TableQueryPlanAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public Object query_plan(
141141
ConnectContext.get().getSessionVariable().setEnableTwoPhaseReadOpt(false);
142142
}
143143
if (Config.isCloudMode()) { // Choose a cluster to for this query
144-
ConnectContext.get().getCurrentCloudCluster();
144+
ConnectContext.get().getCloudCluster();
145145
}
146146
// parse/analysis/plan the sql and acquire tablet distributions
147147
handleQuery(ConnectContext.get(), fullDbName, tblName, sql, resultMap);

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ public void setComputeGroup() {
258258
protected LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFileGroup> brokerFileGroups,
259259
boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey aggKey,
260260
BrokerPendingTaskAttachment attachment) throws UserException {
261-
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
261+
LoadLoadingTask task = new LoadLoadingTask(this.userInfo, db, table, brokerDesc,
262262
brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
263263
isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(),
264264
getLoadParallelism(), getSendBatchParallelism(),

0 commit comments

Comments
 (0)