-
Notifications
You must be signed in to change notification settings - Fork 165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix serverless job run failed related issues #2899
Conversation
|
||
public boolean isJobEnded( | ||
@NotNull com.microsoft.azure.hdinsight.sdk.rest.azure.serverless.spark.models.SparkBatchJob sparkBatchJob) { | ||
return sparkBatchJob.schedulerState().toString().equalsIgnoreCase(SchedulerState.ENDED.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need adding some abnormal end status, such as: failed, killed? #Resolved
.onErrorResumeNext(err -> { | ||
if (err instanceof SparkJobFinishedException || err.getCause() instanceof SparkJobFinishedException) { | ||
return Observable.error(err); | ||
.takeUntil(sparkBatchJob -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The val sparkBatchJob
might be easy confused. You can try batchResponse
or batchResp
#Resolved
final int GET_JOB_DONE_REPEAT_DELAY_MILLISECONDS = 1000; | ||
return getSparkBatchJobRequest() | ||
.repeatWhen(ob -> ob.delay(GET_JOB_DONE_REPEAT_DELAY_MILLISECONDS, TimeUnit.MILLISECONDS)) | ||
.takeUntil(sparkBatchJob -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same, batchResp
or others #Resolved
.takeUntil(sparkBatchJob -> { | ||
String jobState = getJobState(sparkBatchJob); | ||
return isJobEnded(sparkBatchJob) | ||
|| (jobState != null && isDone(jobState)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between isDone
and isJobEnded
? Can they be merged? #Resolved
ctrlInfo("The Spark job is starting..."); | ||
return getSparkBatchJobRequest() | ||
.doOnNext(sparkBatchJob -> { | ||
if (sparkBatchJob.properties() != null && sparkBatchJob.properties().sparkMasterUI() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sparkBatchJob.properties() != null && sparkBatchJob.properties().sparkMasterUI()
can be wrapped as a getMasterUI(batchResp)
method #Resolved
.takeUntil(sparkBatchJob -> { | ||
String jobState = getJobState(sparkBatchJob); | ||
return isJobEnded(sparkBatchJob) | ||
|| (jobState != null && (isDone(jobState) || isRunning(jobState))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If isJobEnded
and isDone
merged, the code can looks like: isJobEnded() || isJobStarted()
#Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, these two methods are confusing, I will merge them then.
In reply to: 268020311 [](ancestors = 268020311)
}) | ||
.flatMap(job -> | ||
getSparkBatchJobRequest() | ||
.repeatWhen(ob -> ob.delay(getDelaySeconds(), TimeUnit.SECONDS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can keep "The Spark job is starting..."
output in repeat waiting function. #Resolved
.flatMap(sparkBatchJob -> { | ||
String jobState = getJobState(sparkBatchJob); | ||
if (jobState != null && (isDone(jobState) || isRunning(jobState))) { | ||
if (isDone(jobState) && !isSuccess(jobState)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert the if..else
might be more clear and easy understanding. #Resolved
.takeUntil(sparkBatchJob -> | ||
isJobEnded(sparkBatchJob) | ||
|| (sparkBatchJob.properties() != null | ||
&& StringUtils.isNotEmpty(sparkBatchJob.properties().livyServerAPI()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same as above, getLivyAPI()
#Resolved
String message = "Job is in " + logAndStateTriple.getMiddle() + " state."; | ||
return Observable.just(new AbstractMap.SimpleImmutableEntry<>(Log, message)); | ||
} else if (logAndStateTriple.getRight().equalsIgnoreCase(SchedulerState.ENDED.toString())) { | ||
String message = "Job is in " + logAndStateTriple.getRight() + " state."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's almost the same message than above job state, need to fix it. And a question, do we need to expose those states to users? If yes, should they be present both, such as: -? #Resolved
25132ec
to
009c92b
Compare
if (err instanceof SparkJobFinishedException || err.getCause() instanceof SparkJobFinishedException) { | ||
return Observable.error(err); | ||
.repeatWhen(ob -> ob.delay(GET_LIVY_URL_REPEAT_DELAY_MILLISECONDS, TimeUnit.MILLISECONDS)) | ||
.takeUntil(batchResp -> isJobEnded(batchResp) || StringUtils.isNotEmpty(getLivyAPI(batchResp))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about if getJobState()
always returns null
? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
009c92b
to
7345219
Compare
@@ -204,6 +214,11 @@ public String getJobUuid() { | |||
}) | |||
.flatMap(job -> | |||
getSparkBatchJobRequest() | |||
.flatMap(batchResp -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May try to use one getSparkBatchJobRequest()
here
7345219
to
db83716
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Shipit! Thanks Rui!
With this PR, - Serverless job can successfully run and finish when backend is stable. - Pop up Spark master UI web page when serverless job is starting - Warn user that kill serverless job action is invalid when the job is in `Finalizing` or `Ended` state
With this PR,
Finalizing
orEnded
state