Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebase #3038

Merged
merged 11 commits into from
Jan 6, 2025
20 changes: 12 additions & 8 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ adlfs==2024.4.1
# via flytekit
aiobotocore==2.13.0
# via s3fs
aiohttp==3.9.5
aiohappyeyeballs==2.4.4
# via aiohttp
aiohttp==3.10.11
# via
# adlfs
# aiobotocore
Expand Down Expand Up @@ -113,10 +115,8 @@ filelock==3.14.0
# via
# snowflake-connector-python
# virtualenv
flyteidl @ git+https://github.com/flyteorg/flyte.git@master#subdirectory=flyteidl
# via
# -r dev-requirements.in
# flytekit
flyteidl==1.14.1
# via flytekit
frozenlist==1.4.1
# via
# aiohttp
Expand Down Expand Up @@ -244,7 +244,9 @@ keyring==25.2.1
keyrings-alt==5.0.1
# via -r dev-requirements.in
kubernetes==29.0.0
# via -r dev-requirements.in
# via
# -r dev-requirements.in
# flytekit
markdown-it-py==3.0.0
# via
# flytekit
Expand All @@ -260,7 +262,7 @@ marshmallow-enum==1.5.1
# flytekit
marshmallow-jsonschema==0.13.0
# via flytekit
mashumaro==3.13
mashumaro==3.15
# via flytekit
matplotlib-inline==0.1.7
# via
Expand Down Expand Up @@ -345,6 +347,8 @@ prometheus-client==0.20.0
# via -r dev-requirements.in
prompt-toolkit==3.0.45
# via ipython
propcache==0.2.1
# via yarl
proto-plus==1.23.0
# via
# google-api-core
Expand Down Expand Up @@ -557,7 +561,7 @@ websocket-client==1.8.0
# kubernetes
wrapt==1.16.0
# via aiobotocore
yarl==1.9.4
yarl==1.18.3
# via aiohttp
zipp==3.19.1
# via importlib-metadata
Expand Down
13 changes: 10 additions & 3 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,15 @@ def to_click_option(
If no custom logic exists, fall back to json.dumps.
"""
with FlyteContextManager.with_context(flyte_ctx.new_builder()):
encoder = JSONEncoder(python_type)
default_val = encoder.encode(default_val)
if hasattr(default_val, "model_dump_json"):
# pydantic v2
default_val = default_val.model_dump_json()
elif hasattr(default_val, "json"):
# pydantic v1
default_val = default_val.json()
else:
encoder = JSONEncoder(python_type)
default_val = encoder.encode(default_val)
if literal_var.type.metadata:
description_extra = f": {json.dumps(literal_var.type.metadata)}"

Expand Down Expand Up @@ -1057,7 +1064,7 @@ def _create_command(
h = h + click.style(f" (LP Name: {loaded_entity.name})", fg="yellow")
else:
if loaded_entity.__doc__:
h = h + click.style(f"{loaded_entity.__doc__}", dim=True)
h = h + click.style(f" {loaded_entity.__doc__}", dim=True)
cmd = YamlFileReadingCommand(
name=entity_name,
params=params,
Expand Down
48 changes: 38 additions & 10 deletions flytekit/core/local_cache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import Optional, Tuple

from diskcache import Cache
from flyteidl.core.literals_pb2 import LiteralMap

from flytekit import lazy_module
from flytekit.models.literals import Literal, LiteralCollection, LiteralMap
from flytekit.models.literals import Literal, LiteralCollection
from flytekit.models.literals import LiteralMap as ModelLiteralMap

joblib = lazy_module("joblib")

Expand All @@ -23,13 +25,16 @@ def _recursive_hash_placement(literal: Literal) -> Literal:
literal_map = {}
for key, literal_value in literal.map.literals.items():
literal_map[key] = _recursive_hash_placement(literal_value)
return Literal(map=LiteralMap(literal_map))
return Literal(map=ModelLiteralMap(literal_map))
else:
return literal


def _calculate_cache_key(
task_name: str, cache_version: str, input_literal_map: LiteralMap, cache_ignore_input_vars: Tuple[str, ...] = ()
task_name: str,
cache_version: str,
input_literal_map: ModelLiteralMap,
cache_ignore_input_vars: Tuple[str, ...] = (),
) -> str:
# Traverse the literals and replace the literal with a new literal that only contains the hash
literal_map_overridden = {}
Expand All @@ -40,7 +45,7 @@ def _calculate_cache_key(

# Generate a stable representation of the underlying protobuf by passing `deterministic=True` to the
# protobuf library.
hashed_inputs = LiteralMap(literal_map_overridden).to_flyte_idl().SerializeToString(deterministic=True)
hashed_inputs = ModelLiteralMap(literal_map_overridden).to_flyte_idl().SerializeToString(deterministic=True)
# Use joblib to hash the string representation of the literal into a fixed length string
return f"{task_name}-{cache_version}-{joblib.hash(hashed_inputs)}"

Expand All @@ -66,24 +71,47 @@ def clear():

@staticmethod
def get(
task_name: str, cache_version: str, input_literal_map: LiteralMap, cache_ignore_input_vars: Tuple[str, ...]
) -> Optional[LiteralMap]:
task_name: str, cache_version: str, input_literal_map: ModelLiteralMap, cache_ignore_input_vars: Tuple[str, ...]
) -> Optional[ModelLiteralMap]:
if not LocalTaskCache._initialized:
LocalTaskCache.initialize()
return LocalTaskCache._cache.get(
serialized_obj = LocalTaskCache._cache.get(
_calculate_cache_key(task_name, cache_version, input_literal_map, cache_ignore_input_vars)
)

if serialized_obj is None:
return None

# If the serialized object is a model file, first convert it back to a proto object (which will force it to
# use the installed flyteidl proto messages) and then convert it to a model object. This will guarantee
# that the object is in the correct format.
if isinstance(serialized_obj, ModelLiteralMap):
return ModelLiteralMap.from_flyte_idl(ModelLiteralMap.to_flyte_idl(serialized_obj))
elif isinstance(serialized_obj, bytes):
# If it is a bytes object, then it is a serialized proto object.
# We need to convert it to a model object first.o
pb_literal_map = LiteralMap()
pb_literal_map.ParseFromString(serialized_obj)
return ModelLiteralMap.from_flyte_idl(pb_literal_map)
else:
raise ValueError(f"Unexpected object type {type(serialized_obj)}")

@staticmethod
def set(
task_name: str,
cache_version: str,
input_literal_map: LiteralMap,
input_literal_map: ModelLiteralMap,
cache_ignore_input_vars: Tuple[str, ...],
value: LiteralMap,
value: ModelLiteralMap,
) -> None:
if not LocalTaskCache._initialized:
LocalTaskCache.initialize()
LocalTaskCache._cache.set(
_calculate_cache_key(task_name, cache_version, input_literal_map, cache_ignore_input_vars), value
_calculate_cache_key(
task_name,
cache_version,
input_literal_map,
cache_ignore_input_vars,
),
value.to_flyte_idl().SerializeToString(),
)
4 changes: 2 additions & 2 deletions flytekit/extend/backend/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon
task_execution_metadata = TaskExecutionMetadata.from_flyte_idl(request.task_execution_metadata)

logger.info(f"{agent.name} start creating the job")
resource_mata = await mirror_async_methods(
resource_meta = await mirror_async_methods(
agent.create,
task_template=template,
inputs=inputs,
output_prefix=request.output_prefix,
task_execution_metadata=task_execution_metadata,
)
return CreateTaskResponse(resource_meta=resource_mata.encode())
return CreateTaskResponse(resource_meta=resource_meta.encode())

@record_agent_metrics
async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse:
Expand Down
4 changes: 2 additions & 2 deletions flytekit/extend/backend/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,10 @@ def execute(self: PythonTask, **kwargs) -> LiteralMap:
task_template = get_serializable(OrderedDict(), ss, self).template
self._agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version)

resource_mata = asyncio.run(
resource_meta = asyncio.run(
self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs)
)
resource = asyncio.run(self._get(resource_meta=resource_mata))
resource = asyncio.run(self._get(resource_meta=resource_meta))

if resource.phase != TaskExecution.SUCCEEDED:
raise FlyteUserException(f"Failed to run the task {self.name} with error: {resource.message}")
Expand Down
61 changes: 51 additions & 10 deletions flytekit/image_spec/default_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@
"""
)

POETRY_LOCK_TEMPLATE = Template(
"""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/uv,id=uv \
--mount=from=uv,source=/uv,target=/usr/bin/uv \
uv pip install poetry

ENV POETRY_CACHE_DIR=/tmp/poetry_cache \
POETRY_VIRTUALENVS_IN_PROJECT=true

# poetry install does not work running in /, so we move to /root to create the venv
WORKDIR /root

RUN --mount=type=cache,sharing=locked,mode=0777,target=/tmp/poetry_cache,id=poetry \
--mount=type=bind,target=poetry.lock,src=poetry.lock \
--mount=type=bind,target=pyproject.toml,src=pyproject.toml \
poetry install $PIP_INSTALL_ARGS

WORKDIR /

# Update PATH and UV_PYTHON to point to venv
ENV PATH="/root/.venv/bin:$$PATH" \
UV_PYTHON=/root/.venv/bin/python
"""
)

UV_PYTHON_INSTALL_COMMAND_TEMPLATE = Template(
"""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/uv,id=uv \
Expand All @@ -44,6 +69,7 @@
"""
)


APT_INSTALL_COMMAND_TEMPLATE = Template("""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/var/cache/apt,id=apt \
apt-get update && apt-get install -y --no-install-recommends \
Expand Down Expand Up @@ -128,29 +154,33 @@ def _is_flytekit(package: str) -> bool:
return name == "flytekit"


def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str:
# uv sync is experimental, so our uv.lock support is also experimental
# the parameters we pass into install args could be different
warnings.warn("uv.lock support is experimental", UserWarning)

def _copy_lock_files_into_context(image_spec: ImageSpec, lock_file: str, tmp_dir: Path):
if image_spec.packages is not None:
msg = "Support for uv.lock files and packages is mutually exclusive"
msg = f"Support for {lock_file} files and packages is mutually exclusive"
raise ValueError(msg)

uv_lock_path = tmp_dir / "uv.lock"
shutil.copy2(image_spec.requirements, uv_lock_path)
lock_path = tmp_dir / lock_file
shutil.copy2(image_spec.requirements, lock_path)

# uv.lock requires pyproject.toml to be included
# lock requires pyproject.toml to be included
pyproject_toml_path = tmp_dir / "pyproject.toml"
dir_name = os.path.dirname(image_spec.requirements)

pyproject_toml_src = os.path.join(dir_name, "pyproject.toml")
if not os.path.exists(pyproject_toml_src):
msg = "To use uv.lock, a pyproject.toml must be in the same directory as the lock file"
msg = f"To use {lock_file}, a pyproject.toml file must be in the same directory as the lock file"
raise ValueError(msg)

shutil.copy2(pyproject_toml_src, pyproject_toml_path)


def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str:
# uv sync is experimental, so our uv.lock support is also experimental
# the parameters we pass into install args could be different
warnings.warn("uv.lock support is experimental", UserWarning)

_copy_lock_files_into_context(image_spec, "uv.lock", tmp_dir)

# --locked: Assert that the `uv.lock` will remain unchanged
# --no-dev: Omit the development dependency group
# --no-install-project: Do not install the current project
Expand All @@ -160,6 +190,15 @@ def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str],
return UV_LOCK_INSTALL_TEMPLATE.substitute(PIP_INSTALL_ARGS=pip_install_args)


def prepare_poetry_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str:
_copy_lock_files_into_context(image_spec, "poetry.lock", tmp_dir)

# --no-root: Do not install the current project
pip_install_args.extend(["--no-root"])
pip_install_args = " ".join(pip_install_args)
return POETRY_LOCK_TEMPLATE.substitute(PIP_INSTALL_ARGS=pip_install_args)


def prepare_python_install(image_spec: ImageSpec, tmp_dir: Path) -> str:
pip_install_args = []
if image_spec.pip_index:
Expand All @@ -174,6 +213,8 @@ def prepare_python_install(image_spec: ImageSpec, tmp_dir: Path) -> str:
requirement_basename = os.path.basename(image_spec.requirements)
if requirement_basename == "uv.lock":
return prepare_uv_lock_command(image_spec, pip_install_args, tmp_dir)
elif requirement_basename == "poetry.lock":
return prepare_poetry_lock_command(image_spec, pip_install_args, tmp_dir)

# Assume this is a requirements.txt file
with open(image_spec.requirements) as f:
Expand Down
10 changes: 9 additions & 1 deletion flytekit/models/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,15 @@ def offloaded_metadata(self) -> Optional[LiteralOffloadedMetadata]:
"""
This value holds metadata about the offloaded literal.
"""
return self._offloaded_metadata
# The following check might seem non-sensical, since `_offloaded_metadata` is set in the constructor.
# This is here to support backwards compatibility caused by the local cache implementation. Let me explain.
# The local cache pickles values and unpickles them. When unpickling, the constructor is not called, so there
# are cases where the `_offloaded_metadata` is not set (for example if you cache a value using flytekit<=1.13.6
# and you load that value later using flytekit>1.13.6).
# In other words, this is a workaround to support backwards compatibility with the local cache.
if hasattr(self, "_offloaded_metadata"):
return self._offloaded_metadata
return None

def to_flyte_idl(self):
"""
Expand Down
7 changes: 4 additions & 3 deletions flytekit/types/directory/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,11 @@ def listdir(cls, directory: FlyteDirectory) -> typing.List[typing.Union[FlyteDir
file_access = FlyteContextManager.current_context().file_access
if not file_access.is_remote(final_path):
for p in os.listdir(final_path):
if os.path.isfile(os.path.join(final_path, p)):
paths.append(FlyteFile(p))
joined_path = os.path.join(final_path, p)
if os.path.isfile(joined_path):
paths.append(FlyteFile(joined_path))
else:
paths.append(FlyteDirectory(p))
paths.append(FlyteDirectory(joined_path))
return paths

def create_downloader(_remote_path: str, _local_path: str, is_multipart: bool):
Expand Down
Loading
Loading