Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <iaroslav@union.ai>
  • Loading branch information
iaroslav-ciupin committed Sep 20, 2023
1 parent 62804a9 commit 80720d0
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 9 deletions.
1 change: 1 addition & 0 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def run_remote(
overwrite_cache=run_level_params.overwrite_cache,
envs=run_level_params.envvars,
tags=run_level_params.tags,
cluster_pool=run_level_params.cluster_pool,
)

console_url = remote.generate_console_url(execution)
Expand Down
4 changes: 2 additions & 2 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from flyteidl.service.agent_pb2_grpc import add_AsyncAgentServiceServicer_to_server
from grpc import aio

from flytekit.extend.backend.agent_service import AsyncAgentService

_serve_help = """Start a grpc server for the agent service."""


Expand Down Expand Up @@ -52,6 +50,8 @@ async def _start_grpc_server(port: int, worker: int, timeout: int):
click.secho(f"Failed to start the prometheus server with error {e}", fg="red")
click.secho("Starting the agent service...", fg="blue")
server = aio.server(futures.ThreadPoolExecutor(max_workers=worker))
from flytekit.extend.backend.agent_service import AsyncAgentService

add_AsyncAgentServiceServicer_to_server(AsyncAgentService(), server)

server.add_insecure_port(f"[::]:{port}")
Expand Down
2 changes: 1 addition & 1 deletion flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def to_flyte_idl(self):
:rtype: flyteidl.admin.ClusterAssignment
"""
return _cluster_assignment_pb2.ClusterAssignment(
cluster_pool=self.cluster_pool,
cluster_pool_name=self.cluster_pool,
)


Expand Down
4 changes: 1 addition & 3 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,6 @@ def _execute(
execution_name = execution_name or (execution_name_prefix or "f") + uuid.uuid4().hex[:19]
if not options:
options = Options()
if cluster_pool:
options.cluster_assignment = ClusterAssignment(cluster_pool=cluster_pool)
if options.disable_notifications is not None:
if options.disable_notifications:
notifications = None
Expand Down Expand Up @@ -1052,7 +1050,7 @@ def _execute(
security_context=options.security_context,
envs=common_models.Envs(envs) if envs else None,
tags=tags,
cluster_assignment=options.cluster_assignment,
cluster_assignment=ClusterAssignment(cluster_pool=cluster_pool) if cluster_pool else None,
),
literal_inputs,
)
Expand Down
3 changes: 0 additions & 3 deletions flytekit/tools/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from flytekit.core.workflow import ReferenceWorkflow, WorkflowBase
from flytekit.models import common as _common_models
from flytekit.models import common as common_models
from flytekit.models import execution
from flytekit.models import interface as interface_models
from flytekit.models import launch_plan as _launch_plan_models
from flytekit.models import security
Expand Down Expand Up @@ -77,7 +76,6 @@ class Options(object):
max_parallelism: Controls the maximum number of tasknodes that can be run in parallel for the entire workflow.
notifications: List of notifications for this execution.
disable_notifications: This should be set to true if all notifications are intended to be disabled for this execution.
cluster_assignment: Assign newly create execution to a cluster that conforms with cluster_assignment object.
"""

labels: typing.Optional[common_models.Labels] = None
Expand All @@ -87,7 +85,6 @@ class Options(object):
max_parallelism: typing.Optional[int] = None
notifications: typing.Optional[typing.List[common_models.Notification]] = None
disable_notifications: typing.Optional[bool] = None
cluster_assignment: typing.Optional[execution.ClusterAssignment] = None

@classmethod
def default_from(
Expand Down
2 changes: 2 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,13 @@ def test_execute_python_task(flyteclient, flyte_workflows_register, flyte_remote
overwrite_cache=True,
envs={"foo": "bar"},
tags=["flyte"],
cluster_pool="gpu",
)
assert execution.outputs["t1_int_output"] == 12
assert execution.outputs["c"] == "world"
assert execution.spec.envs == {"foo": "bar"}
assert execution.spec.tags == ["flyte"]
assert execution.spec.cluster_assignment.cluster_pool == "gpu"


def test_execute_python_workflow_and_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env):
Expand Down

0 comments on commit 80720d0

Please sign in to comment.