Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Better spot logs #1412

Merged
merged 19 commits into from
Nov 30, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 116 additions & 78 deletions sky/spot/spot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import colorama
import filelock
import rich

from sky import backends
from sky import exceptions
Expand Down Expand Up @@ -37,6 +38,11 @@

_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS = 5

_JOB_WAITING_STATUS_MESSAGE = ('[bold cyan]Waiting for the job to start'
'{status_str}.[/] It may take a few minutes.')
_JOB_CANCELLED_MESSAGE = ('[bold cyan]Waiting for the job status to be updated.'
'[/] It may take a minute.')


class UserSignal(enum.Enum):
"""The signal to be sent to the user."""
Expand Down Expand Up @@ -197,89 +203,121 @@ def cancel_job_by_name(job_name: str) -> str:
def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
"""Stream logs by job id."""
controller_status = job_lib.get_status(job_id)
while (controller_status != job_lib.JobStatus.RUNNING and
(controller_status is None or not controller_status.is_terminal())):
status_str = 'None'
if controller_status is not None:
status_str = controller_status.value
logger.info(
'Waiting for the spot controller process to be RUNNING (status: '
f'{status_str}).')
time.sleep(_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS)
controller_status = job_lib.get_status(job_id)

job_status = spot_state.get_status(job_id)
while job_status is None:
logger.info('Waiting for the spot job to be started.')
time.sleep(1)
status_msg = ('[bold cyan]Waiting for controller process to be RUNNING '
'{status_str}[/]. It may take a few minutes.')
status_display = rich.status.Status(status_msg.format(status_str=''))
with status_display:
prev_msg = None
while (controller_status != job_lib.JobStatus.RUNNING and
(controller_status is None or
not controller_status.is_terminal())):
status_str = 'None'
if controller_status is not None:
status_str = controller_status.value
msg = status_msg.format(status_str=f' (status: {status_str})')
if msg != prev_msg:
status_display.update(msg)
prev_msg = msg
time.sleep(_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS)
controller_status = job_lib.get_status(job_id)

msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str='')
status_display.update(msg)
prev_msg = msg
job_status = spot_state.get_status(job_id)
while job_status is None:
time.sleep(1)
job_status = spot_state.get_status(job_id)

if job_status.is_terminal():
job_msg = ''
if job_status.is_failed():
job_msg = ('\nFor detailed error message, please check: '
f'{colorama.Style.BRIGHT}sky logs '
f'{SPOT_CONTROLLER_NAME} {job_id}'
f'{colorama.Style.RESET_ALL}')
return (
f'Job {job_id} is already in terminal state {job_status.value}. '
f'Logs will not be shown.{job_msg}')
task_name = spot_state.get_task_name_by_job_id(job_id)
cluster_name = generate_spot_cluster_name(task_name, job_id)
backend = backends.CloudVmRayBackend()
spot_status = spot_state.get_status(job_id)
# spot_status can be None if the controller process just started and has
# not updated the spot status yet.
while spot_status is None or not spot_status.is_terminal():
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
# Check the handle: The cluster can be preempted and removed from the
# table before the spot state is updated by the controller. In this
# case, we should skip the logging, and wait for the next round of
# status check.
if handle is None or spot_status != spot_state.SpotStatus.RUNNING:
status_help_str = ''
if (spot_status is not None and
spot_status != spot_state.SpotStatus.RUNNING):
status_help_str = f', as the spot job is {spot_status.value}'
logger.info(f'INFO: The log is not ready yet{status_help_str}. '
f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
if job_status.is_terminal():
job_msg = ''
if job_status.is_failed():
job_msg = ('\nFor detailed error message, please check: '
f'{colorama.Style.BRIGHT}sky logs '
f'{SPOT_CONTROLLER_NAME} {job_id}'
f'{colorama.Style.RESET_ALL}')
return (f'Job {job_id} is already in terminal state '
f'{job_status.value}. Logs will not be shown.{job_msg}')
task_name = spot_state.get_task_name_by_job_id(job_id)
cluster_name = generate_spot_cluster_name(task_name, job_id)
backend = backends.CloudVmRayBackend()
spot_status = spot_state.get_status(job_id)

# spot_status can be None if the controller process just started and has
# not updated the spot status yet.
while spot_status is None or not spot_status.is_terminal():
handle = global_user_state.get_handle_from_cluster_name(
cluster_name)
# Check the handle: The cluster can be preempted and removed from
# the table before the spot state is updated by the controller. In
# this case, we should skip the logging, and wait for the next
# round of status check.
if handle is None or spot_status != spot_state.SpotStatus.RUNNING:
status_str = ''
if (spot_status is not None and
spot_status != spot_state.SpotStatus.RUNNING):
status_str = f' (status: {spot_status.value})'
logger.debug(
f'INFO: The log is not ready yet{status_str}. '
f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str=status_str)
if msg != prev_msg:
status_display.update(msg)
prev_msg = msg
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
spot_status = spot_state.get_status(job_id)
continue
status_display.stop()
returncode = backend.tail_logs(handle,
job_id=None,
spot_job_id=job_id,
follow=follow)
if returncode == 0:
# If the log tailing exit successfully (the real job can be
# SUCCEEDED or FAILED), we can safely break the loop. We use the
# status in job queue to show the information, as the spot_state
# is not updated yet.
job_statuses = backend.get_job_status(handle, stream_logs=False)
job_status = list(job_statuses.values())[0]
assert job_status is not None, 'No job found.'
if job_status != job_lib.JobStatus.CANCELLED:
break
# The job can be cancelled by the user or the controller (when
# the cluster is partially preempted).
logger.debug(
'INFO: Job is cancelled. Waiting for the status update in '
f'{JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
else:
logger.debug(
f'INFO: (Log streaming) Got return code {returncode}. '
f'Retrying in {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
# Finish early if the spot status is already in terminal state.
spot_status = spot_state.get_status(job_id)
continue
returncode = backend.tail_logs(handle,
job_id=None,
spot_job_id=job_id,
follow=follow)
if returncode == 0:
# If the log tailing exit successfully (the real job can be
# SUCCEEDED or FAILED), we can safely break the loop. We use the
# status in job queue to show the information, as the spot_state is
# not updated yet.
job_statuses = backend.get_job_status(handle, stream_logs=False)
job_status = list(job_statuses.values())[0]
assert job_status is not None, 'No job found.'
if job_status != job_lib.JobStatus.CANCELLED:
logger.info(f'Logs finished for job {job_id} '
f'(status: {job_status.value}).')
if spot_status.is_terminal():
break
# The job can be cancelled by the user or the controller (when the
# the cluster is partially preempted).
logger.info('INFO: (Log streaming) Job is cancelled. Waiting '
'for the status update in '
f'{JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
else:
logger.info(
f'INFO: (Log streaming) Got return code {returncode}. Retrying '
f'in {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
# If the tailing fails, it is likely that the cluster fails, so we wait
# a while to make sure the spot state is updated by the controller, and
# check the spot queue again.
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
logger.info(f'{colorama.Fore.YELLOW}The job is preempted.'
f'{colorama.Style.RESET_ALL}')
msg = _JOB_CANCELLED_MESSAGE
status_display.update(msg)
prev_msg = msg
status_display.start()
# If the tailing fails, it is likely that the cluster fails, so we
# wait a while to make sure the spot state is updated by the
# controller, and check the spot queue again.
# Wait a bit longer than the controller, so as to make sure the
# spot state is updated.
time.sleep(3 * JOB_STATUS_CHECK_GAP_SECONDS)
spot_status = spot_state.get_status(job_id)

# The spot_status may not be in terminal status yet, since the controllerhas
# not updated the spot state yet. We wait for a while, until the spot state
# is updated.
spot_status = spot_state.get_status(job_id)
while not spot_status.is_terminal():
time.sleep(1)
spot_status = spot_state.get_status(job_id)
else:
# The spot_status is in terminal state.
logger.info(f'Logs finished for job {job_id} '
f'(status: {spot_state.get_status(job_id).value}).')
logger.info(f'Logs finished for job {job_id} '
f'(status: {spot_status.value}).')
return ''


Expand Down