Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor Job Vertices State on Deploy #284

Merged
merged 24 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
BaseBackoffDuration config.Duration `json:"baseBackoffDuration" pflag:"\"100ms\",Determines the base backoff for exponential retries."`
MaxBackoffDuration config.Duration `json:"maxBackoffDuration" pflag:"\"30s\",Determines the max backoff for exponential retries."`
MaxErrDuration config.Duration `json:"maxErrDuration" pflag:"\"5m\",Determines the max time to wait on errors."`
FlinkJobVertexTimeout config.Duration `json:"flinkJobVertexTimeout" pflag:"\"3m\",Determines the max time to wait on job vertex state turns into RUNNING."`
}

func GetConfig() *Config {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 49 additions & 6 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,14 +756,58 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta
return statusUnchanged, errors.Errorf("Could not find job %s", s.flinkController.GetLatestJobID(ctx, app))
}

// wait until all vertices have been scheduled and started
allVerticesStarted := true
for _, v := range job.Vertices {
allVerticesStarted = allVerticesStarted && (v.StartTime > 0)
jobStartTime := getJobStartTimeInUTC(job.StartTime)
now := time.Now().UTC()
cfg := config.GetConfig()
logger.Info(ctx, "Job vertex timeout config is ", cfg.FlinkJobVertexTimeout)
flinkJobVertexTimeout := cfg.FlinkJobVertexTimeout
if now.Before(jobStartTime.Add(flinkJobVertexTimeout.Duration)) {
allVerticesRunning, hasFailure, failedVertexIndex := monitorAllVerticesState(job)

// fail fast
if hasFailure {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed",
fmt.Sprintf(
"Vertex %d with name [%s] state is Failed", failedVertexIndex, job.Vertices[failedVertexIndex].Name))
premsantosh marked this conversation as resolved.
Show resolved Hide resolved
return s.deployFailed(app)
}
return updateJobAndReturn(ctx, job, s, allVerticesRunning, app, hash)
}

s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed",
fmt.Sprintf(
"Not all vertice of the Flink job state is Running before timeout %f minutes", cfg.FlinkJobVertexTimeout.Minutes()))
return s.deployFailed(app)

}

func monitorAllVerticesState(job *client.FlinkJobOverview) (bool, bool, int) {
allVerticesRunning := true
// wait until all vertices have been scheduled and running
hasFailure := false
failedVertexIndex := -1
for index, v := range job.Vertices {
if v.Status == client.Failed {
failedVertexIndex = index
hasFailure = true
break
}
allVerticesRunning = allVerticesRunning && (v.StartTime > 0) && v.Status == client.Running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first vertex is failing then won't this return allVerticesRunning as true when it should be false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the vertex is failing, hasFailure is set to true, then line 768 if block will have early return to fail the deployment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So its working because of how the code in the current caller is written but essentially this method is still buggy because if its called from somewhere else where the case is not handled like the above caller it will show incorrect result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Initially this part is in the main method, so I didn't set value for allVerticesRunning in fail scenario. I will provide the fix later

}
return allVerticesRunning, hasFailure, failedVertexIndex
}

if job.State == client.Running && allVerticesStarted {
func getJobStartTimeInUTC(startTime int64) time.Time {
jobStartTimeSec := startTime / 1000
jobStartTimeNSec := startTime % 1000
return time.Unix(jobStartTimeSec, jobStartTimeNSec).UTC()
}

func updateJobAndReturn(ctx context.Context, job *client.FlinkJobOverview, s *FlinkStateMachine, allVerticesRunning bool,
app *v1beta1.FlinkApplication, hash string) (bool, error) {
if job.State == client.Running && allVerticesRunning {
// Update job status
logger.Info(ctx, "Job and all vertices states are RUNNING.")
jobStatus := s.flinkController.GetLatestJobStatus(ctx, app)
jobStatus.JarName = app.Spec.JarName
jobStatus.Parallelism = app.Spec.Parallelism
Expand All @@ -782,7 +826,6 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta
s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning)
return statusChanged, nil
}

return statusUnchanged, nil
}

Expand Down
Loading