Skip to content

Commit

Permalink
Pyflyte build imageSpec (flyteorg#1555)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* pyflyte build

Signed-off-by: Kevin Su <pingsutw@apache.org>

* nit

Signed-off-by: Kevin Su <pingsutw@apache.org>

* nit

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Support serialize and package

Signed-off-by: Kevin Su <pingsutw@apache.org>

* more tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* move to plugin

Signed-off-by: Kevin Su <pingsutw@apache.org>

* test

Signed-off-by: Kevin Su <pingsutw@apache.org>

* nit

Signed-off-by: Kevin Su <pingsutw@apache.org>

* test

Signed-off-by: Kevin Su <pingsutw@apache.org>

* test

Signed-off-by: Kevin Su <pingsutw@apache.org>

* test

Signed-off-by: Kevin Su <pingsutw@apache.org>

* test

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* wip

Signed-off-by: Kevin Su <pingsutw@apache.org>

* nit

Signed-off-by: Kevin Su <pingsutw@apache.org>

* fixed tested

Signed-off-by: Kevin Su <pingsutw@apache.org>

* fixed tested

Signed-off-by: Kevin Su <pingsutw@apache.org>

* more tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Add support passing yaml in pyflyte run

Signed-off-by: Kevin Su <pingsutw@apache.org>

* lint

Signed-off-by: Kevin Su <pingsutw@apache.org>

* lint

Signed-off-by: Kevin Su <pingsutw@apache.org>

* nit

Signed-off-by: Kevin Su <pingsutw@apache.org>

* nit

Signed-off-by: Kevin Su <pingsutw@apache.org>

---------

Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw authored and Yicheng-Lu-llll committed Apr 24, 2023
1 parent dfb8697 commit 4064953
Show file tree
Hide file tree
Showing 29 changed files with 840 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ jobs:
- flytekit-deck-standard
- flytekit-dolt
- flytekit-duckdb
- flytekit-envd
- flytekit-greatexpectations
- flytekit-hive
- flytekit-k8s-pod
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ RUN apt-get update && apt-get install build-essential -y
RUN pip install -U flytekit==$VERSION \
flytekitplugins-pod==$VERSION \
flytekitplugins-deck-standard==$VERSION \
flytekitplugins-envd==$VERSION \
scikit-learn

RUN useradd -u 1000 flytekit
Expand Down
127 changes: 127 additions & 0 deletions flytekit/clis/sdk_in_container/build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import os
import pathlib
import typing

import click
from typing_extensions import OrderedDict

from flytekit.clis.sdk_in_container.constants import CTX_MODULE, CTX_PROJECT_ROOT
from flytekit.clis.sdk_in_container.run import RUN_LEVEL_PARAMS_KEY, get_entities_in_file, load_naive_entity
from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.workflow import PythonFunctionWorkflow
from flytekit.tools.script_mode import _find_project_root
from flytekit.tools.translator import get_serializable


def get_workflow_command_base_params() -> typing.List[click.Option]:
"""
Return the set of base parameters added to every pyflyte build workflow subcommand.
"""
return [
click.Option(
param_decls=["--fast"],
required=False,
is_flag=True,
default=False,
help="Use fast serialization. The image won't contain the source code. The value is false by default.",
),
]


def build_command(ctx: click.Context, entity: typing.Union[PythonFunctionWorkflow, PythonTask]):
"""
Returns a function that is used to implement WorkflowCommand and build an image for flyte workflows.
"""

def _build(*args, **kwargs):
m = OrderedDict()
options = None
run_level_params = ctx.obj[RUN_LEVEL_PARAMS_KEY]

project, domain = run_level_params.get("project"), run_level_params.get("domain")
serialization_settings = SerializationSettings(
project=project,
domain=domain,
image_config=ImageConfig.auto_default_image(),
)
if not run_level_params.get("fast"):
serialization_settings.source_root = ctx.obj[RUN_LEVEL_PARAMS_KEY].get(CTX_PROJECT_ROOT)

_ = get_serializable(m, settings=serialization_settings, entity=entity, options=options)

return _build


class WorkflowCommand(click.MultiCommand):
"""
click multicommand at the python file layer, subcommands should be all the workflows in the file.
"""

def __init__(self, filename: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self._filename = pathlib.Path(filename).resolve()

def list_commands(self, ctx):
entities = get_entities_in_file(self._filename.__str__())
return entities.all()

def get_command(self, ctx, exe_entity):
"""
This command uses the filename with which this command was created, and the string name of the entity passed
after the Python filename on the command line, to load the Python object, and then return the Command that
click should run.
:param ctx: The click Context object.
:param exe_entity: string of the flyte entity provided by the user. Should be the name of a workflow, or task
function.
:return:
"""
rel_path = os.path.relpath(self._filename)
if rel_path.startswith(".."):
raise ValueError(
f"You must call pyflyte from the same or parent dir, {self._filename} not under {os.getcwd()}"
)

project_root = _find_project_root(self._filename)
rel_path = self._filename.relative_to(project_root)
module = os.path.splitext(rel_path)[0].replace(os.path.sep, ".")

ctx.obj[RUN_LEVEL_PARAMS_KEY][CTX_PROJECT_ROOT] = project_root
ctx.obj[RUN_LEVEL_PARAMS_KEY][CTX_MODULE] = module

entity = load_naive_entity(module, exe_entity, project_root)

cmd = click.Command(
name=exe_entity,
callback=build_command(ctx, entity),
help=f"Build an image for {module}.{exe_entity}.",
)
return cmd


class BuildCommand(click.MultiCommand):
"""
A click command group for building a image for flyte workflows & tasks in a file.
"""

def __init__(self, *args, **kwargs):
params = get_workflow_command_base_params()
super().__init__(*args, params=params, **kwargs)

def list_commands(self, ctx):
return [str(p) for p in pathlib.Path(".").glob("*.py") if str(p) != "__init__.py"]

def get_command(self, ctx, filename):
if ctx.obj:
ctx.obj[RUN_LEVEL_PARAMS_KEY] = ctx.params
return WorkflowCommand(filename, name=filename, help="Build an image for [workflow|task]")


_build_help = """
This command can build an image for a workflow or a task from the command line, for fully self-contained scripts.
"""

build = BuildCommand(
name="build",
help=_build_help,
)
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from flytekit import configuration
from flytekit.clis.sdk_in_container.backfill import backfill
from flytekit.clis.sdk_in_container.build import build
from flytekit.clis.sdk_in_container.constants import CTX_CONFIG_FILE, CTX_PACKAGES, CTX_VERBOSE
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.local_cache import local_cache
Expand Down Expand Up @@ -132,6 +133,7 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: bool):
main.add_command(run)
main.add_command(register)
main.add_command(backfill)
main.add_command(build)
main.epilog

if __name__ == "__main__":
Expand Down
12 changes: 12 additions & 0 deletions flytekit/configuration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,15 @@
from io import BytesIO
from typing import Dict, List, Optional

import yaml
from dataclasses_json import dataclass_json
from docker_image import reference

from flytekit.configuration import internal as _internal
from flytekit.configuration.default_images import DefaultImages
from flytekit.configuration.file import ConfigEntry, ConfigFile, get_config_file, read_file_if_exists, set_if_exists
from flytekit.image_spec import ImageSpec
from flytekit.image_spec.image_spec import ImageBuildEngine
from flytekit.loggers import logger

PROJECT_PLACEHOLDER = "{{ registration.project }}"
Expand Down Expand Up @@ -205,6 +208,13 @@ def look_up_image_info(name: str, tag: str, optional_tag: bool = False) -> Image
:param Text tag: e.g. somedocker.com/myimage:someversion123
:rtype: Text
"""
if pathlib.Path(tag).is_file():
with open(tag, "r") as f:
image_spec_dict = yaml.safe_load(f)
image_spec = ImageSpec(**image_spec_dict)
ImageBuildEngine.build(image_spec)
tag = image_spec.image_name()

ref = reference.Reference.parse(tag)
if not optional_tag and ref["tag"] is None:
raise AssertionError(f"Incorrectly formatted image {tag}, missing tag value")
Expand Down Expand Up @@ -699,6 +709,7 @@ class SerializationSettings(object):
fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it
can be fast registered (and thus omit building a Docker image) this object contains additional parameters
for serialization.
source_root (Optional[str]): The root directory of the source code.
"""

image_config: ImageConfig
Expand All @@ -710,6 +721,7 @@ class SerializationSettings(object):
python_interpreter: str = DEFAULT_RUNTIME_PYTHON_INTERPRETER
flytekit_virtualenv_root: Optional[str] = None
fast_serialization_settings: Optional[FastSerializationSettings] = None
source_root: Optional[str] = None

def __post_init__(self):
if self.flytekit_virtualenv_root is None:
Expand Down
19 changes: 13 additions & 6 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import importlib
import re
from abc import ABC
from typing import Callable, Dict, List, Optional, TypeVar
from typing import Callable, Dict, List, Optional, TypeVar, Union

from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask, TaskMetadata, TaskResolverMixin
Expand All @@ -13,6 +13,7 @@
from flytekit.core.tracked_abc import FlyteTrackedABC
from flytekit.core.tracker import TrackedInstance, extract_task_module
from flytekit.core.utils import _get_container_definition, _serialize_pod_spec, timeit
from flytekit.image_spec.image_spec import ImageBuildEngine, ImageSpec
from flytekit.loggers import logger
from flytekit.models import task as _task_model
from flytekit.models.security import Secret, SecurityContext
Expand All @@ -35,7 +36,7 @@ def __init__(
name: str,
task_config: T,
task_type="python-task",
container_image: Optional[str] = None,
container_image: Optional[Union[str, ImageSpec]] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
environment: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -77,7 +78,7 @@ def __init__(
raise AssertionError(f"Secret {s} should be of type flytekit.Secret, received {type(s)}")
sec_ctx = SecurityContext(secrets=secret_requests)

# pod_template_name overwrites the metedata.pod_template_name
# pod_template_name overwrites the metadata.pod_template_name
kwargs["metadata"] = kwargs["metadata"] if "metadata" in kwargs else TaskMetadata()
kwargs["metadata"].pod_template_name = pod_template_name

Expand Down Expand Up @@ -115,7 +116,7 @@ def task_resolver(self) -> TaskResolverMixin:
return self._task_resolver

@property
def container_image(self) -> Optional[str]:
def container_image(self) -> Optional[Union[str, ImageSpec]]:
return self._container_image

@property
Expand Down Expand Up @@ -180,6 +181,8 @@ def _get_container(self, settings: SerializationSettings) -> _task_model.Contain
for elem in (settings.env, self.environment):
if elem:
env.update(elem)
if isinstance(self.container_image, ImageSpec):
self.container_image.source_root = settings.source_root
return _get_container_definition(
image=get_registerable_container_image(self.container_image, settings.image_config),
command=[],
Expand Down Expand Up @@ -249,12 +252,16 @@ def get_all_tasks(self) -> List[PythonAutoContainerTask]: # type: ignore
default_task_resolver = DefaultTaskResolver()


def get_registerable_container_image(img: Optional[str], cfg: ImageConfig) -> str:
def get_registerable_container_image(img: Optional[Union[str, ImageSpec]], cfg: ImageConfig) -> str:
"""
:param img: Configured image
:param img: Configured image or image spec
:param cfg: Registration configuration
:return:
"""
if isinstance(img, ImageSpec):
ImageBuildEngine.build(img)
return img.image_name()

if img is not None and img != "":
matches = _IMAGE_REPLACE_REGEX.findall(img)
if matches is None or len(matches) == 0:
Expand Down
3 changes: 2 additions & 1 deletion flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
from flytekit.image_spec.image_spec import ImageSpec
from flytekit.models.documentation import Documentation
from flytekit.models.security import Secret

Expand Down Expand Up @@ -84,7 +85,7 @@ def task(
interruptible: Optional[bool] = None,
deprecated: str = "",
timeout: Union[_datetime.timedelta, int] = 0,
container_image: Optional[str] = None,
container_image: Optional[Union[str, ImageSpec]] = None,
environment: Optional[Dict[str, str]] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
Expand Down
1 change: 1 addition & 0 deletions flytekit/image_spec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .image_spec import ImageSpec
Loading

0 comments on commit 4064953

Please sign in to comment.