Skip to content

Commit

Permalink
Group status tests together
Browse files Browse the repository at this point in the history
  • Loading branch information
dongreenberg committed Jun 17, 2024
1 parent 6d5549a commit c099afb
Showing 1 changed file with 165 additions and 174 deletions.
339 changes: 165 additions & 174 deletions tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,6 @@ def test_cluster_endpoint(self, cluster):
verify=verify,
headers=rh.globals.rns_client.request_headers(),
)
assert r.status_code == 200
assert r.json().get("cluster_config")["resource_type"] == "cluster"

@pytest.mark.level("local")
def test_load_cluster_status(self, cluster):
endpoint = cluster.endpoint()
verify = cluster.client.verify
r = requests.get(
f"{endpoint}/status",
verify=verify,
headers=rh.globals.rns_client.request_headers(),
)

assert r.status_code == 200
status_data = r.json()
assert status_data["cluster_config"]["resource_type"] == "cluster"
Expand Down Expand Up @@ -276,6 +263,171 @@ def test_rh_here_objects(self, cluster):
assert "test_table" in cluster.keys()
assert isinstance(cluster.get("test_table"), rh.Table)

@pytest.mark.level("local")
def test_condensed_config_for_cluster(self, cluster):
remote_cluster_config = rh.function(cluster_config).to(cluster)
on_cluster_config = remote_cluster_config()
local_cluster_config = cluster.config()

keys_to_skip = [
"creds",
"client_port",
"server_host",
"api_server_url",
"ssl_keyfile",
"ssl_certfile",
]
on_cluster_config = remove_config_keys(on_cluster_config, keys_to_skip)
local_cluster_config = remove_config_keys(local_cluster_config, keys_to_skip)

if local_cluster_config.get("stable_internal_external_ips", False):
cluster_ips = local_cluster_config.pop(
"stable_internal_external_ips", None
)[0]
on_cluster_ips = on_cluster_config.pop(
"stable_internal_external_ips", None
)[0]
assert tuple(cluster_ips) == tuple(on_cluster_ips)

assert on_cluster_config == local_cluster_config

@pytest.mark.level("local")
def test_sharing(self, cluster, friend_account_logged_in_docker_cluster_pk_ssh):
# Skip this test for ondemand clusters, because making
# it compatible with ondemand_cluster requires changes
# that break CI.
# TODO: Remove this by doing some CI-specific logic.
if cluster.__class__.__name__ == "OnDemandCluster":
return

if cluster.rns_address.startswith("~"):
# For `local_named_resource` resolve the rns address so it can be shared and loaded
from runhouse.globals import rns_client

cluster.rns_address = rns_client.local_to_remote_address(
cluster.rns_address
)

cluster.share(
users=["info@run.house"],
access_level="read",
notify_users=False,
)

# First try loading in same process/filesystem because it's more debuggable, but not as thorough
resource_class_name = cluster.config().get("resource_type").capitalize()
config = cluster.config()

with friend_account():
curr_config = load_shared_resource_config(
resource_class_name, cluster.rns_address
)
new_creds = curr_config.get("creds", None)
assert f'{config["name"]}-ssh-secret' in new_creds
assert curr_config == config

# TODO: If we are testing with an ondemand_cluster we to
# sync sky key so loading ondemand_cluster from config works
# Also need aws secret to load availability zones
# secrets=["sky", "aws"],
load_shared_resource_config_cluster = rh.function(
load_shared_resource_config
).to(friend_account_logged_in_docker_cluster_pk_ssh)
new_config = load_shared_resource_config_cluster(
resource_class_name, cluster.rns_address
)
new_creds = curr_config.get("creds", None)
assert f'{config["name"]}-ssh-secret' in new_creds
assert new_config == config

@pytest.mark.level("local")
def test_access_to_shared_cluster(self, cluster):
# TODO: Remove this by doing some CI-specific logic.
if cluster.__class__.__name__ == "OnDemandCluster":
return

if cluster.rns_address.startswith("~"):
# For `local_named_resource` resolve the rns address so it can be shared and loaded
from runhouse.globals import rns_client

cluster.rns_address = rns_client.local_to_remote_address(
cluster.rns_address
)

cluster.share(
users=["support@run.house"],
access_level="write",
notify_users=False,
)

cluster_name = cluster.rns_address
cluster_creds = cluster.creds_values
cluster_creds.pop("private_key", None)
cluster_creds.pop("public_key", None)

with friend_account_in_org():
shared_cluster = rh.cluster(name=cluster_name)
assert shared_cluster.rns_address == cluster_name
assert shared_cluster.creds_values.keys() == cluster_creds.keys()
echo_msg = "hello from shared cluster"
run_res = shared_cluster.run([f"echo {echo_msg}"])
assert echo_msg in run_res[0][1]
# First element, return code
assert shared_cluster.run(["echo hello"])[0][0] == 0

@pytest.mark.level("local")
def test_changing_name_and_saving_in_between(self, cluster):
remote_summer = rh.function(summer).to(cluster)
assert remote_summer(3, 4) == 7
old_name = cluster.name

cluster.save(name="new_testing_name")

assert remote_summer(3, 4) == 7
remote_sub = rh.function(sub).to(cluster)
assert remote_sub(3, 4) == -1

cluster_keys_remote = rh.function(cluster_keys).to(cluster)

# If save did not update the name, this will attempt to create a connection
# when the cluster is used remotely. However, if you update the name, `on_this_cluster` will
# work correctly and then the remote function will just call the object store when it calls .keys()
assert cluster.keys() == cluster_keys_remote(cluster)

# Restore the state?
cluster.save(name=old_name)

@pytest.mark.level("local")
def test_caller_token_propagated(self, cluster):
remote_assume_caller_and_get_token = rh.function(
assume_caller_and_get_token
).to(cluster)

remote_assume_caller_and_get_token.share(
users=["info@run.house"], notify_users=False
)

with friend_account():
unassumed_token, assumed_token = remote_assume_caller_and_get_token()
# "Local token" is the token the cluster accesses in rh.configs.token; this is what will be used
# in subsequent rns_client calls
assert assumed_token == rh.globals.rns_client.cluster_token(
rh.configs.token, cluster.rns_address
)
assert unassumed_token != rh.configs.token

# Docker clusters are logged out, ondemand clusters are logged in
output = cluster.run("sed -n 's/.*token: *//p' ~/.rh/config.yaml")
# No config file
if output[0][0] == 2:
assert unassumed_token is None
elif output[0][0] == 0:
assert unassumed_token == output[0][1].strip()

####################################################################################################
# Status tests
####################################################################################################

@pytest.mark.level("local")
def test_rh_status_pythonic(self, cluster):
sleep_remote = rh.function(sleep_fn).to(
Expand Down Expand Up @@ -505,167 +657,6 @@ def test_rh_status_stopped(self, cluster):
finally:
cluster.run(["runhouse restart"])

@pytest.mark.level("local")
def test_condensed_config_for_cluster(self, cluster):
remote_cluster_config = rh.function(cluster_config).to(cluster)
on_cluster_config = remote_cluster_config()
local_cluster_config = cluster.config()

keys_to_skip = [
"creds",
"client_port",
"server_host",
"api_server_url",
"ssl_keyfile",
"ssl_certfile",
]
on_cluster_config = remove_config_keys(on_cluster_config, keys_to_skip)
local_cluster_config = remove_config_keys(local_cluster_config, keys_to_skip)

if local_cluster_config.get("stable_internal_external_ips", False):
cluster_ips = local_cluster_config.pop(
"stable_internal_external_ips", None
)[0]
on_cluster_ips = on_cluster_config.pop(
"stable_internal_external_ips", None
)[0]
assert tuple(cluster_ips) == tuple(on_cluster_ips)

assert on_cluster_config == local_cluster_config

@pytest.mark.level("local")
def test_sharing(self, cluster, friend_account_logged_in_docker_cluster_pk_ssh):
# Skip this test for ondemand clusters, because making
# it compatible with ondemand_cluster requires changes
# that break CI.
# TODO: Remove this by doing some CI-specific logic.
if cluster.__class__.__name__ == "OnDemandCluster":
return

if cluster.rns_address.startswith("~"):
# For `local_named_resource` resolve the rns address so it can be shared and loaded
from runhouse.globals import rns_client

cluster.rns_address = rns_client.local_to_remote_address(
cluster.rns_address
)

cluster.share(
users=["info@run.house"],
access_level="read",
notify_users=False,
)

# First try loading in same process/filesystem because it's more debuggable, but not as thorough
resource_class_name = cluster.config().get("resource_type").capitalize()
config = cluster.config()

with friend_account():
curr_config = load_shared_resource_config(
resource_class_name, cluster.rns_address
)
new_creds = curr_config.get("creds", None)
assert f'{config["name"]}-ssh-secret' in new_creds
assert curr_config == config

# TODO: If we are testing with an ondemand_cluster we to
# sync sky key so loading ondemand_cluster from config works
# Also need aws secret to load availability zones
# secrets=["sky", "aws"],
load_shared_resource_config_cluster = rh.function(
load_shared_resource_config
).to(friend_account_logged_in_docker_cluster_pk_ssh)
new_config = load_shared_resource_config_cluster(
resource_class_name, cluster.rns_address
)
new_creds = curr_config.get("creds", None)
assert f'{config["name"]}-ssh-secret' in new_creds
assert new_config == config

@pytest.mark.level("local")
def test_access_to_shared_cluster(self, cluster):
# TODO: Remove this by doing some CI-specific logic.
if cluster.__class__.__name__ == "OnDemandCluster":
return

if cluster.rns_address.startswith("~"):
# For `local_named_resource` resolve the rns address so it can be shared and loaded
from runhouse.globals import rns_client

cluster.rns_address = rns_client.local_to_remote_address(
cluster.rns_address
)

cluster.share(
users=["support@run.house"],
access_level="write",
notify_users=False,
)

cluster_name = cluster.rns_address
cluster_creds = cluster.creds_values
cluster_creds.pop("private_key", None)
cluster_creds.pop("public_key", None)

with friend_account_in_org():
shared_cluster = rh.cluster(name=cluster_name)
assert shared_cluster.rns_address == cluster_name
assert shared_cluster.creds_values.keys() == cluster_creds.keys()
echo_msg = "hello from shared cluster"
run_res = shared_cluster.run([f"echo {echo_msg}"])
assert echo_msg in run_res[0][1]
# First element, return code
assert shared_cluster.run(["echo hello"])[0][0] == 0

@pytest.mark.level("local")
def test_changing_name_and_saving_in_between(self, cluster):
remote_summer = rh.function(summer).to(cluster)
assert remote_summer(3, 4) == 7
old_name = cluster.name

cluster.save(name="new_testing_name")

assert remote_summer(3, 4) == 7
remote_sub = rh.function(sub).to(cluster)
assert remote_sub(3, 4) == -1

cluster_keys_remote = rh.function(cluster_keys).to(cluster)

# If save did not update the name, this will attempt to create a connection
# when the cluster is used remotely. However, if you update the name, `on_this_cluster` will
# work correctly and then the remote function will just call the object store when it calls .keys()
assert cluster.keys() == cluster_keys_remote(cluster)

# Restore the state?
cluster.save(name=old_name)

@pytest.mark.level("local")
def test_caller_token_propagated(self, cluster):
remote_assume_caller_and_get_token = rh.function(
assume_caller_and_get_token
).to(cluster)

remote_assume_caller_and_get_token.share(
users=["info@run.house"], notify_users=False
)

with friend_account():
unassumed_token, assumed_token = remote_assume_caller_and_get_token()
# "Local token" is the token the cluster accesses in rh.configs.token; this is what will be used
# in subsequent rns_client calls
assert assumed_token == rh.globals.rns_client.cluster_token(
rh.configs.token, cluster.rns_address
)
assert unassumed_token != rh.configs.token

# Docker clusters are logged out, ondemand clusters are logged in
output = cluster.run("sed -n 's/.*token: *//p' ~/.rh/config.yaml")
# No config file
if output[0][0] == 2:
assert unassumed_token is None
elif output[0][0] == 0:
assert unassumed_token == output[0][1].strip()

@pytest.mark.level("local")
def test_send_status_to_db(self, cluster):
import json
Expand Down

0 comments on commit c099afb

Please sign in to comment.