Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,18 @@ public long getMinTaskBeId(String clusterName) throws LoadException {
// check if the specified BE is available for running task
// return true if it is available. return false if otherwise.
// throw exception if unrecoverable errors happen.
public long getAvailableBeForTask(long jobId, long previousBeId) throws LoadException {
public long getAvailableBeForTask(long jobId, long previousBeId) throws UserException {
List<Long> availableBeIds = getAvailableBackendIds(jobId);
if (availableBeIds.isEmpty()) {
RoutineLoadJob job = getJob(jobId);
if (job != null) {
String msg = "no available BE found for job " + jobId
+ "please check the BE status and user's cluster or tags";
job.updateState(RoutineLoadJob.JobState.PAUSED,
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, msg), false /* not replay */);
}
return -1L;
}

// check if be has idle slot
readLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc
// this should be done before txn begin, or the txn may be begun successfully but failed to be allocated.
if (!allocateTaskToBe(routineLoadTaskInfo)) {
// allocate failed, push it back to the queue to wait next scheduling
needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
needScheduleTasksQueue.addLast(routineLoadTaskInfo);
return;
}
} catch (UserException e) {
Expand Down Expand Up @@ -311,7 +311,7 @@ private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException
// 2. If not, try to find a better one with most idle slots.
// return true if allocate successfully. return false if failed.
// throw exception if unrecoverable errors happen.
private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws UserException {
long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(),
routineLoadTaskInfo.getPreviousBeId());
if (beId == -1L) {
Expand Down
Loading