Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] add early stopped trials in converter #2004

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/apis/controller/trials/v1beta1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1beta1

import (
"errors"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -89,6 +90,22 @@ func (trial *Trial) IsMetricsUnavailable() bool {
return hasCondition(trial, TrialMetricsUnavailable)
}

// IsObservationAvailable return ture if the Trial has valid observations updated
func (trial *Trial) IsObservationAvailable() bool {
if trial.Spec.Objective == nil {
return false
}
objectiveMetricName := trial.Spec.Objective.ObjectiveMetricName
if trial.Status.Observation != nil && trial.Status.Observation.Metrics != nil {
for _, metric := range trial.Status.Observation.Metrics {
if metric.Name == objectiveMetricName && metric.Latest != consts.UnavailableMetricValue {
return true
}
}
}
return false
}

func (trial *Trial) IsCompleted() bool {
return trial.IsSucceeded() || trial.IsFailed() || trial.IsKilled() || trial.IsEarlyStopped() || trial.IsMetricsUnavailable()
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller.v1beta1/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,19 @@ func (r *ReconcileExperiment) ReconcileSuggestions(instance *experimentsv1beta1.
logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})
var assignments []suggestionsv1beta1.TrialAssignment
currentCount := int32(len(trialList))
incompleteEarlyStoppingCount := int32(0)
trialNames := map[string]bool{}
for _, trial := range trialList {
trialNames[trial.Name] = true

if !trial.IsObservationAvailable() && trial.IsEarlyStopped() {
incompleteEarlyStoppingCount += 1
}
}

suggestionRequestsCount := currentCount + addCount
// Exclude the number of incomplete early stopping trials from total suggestion requests
// This means that no new trials will be requested from suggestion service until observations are ready for early stopped trials
suggestionRequestsCount := currentCount + addCount - incompleteEarlyStoppingCount

logger.Info("GetOrCreateSuggestion", "name", instance.Name, "Suggestion Requests", suggestionRequestsCount)
original, err := r.GetOrCreateSuggestion(instance, suggestionRequestsCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ func (g *General) ConvertTrials(ts []trialsv1beta1.Trial) []*suggestionapi.Trial
if t.IsMetricsUnavailable() {
continue
}
if !t.IsObservationAvailable() && t.IsEarlyStopped() {
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what about Failed Trials (I guess the observation is empty for them, right)? Do we want to send them to the Suggestion service ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it make more sense to filter the failed trails out as those won't provide any updates to the suggestion service.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some Suggestion services record failed Trials on its own DataBase (e.g. Goptuna Suggestion)
@shaowei-su Do you have any concerns with sending Failed Trials to the Suggestion service ?
We add failed Trials to completed, so it won't ask Suggestion service to generate new Trials.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I didn't realize that failed trials are tracked by other suggestion services. Updated in the latest commit to filter on early stopped & without observations trials only, PTAL.

Comment on lines +346 to +348
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaowei-su Can we add a test for this condition to the TestSyncAssignments func or to a new test function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tenzen-y ! I added few more trails in the mock so this branching logic is also validated.

Btw, could help restart the failed unit tests? Looks like a flaky test failed.

trial := &suggestionapi.Trial{
Name: t.Name,
Spec: &suggestionapi.TrialSpec{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,14 @@ func newFakeTrialObservation() *commonv1beta1.Observation {
}
}

func newFakeSuggestionTrialObservation() *commonv1beta1.Observation {
return &commonv1beta1.Observation{
Metrics: []commonv1beta1.Metric{
{Name: "metric1-name", Min: "0.95", Max: "0.95", Latest: "0.95"},
},
}
}

func newFakeRequestObservation() *suggestionapi.Observation {
return &suggestionapi.Observation{
Metrics: []*suggestionapi.Metric{
Expand Down Expand Up @@ -664,6 +672,9 @@ func newFakeObjective() *commonapiv1beta1.ObjectiveSpec {
ObjectiveMetricName: "metric1-name",
AdditionalMetricNames: []string{"metric2-name"},
Goal: &goal,
MetricStrategies: []commonapiv1beta1.MetricStrategy{
{Name: "metric1-name", Value: commonapiv1beta1.ExtractByLatest},
},
}
}

Expand Down Expand Up @@ -793,6 +804,14 @@ func newFakeTrials() []trialsv1beta1.Trial {
Type: trialsv1beta1.TrialSucceeded,
},
}

fakeEarlyStoppedConditions := []trialsv1beta1.TrialCondition{
{
Type: trialsv1beta1.TrialEarlyStopped,
Status: corev1.ConditionTrue,
},
}

return []trialsv1beta1.Trial{
{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -817,6 +836,7 @@ func newFakeTrials() []trialsv1beta1.Trial {
StartTime: newFakeTime(),
CompletionTime: newFakeTime(),
Conditions: fakeConditions,
Observation: newFakeSuggestionTrialObservation(),
},
},
{
Expand All @@ -839,7 +859,8 @@ func newFakeTrials() []trialsv1beta1.Trial {
Labels: map[string]string{},
},
Status: trialsv1beta1.TrialStatus{
Conditions: fakeConditions,
Conditions: fakeConditions,
Observation: newFakeSuggestionTrialObservation(),
},
},
{
Expand All @@ -857,6 +878,54 @@ func newFakeTrials() []trialsv1beta1.Trial {
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "trial4-name",
Namespace: "namespace",
},
Spec: trialsv1beta1.TrialSpec{
Objective: newFakeObjective(),
ParameterAssignments: []commonapiv1beta1.ParameterAssignment{
{
Name: "param1-name",
Value: "4",
},
{
Name: "param2-name",
Value: "0.4",
},
},
Labels: map[string]string{},
},
Status: trialsv1beta1.TrialStatus{
Conditions: fakeEarlyStoppedConditions,
Observation: nil,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "trial5-name",
Namespace: "namespace",
},
Spec: trialsv1beta1.TrialSpec{
Objective: newFakeObjective(),
ParameterAssignments: []commonapiv1beta1.ParameterAssignment{
{
Name: "param1-name",
Value: "5",
},
{
Name: "param2-name",
Value: "0.5",
},
},
Labels: map[string]string{},
},
Status: trialsv1beta1.TrialStatus{
Conditions: fakeEarlyStoppedConditions,
Observation: newFakeSuggestionTrialObservation(),
},
},
}
}

Expand Down Expand Up @@ -965,7 +1034,14 @@ func newFakeRequest() *suggestionapi.GetSuggestionsRequest {
StartTime: newFakeTime().Format(timeFormat),
CompletionTime: newFakeTime().Format(timeFormat),
Condition: suggestionapi.TrialStatus_SUCCEEDED,
Observation: &suggestionapi.Observation{},
Observation: &suggestionapi.Observation{
Metrics: []*suggestionapi.Metric{
{
Name: "metric1-name",
Value: "0.95",
},
},
},
},
},
{
Expand All @@ -990,7 +1066,46 @@ func newFakeRequest() *suggestionapi.GetSuggestionsRequest {
StartTime: "",
CompletionTime: "",
Condition: suggestionapi.TrialStatus_SUCCEEDED,
Observation: &suggestionapi.Observation{},
Observation: &suggestionapi.Observation{
Metrics: []*suggestionapi.Metric{
{
Name: "metric1-name",
Value: "0.95",
},
},
},
},
},
{
Name: "trial5-name",
Spec: &suggestionapi.TrialSpec{
Objective: fakeObjective,
ParameterAssignments: &suggestionapi.TrialSpec_ParameterAssignments{
Assignments: []*suggestionapi.ParameterAssignment{
{
Name: "param1-name",
Value: "5",
},
{
Name: "param2-name",
Value: "0.5",
},
},
},
Labels: fakeLabels,
},
Status: &suggestionapi.TrialStatus{
StartTime: "",
CompletionTime: "",
Condition: suggestionapi.TrialStatus_EARLYSTOPPED,
Observation: &suggestionapi.Observation{
Metrics: []*suggestionapi.Metric{
{
Name: "metric1-name",
Value: "0.95",
},
},
},
},
},
},
Expand Down
14 changes: 1 addition & 13 deletions pkg/controller.v1beta1/trial/trial_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria
timeNow := metav1.Now()

if jobStatus.Condition == trialutil.JobSucceeded {
if isTrialObservationAvailable(instance) && !instance.IsSucceeded() {
if instance.IsObservationAvailable() && !instance.IsSucceeded() {
if !instance.IsEarlyStopped() {
msg := "Trial has succeeded"
reason := TrialSucceededReason
Expand Down Expand Up @@ -162,18 +162,6 @@ func (r *ReconcileTrial) updateFinalizers(instance *trialsv1beta1.Trial, finaliz
}
}

func isTrialObservationAvailable(instance *trialsv1beta1.Trial) bool {
objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName
if instance.Status.Observation != nil && instance.Status.Observation.Metrics != nil {
for _, metric := range instance.Status.Observation.Metrics {
if metric.Name == objectiveMetricName && metric.Latest != consts.UnavailableMetricValue {
return true
}
}
}
return false
}

func getMetrics(metricLogs []*api_pb.MetricLog, strategies []commonv1beta1.MetricStrategy) (*commonv1beta1.Observation, error) {
metrics := make(map[string]*commonv1beta1.Metric)
timestamps := make(map[string]*time.Time)
Expand Down
3 changes: 2 additions & 1 deletion pkg/suggestion/v1beta1/internal/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def __init__(
def convert(trials):
res = []
for trial in trials:
if trial.status.condition == api.TrialStatus.TrialConditionType.SUCCEEDED:
if trial.status.condition == api.TrialStatus.TrialConditionType.SUCCEEDED or \
trial.status.condition == api.TrialStatus.TrialConditionType.EARLYSTOPPED:
new_trial = Trial.convertTrial(trial)
if new_trial is not None:
res.append(Trial.convertTrial(trial))
Expand Down
4 changes: 2 additions & 2 deletions pkg/suggestion/v1beta1/skopt/base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def getSuggestions(self, trials, current_request_number):
Get the new suggested trials with skopt algorithm.
"""
logger.info("-" * 100 + "\n")
logger.info("New GetSuggestions call\n")
logger.info("New GetSuggestions call with current request number: {}\n".format(current_request_number))
skopt_suggested = []
loss_for_skopt = []
if len(trials) > self.succeeded_trials or self.succeeded_trials == 0:
Expand Down Expand Up @@ -112,7 +112,7 @@ def getSuggestions(self, trials, current_request_number):
logger.info("List of recorded Trials names: {}\n".format(self.recorded_trials_names))

else:
logger.info("Succeeded Trials didn't change: {}\n".format(self.succeeded_trials))
logger.error("Succeeded Trials didn't change: {}\n".format(self.succeeded_trials))

logger.info("Running Optimizer ask to query new parameters for Trials\n")

Expand Down