-
Notifications
You must be signed in to change notification settings - Fork 276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement sidecar tasks and task resource specification #263
Merged
Merged
Changes from 6 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
f4a19cd
wip
katrogan 6f17766
wip
katrogan 950bb4b
Merge branch 'annotations' into sidecar
katrogan d8387fc
flake8
katrogan fa276ba
more flake
katrogan 6314168
review comments
katrogan e098aa1
add **kwargs as input to get_resources()
katrogan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
from dataclasses import dataclass | ||
|
||
|
||
@dataclass | ||
class Resource(object): | ||
cpu: str = None | ||
mem: str = None | ||
gpu: str = None | ||
storage: str = None | ||
|
||
|
||
@dataclass | ||
class ResourceSpec(object): | ||
requests: Resource = None | ||
limits: Resource = None | ||
|
||
|
||
def get_resources( | ||
memory_request=None, | ||
memory_limit=None, | ||
cpu_request=None, | ||
cpu_limit=None, | ||
storage_request=None, | ||
storage_limit=None, | ||
gpu_request=None, | ||
gpu_limit=None, | ||
) -> ResourceSpec: | ||
resources = ResourceSpec() | ||
resources.requests = Resource() | ||
resources.limits = Resource() | ||
if memory_request: | ||
resources.requests.mem = memory_request | ||
if memory_limit: | ||
resources.limits.mem = memory_limit | ||
if cpu_request: | ||
resources.requests.cpu = cpu_request | ||
if cpu_limit: | ||
resources.limits.cpu = cpu_limit | ||
if storage_request: | ||
resources.requests.storage = storage_request | ||
if storage_limit: | ||
resources.limits.storage = storage_limit | ||
if gpu_request: | ||
resources.requests.gpu = gpu_request | ||
if gpu_limit: | ||
resources.limits.gpu = gpu_limit | ||
return resources |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .task import Sidecar | ||
|
||
__all__ = [Sidecar] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
from typing import Any, Callable, Dict, Tuple, Union | ||
|
||
from flyteidl.core import tasks_pb2 as _core_task | ||
from google.protobuf.json_format import MessageToDict | ||
|
||
from flytekit.annotated.context_manager import FlyteContext, RegistrationSettings | ||
from flytekit.annotated.promise import Promise | ||
from flytekit.annotated.python_function_task import PythonFunctionTask | ||
from flytekit.annotated.task import TaskPlugins | ||
from flytekit.common.exceptions import user as _user_exceptions | ||
from flytekit.models import task as _task_model | ||
from flytekit.models import task as _task_models | ||
from flytekit.plugins import k8s as _lazy_k8s | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think you need to do this. But, I guess lets keep it. I need to move all the _lazy_k8s logic to the plugins |
||
|
||
|
||
class Sidecar(object): | ||
def __init__(self, pod_spec: _lazy_k8s.io.api.core.v1.generated_pb2.PodSpec, primary_container_name: str): | ||
if not pod_spec: | ||
raise _user_exceptions.FlyteValidationException("A pod spec cannot be undefined") | ||
if not primary_container_name: | ||
raise _user_exceptions.FlyteValidationException("A primary container name cannot be undefined") | ||
|
||
self._pod_spec = pod_spec | ||
self._primary_container_name = primary_container_name | ||
|
||
@property | ||
def pod_spec(self) -> _lazy_k8s.io.api.core.v1.generated_pb2.PodSpec: | ||
return self._pod_spec | ||
|
||
@property | ||
def primary_container_name(self) -> str: | ||
return self._primary_container_name | ||
|
||
|
||
class SidecarFunctionTask(PythonFunctionTask[Sidecar]): | ||
def __init__( | ||
self, task_config: Sidecar, task_function: Callable, metadata: _task_model.TaskMetadata, *args, **kwargs | ||
): | ||
super(SidecarFunctionTask, self).__init__( | ||
task_config=task_config, | ||
task_type="sidecar", | ||
task_function=task_function, | ||
metadata=metadata, | ||
*args, | ||
**kwargs, | ||
) | ||
|
||
def get_custom(self, settings: RegistrationSettings) -> Dict[str, Any]: | ||
containers = self.task_config.pod_spec.containers | ||
primary_exists = False | ||
for container in containers: | ||
if container.name == self.task_config.primary_container_name: | ||
primary_exists = True | ||
break | ||
if not primary_exists: | ||
# insert a placeholder primary container if it is not defined in the pod spec. | ||
containers.extend( | ||
[_lazy_k8s.io.api.core.v1.generated_pb2.Container(name=self.task_config.primary_container_name)] | ||
) | ||
|
||
final_containers = [] | ||
for container in containers: | ||
# In the case of the primary container, we overwrite specific container attributes with the default values | ||
# used in an SDK runnable task. | ||
if container.name == self.task_config.primary_container_name: | ||
sdk_default_container = self.get_container(settings) | ||
|
||
container.image = sdk_default_container.image | ||
# clear existing commands | ||
del container.command[:] | ||
container.command.extend(sdk_default_container.command) | ||
# also clear existing args | ||
del container.args[:] | ||
container.args.extend(sdk_default_container.args) | ||
|
||
resource_requirements = _lazy_k8s.io.api.core.v1.generated_pb2.ResourceRequirements() | ||
for resource in sdk_default_container.resources.limits: | ||
resource_requirements.limits[ | ||
_core_task.Resources.ResourceName.Name(resource.name).lower() | ||
].CopyFrom(_lazy_k8s.io.apimachinery.pkg.api.resource.generated_pb2.Quantity(string=resource.value)) | ||
for resource in sdk_default_container.resources.requests: | ||
resource_requirements.requests[ | ||
_core_task.Resources.ResourceName.Name(resource.name).lower() | ||
].CopyFrom(_lazy_k8s.io.apimachinery.pkg.api.resource.generated_pb2.Quantity(string=resource.value)) | ||
if resource_requirements.ByteSize(): | ||
# Important! Only copy over resource requirements if they are non-empty. | ||
container.resources.CopyFrom(resource_requirements) | ||
|
||
del container.env[:] | ||
container.env.extend( | ||
[ | ||
_lazy_k8s.io.api.core.v1.generated_pb2.EnvVar(name=key, value=val) | ||
for key, val in sdk_default_container.env.items() | ||
] | ||
) | ||
|
||
final_containers.append(container) | ||
|
||
del self.task_config._pod_spec.containers[:] | ||
self.task_config._pod_spec.containers.extend(final_containers) | ||
|
||
sidecar_job_plugin = _task_models.SidecarJob( | ||
pod_spec=self.task_config.pod_spec, primary_container_name=self.task_config.primary_container_name, | ||
).to_flyte_idl() | ||
return MessageToDict(sidecar_job_plugin) | ||
|
||
def _local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Promise, None]: | ||
raise _user_exceptions.FlyteUserException("Local execute is not currently supported for sidecar tasks") | ||
|
||
|
||
TaskPlugins.register_pythontask_plugin(Sidecar, SidecarFunctionTask) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from flytekit.annotated.resources import get_resources | ||
|
||
|
||
def test_get_resources(): | ||
resources = get_resources( | ||
memory_request="100M", | ||
memory_limit="200M", | ||
cpu_request="1", | ||
cpu_limit="1.5", | ||
storage_request="1Gb", | ||
storage_limit="2Gb", | ||
gpu_request="1", | ||
gpu_limit="2", | ||
) | ||
assert resources.requests.mem == "100M" | ||
assert resources.limits.mem == "200M" | ||
assert resources.requests.cpu == "1" | ||
assert resources.limits.cpu == "1.5" | ||
assert resources.requests.storage == "1Gb" | ||
assert resources.limits.storage == "2Gb" | ||
assert resources.requests.gpu == "1" | ||
assert resources.limits.gpu == "2" | ||
|
||
|
||
def test_get_resources_none_specified(): | ||
resources = get_resources() | ||
assert not resources.requests.mem | ||
assert not resources.limits.mem | ||
assert not resources.requests.cpu | ||
assert not resources.limits.cpu | ||
assert not resources.requests.storage | ||
assert not resources.limits.storage | ||
assert not resources.requests.gpu | ||
assert not resources.limits.gpu |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
from k8s.io.api.core.v1 import generated_pb2 | ||
|
||
from flytekit.annotated.context_manager import Image, ImageConfig, RegistrationSettings | ||
from flytekit.annotated.task import task | ||
from flytekit.taskplugins.sidecar.task import Sidecar, SidecarFunctionTask | ||
|
||
|
||
def get_pod_spec(): | ||
a_container = generated_pb2.Container(name="a container",) | ||
a_container.command.extend(["fee", "fi", "fo", "fum"]) | ||
a_container.volumeMounts.extend([generated_pb2.VolumeMount(name="volume mount", mountPath="some/where",)]) | ||
|
||
pod_spec = generated_pb2.PodSpec(restartPolicy="OnFailure",) | ||
pod_spec.containers.extend([a_container, generated_pb2.Container(name="another container")]) | ||
return pod_spec | ||
|
||
|
||
def test_sidecar_task(): | ||
sidecar = Sidecar(pod_spec=get_pod_spec(), primary_container_name="a container") | ||
|
||
@task(task_config=sidecar, cpu_request="10", gpu_limit="2") | ||
def simple_sidecar_task(i: int): | ||
pass | ||
|
||
assert isinstance(simple_sidecar_task, SidecarFunctionTask) | ||
assert simple_sidecar_task.task_config == sidecar | ||
|
||
default_img = Image(name="default", fqn="test", tag="tag") | ||
|
||
custom = simple_sidecar_task.get_custom( | ||
RegistrationSettings( | ||
project="project", | ||
domain="domain", | ||
version="version", | ||
env={"foo": "bar"}, | ||
image_config=ImageConfig(default_image=default_img, images=[default_img]), | ||
) | ||
) | ||
assert custom["podSpec"]["restartPolicy"] == "OnFailure" | ||
assert len(custom["podSpec"]["containers"]) == 2 | ||
primary_container = custom["podSpec"]["containers"][0] | ||
assert primary_container["name"] == "a container" | ||
assert primary_container["args"] == [ | ||
"pyflyte-execute", | ||
"--task-module", | ||
"test_sidecar", | ||
"--task-name", | ||
"simple_sidecar_task", | ||
"--inputs", | ||
"{{.input}}", | ||
"--output-prefix", | ||
"{{.outputPrefix}}", | ||
"--raw-output-data-prefix", | ||
"{{.rawOutputDataPrefix}}", | ||
] | ||
assert primary_container["volumeMounts"] == [{"mountPath": "some/where", "name": "volume mount"}] | ||
assert {"name": "foo", "value": "bar"} in primary_container["env"] | ||
assert primary_container["resources"] == { | ||
"requests": {"cpu": {"string": "10"}}, | ||
"limits": {"gpu": {"string": "2"}}, | ||
} | ||
assert custom["podSpec"]["containers"][1]["name"] == "another container" | ||
assert custom["primaryContainerName"] == "a container" |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should add
**kwargs
as the last item.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
talked offline, added to get_resources