Skip to content

Commit

Permalink
modify cluster status checks tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Aug 27, 2024
1 parent da456c2 commit 717583b
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 181 deletions.
5 changes: 5 additions & 0 deletions runhouse/resources/hardware/on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,9 @@ def teardown(self):
Example:
>>> rh.ondemand_cluster("rh-cpu").teardown()
"""

status_code_den_request = None

try:
cluster_status_data = self.status()
status_data = {
Expand All @@ -584,6 +587,8 @@ def teardown(self):
sky.down(self.name)
self.address = None

return status_code_den_request

def teardown_and_delete(self):
"""Teardown cluster and delete it from configs.
Expand Down
161 changes: 102 additions & 59 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import datetime
import json
import os.path
import threading
from typing import Any, Dict, List, Optional, Set, Tuple, Union

Expand Down Expand Up @@ -70,10 +71,10 @@ async def __init__(
)

logger.info("Creating periodic_cluster_checks thread.")
cluster_checks_thread = threading.Thread(
self.cluster_checks_thread = threading.Thread(
target=self.periodic_cluster_checks, daemon=True
)
cluster_checks_thread.start()
self.cluster_checks_thread.start()

##############################################
# Cluster config state storage methods
Expand All @@ -87,6 +88,16 @@ async def aset_cluster_config(self, cluster_config: Dict[str, Any]):

self.cluster_config = cluster_config

new_cluster_name = self.cluster_config.get("name", None)

if self._cluster_name != new_cluster_name:
self._cluster_name = new_cluster_name
self._cluster_uri = (
rns_client.format_rns_address(self._cluster_name)
if self._cluster_name
else None
)

# Propagate the changes to all other process's obj_stores
await asyncio.gather(
*[
Expand Down Expand Up @@ -235,7 +246,9 @@ async def asave_status_metrics_to_den(self, status: dict):

status_data = {
"status": ResourceServerStatus.running,
"resource_type": status_copy.get("cluster_config").pop("resource_type"),
"resource_type": status_copy.get("cluster_config").pop(
"resource_type", "cluster"
),
"resource_info": status_copy,
"env_servlet_processes": env_servlet_processes,
}
Expand All @@ -251,6 +264,66 @@ async def asave_status_metrics_to_den(self, status: dict):
def save_status_metrics_to_den(self, status: dict):
return sync_function(self.asave_status_metrics_to_den)(status)

async def acheck_cluster_status_and_logs(
self, interval_size: int, send_to_den: bool = True
):

logger.debug("Performing cluster checks")
status, den_resp_status = await self.astatus(send_to_den=send_to_den)
logs_resp_status_code, new_start_log_line, new_end_log_line = None, None, None

if not send_to_den:
return (
den_resp_status,
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
status,
)

status_code_cluster_status = den_resp_status.status_code

if status_code_cluster_status == 404:
logger.info(
"Cluster has not yet been saved to Den, cannot update status or logs."
)
elif status_code_cluster_status != 200:
logger.error(
f"Failed to send cluster status to Den: {den_resp_status.json()}"
)
else:
logger.debug("Successfully sent cluster status to Den.")

cluster_config = await self.aget_cluster_config()
prev_end_log_line = cluster_config.get("end_log_line", 0)
(
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
) = await self.send_cluster_logs_to_den(
prev_end_log_line=prev_end_log_line,
)
if not logs_resp_status_code:
logger.debug(
f"No logs were generated in the past {interval_size} minute(s), logs were not sent to Den."
)

elif logs_resp_status_code == 200:
logger.debug("Successfully sent cluster logs to Den.")
await self.aset_cluster_config_value(
key="start_log_line", value=new_start_log_line
)
await self.aset_cluster_config_value(
key="end_log_line", value=new_end_log_line
)
return (
status_code_cluster_status,
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
status,
)

async def aperiodic_cluster_checks(self):
"""Periodically check the status of the cluster, gather metrics about the cluster's utilization & memory,
and save it to Den."""
Expand All @@ -260,71 +333,34 @@ async def aperiodic_cluster_checks(self):
"status_check_interval", DEFAULT_STATUS_CHECK_INTERVAL
)
while True:
try:
# Only if one of these is true, do we actually need to get the status from each EnvServlet
should_send_status_and_logs_to_den: bool = (
configs.token is not None
and interval_size != -1
and self._cluster_uri is not None
)
should_update_autostop: bool = self.autostop_helper is not None
should_send_status_and_logs_to_den: bool = (
configs.token is not None
and interval_size != -1
and self._cluster_uri is not None
)
should_update_autostop: bool = self.autostop_helper is not None

if (
not should_send_status_and_logs_to_den
and not should_update_autostop
):
break
if not should_send_status_and_logs_to_den and not should_update_autostop:
break

logger.debug("Performing cluster checks")
status, den_resp = await self.astatus(
send_to_den=should_send_status_and_logs_to_den
try:
(
status_code_cluster_status,
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
status,
) = await self.acheck_cluster_status_and_logs(
interval_size=interval_size,
send_to_den=should_send_status_and_logs_to_den,
)

if should_update_autostop:
logger.debug("Updating autostop")
await self._update_autostop(status)

if not should_send_status_and_logs_to_den:
elif not should_send_status_and_logs_to_den:
break

status_code = den_resp.status_code

if status_code == 404:
logger.info(
"Cluster has not yet been saved to Den, cannot update status or logs."
)
elif status_code != 200:
logger.error(
f"Failed to send cluster status to Den: {den_resp.json()}"
)
else:
logger.debug("Successfully sent cluster status to Den.")

prev_end_log_line = cluster_config.get("end_log_line", 0)
(
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
) = await self.send_cluster_logs_to_den(
prev_end_log_line=prev_end_log_line,
)
if not logs_resp_status_code:
logger.debug(
f"No logs were generated in the past {interval_size} minute(s), logs were not sent to Den."
)

elif logs_resp_status_code == 200:
logger.debug("Successfully sent cluster logs to Den.")
await self.aset_cluster_config_value(
key="start_log_line", value=new_start_log_line
)
await self.aset_cluster_config_value(
key="end_log_line", value=new_end_log_line
)
# since we are setting a new values to the cluster_config, we need to reload it so the next
# cluster check iteration will reference to the updated cluster config.
cluster_config = await self.aget_cluster_config()

except Exception:
self.logger.error(
"Cluster checks have failed.\n"
Expand Down Expand Up @@ -525,6 +561,10 @@ def status(self, send_to_den: bool = False):
# Save cluster logs to Den
##############################################
def _get_logs(self):

if not os.path.exists(SERVER_LOGFILE):
return ""

with open(SERVER_LOGFILE) as log_file:
log_lines = log_file.readlines()
cleaned_log_lines = [ColoredFormatter.format_log(line) for line in log_lines]
Expand Down Expand Up @@ -573,3 +613,6 @@ async def send_cluster_logs_to_den(
)

return resp_status_code, prev_end_log_line, new_end_log_line

def _cluster_periodic_thread_is_alive(self):
return self.cluster_checks_thread.is_alive()
4 changes: 2 additions & 2 deletions runhouse/servers/env_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _get_env_cpu_usage(self, cluster_config: dict = None):

if not cluster_config.get("resource_subtype") == "Cluster":
stable_internal_external_ips = cluster_config.get(
"stable_internal_external_ips"
"stable_internal_external_ips", []
)
for ips_set in stable_internal_external_ips:
internal_ip, external_ip = ips_set[0], ips_set[1]
Expand All @@ -214,7 +214,7 @@ def _get_env_cpu_usage(self, cluster_config: dict = None):
node_name = f"worker_{stable_internal_external_ips.index(ips_set)} ({external_ip})"
else:
# a case it is a BYO cluster, assume that first ip in the ips list is the head.
ips = cluster_config.get("ips")
ips = cluster_config.get("ips", [])
if len(ips) == 1 or node_ip == ips[0]:
node_name = f"head ({node_ip})"
else:
Expand Down
105 changes: 8 additions & 97 deletions tests/test_resources/test_clusters/test_on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@
import time

import pytest
import requests

import runhouse as rh
from runhouse.constants import SERVER_LOGFILE_PATH
from runhouse.globals import rns_client
from runhouse.logger import ColoredFormatter
from runhouse.resources.hardware.utils import ResourceServerStatus

import tests.test_resources.test_clusters.test_cluster
from tests.utils import friend_account
Expand Down Expand Up @@ -201,97 +196,13 @@ def test_fn_to_docker_container(self, ondemand_aws_cluster):
# Status tests
####################################################################################################

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_status_scheduler_basic_flow(self, cluster):

cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
api_server_url = rh.globals.rns_client.api_server_url
headers = rh.globals.rns_client.request_headers()

get_status_data_resp = requests.get(
f"{api_server_url}/resource/{cluster_uri}/cluster/status?limit=1",
headers=headers,
)

assert get_status_data_resp.status_code == 200
resp_data = get_status_data_resp.json().get("data")[0]
assert resp_data.get("status") == ResourceServerStatus.running

# TODO [sb]: call cluster's /logs endpoint once introduced.
cluster_logs = cluster.run([f"cat {SERVER_LOGFILE_PATH}"], stream_logs=False)[
0
][1].split(
"\n"
) # create list of lines
cluster_logs = [
ColoredFormatter.format_log(log) for log in cluster_logs
] # clean log formatting
cluster_logs = "\n".join(cluster_logs) # make logs list into one string

assert "Cluster checks have failed" not in cluster_logs
assert "Failed to send cluster status to Den: " not in cluster_logs

@pytest.mark.level("minimal")
@pytest.mark.skip("Test requires terminating the cluster")
def test_set_status_after_teardown(self, cluster):

def test_set_status_after_teardown(self, cluster, mocker):
mock_function = mocker.patch("sky.down")
response = cluster.teardown()
assert isinstance(response, int)
assert (
response == 200
) # that means that the call to post status endpoint in den was successful
mock_function.assert_called_once()
assert cluster.is_up()
cluster_config = cluster.config()
cluster_uri = rns_client.format_rns_address(cluster.rns_address)
api_server_url = cluster_config.get("api_server_url", rns_client.api_server_url)
cluster.teardown()
get_status_data_resp = requests.get(
f"{api_server_url}/resource/{cluster_uri}/cluster/status",
headers=rns_client.request_headers(),
)

assert get_status_data_resp.status_code == 200
# For UI displaying purposes, the cluster/status endpoint returns cluster status history.
# The latest status info is the first element in the list returned by the endpoint.
get_status_data = get_status_data_resp.json()["data"][0]
assert get_status_data["resource_type"] == cluster_config.get("resource_type")
assert get_status_data["status"] == ResourceServerStatus.terminated

####################################################################################################
# Logs surfacing tests
####################################################################################################
@pytest.mark.level("minimal")
def test_logs_surfacing_scheduler_basic_flow(self, cluster):
cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
headers = rh.globals.rns_client.request_headers()
api_server_url = rh.globals.rns_client.api_server_url

get_logs_data_resp = requests.get(
f"{api_server_url}/resource/{cluster_uri}/logs",
headers=headers,
)

# TODO [sb]: call cluster's /logs endpoint once introduced.
cluster_logs = cluster.run([f"cat {SERVER_LOGFILE_PATH}"], stream_logs=False)[
0
][1].split(
"\n"
) # create list of lines
cluster_logs = [
ColoredFormatter.format_log(log) for log in cluster_logs
] # clean log formatting
cluster_logs = "\n".join(cluster_logs) # make logs list into one string

assert "Cluster checks have failed" not in cluster_logs
assert "Failed to send cluster logs to Den: " not in cluster_logs

cluster_status = cluster.status()
logs_start_line = cluster_status.get("cluster_config").get(
"start_log_line", None
)
logs_end_line = cluster_status.get("cluster_config").get("end_log_line", None)

assert logs_start_line
assert logs_end_line
assert logs_end_line > logs_start_line

resp_data = get_logs_data_resp.json().get("data")
assert get_logs_data_resp.status_code == 200
cluster_logs_from_s3 = resp_data["logs_text"][0][1:].replace("\n ", "\n")
assert cluster_logs_from_s3 in cluster_logs
Loading

0 comments on commit 717583b

Please sign in to comment.