From 2cf4d84baa0c64c0fc2be1ccfa12810e3642e68f Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Wed, 23 Oct 2024 13:55:26 -0400 Subject: [PATCH 1/7] chore: populate exp config priority with enforced priority before opting to k8s default CM-586: #in-review --- .../configpolicy/task_config_policy.go | 40 +++++++++++-------- master/internal/core_experiment.go | 20 ++++++++-- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/master/internal/configpolicy/task_config_policy.go b/master/internal/configpolicy/task_config_policy.go index 3d7fd4d5009..aec6c0ea289 100644 --- a/master/internal/configpolicy/task_config_policy.go +++ b/master/internal/configpolicy/task_config_policy.go @@ -11,6 +11,7 @@ import ( "github.com/determined-ai/determined/master/internal/rm" "github.com/determined-ai/determined/master/pkg/model" + "github.com/determined-ai/determined/master/pkg/ptrs" "github.com/determined-ai/determined/master/pkg/schemas" "github.com/determined-ai/determined/master/pkg/schemas/expconf" ) @@ -231,10 +232,14 @@ func MergeWithInvariantExperimentConfigs(ctx context.Context, workspaceID int, return &config, nil } -func findAllowedPriority(scope *int, workloadType string) (limit int, exists bool, err error) { +// FindAllowedPriority finds the optionally set priority limit in scope's invariant config +// policies. Returns the invariant config priority if that's set, and otherwise returns the +// the priority_limit constraint. If neither of the two is set, returns nil limit. +func FindAllowedPriority(scope *int, workloadType string) (limit *int, constraintExists bool, + err error) { configPolicies, err := GetTaskConfigPolicies(context.TODO(), scope, workloadType) if err != nil { - return 0, false, fmt.Errorf("unable to fetch task config policies: %w", err) + return nil, false, fmt.Errorf("unable to fetch task config policies: %w", err) } // Cannot update priority if priority set in invariant config. @@ -244,26 +249,26 @@ func findAllowedPriority(scope *int, workloadType string) (limit int, exists boo var configs model.CommandConfig err = json.Unmarshal([]byte(*configPolicies.InvariantConfig), &configs) if err != nil { - return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err) + return nil, false, fmt.Errorf("unable to unmarshal task config policies: %w", err) } if configs.Resources.Priority != nil { adminPriority := *configs.Resources.Priority - return adminPriority, false, + return ptrs.Ptr(adminPriority), false, fmt.Errorf("priority set by invariant config: %w", errPriorityImmutable) } case model.ExperimentType: var configs expconf.ExperimentConfigV0 err = json.Unmarshal([]byte(*configPolicies.InvariantConfig), &configs) if err != nil { - return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err) + return nil, false, fmt.Errorf("unable to unmarshal task config policies: %w", err) } if configs.RawResources != nil && configs.RawResources.RawPriority != nil { adminPriority := *configs.RawResources.RawPriority - return adminPriority, false, + return ptrs.Ptr(adminPriority), false, fmt.Errorf("priority set by invariant config: %w", errPriorityImmutable) } default: - return 0, false, fmt.Errorf("workload type %s not supported", workloadType) + return nil, false, fmt.Errorf("workload type %s not supported", workloadType) } } @@ -271,23 +276,24 @@ func findAllowedPriority(scope *int, workloadType string) (limit int, exists boo var constraints model.Constraints if configPolicies.Constraints != nil { if err = json.Unmarshal([]byte(*configPolicies.Constraints), &constraints); err != nil { - return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err) + return nil, false, fmt.Errorf("unable to unmarshal task config policies: %w", err) } if constraints.PriorityLimit != nil { - return *constraints.PriorityLimit, true, nil + return constraints.PriorityLimit, true, nil } } - return 0, false, nil + return nil, false, nil } // PriorityUpdateAllowed returns true if the desired priority is within the task config policy limit. func PriorityUpdateAllowed(wkspID int, workloadType string, priority int, smallerHigher bool) (bool, error) { // Check if a priority limit has been set with a constraint policy. // Global policies have highest precedence. - globalLimit, globalExists, err := findAllowedPriority(nil, workloadType) + globalEnforcedPriority, globalExists, err := FindAllowedPriority(nil, workloadType) - if errors.Is(err, errPriorityImmutable) && globalLimit == priority { + if errors.Is(err, errPriorityImmutable) && globalEnforcedPriority != nil && + *globalEnforcedPriority == priority { // If task config policies have updated since the workload was originally scheduled, allow users // to update the priority to the new priority set by invariant config. return true, nil @@ -298,9 +304,9 @@ func PriorityUpdateAllowed(wkspID int, workloadType string, priority int, smalle // TODO use COALESCE instead once postgres updates are complete. // Workspace policies have second precedence. - wkspLimit, wkspExists, err := findAllowedPriority(&wkspID, workloadType) - - if errors.Is(err, errPriorityImmutable) && wkspLimit == priority { + wkspEnforcedPriority, wkspExists, err := FindAllowedPriority(&wkspID, workloadType) + if errors.Is(err, errPriorityImmutable) && wkspEnforcedPriority != nil && + *wkspEnforcedPriority == priority { // If task config policies have updated since the workload was originally scheduled, allow users // to update the priority to the new priority set by invariant config. return true, nil @@ -311,10 +317,10 @@ func PriorityUpdateAllowed(wkspID int, workloadType string, priority int, smalle // No invariant configs. Check for constraints. if globalExists { - return priorityWithinLimit(priority, globalLimit, smallerHigher), nil + return priorityWithinLimit(priority, *wkspEnforcedPriority, smallerHigher), nil } if wkspExists { - return priorityWithinLimit(priority, wkspLimit, smallerHigher), nil + return priorityWithinLimit(priority, *wkspEnforcedPriority, smallerHigher), nil } // No priority limit has been set. diff --git a/master/internal/core_experiment.go b/master/internal/core_experiment.go index 88161fccc94..b628303ae7c 100644 --- a/master/internal/core_experiment.go +++ b/master/internal/core_experiment.go @@ -349,10 +349,24 @@ func (m *Master) parseCreateExperiment(ctx context.Context, req *apiv1.CreateExp config.RawCheckpointStorage, &m.config.CheckpointStorage, ) - // Apply the scheduler's default priority. + // Apply the scheduler's default priority if priority is not set in an invariant config or // constraint. + // constraint. if config.Resources().Priority() == nil { - prio := masterConfig.DefaultPriorityForPool(poolName.String()) - config.RawResources.RawPriority = &prio + // Returns an error if RM does not implement priority. + enforcedPriority, _, err := configpolicy.FindAllowedPriority( + &workspaceID, + model.ExperimentType, + ) + if err != nil { + return nil, nil, config, nil, nil, err + } + if enforcedPriority == nil { + prio := masterConfig.DefaultPriorityForPool(poolName.String()) + config.RawResources.RawPriority = &prio + } else { + config.RawResources.RawPriority = enforcedPriority + } + } // Lastly, apply any json-schema-defined defaults. From 34d483dd2a03ffd639507817dc720710191c13d3 Mon Sep 17 00:00:00 2001 From: Amanda Vialva <144278621+amandavialva01@users.noreply.github.com> Date: Wed, 23 Oct 2024 14:12:17 -0400 Subject: [PATCH 2/7] Update master/internal/configpolicy/task_config_policy.go Co-authored-by: Bradley Laney --- master/internal/configpolicy/task_config_policy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/master/internal/configpolicy/task_config_policy.go b/master/internal/configpolicy/task_config_policy.go index aec6c0ea289..f84e5b42cd5 100644 --- a/master/internal/configpolicy/task_config_policy.go +++ b/master/internal/configpolicy/task_config_policy.go @@ -264,7 +264,7 @@ func FindAllowedPriority(scope *int, workloadType string) (limit *int, constrain } if configs.RawResources != nil && configs.RawResources.RawPriority != nil { adminPriority := *configs.RawResources.RawPriority - return ptrs.Ptr(adminPriority), false, + return &adminPriority, false, fmt.Errorf("priority set by invariant config: %w", errPriorityImmutable) } default: From aee8ec3bf26a896193b46457071e30e23f9740a1 Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Wed, 23 Oct 2024 14:18:53 -0400 Subject: [PATCH 3/7] remove comment and change ptrs.ptr to &pointer --- master/internal/configpolicy/task_config_policy.go | 3 +-- master/internal/core_experiment.go | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/master/internal/configpolicy/task_config_policy.go b/master/internal/configpolicy/task_config_policy.go index f84e5b42cd5..2b0c256c138 100644 --- a/master/internal/configpolicy/task_config_policy.go +++ b/master/internal/configpolicy/task_config_policy.go @@ -11,7 +11,6 @@ import ( "github.com/determined-ai/determined/master/internal/rm" "github.com/determined-ai/determined/master/pkg/model" - "github.com/determined-ai/determined/master/pkg/ptrs" "github.com/determined-ai/determined/master/pkg/schemas" "github.com/determined-ai/determined/master/pkg/schemas/expconf" ) @@ -253,7 +252,7 @@ func FindAllowedPriority(scope *int, workloadType string) (limit *int, constrain } if configs.Resources.Priority != nil { adminPriority := *configs.Resources.Priority - return ptrs.Ptr(adminPriority), false, + return &adminPriority, false, fmt.Errorf("priority set by invariant config: %w", errPriorityImmutable) } case model.ExperimentType: diff --git a/master/internal/core_experiment.go b/master/internal/core_experiment.go index b628303ae7c..9ec9c617c31 100644 --- a/master/internal/core_experiment.go +++ b/master/internal/core_experiment.go @@ -352,7 +352,6 @@ func (m *Master) parseCreateExperiment(ctx context.Context, req *apiv1.CreateExp // Apply the scheduler's default priority if priority is not set in an invariant config or // constraint. // constraint. if config.Resources().Priority() == nil { - // Returns an error if RM does not implement priority. enforcedPriority, _, err := configpolicy.FindAllowedPriority( &workspaceID, model.ExperimentType, From beafd0e6f56df90736a1a85d23854c8a6c772b89 Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Wed, 23 Oct 2024 14:24:34 -0400 Subject: [PATCH 4/7] fix lint --- master/internal/configpolicy/task_config_policy.go | 5 +++-- .../internal/configpolicy/task_config_policy_intg_test.go | 8 ++++---- master/internal/core_experiment.go | 3 +-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/master/internal/configpolicy/task_config_policy.go b/master/internal/configpolicy/task_config_policy.go index 2b0c256c138..df6cc0cabcd 100644 --- a/master/internal/configpolicy/task_config_policy.go +++ b/master/internal/configpolicy/task_config_policy.go @@ -232,10 +232,11 @@ func MergeWithInvariantExperimentConfigs(ctx context.Context, workspaceID int, } // FindAllowedPriority finds the optionally set priority limit in scope's invariant config -// policies. Returns the invariant config priority if that's set, and otherwise returns the +// policies. Returns the invariant config priority if that's set, and otherwise returns // the priority_limit constraint. If neither of the two is set, returns nil limit. func FindAllowedPriority(scope *int, workloadType string) (limit *int, constraintExists bool, - err error) { + err error, +) { configPolicies, err := GetTaskConfigPolicies(context.TODO(), scope, workloadType) if err != nil { return nil, false, fmt.Errorf("unable to fetch task config policies: %w", err) diff --git a/master/internal/configpolicy/task_config_policy_intg_test.go b/master/internal/configpolicy/task_config_policy_intg_test.go index ae19901868e..5fc8db60dfe 100644 --- a/master/internal/configpolicy/task_config_policy_intg_test.go +++ b/master/internal/configpolicy/task_config_policy_intg_test.go @@ -25,7 +25,7 @@ func TestFindAllowedPriority(t *testing.T) { db.MustMigrateTestPostgres(t, pgDB, db.MigrationsFromDB) // No priority limit to find. - _, exists, err := findAllowedPriority(nil, model.ExperimentType) + _, exists, err := FindAllowedPriority(nil, model.ExperimentType) require.NoError(t, err) require.False(t, exists) @@ -33,7 +33,7 @@ func TestFindAllowedPriority(t *testing.T) { globalLimit := 10 user := db.RequireMockUser(t, pgDB) addConstraints(t, user, nil, fmt.Sprintf(`{"priority_limit": %d}`, globalLimit), model.ExperimentType) - limit, exists, err := findAllowedPriority(nil, model.ExperimentType) + limit, exists, err := FindAllowedPriority(nil, model.ExperimentType) require.Equal(t, globalLimit, limit) require.True(t, exists) require.NoError(t, err) @@ -42,7 +42,7 @@ func TestFindAllowedPriority(t *testing.T) { configPriority := 15 invariantConfig := fmt.Sprintf(`{"resources": {"priority": %d}}`, configPriority) addConfig(t, user, nil, invariantConfig, model.NTSCType) - limit, _, err = findAllowedPriority(nil, model.NTSCType) + limit, _, err = FindAllowedPriority(nil, model.NTSCType) require.ErrorIs(t, err, errPriorityImmutable) require.Equal(t, configPriority, limit) @@ -50,7 +50,7 @@ func TestFindAllowedPriority(t *testing.T) { configPriority = 7 invariantConfig = fmt.Sprintf(`{"resources": {"priority": %d}}`, configPriority) addConfig(t, user, nil, invariantConfig, model.ExperimentType) - limit, _, err = findAllowedPriority(nil, model.ExperimentType) + limit, _, err = FindAllowedPriority(nil, model.ExperimentType) require.ErrorIs(t, err, errPriorityImmutable) require.Equal(t, configPriority, limit) } diff --git a/master/internal/core_experiment.go b/master/internal/core_experiment.go index 9ec9c617c31..8862aab7657 100644 --- a/master/internal/core_experiment.go +++ b/master/internal/core_experiment.go @@ -349,7 +349,7 @@ func (m *Master) parseCreateExperiment(ctx context.Context, req *apiv1.CreateExp config.RawCheckpointStorage, &m.config.CheckpointStorage, ) - // Apply the scheduler's default priority if priority is not set in an invariant config or // constraint. + // Apply the scheduler's default priority if priority is not set in an invariant config or // constraint. if config.Resources().Priority() == nil { enforcedPriority, _, err := configpolicy.FindAllowedPriority( @@ -365,7 +365,6 @@ func (m *Master) parseCreateExperiment(ctx context.Context, req *apiv1.CreateExp } else { config.RawResources.RawPriority = enforcedPriority } - } // Lastly, apply any json-schema-defined defaults. From d48e214387f150e83039c554f2846eb97ac68cd5 Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Thu, 24 Oct 2024 09:34:49 -0400 Subject: [PATCH 5/7] address comments --- master/internal/core_experiment.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/master/internal/core_experiment.go b/master/internal/core_experiment.go index 8862aab7657..c3ce93017a9 100644 --- a/master/internal/core_experiment.go +++ b/master/internal/core_experiment.go @@ -352,14 +352,14 @@ func (m *Master) parseCreateExperiment(ctx context.Context, req *apiv1.CreateExp // Apply the scheduler's default priority if priority is not set in an invariant config or // constraint. if config.Resources().Priority() == nil { - enforcedPriority, _, err := configpolicy.FindAllowedPriority( + enforcedPriority, constraint, err := configpolicy.FindAllowedPriority( &workspaceID, model.ExperimentType, ) if err != nil { return nil, nil, config, nil, nil, err } - if enforcedPriority == nil { + if enforcedPriority == nil || constraint { prio := masterConfig.DefaultPriorityForPool(poolName.String()) config.RawResources.RawPriority = &prio } else { From 4393a1bfdc4ce9ba17b3acbe8a2eb1a33983495e Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Thu, 24 Oct 2024 11:43:07 -0400 Subject: [PATCH 6/7] fix test --- .../configpolicy/task_config_policy_intg_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/master/internal/configpolicy/task_config_policy_intg_test.go b/master/internal/configpolicy/task_config_policy_intg_test.go index 5fc8db60dfe..723936a2bb7 100644 --- a/master/internal/configpolicy/task_config_policy_intg_test.go +++ b/master/internal/configpolicy/task_config_policy_intg_test.go @@ -34,7 +34,8 @@ func TestFindAllowedPriority(t *testing.T) { user := db.RequireMockUser(t, pgDB) addConstraints(t, user, nil, fmt.Sprintf(`{"priority_limit": %d}`, globalLimit), model.ExperimentType) limit, exists, err := FindAllowedPriority(nil, model.ExperimentType) - require.Equal(t, globalLimit, limit) + require.NotNil(t, limit) + require.Equal(t, globalLimit, *limit) require.True(t, exists) require.NoError(t, err) @@ -44,7 +45,8 @@ func TestFindAllowedPriority(t *testing.T) { addConfig(t, user, nil, invariantConfig, model.NTSCType) limit, _, err = FindAllowedPriority(nil, model.NTSCType) require.ErrorIs(t, err, errPriorityImmutable) - require.Equal(t, configPriority, limit) + require.NotNil(t, limit) + require.Equal(t, configPriority, *limit) // Experiment priority set. configPriority = 7 @@ -52,7 +54,8 @@ func TestFindAllowedPriority(t *testing.T) { addConfig(t, user, nil, invariantConfig, model.ExperimentType) limit, _, err = FindAllowedPriority(nil, model.ExperimentType) require.ErrorIs(t, err, errPriorityImmutable) - require.Equal(t, configPriority, limit) + require.NotNil(t, limit) + require.Equal(t, configPriority, *limit) } func TestPriorityUpdateAllowed(t *testing.T) { From 0da7a0965e197acfc49ca17c8fbe132b75c106cf Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Thu, 24 Oct 2024 12:15:12 -0400 Subject: [PATCH 7/7] fix tests --- master/internal/configpolicy/task_config_policy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/master/internal/configpolicy/task_config_policy.go b/master/internal/configpolicy/task_config_policy.go index df6cc0cabcd..6ab518a42d5 100644 --- a/master/internal/configpolicy/task_config_policy.go +++ b/master/internal/configpolicy/task_config_policy.go @@ -317,7 +317,7 @@ func PriorityUpdateAllowed(wkspID int, workloadType string, priority int, smalle // No invariant configs. Check for constraints. if globalExists { - return priorityWithinLimit(priority, *wkspEnforcedPriority, smallerHigher), nil + return priorityWithinLimit(priority, *globalEnforcedPriority, smallerHigher), nil } if wkspExists { return priorityWithinLimit(priority, *wkspEnforcedPriority, smallerHigher), nil