Skip to content

Commit

Permalink
[App] Refactor cloud dispatch and update to new API (#16456)
Browse files Browse the repository at this point in the history
Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com>
Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com>
(cherry picked from commit 04886ed)
  • Loading branch information
ethanwharris authored and Borda committed Feb 9, 2023
1 parent c87ef74 commit e1e30b8
Show file tree
Hide file tree
Showing 10 changed files with 833 additions and 668 deletions.
2 changes: 1 addition & 1 deletion requirements/app/base.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
lightning-cloud>=0.5.12, <=0.5.16
lightning-cloud>=0.5.19
packaging
typing-extensions>=4.0.0, <=4.4.0
deepdiff>=5.7.0, <6.2.4
Expand Down
1 change: 1 addition & 0 deletions src/lightning_app/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def get_lightning_cloud_url() -> str:
# Project under which the resources need to run in cloud. If this env is not set,
# cloud runner will try to get the default project from the cloud
LIGHTNING_CLOUD_PROJECT_ID = os.getenv("LIGHTNING_CLOUD_PROJECT_ID")
LIGHTNING_CLOUD_PRINT_SPECS = os.getenv("LIGHTNING_CLOUD_PRINT_SPECS")
LIGHTNING_DIR = os.getenv("LIGHTNING_DIR", str(Path.home() / ".lightning"))
LIGHTNING_CREDENTIAL_PATH = os.getenv("LIGHTNING_CREDENTIAL_PATH", str(Path(LIGHTNING_DIR) / "credentials.json"))
DOT_IGNORE_FILENAME = ".lightningignore"
Expand Down
1,054 changes: 584 additions & 470 deletions src/lightning_app/runners/cloud.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/lightning_app/source_code/copytree.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from functools import partial
from pathlib import Path
from shutil import copy2, copystat, Error
from typing import Callable, List, Optional, Set, Union
from typing import Callable, List, Optional, Set, Tuple, Union

from lightning_app.core.constants import DOT_IGNORE_FILENAME
from lightning_app.utilities.app_helpers import Logger
Expand Down Expand Up @@ -122,7 +122,7 @@ def _filter_ignored(src: Path, patterns: Set[str], current_dir: Path, entries: L
return [entry for entry in entries if str(relative_dir / entry.name) not in ignored_names]


def _parse_lightningignore(lines: List[str]) -> Set[str]:
def _parse_lightningignore(lines: Tuple[str]) -> Set[str]:
"""Creates a set that removes empty lines and comments."""
lines = [ln.strip() for ln in lines]
# removes first `/` character for posix and `\\` for windows
Expand Down
9 changes: 9 additions & 0 deletions src/lightning_app/source_code/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from shutil import rmtree
from typing import List, Optional

from lightning_app.core.constants import DOT_IGNORE_FILENAME
from lightning_app.source_code.copytree import _copytree, _IGNORE_FUNCTION
from lightning_app.source_code.hashing import _get_hash
from lightning_app.source_code.tar import _tar_path
Expand All @@ -27,6 +28,14 @@ def __init__(self, path: Path, ignore_functions: Optional[List[_IGNORE_FUNCTION]
if not self.cache_location.exists():
self.cache_location.mkdir(parents=True, exist_ok=True)

# Create a default dotignore if it doesn't exist
if not (path / DOT_IGNORE_FILENAME).is_file():
with open(path / DOT_IGNORE_FILENAME, "w") as f:
f.write("venv/\n")
if (path / "bin" / "activate").is_file() or (path / "pyvenv.cfg").is_file():
# the user is developing inside venv
f.write("bin/\ninclude/\nlib/\npyvenv.cfg\n")

# clean old cache entries
self._prune_cache()

Expand Down
51 changes: 51 additions & 0 deletions src/lightning_app/utilities/clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import random

from lightning_cloud.openapi import V1ClusterType, V1ProjectClusterBinding
from lightning_cloud.openapi.rest import ApiException

from lightning_app.utilities.network import LightningClient


def _ensure_cluster_project_binding(client: LightningClient, project_id: str, cluster_id: str) -> None:
cluster_bindings = client.projects_service_list_project_cluster_bindings(project_id=project_id)

for cluster_binding in cluster_bindings.clusters:
if cluster_binding.cluster_id != cluster_id:
continue
if cluster_binding.project_id == project_id:
return

client.projects_service_create_project_cluster_binding(
project_id=project_id,
body=V1ProjectClusterBinding(cluster_id=cluster_id, project_id=project_id),
)


def _get_default_cluster(client: LightningClient, project_id: str) -> str:
"""This utility implements a minimal version of the cluster selection logic used in the cloud.
TODO: This should be requested directly from the platform.
"""
cluster_bindings = client.projects_service_list_project_cluster_bindings(project_id=project_id).clusters

if not cluster_bindings:
raise ValueError(f"No clusters are bound to the project {project_id}.")

if len(cluster_bindings) == 1:
return cluster_bindings[0].cluster_id

clusters = []
for cluster_binding in cluster_bindings:
try:
clusters.append(client.cluster_service_get_cluster(cluster_binding.cluster_id))
except ApiException:
# If we failed to get the cluster, ignore it
continue

# Filter global clusters
clusters = [cluster for cluster in clusters if cluster.spec.cluster_type == V1ClusterType.GLOBAL]

if len(clusters) == 0:
raise RuntimeError(f"No clusters found on `{client.api_client.configuration.host}`.")

return random.choice(clusters).id
23 changes: 10 additions & 13 deletions tests/integrations_app/public/test_commands_and_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ def test_commands_and_api_example_cloud() -> None:
admin_page,
_,
fetch_logs,
_,
app_name,
):
# 1: Collect the app_id
app_id = admin_page.url.split("/")[-1]

# 2: Connect to the App and send the first & second command with the client
# Connect to the App and send the first & second command with the client
# Requires to be run within the same process.
cmd_1 = f"python -m lightning connect {app_id}"
cmd_1 = f"python -m lightning connect {app_name}"
cmd_2 = "python -m lightning command with client --name=this"
cmd_3 = "python -m lightning command without client --name=is"
cmd_4 = "lightning disconnect"
Expand All @@ -35,32 +32,32 @@ def test_commands_and_api_example_cloud() -> None:
# This prevents some flakyness in the CI. Couldn't reproduce it locally.
sleep(5)

# 5: Send a request to the Rest API directly.
# Send a request to the Rest API directly.
client = LightningClient()
project = _get_project(client)

lit_apps = [
app
for app in client.lightningapp_instance_service_list_lightningapp_instances(
project_id=project.project_id
lit_app
for lit_app in client.lightningapp_instance_service_list_lightningapp_instances(
project_id=project.project_id,
).lightningapps
if app.id == app_id
if lit_app.name == app_name
]
app = lit_apps[0]

base_url = app.status.url
resp = requests.post(base_url + "/user/command_without_client?name=awesome")
assert resp.status_code == 200, resp.json()

# 6: Validate the logs.
# Validate the logs.
has_logs = False
while not has_logs:
for log in fetch_logs():
if "['this', 'is', 'awesome']" in log:
has_logs = True
sleep(1)

# 7: Send a request to the Rest API directly.
# Send a request to the Rest API directly.
resp = requests.get(base_url + "/pure_function")
assert resp.status_code == 200
assert resp.json() == "Hello World !"
24 changes: 12 additions & 12 deletions tests/tests_app/cli/test_cloud_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import pytest
from click.testing import CliRunner
from lightning_cloud.openapi import (
V1LightningappV2,
V1CloudSpace,
V1ListCloudSpacesResponse,
V1ListLightningappInstancesResponse,
V1ListLightningappsV2Response,
V1ListMembershipsResponse,
V1Membership,
)
Expand All @@ -37,8 +37,8 @@ class FakeResponse:


class FakeLightningClient:
def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
return V1ListLightningappsV2Response(lightningapps=[])
def cloud_space_service_list_cloud_spaces(self, *args, **kwargs):
return V1ListCloudSpacesResponse(cloudspaces=[])

def lightningapp_instance_service_list_lightningapp_instances(self, *args, **kwargs):
return V1ListLightningappInstancesResponse(lightningapps=[])
Expand Down Expand Up @@ -105,14 +105,14 @@ def __init__(self, *args, create_response, **kwargs):
super().__init__()
self.create_response = create_response

def lightningapp_v2_service_create_lightningapp_v2(self, *args, **kwargs):
return V1LightningappV2(id="my_app", name="app")
def cloud_space_service_create_cloud_space(self, *args, **kwargs):
return V1CloudSpace(id="my_app", name="app")

def lightningapp_v2_service_create_lightningapp_release(self, project_id, app_id, body):
def cloud_space_service_create_lightning_run(self, project_id, cloudspace_id, body):
assert project_id == "test-project-id"
return self.create_response

def lightningapp_v2_service_create_lightningapp_release_instance(self, project_id, app_id, id, body):
def cloud_space_service_create_lightning_run_instance(self, project_id, cloudspace_id, id, body):
assert project_id == "test-project-id"
return self.create_response

Expand All @@ -123,7 +123,7 @@ def lightningapp_v2_service_create_lightningapp_release_instance(self, project_i
def test_start_app(create_response, monkeypatch):

monkeypatch.setattr(cloud, "V1LightningappInstanceState", MagicMock())
monkeypatch.setattr(cloud, "Body8", MagicMock())
monkeypatch.setattr(cloud, "CloudspaceIdRunsBody", MagicMock())
monkeypatch.setattr(cloud, "V1Flowserver", MagicMock())
monkeypatch.setattr(cloud, "V1LightningappInstanceSpec", MagicMock())
monkeypatch.setattr(
Expand Down Expand Up @@ -167,7 +167,7 @@ def run():
flow_servers=ANY,
)

cloud.Body8.assert_called_once()
cloud.CloudspaceIdRunsBody.assert_called_once()


class HttpHeaderDict(dict):
Expand All @@ -186,7 +186,7 @@ def __init__(self, *args, message, **kwargs):
super().__init__()
self.message = message

def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
def cloud_space_service_list_cloud_spaces(self, *args, **kwargs):
raise ApiException(
http_resp=HttpHeaderDict(
data=self.message,
Expand All @@ -207,7 +207,7 @@ def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
def test_start_app_exception(message, monkeypatch, caplog):

monkeypatch.setattr(cloud, "V1LightningappInstanceState", MagicMock())
monkeypatch.setattr(cloud, "Body8", MagicMock())
monkeypatch.setattr(cloud, "CloudspaceIdRunsBody", MagicMock())
monkeypatch.setattr(cloud, "V1Flowserver", MagicMock())
monkeypatch.setattr(cloud, "V1LightningappInstanceSpec", MagicMock())
monkeypatch.setattr(cloud, "LocalSourceCodeDir", MagicMock())
Expand Down
Loading

0 comments on commit e1e30b8

Please sign in to comment.