Skip to content

Commit 2182d75

Browse files
dmitsfhhsecondpre-commit-ci[bot]Borda
authored
[CLI] Adding opportunity to see basic cluster logs (#14334)
* pinning starsessions * pinning starsessions * adding strict back to requirements.txt * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Duplicated * Basic implementation * Basic implementation * Basic implementation * Basic implementation * Common things moved to log helpers file * Decomposing logs reader classes for reusing * Setting colors for log levels * Manifest trimming * Changes added to CHANGELOG * Prettifications * Prettifications * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Logs function name change * Logs function name change * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * attempt to fix the pydanitc import * Tests + command name fixes * Extending tests * Adding limit argument * Unmerging CI fix * Unmerging CI fix * Adding fields for errors * Adding log level fixed field width * Adding absent typing + exeptions raising * Adding socket error logging * Addressing comments on cluster list function return value * Addressing comments on adding e2e tests * Adding version range for arrow package in reqs * New unit tests * arrow time parsing callback modified + unit tests * helpers updated * helpers updated * helpers updated * One more test * CMD test fix * CMD test fix * CMD test fix * CMD test fix * CMD test fix * LightningClient mocking * Flaky test removed Co-authored-by: hhsecond <sherin@grid.ai> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Jirka Borovec <Borda@users.noreply.github.com>
1 parent 5661458 commit 2182d75

File tree

14 files changed

+546
-30
lines changed

14 files changed

+546
-30
lines changed

requirements/app/base.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ fsspec>=2022.01.0, <=2022.7.1
66
s3fs>=2022.1.0, <=2022.7.1
77
croniter # for now until we found something more robust.
88
traitlets<5.2.0 # Traitlets 5.2.X fails: https://github.com/ipython/traitlets/issues/741
9+
arrow>=1.2.0, <=1.2.2

src/lightning_app/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
1414
- Add support for Lightning AI BYOC cluster management ([#13835](https://github.com/Lightning-AI/lightning/pull/13835))
1515

1616

17+
- Add support to see Lightning AI BYOC cluster logs ([#14334](https://github.com/Lightning-AI/lightning/pull/14334))
18+
19+
1720
- Add support to run Lightning apps on Lightning AI BYOC clusters ([#13894](https://github.com/Lightning-AI/lightning/pull/13894))
1821

1922

src/lightning_app/cli/cmd_clusters.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,14 @@ def create(
9393

9494
click.echo(f"${resp.id} cluster is ${resp.status.phase}")
9595

96-
def list(self):
96+
def get_clusters(self):
9797
resp = self.api_client.cluster_service_list_clusters(phase_not_in=[V1ClusterState.DELETED])
98+
return ClusterList(resp.clusters)
99+
100+
def list(self):
101+
clusters = self.get_clusters()
98102
console = Console()
99-
console.print(ClusterList(resp.clusters).as_table())
103+
console.print(clusters.as_table())
100104

101105
def delete(self, cluster_id: str = None, force: bool = False, wait: bool = False):
102106
if force:

src/lightning_app/cli/lightning_cli.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from pathlib import Path
66
from typing import List, Tuple, Union
77

8+
import arrow
89
import click
910
import requests
1011
import rich
@@ -13,6 +14,7 @@
1314

1415
from lightning_app import __version__ as ver
1516
from lightning_app.cli import cmd_init, cmd_install, cmd_pl_init, cmd_react_ui_init
17+
from lightning_app.cli.cmd_clusters import AWSClusterManager
1618
from lightning_app.cli.lightning_cli_create import create
1719
from lightning_app.cli.lightning_cli_delete import delete
1820
from lightning_app.cli.lightning_cli_list import get_list
@@ -21,10 +23,12 @@
2123
from lightning_app.runners.runtime_type import RuntimeType
2224
from lightning_app.utilities.app_logs import _app_logs_reader
2325
from lightning_app.utilities.cli_helpers import (
26+
_arrow_time_callback,
2427
_format_input_env_variables,
2528
_retrieve_application_url_and_available_commands,
2629
)
2730
from lightning_app.utilities.cloud import _get_project
31+
from lightning_app.utilities.cluster_logs import _cluster_logs_reader
2832
from lightning_app.utilities.enum import OpenAPITags
2933
from lightning_app.utilities.install_components import register_all_external_components
3034
from lightning_app.utilities.login import Auth
@@ -141,6 +145,94 @@ def logs(app_name: str, components: List[str], follow: bool) -> None:
141145
rich.print(f"[{color}]{log_event.component_name}[/{color}] {date} {log_event.message}")
142146

143147

148+
@show.group()
149+
def cluster():
150+
"""Groups cluster commands inside show."""
151+
pass
152+
153+
154+
@cluster.command(name="logs")
155+
@click.argument("cluster_name", required=True)
156+
@click.option(
157+
"--from",
158+
"from_time",
159+
default="24 hours ago",
160+
help="The starting timestamp to query cluster logs from. Human-readable (e.g. '48 hours ago') or ISO 8601 "
161+
"(e.g. '2022-08-23 12:34') formats.",
162+
callback=_arrow_time_callback,
163+
)
164+
@click.option(
165+
"--to",
166+
"to_time",
167+
default="0 seconds ago",
168+
callback=_arrow_time_callback,
169+
help="The end timestamp / relative time increment to query logs for. This is ignored when following logs (with "
170+
"-f/--follow). The same format as --from option has.",
171+
)
172+
@click.option("--limit", default=1000, help="The max number of log lines returned.")
173+
@click.option("-f", "--follow", required=False, is_flag=True, help="Wait for new logs, to exit use CTRL+C.")
174+
def cluster_logs(cluster_name: str, to_time: arrow.Arrow, from_time: arrow.Arrow, limit: int, follow: bool) -> None:
175+
"""Show cluster logs.
176+
177+
Example uses:
178+
179+
Print cluster logs:
180+
181+
$ lightning show cluster logs my-cluster
182+
183+
184+
Print cluster logs and wait for new logs:
185+
186+
$ lightning show cluster logs my-cluster --follow
187+
188+
189+
Print cluster logs, from 48 hours ago to now:
190+
191+
$ lightning show cluster logs my-cluster --from "48 hours ago"
192+
193+
194+
Print cluster logs, 10 most recent lines:
195+
196+
$ lightning show cluster logs my-cluster --limit 10
197+
"""
198+
199+
client = LightningClient()
200+
cluster_manager = AWSClusterManager()
201+
existing_cluster_list = cluster_manager.get_clusters()
202+
203+
clusters = {cluster.name: cluster.id for cluster in existing_cluster_list.clusters}
204+
205+
if not clusters:
206+
raise click.ClickException("You don't have any clusters.")
207+
208+
if not cluster_name:
209+
raise click.ClickException(
210+
f"You have not specified any clusters. Please select one of available: [{', '.join(clusters.keys())}]"
211+
)
212+
213+
if cluster_name not in clusters:
214+
raise click.ClickException(
215+
f"The cluster '{cluster_name}' does not exist."
216+
f" Please select one of the following: [{', '.join(clusters.keys())}]"
217+
)
218+
219+
log_reader = _cluster_logs_reader(
220+
client=client,
221+
cluster_id=clusters[cluster_name],
222+
start=from_time.int_timestamp,
223+
end=to_time.int_timestamp,
224+
limit=limit,
225+
follow=follow,
226+
)
227+
228+
colors = {"error": "red", "warn": "yellow", "info": "green"}
229+
230+
for log_event in log_reader:
231+
date = log_event.timestamp.strftime("%m/%d/%Y %H:%M:%S")
232+
color = colors.get(log_event.labels.level, "green")
233+
rich.print(f"[{color}]{log_event.labels.level:5}[/{color}] {date} {log_event.message.rstrip()}")
234+
235+
144236
@_main.command()
145237
def login():
146238
"""Log in to your lightning.ai account."""

src/lightning_app/utilities/app_logs.py

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import json
22
import queue
3-
import sys
43
from dataclasses import dataclass
5-
from datetime import datetime, timedelta
6-
from json import JSONDecodeError
4+
from datetime import timedelta
75
from threading import Thread
86
from typing import Callable, Iterator, List, Optional
97

108
import dateutil.parser
119
from websocket import WebSocketApp
1210

11+
from lightning_app.utilities.log_helpers import _error_callback, _OrderedLogEntry
1312
from lightning_app.utilities.logs_socket_api import _LightningLogsSocketAPI
1413
from lightning_app.utilities.network import LightningClient
1514

@@ -27,18 +26,10 @@ class _LogEventLabels:
2726

2827

2928
@dataclass
30-
class _LogEvent:
31-
message: str
32-
timestamp: datetime
29+
class _LogEvent(_OrderedLogEntry):
3330
component_name: str
3431
labels: _LogEventLabels
3532

36-
def __ge__(self, other: "_LogEvent") -> bool:
37-
return self.timestamp >= other.timestamp
38-
39-
def __gt__(self, other: "_LogEvent") -> bool:
40-
return self.timestamp > other.timestamp
41-
4233

4334
def _push_log_events_to_read_queue_callback(component_name: str, read_queue: queue.PriorityQueue):
4435
"""Pushes _LogEvents from websocket to read_queue.
@@ -65,17 +56,6 @@ def callback(ws_app: WebSocketApp, msg: str):
6556
return callback
6657

6758

68-
def _error_callback(ws_app: WebSocketApp, error: Exception):
69-
errors = {
70-
KeyError: "Malformed log message, missing key",
71-
JSONDecodeError: "Malformed log message",
72-
TypeError: "Malformed log format",
73-
ValueError: "Malformed date format",
74-
}
75-
print(f"Error while reading logs ({errors.get(type(error), 'Unknown')})", file=sys.stderr)
76-
ws_app.close()
77-
78-
7959
def _app_logs_reader(
8060
client: LightningClient,
8161
project_id: str,
@@ -127,7 +107,7 @@ def _app_logs_reader(
127107
pass
128108

129109
except KeyboardInterrupt:
130-
# User pressed CTRL+C to exit, we sould respect that
110+
# User pressed CTRL+C to exit, we should respect that
131111
pass
132112

133113
finally:

src/lightning_app/utilities/cli_helpers.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import re
22
from typing import Dict, Optional
33

4+
import arrow
5+
import click
46
import requests
57

68
from lightning_app.core.constants import APP_SERVER_PORT
@@ -119,3 +121,15 @@ def _retrieve_application_url_and_available_commands(app_id_or_name_or_url: Opti
119121
raise Exception(f"The server didn't process the request properly. Found {resp.json()}")
120122
return lightningapp.status.url, _extract_command_from_openapi(resp.json())
121123
return None, None
124+
125+
126+
def _arrow_time_callback(
127+
_ctx: "click.core.Context", _param: "click.core.Option", value: str, arw_now=arrow.utcnow()
128+
) -> arrow.Arrow:
129+
try:
130+
return arw_now.dehumanize(value)
131+
except ValueError:
132+
try:
133+
return arrow.get(value)
134+
except (ValueError, TypeError):
135+
raise click.ClickException(f"cannot parse time {value}")
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import json
2+
import queue
3+
from dataclasses import dataclass
4+
from threading import Thread
5+
from typing import Callable, Iterator, Optional
6+
7+
import arrow
8+
import dateutil.parser
9+
from websocket import WebSocketApp
10+
11+
from lightning_app.utilities.log_helpers import _error_callback, _OrderedLogEntry
12+
from lightning_app.utilities.logs_socket_api import _ClusterLogsSocketAPI
13+
from lightning_app.utilities.network import LightningClient
14+
15+
16+
@dataclass
17+
class _ClusterLogEventLabels:
18+
cluster_id: str
19+
grid_url: str
20+
hostname: str
21+
level: str
22+
logger: str
23+
path: Optional[str] = None
24+
workspace: Optional[str] = None
25+
identifier: Optional[str] = None
26+
issuer: Optional[str] = None
27+
error: Optional[str] = None
28+
errorVerbose: Optional[str] = None
29+
30+
31+
@dataclass
32+
class _ClusterLogEvent(_OrderedLogEntry):
33+
labels: _ClusterLogEventLabels
34+
35+
36+
def _push_log_events_to_read_queue_callback(read_queue: queue.PriorityQueue):
37+
"""Pushes _LogEvents from websocket to read_queue.
38+
39+
Returns callback function used with `on_message_callback` of websocket.WebSocketApp.
40+
"""
41+
42+
def callback(ws_app: WebSocketApp, msg: str):
43+
# We strongly trust that the contract on API will hold atm :D
44+
event_dict = json.loads(msg)
45+
labels = _ClusterLogEventLabels(**event_dict["labels"])
46+
47+
if "message" in event_dict:
48+
message = event_dict["message"]
49+
timestamp = dateutil.parser.isoparse(event_dict["timestamp"])
50+
event = _ClusterLogEvent(
51+
message=message,
52+
timestamp=timestamp,
53+
labels=labels,
54+
)
55+
read_queue.put(event)
56+
57+
return callback
58+
59+
60+
def _cluster_logs_reader(
61+
client: LightningClient,
62+
cluster_id: str,
63+
start: arrow.Arrow,
64+
end: arrow.Arrow,
65+
limit: int,
66+
follow: bool,
67+
on_error_callback: Optional[Callable] = None,
68+
) -> Iterator[_ClusterLogEvent]:
69+
70+
logs_api_client = _ClusterLogsSocketAPI(client.api_client)
71+
read_queue = queue.PriorityQueue()
72+
73+
# We will use a socket inside a thread to read logs,
74+
# to follow our typical reading pattern
75+
log_socket = logs_api_client.create_cluster_logs_socket(
76+
cluster_id=cluster_id,
77+
start=start,
78+
end=end,
79+
limit=limit,
80+
on_message_callback=_push_log_events_to_read_queue_callback(read_queue),
81+
on_error_callback=on_error_callback or _error_callback,
82+
)
83+
84+
log_thread = Thread(target=log_socket.run_forever)
85+
86+
# Establish connection and begin pushing logs to the print queue
87+
log_thread.start()
88+
89+
# Print logs from queue when log event is available
90+
try:
91+
while True:
92+
log_event = read_queue.get(timeout=None if follow else 1.0)
93+
yield log_event
94+
95+
except queue.Empty:
96+
# Empty is raised by queue.get if timeout is reached. Follow = False case.
97+
pass
98+
99+
except KeyboardInterrupt:
100+
# User pressed CTRL+C to exit, we should respect that
101+
pass
102+
103+
finally:
104+
# Close connection - it will cause run_forever() to finish -> thread as finishes as well
105+
log_socket.close()
106+
107+
# The socket was closed, we can just wait for thread to finish.
108+
log_thread.join()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import logging
2+
from dataclasses import dataclass
3+
from datetime import datetime
4+
from json import JSONDecodeError
5+
6+
from websocket import WebSocketApp
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
# This is a superclass to inherit log entry classes from it:
12+
# it implements magic methods to sort logs by timestamps.
13+
@dataclass
14+
class _OrderedLogEntry:
15+
message: str
16+
timestamp: datetime
17+
18+
def __ge__(self, other: "_OrderedLogEntry") -> bool:
19+
return self.timestamp >= other.timestamp
20+
21+
def __gt__(self, other: "_OrderedLogEntry") -> bool:
22+
return self.timestamp > other.timestamp
23+
24+
25+
# A general error callback for log reading, prints most common types of possible errors.
26+
def _error_callback(ws_app: WebSocketApp, error: Exception):
27+
errors = {
28+
KeyError: "Malformed log message, missing key",
29+
JSONDecodeError: "Malformed log message",
30+
TypeError: "Malformed log format",
31+
ValueError: "Malformed date format",
32+
}
33+
logger.error(f"⚡ Error while reading logs ({errors.get(type(error), 'Unknown')}), {error}")
34+
ws_app.close()

0 commit comments

Comments
 (0)