Skip to content

Commit

Permalink
[App] Content for plugins (#17243)
Browse files Browse the repository at this point in the history
Co-authored-by: Yurij Mikhalevich <yurij@grid.ai>
Co-authored-by: Luca Antiga <luca.antiga@gmail.com>

(cherry picked from commit 2c3dfc0)
  • Loading branch information
ethanwharris authored and Borda committed Jul 7, 2023
1 parent 33c8f00 commit a8fd96b
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 81 deletions.
13 changes: 13 additions & 0 deletions src/lightning/app/plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def run_job(self, name: str, app_entrypoint: str, env_vars: Dict[str, str] = {})
"""
from lightning.app.runners.cloud import CloudRuntime

logger.info(f"Processing job run request. name: {name}, app_entrypoint: {app_entrypoint}, env_vars: {env_vars}")

# Dispatch the job
_set_flow_context()

Expand Down Expand Up @@ -123,6 +125,8 @@ def _run_plugin(run: _Run) -> Dict[str, Any]:

# Download the tarball
try:
logger.info(f"Downloading plugin source: {run.source_code_url}")

# Sometimes the URL gets encoded, so we parse it here
source_code_url = urlparse(run.source_code_url).geturl()

Expand All @@ -141,6 +145,8 @@ def _run_plugin(run: _Run) -> Dict[str, Any]:

# Extract
try:
logger.info("Extracting plugin source.")

with tarfile.open(download_path, "r:gz") as tf:
tf.extractall(source_path)
except Exception as ex:
Expand All @@ -151,6 +157,8 @@ def _run_plugin(run: _Run) -> Dict[str, Any]:

# Import the plugin
try:
logger.info(f"Importing plugin: {run.plugin_entrypoint}")

plugin = _load_plugin_from_file(os.path.join(source_path, run.plugin_entrypoint))
except Exception as ex:
raise HTTPException(
Expand All @@ -163,6 +171,11 @@ def _run_plugin(run: _Run) -> Dict[str, Any]:

# Setup and run the plugin
try:
logger.info(
"Running plugin. "
f"project_id: {run.project_id}, cloudspace_id: {run.cloudspace_id}, cluster_id: {run.cluster_id}."
)

plugin._setup(
project_id=run.project_id,
cloudspace_id=run.cloudspace_id,
Expand Down
71 changes: 53 additions & 18 deletions src/lightning/app/runners/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,22 @@ def cloudspace_dispatch(
# Dispatch in four phases: resolution, validation, spec creation, API transactions
# Resolution
root = self._resolve_root()
repo = self._resolve_repo(root)
# If the root will already be there, we don't need to upload and preserve the absolute entrypoint
absolute_entrypoint = str(root).startswith("/project")
# If system customization files found, it will set their location path
sys_customizations_root = self._resolve_env_root()
repo = self._resolve_repo(
root,
default_ignore=False,
package_source=not absolute_entrypoint,
sys_customizations_root=sys_customizations_root,
)
project = self._resolve_project(project_id=project_id)
existing_instances = self._resolve_run_instances_by_name(project_id, name)
name = self._resolve_run_name(name, existing_instances)
cloudspace = self._resolve_cloudspace(project_id, cloudspace_id)
queue_server_type = self._resolve_queue_server_type()

# If system customization files found, it will set their location path
sys_customizations_sync_root = self._resolve_env_root()

self.app._update_index_file()

# Validation
Expand All @@ -241,17 +247,26 @@ def cloudspace_dispatch(
flow_servers = self._get_flow_servers()
network_configs = self._get_network_configs(flow_servers)
works = self._get_works(cloudspace=cloudspace)
run_body = self._get_run_body(cluster_id, flow_servers, network_configs, works, False, root, True)
run_body = self._get_run_body(
cluster_id,
flow_servers,
network_configs,
works,
False,
root,
True,
True,
absolute_entrypoint,
)
env_vars = self._get_env_vars(self.env_vars, self.secrets, self.run_app_comment_commands)

# If the system customization root is set, prepare files for environment synchronization
if sys_customizations_sync_root is not None:
repo.prepare_sys_customizations_sync(sys_customizations_sync_root)

# API transactions
logger.info(f"Creating cloudspace run. run_body: {run_body}")
run = self._api_create_run(project_id, cloudspace_id, run_body)

self._api_package_and_upload_repo(repo, run)

logger.info(f"Creating cloudspace run instance. name: {name}")
run_instance = self._api_create_run_instance(
cluster_id,
project_id,
Expand Down Expand Up @@ -454,6 +469,9 @@ def _resolve_repo(
self,
root: Path,
ignore_functions: Optional[List[_IGNORE_FUNCTION]] = None,
default_ignore: bool = True,
package_source: bool = True,
sys_customizations_root: Optional[Path] = None,
) -> LocalSourceCodeDir:
"""Gather and merge all lightningignores from the app children and create the ``LocalSourceCodeDir``
object."""
Expand All @@ -470,7 +488,13 @@ def _resolve_repo(
patterns = _parse_lightningignore(merged)
ignore_functions = [*ignore_functions, partial(_filter_ignored, root, patterns)]

return LocalSourceCodeDir(path=root, ignore_functions=ignore_functions)
return LocalSourceCodeDir(
path=root,
ignore_functions=ignore_functions,
default_ignore=default_ignore,
package_source=package_source,
sys_customizations_root=sys_customizations_root,
)

def _resolve_project(self, project_id: Optional[str] = None) -> V1Membership:
"""Determine the project to run on, choosing a default if multiple projects are found."""
Expand Down Expand Up @@ -785,7 +809,7 @@ def _get_works(self, cloudspace: Optional[V1CloudSpace] = None) -> List[V1Work]:
network_config=[V1NetworkConfig(name=random_name, port=work.port)],
data_connection_mounts=data_connection_mounts,
)
works.append(V1Work(name=work.name, spec=work_spec))
works.append(V1Work(name=work.name, display_name=work.display_name, spec=work_spec))

return works

Expand All @@ -798,12 +822,18 @@ def _get_run_body(
no_cache: bool,
root: Path,
start_server: bool,
should_mount_cloudspace_content: bool = False,
absolute_entrypoint: bool = False,
) -> CloudspaceIdRunsBody:
"""Get the specification of the run creation request."""
# The entry point file needs to be relative to the root of the uploaded source file directory,
# because the backend will invoke the lightning commands relative said source directory
# TODO: we shouldn't set this if the entrypoint isn't a file but the backend gives an error if we don't
app_entrypoint_file = Path(self.entrypoint).absolute().relative_to(root)
if absolute_entrypoint:
# If the entrypoint will already exist in the cloud then we can choose to keep it as an absolute path.
app_entrypoint_file = Path(self.entrypoint).absolute()
else:
# The entry point file needs to be relative to the root of the uploaded source file directory,
# because the backend will invoke the lightning commands relative said source directory
# TODO: we shouldn't set this if the entrypoint isn't a file but the backend gives an error if we don't
app_entrypoint_file = Path(self.entrypoint).absolute().relative_to(root)

run_body = CloudspaceIdRunsBody(
cluster_id=cluster_id,
Expand All @@ -813,6 +843,7 @@ def _get_run_body(
network_config=network_configs,
works=works,
local_source=True,
should_mount_cloudspace_content=should_mount_cloudspace_content,
)

if self.app is not None:
Expand All @@ -827,9 +858,10 @@ def _get_run_body(
# if requirements file at the root of the repository is present,
# we pass just the file name to the backend, so backend can find it in the relative path
requirements_file = root / "requirements.txt"
if requirements_file.is_file():
if requirements_file.is_file() and requirements_file.exists():
requirements_path = requirements_file if absolute_entrypoint else "requirements.txt"
run_body.image_spec = Gridv1ImageSpec(
dependency_file_info=V1DependencyFileInfo(package_manager=V1PackageManager.PIP, path="requirements.txt")
dependency_file_info=V1DependencyFileInfo(package_manager=V1PackageManager.PIP, path=requirements_path)
)
if not DISABLE_DEPENDENCY_CACHE and not no_cache:
# hash used for caching the dependencies
Expand Down Expand Up @@ -997,7 +1029,10 @@ def _api_create_run_instance(
)

@staticmethod
def _api_package_and_upload_repo(repo: LocalSourceCodeDir, run: V1LightningRun) -> None:
def _api_package_and_upload_repo(
repo: LocalSourceCodeDir,
run: V1LightningRun,
) -> None:
"""Package and upload the provided local source code directory to the provided run."""
if run.source_upload_url == "":
raise RuntimeError("The source upload url is empty.")
Expand Down
46 changes: 31 additions & 15 deletions src/lightning/app/source_code/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
# limitations under the License.

import os
import uuid
from contextlib import contextmanager
from pathlib import Path
from shutil import copytree, rmtree
from typing import List, Optional

from lightning.app.core.constants import DOT_IGNORE_FILENAME, SYS_CUSTOMIZATIONS_SYNC_PATH
from lightning.app.source_code.copytree import _copytree, _IGNORE_FUNCTION
from lightning.app.source_code.hashing import _get_hash
from lightning.app.source_code.tar import _tar_path
from lightning.app.source_code.uploader import FileUploader

Expand All @@ -30,20 +30,35 @@ class LocalSourceCodeDir:

cache_location: Path = Path.home() / ".lightning" / "cache" / "repositories"

def __init__(self, path: Path, ignore_functions: Optional[List[_IGNORE_FUNCTION]] = None) -> None:
def __init__(
self,
path: Path,
ignore_functions: Optional[List[_IGNORE_FUNCTION]] = None,
default_ignore: bool = True,
package_source: bool = True,
sys_customizations_root: Optional[Path] = None,
) -> None:
if "LIGHTNING_VSCODE_WORKSPACE" in os.environ:
# Don't use home to store the tar ball. This won't play nice with symlinks
self.cache_location: Path = Path("/tmp", ".lightning", "cache", "repositories") # noqa: S108
else:
self.cache_location: Path = Path.home() / ".lightning" / "cache" / "repositories"

self.path = path
self.ignore_functions = ignore_functions
self.package_source = package_source
self.sys_customizations_root = sys_customizations_root

# cache checksum version
# cache version
self._version: Optional[str] = None
self._non_ignored_files: Optional[List[str]] = None

# create global cache location if it doesn't exist
if not self.cache_location.exists():
self.cache_location.mkdir(parents=True, exist_ok=True)

# Create a default dotignore if it doesn't exist
if not (path / DOT_IGNORE_FILENAME).is_file():
# Create a default dotignore if requested and it doesn't exist
if default_ignore and not (path / DOT_IGNORE_FILENAME).is_file():
with open(path / DOT_IGNORE_FILENAME, "w") as f:
f.write("venv/\n")
if (path / "bin" / "activate").is_file() or (path / "pyvenv.cfg").is_file():
Expand All @@ -57,7 +72,10 @@ def __init__(self, path: Path, ignore_functions: Optional[List[_IGNORE_FUNCTION]
def files(self) -> List[str]:
"""Returns a set of files that are not ignored by .lightningignore."""
if self._non_ignored_files is None:
self._non_ignored_files = _copytree(self.path, "", ignore_functions=self.ignore_functions, dry_run=True)
if self.package_source:
self._non_ignored_files = _copytree(self.path, "", ignore_functions=self.ignore_functions, dry_run=True)
else:
self._non_ignored_files = []
return self._non_ignored_files

@property
Expand All @@ -67,8 +85,8 @@ def version(self):
if self._version is not None:
return self._version

# stores both version and a set with the files used to generate the checksum
self._version = _get_hash(files=self.files, algorithm="blake2")
# create a random version ID and store it
self._version = uuid.uuid4().hex
return self._version

@property
Expand All @@ -83,7 +101,11 @@ def packaging_session(self) -> Path:
session_path = self.cache_location / "packaging_sessions" / self.version
try:
rmtree(session_path, ignore_errors=True)
_copytree(self.path, session_path, ignore_functions=self.ignore_functions)
if self.package_source:
_copytree(self.path, session_path, ignore_functions=self.ignore_functions)
if self.sys_customizations_root is not None:
path_to_sync = Path(session_path, SYS_CUSTOMIZATIONS_SYNC_PATH)
copytree(self.sys_customizations_root, path_to_sync, dirs_exist_ok=True)
yield session_path
finally:
rmtree(session_path, ignore_errors=True)
Expand All @@ -104,12 +126,6 @@ def package(self) -> Path:
_tar_path(source_path=session_path, target_file=str(self.package_path), compression=True)
return self.package_path

def prepare_sys_customizations_sync(self, sys_customizations_root: Path) -> None:
"""Prepares files for system environment customization setup by copying conda and system environment files
to an app files directory."""
path_to_sync = Path(self.path, SYS_CUSTOMIZATIONS_SYNC_PATH)
copytree(sys_customizations_root, path_to_sync, dirs_exist_ok=True)

def upload(self, url: str) -> None:
"""Uploads package to URL, usually pre-signed URL.
Expand Down
Loading

0 comments on commit a8fd96b

Please sign in to comment.