-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lifecycle management of a trainingJob #634
lifecycle management of a trainingJob #634
Conversation
@typhoonzero |
d8cf99e
to
fc110b2
Compare
@qizheng09 you can simply add "Reviewers" on the right side of this page, so that they can receive an email notification. |
go/updater/trainingJobUpdater.go
Outdated
job: job, | ||
kubeClient: kubeClient, | ||
trainingJobClient: trainingJobClient, | ||
status: job.Status, | ||
eventCh: make(chan *trainingJobEvent, 1000), | ||
} | ||
return jobber, nil | ||
return updater, nil | ||
} | ||
|
||
// NewUpdater return a trainingJobUpdater for controller. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NewUpdater return a trainingJobUpdater for controller. => NewUpdater creates a new TrainingJobUpdater and start a goroutine to control current job.
go/updater/trainingJobUpdater.go
Outdated
} | ||
|
||
// NewUpdater return a trainingJobUpdater for controller. | ||
func NewUpdater(job *padv1.TrainingJob, kubeClient kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater, | ||
error) { | ||
log.Infof("NewJobber namespace=%v name=%v", job.Namespace, job.Name) | ||
jobber, err := initUpdater(job, kubeClient, trainingJobClient) | ||
updater, err := initUpdater(job, kubeClient, trainingJobClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May not need another function call here, initUpdater
is quite short.
go/updater/trainingJobUpdater.go
Outdated
return fmt.Errorf("unknow resource") | ||
} | ||
var replica int32 | ||
replica = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var replica int32
replica = 0
is same to replica := 0
go/updater/trainingJobUpdater.go
Outdated
|
||
err = updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options) | ||
if err != nil { | ||
return fmt.Errorf("release resource failed , namespace=%v name=%v resourceName=%v", updater.job.Namespace, updater.job.Name, resource.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can return this err
directly.
go/updater/trainingJobUpdater.go
Outdated
for j := 0; j <= retryTimes; j++ { | ||
time.Sleep(confirmResourceTicker) | ||
pl, err := updater.kubeClient.CoreV1().Pods(updater.job.Namespace).List(options) | ||
if err == nil && len(pl.Items) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to do something if the error still exists after retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry is used to make sure that the replica
of resource have been scaled to 0. If error still exists or len(pl.Items)
haven't changed to 0 after retry. I will delete pods forcely as shown below.
go/updater/trainingJobUpdater.go
Outdated
return nil | ||
} | ||
|
||
func (updater *TrainingJobUpdater) releaseMaPs() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function name releaseMaPs
does not make sense. Need a more clear name.
go/updater/trainingJobUpdater.go
Outdated
} | ||
|
||
// SetUp validates the fields and parses the TrainingJob | ||
func (updater *TrainingJobUpdater) setUp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call this parseTrainingJob
fc110b2
to
8473a43
Compare
go/updater/trainingJobUpdater.go
Outdated
return err | ||
} | ||
|
||
func (updater *TrainingJobUpdater) releaseMasterPserver() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about split releaseMasterPserver
into two function releaseMaster
and releasePserver
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay~
go/updater/trainingJobUpdater.go
Outdated
} | ||
|
||
err := updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks no need to check err != nil
, just
return updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options)
go/updater/trainingJobUpdater.go
Outdated
return err | ||
} | ||
|
||
err := updater.createTrainer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return updater.createTrainer()
go/updater/trainingJobUpdater.go
Outdated
} | ||
|
||
func (updater *TrainingJobUpdater) createTrainingJob() error { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete redundant blank line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yancey1989 Done and thanks for your comments~
8473a43
to
57eeb5e
Compare
57eeb5e
to
d6447f6
Compare
go/updater/trainingJobUpdater.go
Outdated
// it will notify updater to process the event. It send event to updater's eventCh. | ||
func (updater *TrainingJobUpdater) notify(te *trainingJobEvent) { | ||
select { | ||
case updater.eventCh <- te: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code the same with:
// no more select
updater.eventCh <- te
// code in the case block.
Maybe the above is less verbose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
go/updater/trainingJobUpdater.go
Outdated
select { | ||
case updater.eventCh <- te: | ||
lene, cape := len(updater.eventCh), cap(updater.eventCh) | ||
if lene > int(float64(cape)*0.8) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put 0.8 as a constant in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
for v := range ticker.C { | ||
rs, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Get(resource.Name, v1.GetOptions{}) | ||
log.Infof("Current time %v runing pod is %v, resourceName=%v", v.String(), rs.Status.ReadyReplicas, resource.Name) | ||
if err != nil && !errors.IsServerTimeout(err) && !errors.IsTooManyRequests(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if err is timeout or too many requests? I see the code below uses the returned rs
even when the err happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
func (updater *TrainingJobUpdater) parseTrainingJob() { | ||
if updater.job == nil { | ||
updater.status.Phase = padv1.TrainingJobPhaseFailed | ||
updater.status.Reason = "Internal error; Setup error; job is missing TainingJob" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe need to return at here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
go/updater/trainingJobUpdater.go
Outdated
switch ev.pet { | ||
case trainingJobEventDelete: | ||
log.Infof("Delete updater, namespace=%v name=%v: ", updater.job.Namespace, updater.job.Name) | ||
ticker.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already defer ticker.Stop()
above, maybe it's not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
go/updater/trainingJobUpdater.go
Outdated
} | ||
|
||
if updater.status.Phase == padv1.TrainingJobPhaseSucceeded || updater.status.Phase == padv1.TrainingJobPhaseFailed { | ||
if ticker != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move this stop ticker logic to line 466, Convert
function does not have much to do with ticker
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thank you very much for your comments, and I have fixed all of the comments you mentioned above.
go/updater/trainingJobUpdater.go
Outdated
resource.Spec.Replicas = &replica | ||
_, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Update(resource) | ||
if errors.IsNotFound(err) { | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May return err
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
go/updater/trainingJobUpdater.go
Outdated
return err | ||
} | ||
} | ||
if newResource != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to deal with other error except IsNotFound
, and the example code may be helpful:
https://github.com/kubernetes/kubernetes/blob/a7d6340ad28970a99a67feba3913d0bd653855b1/staging/src/k8s.io/client-go/examples/in-cluster-client-configuration/main.go#L51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our goal is to try our best to ensure the normal operation of the job. In case of unexpected errors, should we deal with other errors by retrying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
go/updater/trainingJobUpdater.go
Outdated
|
||
func (updater *TrainingJobUpdater) createTrainer() error { | ||
newTrainer, err := updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Get(updater.job.Spec.Trainer.ReplicaSpec.Name, v1.GetOptions{}) | ||
if errors.IsNotFound(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thank you very much for your comments, and I have fixed all of the comments you mentioned above.
d46a482
to
ac86ef9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Need some naming polishment later on.
log.Infof("End to delete TrainingJob namespace=%v name=%v", updater.job.Namespace, updater.job.Name) | ||
|
||
if fault { | ||
return fmt.Errorf("delete resource error namespace=%v name=%v", updater.job.Namespace, updater.job.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may lose the original k8s api call error message, return the error object from releaseXXX
call.
@typhoonzero Hi~, I would like you to review this pr.
This pr contains the main process of lifecycle of a TrainingJob. Including: