Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick sagemaker fix and move of kfp SM test #619

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 148 additions & 19 deletions tests/e2e/tests/test_sanity.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import subprocess
import time
import pytest
import json


from e2e.utils.constants import DEFAULT_USER_NAMESPACE
from e2e.utils.utils import load_yaml_file, wait_for, rand_name, write_yaml_file
Expand Down Expand Up @@ -39,9 +41,16 @@
from kubernetes.client.exceptions import ApiException as K8sApiException


from e2e.utils.aws.iam import IAMRole
from e2e.utils.s3_for_training.data_bucket import S3BucketWithTrainingData


INSTALLATION_PATH_FILE = "./resources/installation_config/vanilla.yaml"
CUSTOM_RESOURCE_TEMPLATES_FOLDER = "./resources/custom-resource-templates"

RANDOM_PREFIX = rand_name("kfp-")
PIPELINE_NAME_KFP = "[Tutorial] SageMaker Training"


@pytest.fixture(scope="class")
def installation_path():
Expand All @@ -62,12 +71,22 @@ def callback():

wait_for(callback, timeout=600)


@pytest.fixture(scope="class")
def setup_load_balancer(metadata, region, request, cluster, installation, root_domain_name, root_domain_hosted_zone_id):

def setup_load_balancer(
metadata,
region,
request,
cluster,
installation,
root_domain_name,
root_domain_hosted_zone_id,
):

lb_deps = {}
env_value = os.environ.copy()
env_value["PYTHONPATH"] = f"{os.getcwd()}/..:" + os.environ.get("PYTHONPATH", "")

def on_create():
if not root_domain_name or not root_domain_hosted_zone_id:
pytest.fail(
Expand All @@ -76,15 +95,8 @@ def on_create():

subdomain_name = rand_name("platform") + "." + root_domain_name
lb_config = {
"cluster": {
"region": region,
"name": cluster
},
"kubeflow": {
"alb": {
"scheme": "internet-facing"
}
},
"cluster": {"region": region, "name": cluster},
"kubeflow": {"alb": {"scheme": "internet-facing"}},
"route53": {
"rootDomain": {
"name": root_domain_name,
Expand All @@ -93,23 +105,19 @@ def on_create():
"subDomain": {
"name": subdomain_name,
},
}
},
}
write_yaml_file(lb_config, LB_CONFIG_FILE)

cmd = "python utils/load_balancer/setup_load_balancer.py".split()
retcode = subprocess.call(
cmd, stderr=subprocess.STDOUT, env=env_value
)
retcode = subprocess.call(cmd, stderr=subprocess.STDOUT, env=env_value)
assert retcode == 0
lb_deps["config"] = load_yaml_file(LB_CONFIG_FILE)

def on_delete():
if metadata.get("lb_deps"):
cmd = "python utils/load_balancer/lb_resources_cleanup.py".split()
retcode = subprocess.call(
cmd, stderr=subprocess.STDOUT, env=env_value
)
retcode = subprocess.call(cmd, stderr=subprocess.STDOUT, env=env_value)
assert retcode == 0

return configure_resource_fixture(
Expand All @@ -122,14 +130,87 @@ def host(setup_load_balancer):
print(setup_load_balancer["config"]["route53"]["subDomain"]["name"])
print("wait for 60s for website to be available...")
time.sleep(60)
host = "https://kubeflow." + setup_load_balancer["config"]["route53"]["subDomain"]["name"]
host = (
"https://kubeflow."
+ setup_load_balancer["config"]["route53"]["subDomain"]["name"]
)
print(f"accessing {host}...")
return host


@pytest.fixture(scope="class")
def sagemaker_execution_role(region, metadata, request):
sagemaker_execution_role_name = "role-" + RANDOM_PREFIX
managed_policies = [
"arn:aws:iam::aws:policy/AmazonS3FullAccess",
"arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
]
role = IAMRole(
name=sagemaker_execution_role_name, region=region, policy_arns=managed_policies
)
metadata_key = "sagemaker_execution_role"

resource_details = {}

def on_create():
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "sagemaker.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}

sagemaker_execution_role_arn = role.create(
policy_document=json.dumps(trust_policy)
)

resource_details["name"] = sagemaker_execution_role_name
resource_details["arn"] = sagemaker_execution_role_arn

def on_delete():
role.delete()

return configure_resource_fixture(
metadata=metadata,
request=request,
resource_details=resource_details,
metadata_key=metadata_key,
on_create=on_create,
on_delete=on_delete,
)


@pytest.fixture(scope="class")
def port_forward(installation):
pass


@pytest.fixture(scope="class")
def s3_bucket_with_data(region):
bucket_name = "s3-" + RANDOM_PREFIX
bucket = S3BucketWithTrainingData(
name=bucket_name,
cmd=f"python utils/s3_for_training/sync.py {bucket_name} {region}",
time_to_sleep=120,
)
bucket.create()

yield
bucket.delete()


@pytest.fixture(scope="class")
def clean_up_training_jobs_in_user_ns():
yield

cmd = f"kubectl delete trainingjobs --all -n {DEFAULT_USER_NAMESPACE}".split()
subprocess.Popen(cmd)


class TestSanity:
@pytest.fixture(scope="class")
def setup(self, metadata, host):
Expand Down Expand Up @@ -224,3 +305,51 @@ def test_katib_experiment(self, setup, cluster, region):
raise AssertionError("Expected K8sApiException Not Found")
except K8sApiException as e:
assert "Not Found" == e.reason


def test_run_kfp_sagemaker_pipeline(
self,
region,
metadata,
s3_bucket_with_data,
sagemaker_execution_role,
kfp_client,
clean_up_training_jobs_in_user_ns,
):

experiment_name = "experiment-" + RANDOM_PREFIX
experiment_description = "description-" + RANDOM_PREFIX
sagemaker_execution_role_name = "role-" + RANDOM_PREFIX
bucket_name = "s3-" + RANDOM_PREFIX
job_name = "kfp-run-" + RANDOM_PREFIX

sagemaker_execution_role_details = metadata.get("sagemaker_execution_role")
sagemaker_execution_role_arn = sagemaker_execution_role_details["arn"]

experiment = kfp_client.create_experiment(
experiment_name,
description=experiment_description,
namespace=DEFAULT_USER_NAMESPACE,
)

pipeline_id = kfp_client.get_pipeline_id(PIPELINE_NAME_KFP)

params = {
"sagemaker_role_arn": sagemaker_execution_role_arn,
"s3_bucket_name": bucket_name,
}

run = kfp_client.run_pipeline(
experiment.id, job_name=job_name, pipeline_id=pipeline_id, params=params
)

assert run.name == job_name
assert run.pipeline_spec.pipeline_id == pipeline_id
assert run.status == None

wait_for_run_succeeded(kfp_client, run, job_name, pipeline_id)

kfp_client.delete_experiment(experiment.id)

cmd = "kubectl delete trainingjobs --all -n kubeflow-user-example-com".split()
subprocess.Popen(cmd)
99 changes: 0 additions & 99 deletions tests/e2e/tests/test_sanity_portforward.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
from e2e.utils.load_balancer.common import CONFIG_FILE as LB_CONFIG_FILE
from kfp_server_api.exceptions import ApiException as KFPApiException
from kubernetes.client.exceptions import ApiException as K8sApiException
from e2e.utils.aws.iam import IAMRole
from e2e.utils.s3_for_training.data_bucket import S3BucketWithTrainingData
from e2e.fixtures.notebook_dependencies import notebook_server


Expand Down Expand Up @@ -92,62 +90,6 @@ def callback():
return wait_for(callback, timeout=900)


@pytest.fixture(scope="class")
def sagemaker_execution_role(region, metadata, request):
sagemaker_execution_role_name = "role-" + RANDOM_PREFIX
managed_policies = ["arn:aws:iam::aws:policy/AmazonS3FullAccess", "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"]
role = IAMRole(name=sagemaker_execution_role_name, region=region, policy_arns=managed_policies)
metadata_key = "sagemaker_execution_role"

resource_details = {}

def on_create():
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "sagemaker.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}

sagemaker_execution_role_arn = role.create(
policy_document=json.dumps(trust_policy)
)

resource_details["name"] = sagemaker_execution_role_name
resource_details["arn"] = sagemaker_execution_role_arn

def on_delete():
role.delete()

return configure_resource_fixture(
metadata=metadata,
request=request,
resource_details=resource_details,
metadata_key=metadata_key,
on_create=on_create,
on_delete=on_delete,
)

@pytest.fixture(scope="class")
def s3_bucket_with_data():
bucket_name = "s3-" + RANDOM_PREFIX
bucket = S3BucketWithTrainingData(name=bucket_name)
bucket.create()

yield
bucket.delete()

@pytest.fixture(scope="class")
def clean_up_training_jobs_in_user_ns():
yield

cmd = f"kubectl delete trainingjobs --all -n {DEFAULT_USER_NAMESPACE}".split()
subprocess.Popen(cmd)

class TestSanity:
@pytest.fixture(scope="class")
def setup(self):
Expand Down Expand Up @@ -278,44 +220,3 @@ def test_ack_crds(
# The second condition is now required in case the kfp test runs before this one.
assert expected_output in output or "training-job-" in output

def test_run_kfp_sagemaker_pipeline(
self, region, metadata, s3_bucket_with_data, sagemaker_execution_role, kfp_client, clean_up_training_jobs_in_user_ns
):

experiment_name = "experiment-" + RANDOM_PREFIX
experiment_description = "description-" + RANDOM_PREFIX
sagemaker_execution_role_name = "role-" + RANDOM_PREFIX
bucket_name = "s3-" + RANDOM_PREFIX
job_name = "kfp-run-" + RANDOM_PREFIX

sagemaker_execution_role_details = metadata.get("sagemaker_execution_role")
sagemaker_execution_role_arn = sagemaker_execution_role_details["arn"]


experiment = kfp_client.create_experiment(
experiment_name,
description=experiment_description,
namespace=DEFAULT_USER_NAMESPACE,
)

pipeline_id = kfp_client.get_pipeline_id(PIPELINE_NAME_KFP)

params = {
"sagemaker_role_arn": sagemaker_execution_role_arn,
"s3_bucket_name": bucket_name,
}

run = kfp_client.run_pipeline(
experiment.id, job_name=job_name, pipeline_id=pipeline_id, params=params
)

assert run.name == job_name
assert run.pipeline_spec.pipeline_id == pipeline_id
assert run.status == None

wait_for_run_succeeded(kfp_client, run, job_name, pipeline_id)

kfp_client.delete_experiment(experiment.id)

cmd = "kubectl delete trainingjobs --all -n kubeflow-user-example-com".split()
subprocess.Popen(cmd)
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ def get_account_id():

def delete_iam_role(role_name, policy_name, region):
iam_client = get_iam_client(region=region)
iam_client.detach_role_policy(
RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
)
acc_id = get_account_id()
custom_policy_arn = f"arn:aws:iam::{acc_id}:policy/{policy_name}"
iam_client.detach_role_policy(
RoleName=role_name, PolicyArn=custom_policy_arn
)
try:
iam_client.detach_role_policy(
RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
)
acc_id = get_account_id()
custom_policy_arn = f"arn:aws:iam::{acc_id}:policy/{policy_name}"
iam_client.detach_role_policy(
RoleName=role_name, PolicyArn=custom_policy_arn
)
except:
logger.log("Failed to detach role policy, it may not exist anymore.")

iam_client.delete_role(RoleName=role_name)
print(f"Deleted IAM Role : {role_name}")

Expand Down