Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting a ExecutionClusterLabel when triggering a Launchplan/Workflow/Task #4998

Merged
merged 12 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions flyteadmin/pkg/executioncluster/execution_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ import (
restclient "k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
flyteclient "github.com/flyteorg/flyte/flytepropeller/pkg/client/clientset/versioned"
"github.com/flyteorg/flyte/flytestdlib/random"
)

// Spec to determine the execution target
type ExecutionTargetSpec struct {
TargetID string
ExecutionID string
Project string
Domain string
Workflow string
LaunchPlan string
TargetID string
ExecutionID string
Project string
Domain string
Workflow string
LaunchPlan string
ExecutionClusterLabel *admin.ExecutionClusterLabel
}

// Client object of the target execution cluster
Expand Down
3 changes: 3 additions & 0 deletions flyteadmin/pkg/executioncluster/impl/in_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func (i InCluster) GetTarget(ctx context.Context, spec *executioncluster.Executi
if spec != nil && !(spec.TargetID == "" || spec.TargetID == defaultInClusterTargetID) {
return nil, errors.New(fmt.Sprintf("remote target %s is not supported", spec.TargetID))
}
if spec != nil && spec.ExecutionClusterLabel != nil && spec.ExecutionClusterLabel.Value != "" {
return nil, errors.New(fmt.Sprintf("execution cluster label %s is not supported", spec.ExecutionClusterLabel.Value))
}
return &i.target, nil
}

Expand Down
11 changes: 11 additions & 0 deletions flyteadmin/pkg/executioncluster/impl/in_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
)

func TestInClusterGetTarget(t *testing.T) {
Expand Down Expand Up @@ -52,6 +53,16 @@ func TestInClusterGetRemoteTarget(t *testing.T) {
assert.EqualError(t, err, "remote target t1 is not supported")
}

func TestInClusterGetTargetWithExecutionClusterLabel(t *testing.T) {
cluster := InCluster{
target: executioncluster.ExecutionTarget{},
}
_, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{ExecutionClusterLabel: &admin.ExecutionClusterLabel{
Value: "l1",
}})
assert.EqualError(t, err, "execution cluster label l1 is not supported")
}

func TestInClusterGetAllValidTargets(t *testing.T) {
target := &executioncluster.ExecutionTarget{
Enabled: true,
Expand Down
34 changes: 22 additions & 12 deletions flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,31 @@
}
return nil, fmt.Errorf("invalid cluster target %s", spec.TargetID)
}
resource, err := s.resourceManager.GetResource(ctx, managerInterfaces.ResourceRequest{
Project: spec.Project,
Domain: spec.Domain,
Workflow: spec.Workflow,
LaunchPlan: spec.LaunchPlan,
ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL,
})
if err != nil && !errors.IsDoesNotExistError(err) {
return nil, err
}

var weightedRandomList random.WeightedRandomList
if resource != nil && resource.Attributes.GetExecutionClusterLabel() != nil {
label := resource.Attributes.GetExecutionClusterLabel().Value

var label string

if spec.ExecutionClusterLabel != nil && spec.ExecutionClusterLabel.Value != "" {
label = spec.ExecutionClusterLabel.Value
logger.Debugf(ctx, "Using execution cluster label %s", label)
} else {
resource, err := s.resourceManager.GetResource(ctx, managerInterfaces.ResourceRequest{
Project: spec.Project,
Domain: spec.Domain,
Workflow: spec.Workflow,
LaunchPlan: spec.LaunchPlan,
ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL,
})
if err != nil && !errors.IsDoesNotExistError(err) {
return nil, err
}

Check warning on line 114 in flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go#L113-L114

Added lines #L113 - L114 were not covered by tests
if resource != nil && resource.Attributes.GetExecutionClusterLabel() != nil {
label = resource.Attributes.GetExecutionClusterLabel().Value
}
}

if label != "" {
if _, ok := s.labelWeightedRandomMap[label]; ok {
weightedRandomList = s.labelWeightedRandomMap[label]
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,35 @@ func TestRandomClusterSelectorGetTargetWithFallbackToDefault3(t *testing.T) {
assert.Equal(t, testCluster1, target.ID)
assert.True(t, target.Enabled)
}

func TestRandomClusterSelectorGetTargetWithExecutionClusterLabelClusterAssignmentOne(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2WithDefaultLabel)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: "different",
Workflow: testWorkflow,
ExecutionID: "e3",
ExecutionClusterLabel: &admin.ExecutionClusterLabel{
Value: "one",
},
})
assert.Nil(t, err)
assert.Equal(t, "testcluster1", target.ID)
assert.True(t, target.Enabled)
}

func TestRandomClusterSelectorGetTargetWithExecutionClusterLabelClusterAssignmentThree(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2WithDefaultLabel)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: "different",
Workflow: testWorkflow,
ExecutionID: "e3",
ExecutionClusterLabel: &admin.ExecutionClusterLabel{
Value: "three",
},
})
assert.Nil(t, err)
assert.Equal(t, "testcluster3", target.ID)
assert.True(t, target.Enabled)
}
51 changes: 31 additions & 20 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,17 +547,22 @@
return nil, nil, err
}

var executionClusterLabel *admin.ExecutionClusterLabel
if requestSpec.ExecutionClusterLabel != nil {
executionClusterLabel = requestSpec.ExecutionClusterLabel
}

Check warning on line 553 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L552-L553

Added lines #L552 - L553 were not covered by tests
executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
ExecutionClusterLabel: executionClusterLabel,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "")
Expand Down Expand Up @@ -947,17 +952,23 @@
return nil, nil, err
}

var executionClusterLabel *admin.ExecutionClusterLabel
if requestSpec.ExecutionClusterLabel != nil {
executionClusterLabel = requestSpec.ExecutionClusterLabel
}

executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
ExecutionClusterLabel: executionClusterLabel,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name)
Expand Down
6 changes: 4 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ import (
)

const (
principal = "principal"
rawOutput = "raw_output"
principal = "principal"
rawOutput = "raw_output"
executionClusterLabel = "execution_cluster_label"
)

var spec = testutils.GetExecutionRequest().Spec
Expand Down Expand Up @@ -383,6 +384,7 @@ func TestCreateExecution(t *testing.T) {
}
request.Spec.RawOutputDataConfig = &admin.RawOutputDataConfig{OutputLocationPrefix: rawOutput}
request.Spec.ClusterAssignment = &clusterAssignment
request.Spec.ExecutionClusterLabel = &admin.ExecutionClusterLabel{Value: executionClusterLabel}

identity, err := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil, nil)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"google.golang.org/protobuf/encoding/protojson"
"net"
"net/http"
"strings"
Expand All @@ -24,6 +23,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/encoding/protojson"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flyte/flyteadmin/auth"
Expand Down
11 changes: 6 additions & 5 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
}

executionTargetSpec := executioncluster.ExecutionTargetSpec{
Project: data.ExecutionID.Project,
Domain: data.ExecutionID.Domain,
Workflow: data.ReferenceWorkflowName,
LaunchPlan: data.ReferenceWorkflowName,
ExecutionID: data.ExecutionID.Name,
Project: data.ExecutionID.Project,
Domain: data.ExecutionID.Domain,
Workflow: data.ReferenceWorkflowName,
LaunchPlan: data.ReferenceWorkflowName,
ExecutionID: data.ExecutionID.Name,
ExecutionClusterLabel: data.ExecutionParameters.ExecutionClusterLabel,
}
targetCluster, err := e.executionCluster.GetTarget(ctx, &executionTargetSpec)
if err != nil {
Expand Down
25 changes: 13 additions & 12 deletions flyteadmin/pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ type TaskResources struct {
}

type ExecutionParameters struct {
Inputs *core.LiteralMap
AcceptedAt time.Time
Labels map[string]string
Annotations map[string]string
TaskPluginOverrides []*admin.PluginOverride
ExecutionConfig *admin.WorkflowExecutionConfig
RecoveryExecution *core.WorkflowExecutionIdentifier
TaskResources *TaskResources
EventVersion int
RoleNameKey string
RawOutputDataConfig *admin.RawOutputDataConfig
ClusterAssignment *admin.ClusterAssignment
Inputs *core.LiteralMap
AcceptedAt time.Time
Labels map[string]string
Annotations map[string]string
TaskPluginOverrides []*admin.PluginOverride
ExecutionConfig *admin.WorkflowExecutionConfig
RecoveryExecution *core.WorkflowExecutionIdentifier
TaskResources *TaskResources
EventVersion int
RoleNameKey string
RawOutputDataConfig *admin.RawOutputDataConfig
ClusterAssignment *admin.ClusterAssignment
ExecutionClusterLabel *admin.ExecutionClusterLabel
}

// ExecutionData includes all parameters required to create an execution CRD object.
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/tests/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ package tests

import (
"context"
"github.com/flyteorg/flyte/flytestdlib/utils"
"testing"

"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/utils"
)

func TestCreateProject(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading