Skip to content

Commit

Permalink
Merge pull request #814 from richardsliu/estimator_test
Browse files Browse the repository at this point in the history
Estimator e2etest
  • Loading branch information
richardsliu authored Sep 5, 2018
2 parents ddc9abf + 6f6f2fe commit 6af5924
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 29 deletions.
118 changes: 92 additions & 26 deletions py/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import json
import os
import re
import requests
import retrying
import subprocess
import time
import uuid
import yaml

from kubernetes import client as k8s_client
from kubernetes.client import rest
Expand Down Expand Up @@ -290,40 +290,86 @@ def parse_events(events):


@retrying.retry(wait_fixed=10, stop_max_delay=60)
def terminateReplica(masterHost, namespace, target, exitCode=0):
def terminate_replica(master_host, namespace, target, exit_code=0):
"""Issue a request to terminate the requested TF replica running test_app.
Args:
masterHost: The IP address of the master e.g. https://35.188.37.10
master_host: The IP address of the master e.g. https://35.188.37.10
namespace: The namespace
target: The K8s service corresponding to the pod to terminate.
exitCode: What exit code to terminate the pod with.
exit_code: What exit code to terminate the pod with.
"""
params = {
"exitCode": exitCode,
"exitCode": exit_code,
}
tf_operator_util.send_request(master_host, namespace, target, "exit", params)

token = subprocess.check_output(["gcloud", "auth", "print-access-token"])
headers = {
"Authorization": "Bearer " + token.strip(),

def get_runconfig(master_host, namespace, target):
"""Issue a request to get the runconfig of the specified replica running test_server.
Args:
master_host: The IP address of the master e.g. https://35.188.37.10
namespace: The namespace
target: The K8s service corresponding to the pod to call.
"""
response = tf_operator_util.send_request(master_host, namespace, target, "runconfig", {})
return yaml.load(response)


def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_workers):
"""Verifies that the TF RunConfig on the specified replica is the same as expected.
Args:
master_host: The IP address of the master e.g. https://35.188.37.10
namespace: The namespace
job_name: The name of the TF job
replica: The replica type (chief, ps, or worker)
num_ps: The number of PS replicas
num_workers: The number of worker replicas
"""
is_chief = True
num_replicas = 1
if replica == "ps":
is_chief = False
num_replicas = num_ps
elif replica == "worker":
is_chief = False
num_replicas = num_workers

# Construct the expected cluster spec
chief_list = ["{name}-chief-0:2222".format(name=job_name)]
ps_list = []
for i in range(num_ps):
ps_list.append("{name}-ps-{index}:2222".format(name=job_name, index=i))
worker_list = []
for i in range(num_workers):
worker_list.append("{name}-worker-{index}:2222".format(name=job_name, index=i))
cluster_spec = {
"chief": chief_list,
"ps": ps_list,
"worker": worker_list,
}
url = ("{master}/api/v1/namespaces/{namespace}/services/{service}:2222"
"/proxy/exit").format(
master=masterHost, namespace=namespace, service=target)
r = requests.get(url,
headers=headers, params=params,
verify=False)

if r.status_code == requests.codes.NOT_FOUND:
logging.info("Request to %s returned 404", url)
return
if r.status_code != requests.codes.OK:
msg = "Request to {0} exited with status code: {1}".format(url,
r.status_code)
logging.error(msg)
raise RuntimeError(msg)

logging.info("URL %s returned; %s", url, r.content)

for i in range(num_replicas):
full_target = "{name}-{replica}-{index}".format(name=job_name, replica=replica.lower(), index=i)
actual_config = get_runconfig(master_host, namespace, full_target)
expected_config = {
"task_type": replica,
"task_id": i,
"cluster_spec": cluster_spec,
"is_chief": is_chief,
"master": "grpc://{target}:2222".format(target=full_target),
"num_worker_replicas": num_workers + 1, # Chief is also a worker
"num_ps_replicas": num_ps,
}
# Compare expected and actual configs
if actual_config != expected_config:
msg = "Actual runconfig differs from expected. Expected: {0} Actual: {1}".format(
str(expected_config), str(actual_config))
logging.error(msg)
raise RuntimeError(msg)


def setup_ks_app(args):
"""Setup the ksonnet app"""
Expand Down Expand Up @@ -482,7 +528,21 @@ def run_test(args): # pylint: disable=too-many-branches,too-many-statements
logging.info("Issuing the terminate request")
for num in range(num_targets):
full_target = target + "-{0}".format(num)
terminateReplica(masterHost, namespace, full_target)
terminate_replica(masterHost, namespace, full_target)

# TODO(richardsliu):
# There are lots of verifications in this file, consider refactoring them.
if args.verify_runconfig:
num_ps = results.get("spec", {}).get("tfReplicaSpecs", {}).get(
"PS", {}).get("replicas", 0)
num_workers = results.get("spec", {}).get("tfReplicaSpecs", {}).get(
"Worker", {}).get("replicas", 0)
verify_runconfig(masterHost, namespace, name, "chief", num_ps, num_workers)
verify_runconfig(masterHost, namespace, name, "worker", num_ps, num_workers)
verify_runconfig(masterHost, namespace, name, "ps", num_ps, num_workers)

# Terminate the chief worker to complete the job.
terminate_replica(masterHost, namespace, "{name}-chief-0".format(name=name))

logging.info("Waiting for job to finish.")
results = tf_job_client.wait_for_job(
Expand Down Expand Up @@ -679,6 +739,12 @@ def add_common_args(parser):
type=str,
help="(Optional) the clean pod policy (None, Running, or All).")

parser.add_argument(
"--verify_runconfig",
dest="verify_runconfig",
action="store_true",
help="(Optional) verify runconfig in each replica.")


def build_parser():
# create the top-level parser
Expand Down
34 changes: 34 additions & 0 deletions py/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import re
import requests
import subprocess
import tempfile
import time
Expand Down Expand Up @@ -100,6 +101,39 @@ def run_and_output(command, cwd=None, env=None):
return output


def send_request(master_host, namespace, target, rpc, params):
"""Issue a request to the Kubernetes master.
Args:
master_host: The IP address of the master e.g. https://35.188.37.10
namespace: The namespace
target: The K8s service corresponding to the pod.
rpc: Which rpc to call.
params: What parameters to send in the request.
"""
token = subprocess.check_output(["gcloud", "auth", "print-access-token"])
headers = {
"Authorization": "Bearer " + token.strip(),
}
url = ("{master}/api/v1/namespaces/{namespace}/services/{service}:2222"
"/proxy/{rpc}").format(
master=master_host, namespace=namespace, service=target, rpc=rpc)
r = requests.get(url,
headers=headers, params=params,
verify=False)

if r.status_code == requests.codes.NOT_FOUND:
logging.info("Request to %s returned 404", url)
return ""
if r.status_code != requests.codes.OK:
msg = "Request to {0} exited with status code: {1}".format(url,
r.status_code)
logging.error(msg)
raise RuntimeError(msg)

logging.info("URL %s returned; %s", url, r.content)
return r.content


def clone_repo(dest,
repo_owner=MASTER_REPO_OWNER,
repo_name=MASTER_REPO_NAME,
Expand Down
4 changes: 2 additions & 2 deletions test/test-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
FROM python:3.6.5-slim
MAINTAINER kubeflow-team

RUN pip install flask requests
RUN pip install flask requests tensorflow
RUN mkdir /opt/kubeflow
COPY test_app.py /opt/kubeflow
RUN chmod a+x /opt/kubeflow

ENTRYPOINT ["python", "/opt/kubeflow/test_app.py"]
ENTRYPOINT ["python", "/opt/kubeflow/test_app.py"]
19 changes: 18 additions & 1 deletion test/test-server/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
of various processes so that we can test the controller semantics.
"""
import argparse
import json
import logging
import os
import sys

from flask import Flask, request
from tensorflow.python.estimator import run_config as run_config_lib

APP = Flask(__name__)

Expand All @@ -24,9 +26,24 @@ def index():

@APP.route("/tfconfig", methods=['GET'])
def tf_config():
# Exit with the provided exit code
return os.environ.get("TF_CONFIG", "")

@APP.route("/runconfig", methods=['GET'])
def run_config():
config = run_config_lib.RunConfig()
if config:
config_dict = {
'master': config.master,
'task_id': config.task_id,
'num_ps_replicas': config.num_ps_replicas,
'num_worker_replicas': config.num_worker_replicas,
'cluster_spec': config.cluster_spec.as_dict(),
'task_type': config.task_type,
'is_chief': config.is_chief,
}
return json.dumps(config_dict)
return ""

@APP.route("/exit", methods=['GET'])
def exitHandler():
# Exit with the provided exit code
Expand Down
66 changes: 66 additions & 0 deletions test/workflows/components/estimator_runconfig.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Tests that each replica has a correctly configured TF RunConfig object.
// Each replica runs a tf-operator-test-server, so a manual exit on the chief
// worker is required for the job to end successfully.
local params = std.extVar("__ksonnet/params").components.estimator_runconfig;

local k = import "k.libsonnet";

local parts(namespace, name, image) = {
job:: {
apiVersion: "kubeflow.org/v1alpha2",
kind: "TFJob",
metadata: {
name: name,
namespace: namespace,
},
spec: {
cleanPodPolicy: "All",
tfReplicaSpecs: {
Chief: {
replicas: 1,
restartPolicy: "Never",
template: {
spec: {
containers: [
{
name: "tensorflow",
image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:v20180904-7d89548b",
},
],
},
},
},
PS: {
replicas: 2,
restartPolicy: "Never",
template: {
spec: {
containers: [
{
name: "tensorflow",
image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:v20180904-7d89548b",
},
],
},
},
},
Worker: {
replicas: 2,
restartPolicy: "Never",
template: {
spec: {
containers: [
{
name: "tensorflow",
image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:v20180904-7d89548b",
},
],
},
},
},
},
},
},
};

std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job]))
5 changes: 5 additions & 0 deletions test/workflows/components/params.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
namespace: "kubeflow-test-infra",
image: "",
},
estimator_runconfig: {
name: "estimator_runconfig",
namespace: "kubeflow-test-infra",
image: "",
},
"invalid-tfjob": {
name: "invalid-tfjob",
},
Expand Down
24 changes: 24 additions & 0 deletions test/workflows/components/workflows.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@
template: "run-gpu-tests",
dependencies: ["setup-kubeflow"],
},
// TODO(richardsliu): Clean up the v1alph1 e2etests.
if params.tfJobVersion == "v1alpha2" then
{
name: "run-clean-pod-all",
Expand All @@ -285,6 +286,14 @@
}
else
{},
if params.tfJobVersion == "v1alpha2" then
{
name: "estimator-runconfig",
template: "estimator-runconfig",
dependencies: ["setup-kubeflow"],
}
else
{},
if params.tfJobVersion == "v1alpha2" then
{
name: "invalid-tfjob",
Expand Down Expand Up @@ -494,6 +503,21 @@
"--verify_clean_pod_policy=None",
"--junit_path=" + artifactsDir + "/junit_clean-pod-none-tests.xml",
]), // run clean_pod_none
$.parts(namespace, name).e2e(prow_env, bucket).buildTemplate("estimator-runconfig", [
"python",
"-m",
"py.test_runner",
"test",
"--cluster=" + cluster,
"--zone=" + zone,
"--project=" + project,
"--app_dir=" + srcDir + "/test/workflows",
"--component=estimator_runconfig",
"--params=name=estimator-runconfig,namespace=default",
"--tfjob_version=" + params.tfJobVersion,
"--verify_runconfig",
"--junit_path=" + artifactsDir + "/junit_estimator-runconfig-tests.xml",
]), // run estimator_runconfig
$.parts(namespace, name).e2e(prow_env, bucket).buildTemplate("invalid-tfjob", [
"python",
"-m",
Expand Down

0 comments on commit 6af5924

Please sign in to comment.