Skip to content

Commit

Permalink
Pass cluster pool when creating executions
Browse files Browse the repository at this point in the history
  • Loading branch information
iaroslav-ciupin committed Sep 29, 2022
1 parent 41f7750 commit e191524
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ docstring-parser==0.14.1
# flytekit
filelock==3.8.0
# via virtualenv
flyteidl==1.1.12
flyteidl==git+https://github.com/flyteorg/flyteidl.git@cluster-pools-assignment
# via
# -c requirements.txt
# flytekit
Expand Down
14 changes: 14 additions & 0 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from flytekit.models.interface import Variable
from flytekit.models.literals import Blob, BlobMetadata, Primitive, Union
from flytekit.models.types import LiteralType, SimpleType
from flytekit.models.execution import ClusterAssignment
from flytekit.remote.executions import FlyteWorkflowExecution
from flytekit.tools import module_loader, script_mode
from flytekit.tools.script_mode import _find_project_root
Expand Down Expand Up @@ -427,6 +428,13 @@ def get_workflow_command_base_params() -> typing.List[click.Option]:
default=False,
help="Whether dump a code snippet instructing how to load the workflow execution using flyteremote",
),
click.Option(
param_decls=["--cluster-pool", "cluster_pool"],
required=False,
type=str,
default="",
help="Assign newly created execution to a given cluster pool",
),
]


Expand Down Expand Up @@ -556,6 +564,12 @@ def _run(*args, **kwargs):
# It is assumed that the users expectations is to override the service account only for the execution
options = Options.default_from(k8s_service_account=service_account)

cluster_pool = run_level_params.get("cluster_pool")
if cluster_pool:
if not options:
options = Options()
options.cluster_assignment = ClusterAssignment(cluster_pool=cluster_pool)

execution = remote.execute(
remote_entity,
inputs=inputs,
Expand Down
30 changes: 30 additions & 0 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import typing

import flyteidl
import flyteidl.admin as _admin_pb2
import flyteidl.admin.execution_pb2 as _execution_pb2
import flyteidl.admin.node_execution_pb2 as _node_execution_pb2
import flyteidl.admin.task_execution_pb2 as _task_execution_pb2
Expand Down Expand Up @@ -175,6 +176,7 @@ def __init__(
raw_output_data_config=None,
max_parallelism=None,
security_context: typing.Optional[security.SecurityContext] = None,
cluster_assignment: typing.Optional[ClusterAssignment] = None,
):
"""
:param flytekit.models.core.identifier.Identifier launch_plan: Launch plan unique identifier to execute
Expand All @@ -200,6 +202,7 @@ def __init__(
self._raw_output_data_config = raw_output_data_config
self._max_parallelism = max_parallelism
self._security_context = security_context
self._cluster_assignment = cluster_assignment

@property
def launch_plan(self):
Expand Down Expand Up @@ -266,6 +269,10 @@ def max_parallelism(self) -> int:
def security_context(self) -> typing.Optional[security.SecurityContext]:
return self._security_context

@property
def cluster_assignment(self) -> typing.Optional[ClusterAssignment]:
return self._cluster_assignment

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.ExecutionSpec
Expand All @@ -283,6 +290,7 @@ def to_flyte_idl(self):
else None,
max_parallelism=self.max_parallelism,
security_context=self.security_context.to_flyte_idl() if self.security_context else None,
cluster_assignment=self._cluster_assignment.to_flyte_idl() if self._cluster_assignment else None,
)

@classmethod
Expand All @@ -309,6 +317,28 @@ def from_flyte_idl(cls, p):
)


class ClusterAssignment(_common_models.FlyteIdlEntity):
def __init__(self, cluster_pool=None):
"""
:param Text cluster_pool:
"""
self._cluster_pool = cluster_pool

@property
def cluster_pool(self):
"""
:rtype: Text
"""
return self._cluster_pool

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.ClusterAssignment
"""
return _admin_pb2.ClusterAssignment(
cluster_pool=self.cluster_pool,
)

class LiteralMapBlob(_common_models.FlyteIdlEntity):
def __init__(self, values=None, uri=None):
"""
Expand Down
1 change: 1 addition & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ def _execute(
auth_role=None,
max_parallelism=options.max_parallelism,
security_context=options.security_context,
cluster_assignment=options.cluster_assignment,
),
literal_inputs,
)
Expand Down
3 changes: 3 additions & 0 deletions flytekit/tools/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from flytekit.models import common as common_models
from flytekit.models import interface as interface_models
from flytekit.models import launch_plan as _launch_plan_models
from flytekit.models import execution
from flytekit.models import security
from flytekit.models import task as task_models
from flytekit.models.admin import workflow as admin_workflow_models
Expand Down Expand Up @@ -68,6 +69,7 @@ class Options(object):
max_parallelism: Controls the maximum number of tasknodes that can be run in parallel for the entire workflow.
notifications: List of notifications for this execution
disable_notifications: This should be set to true if all notifications are intended to be disabled for this execution.
cluster_assignment: Assign newly create execution to a cluster that conforms with cluster_assignment object.
"""

labels: typing.Optional[common_models.Labels] = None
Expand All @@ -77,6 +79,7 @@ class Options(object):
max_parallelism: typing.Optional[int] = None
notifications: typing.Optional[typing.List[common_models.Notification]] = None
disable_notifications: typing.Optional[bool] = None
cluster_assignment: typing.Optional[execution.ClusterAssignment] = None

@classmethod
def default_from(
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ docker-image-py==0.1.12
# via flytekit
docstring-parser==0.14.1
# via flytekit
flyteidl==1.1.12
flyteidl==git+https://github.com/flyteorg/flyteidl.git@cluster-pools-assignment
# via flytekit
googleapis-common-protos==1.56.4
# via
Expand Down

0 comments on commit e191524

Please sign in to comment.