-
Notifications
You must be signed in to change notification settings - Fork 5.8k
/
log_manager.py
477 lines (412 loc) · 17.1 KB
/
log_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
import logging
import re
from collections import defaultdict
from typing import AsyncIterable, Callable, Dict, List, Optional, Tuple
from ray._private.pydantic_compat import BaseModel
# TODO(sang): Remove the usage of this class.
from ray.dashboard.datacenter import DataSource
from ray.dashboard.modules.job.common import JOB_LOGS_PATH_TEMPLATE
from ray.util.state.common import (
DEFAULT_RPC_TIMEOUT,
GetLogOptions,
protobuf_to_task_state_dict,
)
from ray.util.state.exception import DataSourceUnavailable
from ray.util.state.state_manager import StateDataSourceClient
if BaseModel is None:
raise ModuleNotFoundError("Please install pydantic via `pip install pydantic`.")
logger = logging.getLogger(__name__)
WORKER_LOG_PATTERN = re.compile(".*worker-([0-9a-f]+)-([0-9a-f]+)-(\d+).(out|err)")
class ResolvedStreamFileInfo(BaseModel):
# The node id where the log file is located.
node_id: str
# The log file path name. Could be a relative path relative to ray's logging folder,
# or an absolute path.
filename: str
# Start offset in the log file to stream from. None to indicate beginning of
# the file, or determined by last tail lines.
start_offset: Optional[int]
# End offset in the log file to stream from. None to indicate the end of the file.
end_offset: Optional[int]
class LogsManager:
def __init__(self, data_source_client: StateDataSourceClient):
self.client = data_source_client
@property
def data_source_client(self) -> StateDataSourceClient:
return self.client
def ip_to_node_id(self, node_ip: Optional[str]):
"""Resolve the node id from a given node ip.
Args:
node_ip: The node ip.
Returns:
node_id if there's a node id that matches the given node ip and is alive.
None otherwise.
"""
return self.client.ip_to_node_id(node_ip)
async def list_logs(
self, node_id: str, timeout: int, glob_filter: str = "*"
) -> Dict[str, List[str]]:
"""Return a list of log files on a given node id filtered by the glob.
Args:
node_id: The node id where log files present.
timeout: The timeout of the API.
glob_filter: The glob filter to filter out log files.
Returns:
Dictionary of {component_name -> list of log files}
Raises:
DataSourceUnavailable: If a source is unresponsive.
"""
self._verify_node_registered(node_id)
reply = await self.client.list_logs(node_id, glob_filter, timeout=timeout)
return self._categorize_log_files(reply.log_files)
async def stream_logs(
self,
options: GetLogOptions,
) -> AsyncIterable[bytes]:
"""Generate a stream of logs in bytes.
Args:
options: The option for streaming logs.
Return:
Async generator of streamed logs in bytes.
"""
node_id = options.node_id or self.ip_to_node_id(options.node_ip)
res = await self.resolve_filename(
node_id=node_id,
log_filename=options.filename,
actor_id=options.actor_id,
task_id=options.task_id,
attempt_number=options.attempt_number,
pid=options.pid,
get_actor_fn=DataSource.actors.get,
timeout=options.timeout,
suffix=options.suffix,
submission_id=options.submission_id,
)
keep_alive = options.media_type == "stream"
stream = await self.client.stream_log(
node_id=res.node_id,
log_file_name=res.filename,
keep_alive=keep_alive,
lines=options.lines,
interval=options.interval,
# If we keepalive logs connection, we shouldn't have timeout
# otherwise the stream will be terminated forcefully
# after the deadline is expired.
timeout=options.timeout if not keep_alive else None,
start_offset=res.start_offset,
end_offset=res.end_offset,
)
async for streamed_log in stream:
yield streamed_log.data
def _verify_node_registered(self, node_id: str):
if node_id not in self.client.get_all_registered_log_agent_ids():
raise DataSourceUnavailable(
f"Given node id {node_id} is not available. "
"It's either the node is dead, or it is not registered. "
"Use `ray list nodes` "
"to see the node status. If the node is registered, "
"it is highly likely "
"a transient issue. Try again."
)
assert node_id is not None
async def _resolve_job_filename(self, sub_job_id: str) -> Tuple[str, str]:
"""Return the log file name and node id for a given job submission id.
Args:
sub_job_id: The job submission id.
Returns:
The log file name and node id.
"""
job_infos = await self.client.get_job_info(timeout=DEFAULT_RPC_TIMEOUT)
target_job = None
for job_info in job_infos:
if job_info.submission_id == sub_job_id:
target_job = job_info
break
if target_job is None:
logger.info(f"Submission job ID {sub_job_id} not found.")
return None, None
node_id = job_info.driver_node_id
if node_id is None:
raise ValueError(
f"Job {sub_job_id} has no driver node id info. "
"This is likely a bug. Please file an issue."
)
log_filename = JOB_LOGS_PATH_TEMPLATE.format(submission_id=sub_job_id)
return node_id, log_filename
async def _resolve_worker_file(
self,
node_id: str,
worker_id: Optional[str],
pid: Optional[int],
suffix: str,
timeout: int,
) -> Optional[str]:
"""Resolve worker log file."""
if worker_id is not None and pid is not None:
raise ValueError(
f"Only one of worker id({worker_id}) or pid({pid}) should be provided."
)
if worker_id is not None:
log_files = await self.list_logs(
node_id, timeout, glob_filter=f"*{worker_id}*{suffix}"
)
else:
log_files = await self.list_logs(
node_id, timeout, glob_filter=f"*{pid}*{suffix}"
)
# Find matching worker logs.
for filename in [*log_files["worker_out"], *log_files["worker_err"]]:
# Worker logs look like worker-[worker_id]-[job_id]-[pid].out
if worker_id is not None:
worker_id_from_filename = WORKER_LOG_PATTERN.match(filename).group(1)
if worker_id_from_filename == worker_id:
return filename
else:
worker_pid_from_filename = int(
WORKER_LOG_PATTERN.match(filename).group(3)
)
if worker_pid_from_filename == pid:
return filename
return None
async def _resolve_actor_filename(
self,
actor_id: str,
get_actor_fn: Callable[[str], Dict],
suffix: str,
timeout: int,
):
"""
Resolve actor log file
Args:
actor_id: The actor id.
get_actor_fn: The function to get actor information.
suffix: The suffix of the log file.
timeout: Timeout in seconds.
Returns:
The log file name and node id.
Raises:
ValueError if actor data is not found or get_actor_fn is not provided.
"""
if get_actor_fn is None:
raise ValueError("get_actor_fn needs to be specified for actor_id")
actor_data = get_actor_fn(actor_id)
if actor_data is None:
raise ValueError(f"Actor ID {actor_id} not found.")
# TODO(sang): Only the latest worker id can be obtained from
# actor information now. That means, if actors are restarted,
# there's no way for us to get the past worker ids.
worker_id = actor_data["address"].get("workerId")
if not worker_id:
raise ValueError(
f"Worker ID for Actor ID {actor_id} not found. "
"Actor is not scheduled yet."
)
node_id = actor_data["address"].get("rayletId")
if not node_id:
raise ValueError(
f"Node ID for Actor ID {actor_id} not found. "
"Actor is not scheduled yet."
)
self._verify_node_registered(node_id)
log_filename = await self._resolve_worker_file(
node_id=node_id,
worker_id=worker_id,
pid=None,
suffix=suffix,
timeout=timeout,
)
return node_id, log_filename
async def _resolve_task_filename(
self, task_id: str, attempt_number: int, suffix: str, timeout: int
):
"""
Resolve log file for a task.
Args:
task_id: The task id.
attempt_number: The attempt number.
suffix: The suffix of the log file, e.g. out or err
timeout: Timeout in seconds.
Returns:
The log file name, node id, the start and end offsets of the
corresponding task log in the file.
Raises:
FileNotFoundError if the log file is not found.
ValueError if the suffix is not out or err.
"""
log_filename = None
node_id = None
start_offset = None
end_offset = None
if suffix not in ["out", "err"]:
raise ValueError(f"Suffix {suffix} is not supported.")
reply = await self.client.get_all_task_info(
filters=[("task_id", "=", task_id)], timeout=timeout
)
# Check if the task is found.
if len(reply.events_by_task) == 0:
raise FileNotFoundError(
f"Could not find log file for task: {task_id}"
f" (attempt {attempt_number}) with suffix: {suffix}"
)
task_event = None
for t in reply.events_by_task:
if t.attempt_number == attempt_number:
task_event = t
break
if task_event is None:
raise FileNotFoundError(
"Could not find log file for task attempt:"
f"{task_id}({attempt_number})"
)
# Get the worker id and node id.
task = protobuf_to_task_state_dict(task_event)
worker_id = task.get("worker_id", None)
node_id = task.get("node_id", None)
log_info = task.get("task_log_info", None)
actor_id = task.get("actor_id", None)
if node_id is None:
raise FileNotFoundError(
"Could not find log file for task attempt."
f"{task_id}({attempt_number}) due to missing node info."
)
if log_info is None and actor_id is not None:
# This is a concurrent actor task. The logs will be interleaved.
# So we return the log file of the actor instead.
raise FileNotFoundError(
f"For actor task, please query actor log for "
f"actor({actor_id}): e.g. ray logs actor --id {actor_id} . Or "
"set RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1 in actor's runtime env "
"or when starting the cluster. Recording actor task's log could be "
"expensive, so Ray turns it off by default."
)
elif log_info is None:
raise FileNotFoundError(
"Could not find log file for task attempt:"
f"{task_id}({attempt_number})."
f"Worker id = {worker_id}, node id = {node_id},"
f"log_info = {log_info}"
)
filename_key = "stdout_file" if suffix == "out" else "stderr_file"
log_filename = log_info.get(filename_key, None)
if log_filename is None:
raise FileNotFoundError(
f"Missing log filename info in {log_info} for task {task_id},"
f"attempt {attempt_number}"
)
start_offset = log_info.get(f"std{suffix}_start", None)
end_offset = log_info.get(f"std{suffix}_end", None)
return node_id, log_filename, start_offset, end_offset
async def resolve_filename(
self,
*,
node_id: Optional[str] = None,
log_filename: Optional[str] = None,
actor_id: Optional[str] = None,
task_id: Optional[str] = None,
attempt_number: Optional[int] = None,
pid: Optional[str] = None,
get_actor_fn: Optional[Callable[[str], Dict]] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
suffix: str = "out",
submission_id: Optional[str] = None,
) -> ResolvedStreamFileInfo:
"""Return the file name given all options.
Args:
node_id: The node's id from which logs are resolved.
log_filename: Filename of the log file.
actor_id: Id of the actor that generates the log file.
task_id: Id of the task that generates the log file.
pid: Id of the worker process that generates the log file.
get_actor_fn: Callback to get the actor's data by id.
timeout: Timeout for the gRPC to listing logs on the node
specified by `node_id`.
suffix: Log suffix if no `log_filename` is provided, when
resolving by other ids'. Default to "out".
submission_id: The submission id for a submission job.
"""
start_offset = None
end_offset = None
if suffix not in ["out", "err"]:
raise ValueError(f"Suffix {suffix} is not supported. ")
# TODO(rickyx): We should make sure we do some sort of checking on the log
# filename
if actor_id:
node_id, log_filename = await self._resolve_actor_filename(
actor_id, get_actor_fn, suffix, timeout
)
elif task_id:
(
node_id,
log_filename,
start_offset,
end_offset,
) = await self._resolve_task_filename(
task_id, attempt_number, suffix, timeout
)
elif submission_id:
node_id, log_filename = await self._resolve_job_filename(submission_id)
elif pid:
if node_id is None:
raise ValueError(
"Node id needs to be specified for resolving"
f" filenames of pid {pid}"
)
self._verify_node_registered(node_id)
log_filename = await self._resolve_worker_file(
node_id=node_id,
worker_id=None,
pid=pid,
suffix=suffix,
timeout=timeout,
)
if log_filename is None:
raise FileNotFoundError(
"Could not find a log file. Please make sure the given "
"option exists in the cluster.\n"
f"\tnode_id: {node_id}\n"
f"\tfilename: {log_filename}\n"
f"\tactor_id: {actor_id}\n"
f"\ttask_id: {task_id}\n"
f"\tpid: {pid}\n"
f"\tsuffix: {suffix}\n"
f"\tsubmission_id: {submission_id}\n"
f"\tattempt_number: {attempt_number}\n"
)
res = ResolvedStreamFileInfo(
node_id=node_id,
filename=log_filename,
start_offset=start_offset,
end_offset=end_offset,
)
logger.info(f"Resolved log file: {res}")
return res
def _categorize_log_files(self, log_files: List[str]) -> Dict[str, List[str]]:
"""Categorize the given log files after filterieng them out using a given glob.
Returns:
Dictionary of {component_name -> list of log files}
"""
result = defaultdict(list)
for log_file in log_files:
if "worker" in log_file and (log_file.endswith(".out")):
result["worker_out"].append(log_file)
elif "worker" in log_file and (log_file.endswith(".err")):
result["worker_err"].append(log_file)
elif "core-worker" in log_file and log_file.endswith(".log"):
result["core_worker"].append(log_file)
elif "core-driver" in log_file and log_file.endswith(".log"):
result["driver"].append(log_file)
elif "raylet." in log_file:
result["raylet"].append(log_file)
elif "gcs_server." in log_file:
result["gcs_server"].append(log_file)
elif "log_monitor" in log_file:
result["internal"].append(log_file)
elif "monitor" in log_file:
result["autoscaler"].append(log_file)
elif "agent." in log_file:
result["agent"].append(log_file)
elif "dashboard." in log_file:
result["dashboard"].append(log_file)
else:
result["internal"].append(log_file)
return result