From 04b25054d5eb2e1c28cd09487d99268852344c2e Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 31 May 2017 11:23:04 +0800 Subject: [PATCH 1/3] use client submit job --- paddlecloud/paddlecloud/settings.py | 7 ------ paddlecloud/paddlejob/views.py | 37 ++++++++++++++++------------- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/paddlecloud/paddlecloud/settings.py b/paddlecloud/paddlecloud/settings.py index 20f2baef..a7728bd4 100644 --- a/paddlecloud/paddlecloud/settings.py +++ b/paddlecloud/paddlecloud/settings.py @@ -251,13 +251,6 @@ PADDLE_BOOK_IMAGE="yancey1989/book-cloud" PADDLE_BOOK_PORT=8888 -if os.getenv("KUBERNETES_SERVICE_HOST", None): - # init kubernete client with service account - config.load_incluster_config() -else: - # init kubernetes client with ~/.kube/config file - config.load_kube_config() - # ============== Datacenter Storage Config Samples ============== #if Paddle cloud use CephFS as backend storage, configure CEPHFS_CONFIGURATION #the following is an example: diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 35ef2c4e..94bebaa8 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -23,7 +23,8 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - job_list = client.BatchV1Api().list_namespaced_job(namespace) + user_client = notebook.utils.get_user_api_client(username) + job_list = user_client.BatchV1Api().list_namespaced_job(namespace) return Response(job_list.to_dict()) def post(self, request, format=None): @@ -101,8 +102,9 @@ def post(self, request, format=None): registry_secret = registry_secret, volumes = volumes ) + user_client = notebook.utils.get_user_api_client(username) try: - ret = client.ExtensionsV1beta1Api().create_namespaced_replica_set( + ret = user_client.ExtensionsV1beta1Api().create_namespaced_replica_set( namespace, paddle_job.new_pserver_job(), pretty=True) @@ -112,7 +114,7 @@ def post(self, request, format=None): #submit trainer job, it's Kubernetes Job try: - ret = client.BatchV1Api().create_namespaced_job( + ret = user_client.BatchV1Api().create_namespaced_job( namespace, paddle_job.new_trainer_job(), pretty=True) @@ -137,14 +139,14 @@ def delete(self, request, format=None): # delete job trainer_name = jobname + "-trainer" try: - u_status = client.BatchV1Api().delete_namespaced_job(trainer_name, namespace, {}) + u_status = user_client.BatchV1Api().delete_namespaced_job(trainer_name, namespace, {}) except ApiException, e: logging.error("error deleting job: %s, %s", jobname, str(e)) delete_status.append(str(e)) # delete job pods try: - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) + job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) for i in job_pod_list.items: u_status = client.CoreV1Api().delete_namespaced_pod(i.metadata.name, namespace, {}) except ApiException, e: @@ -154,7 +156,7 @@ def delete(self, request, format=None): # delete pserver rs pserver_name = jobname + "-pserver" try: - u_status = client.ExtensionsV1beta1Api().delete_namespaced_replica_set(pserver_name, namespace, {}) + u_status = user_client.ExtensionsV1beta1Api().delete_namespaced_replica_set(pserver_name, namespace, {}) except ApiException, e: logging.error("error deleting pserver: %s" % str(e)) delete_status.append(str(e)) @@ -162,7 +164,7 @@ def delete(self, request, format=None): # delete pserver pods try: # pserver replica set has label with jobname - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job-pserver=%s"%jobname) + job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job-pserver=%s"%jobname) for i in job_pod_list.items: u_status = client.CoreV1Api().delete_namespaced_pod(i.metadata.name, namespace, {}) except ApiException, e: @@ -184,25 +186,25 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - + user_client = notebook.utils.get_user_api_client(username) jobname = request.query_params.get("jobname") num_lines = request.query_params.get("n") worker = request.query_params.get("w") - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) + job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) total_job_log = "" if not worker: for i in job_pod_list.items: total_job_log = "".join((total_job_log, "==========================%s==========================" % i.metadata.name)) if num_lines: - pod_log = client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace, tail_lines=int(num_lines)) + pod_log = user_client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace, tail_lines=int(num_lines)) else: - pod_log = client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace) + pod_log = user_client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace) total_job_log = "\n".join((total_job_log, pod_log)) else: if num_lines: - pod_log = client.CoreV1Api().read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines)) + pod_log = user_client.CoreV1Api().read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines)) else: - pod_log = client.CoreV1Api().read_namespaced_pod_log(worker, namespace) + pod_log = user_client.CoreV1Api().read_namespaced_pod_log(worker, namespace) total_job_log = pod_log return utils.simple_response(200, total_job_log) @@ -217,11 +219,12 @@ def get(self, request, format=None): namespace = notebook.utils.email_escape(username) jobname = request.query_params.get("jobname") job_pod_list = None + user_client = notebook.utils.get_user_api_client(username) if not jobname: - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace) + job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace) else: selector = "paddle-job=%s"%jobname - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector=selector) + job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace, label_selector=selector) return Response(job_pod_list.to_dict()) class QuotaView(APIView): @@ -233,6 +236,6 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - - quota_list = client.CoreV1Api().list_namespaced_resource_quota(namespace) + user_client = notebook.utils.get_user_api_client(username) + quota_list = user_client.CoreV1Api().list_namespaced_resource_quota(namespace) return Response(quota_list.to_dict()) From eba49f7aeeef7c0ed351119dd9d8cf495c795901 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 31 May 2017 14:25:29 +0800 Subject: [PATCH 2/3] update --- paddlecloud/paddlejob/views.py | 62 ++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 94bebaa8..e4909e5d 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -23,8 +23,8 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - user_client = notebook.utils.get_user_api_client(username) - job_list = user_client.BatchV1Api().list_namespaced_job(namespace) + api_instance = client.BatchV1Api(api_client=notebook.utils.get_user_api_client(username)) + job_list = api_instance.list_namespaced_job(namespace) return Response(job_list.to_dict()) def post(self, request, format=None): @@ -37,6 +37,7 @@ def post(self, request, format=None): obj = json.loads(request.body) topology = obj.get("topology", "") entry = obj.get("entry", "") + api_client = notebook.utils.get_user_api_client(username) if not topology and not entry: return utils.simple_response(500, "no topology or entry specified") if not obj.get("datacenter"): @@ -102,9 +103,8 @@ def post(self, request, format=None): registry_secret = registry_secret, volumes = volumes ) - user_client = notebook.utils.get_user_api_client(username) try: - ret = user_client.ExtensionsV1beta1Api().create_namespaced_replica_set( + ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( namespace, paddle_job.new_pserver_job(), pretty=True) @@ -114,7 +114,7 @@ def post(self, request, format=None): #submit trainer job, it's Kubernetes Job try: - ret = user_client.BatchV1Api().create_namespaced_job( + ret = client.BatchV1Api(api_client=api_client).create_namespaced_job( namespace, paddle_job.new_trainer_job(), pretty=True) @@ -131,6 +131,7 @@ def delete(self, request, format=None): namespace = notebook.utils.email_escape(username) obj = json.loads(request.body) jobname = obj.get("jobname") + api_client = notebook.utils.get_user_api_client(username) if not jobname: return utils.simple_response(500, "must specify jobname") # FIXME: options needed: grace_period_seconds, orphan_dependents, preconditions @@ -139,16 +140,20 @@ def delete(self, request, format=None): # delete job trainer_name = jobname + "-trainer" try: - u_status = user_client.BatchV1Api().delete_namespaced_job(trainer_name, namespace, {}) + u_status = client.BatchV1Api(api_client=api_client) + .delete_namespaced_job(trainer_name, namespace, {}) except ApiException, e: logging.error("error deleting job: %s, %s", jobname, str(e)) delete_status.append(str(e)) # delete job pods try: - job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) + job_pod_list = client.CoreV1Api(api_client=api_client) + .list_namespaced_pod(namespace, + label_selector="paddle-job=%s"%jobname) for i in job_pod_list.items: - u_status = client.CoreV1Api().delete_namespaced_pod(i.metadata.name, namespace, {}) + u_status = client.CoreV1Api(api_client=api_client).delete_namespaced_pod( + i.metadata.name, namespace, {}) except ApiException, e: logging.error("error deleting job pod: %s", str(e)) delete_status.append(str(e)) @@ -156,7 +161,8 @@ def delete(self, request, format=None): # delete pserver rs pserver_name = jobname + "-pserver" try: - u_status = user_client.ExtensionsV1beta1Api().delete_namespaced_replica_set(pserver_name, namespace, {}) + u_status = client.ExtensionsV1beta1Api(api_client=api_client) + .delete_namespaced_replica_set(pserver_name, namespace, {}) except ApiException, e: logging.error("error deleting pserver: %s" % str(e)) delete_status.append(str(e)) @@ -164,9 +170,12 @@ def delete(self, request, format=None): # delete pserver pods try: # pserver replica set has label with jobname - job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job-pserver=%s"%jobname) + job_pod_list = client.CoreV1Api(api_client=api_client) + .list_namespaced_pod(namespace, + label_selector="paddle-job-pserver=%s"%jobname) for i in job_pod_list.items: - u_status = client.CoreV1Api().delete_namespaced_pod(i.metadata.name, namespace, {}) + u_status = client.CoreV1Api(api_client=api_client) + .delete_namespaced_pod(i.metadata.name, namespace, {}) except ApiException, e: logging.error("error deleting pserver pods: %s" % str(e)) delete_status.append(str(e)) @@ -186,25 +195,31 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - user_client = notebook.utils.get_user_api_client(username) + api_client = notebook.utils.get_user_api_client(username) jobname = request.query_params.get("jobname") num_lines = request.query_params.get("n") worker = request.query_params.get("w") - job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) + job_pod_list = client.CoreV1Api(api_client=api_client) + .list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) total_job_log = "" if not worker: for i in job_pod_list.items: total_job_log = "".join((total_job_log, "==========================%s==========================" % i.metadata.name)) if num_lines: - pod_log = user_client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace, tail_lines=int(num_lines)) + pod_log = client.CoreV1Api(api_client=api_client) + .read_namespaced_pod_log( + i.metadata.name, namespace, tail_lines=int(num_lines)) else: - pod_log = user_client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace) + pod_log = client.CoreV1Api(api_client=api_client) + .read_namespaced_pod_log(i.metadata.name, namespace) total_job_log = "\n".join((total_job_log, pod_log)) else: if num_lines: - pod_log = user_client.CoreV1Api().read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines)) + pod_log = client.CoreV1Api(api_client=api_client) + .read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines)) else: - pod_log = user_client.CoreV1Api().read_namespaced_pod_log(worker, namespace) + pod_log = client.CoreV1Api(api_client=api_client) + .read_namespaced_pod_log(worker, namespace) total_job_log = pod_log return utils.simple_response(200, total_job_log) @@ -219,12 +234,14 @@ def get(self, request, format=None): namespace = notebook.utils.email_escape(username) jobname = request.query_params.get("jobname") job_pod_list = None - user_client = notebook.utils.get_user_api_client(username) + api_client = notebook.utils.get_user_api_client(username) if not jobname: - job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace) + job_pod_list = client.CoreV1Api(api_client=api_client) + .list_namespaced_pod(namespace) else: selector = "paddle-job=%s"%jobname - job_pod_list = user_client.CoreV1Api().list_namespaced_pod(namespace, label_selector=selector) + job_pod_list = client.CoreV1Api(api_client=api_client) + .list_namespaced_pod(namespace, label_selector=selector) return Response(job_pod_list.to_dict()) class QuotaView(APIView): @@ -236,6 +253,7 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - user_client = notebook.utils.get_user_api_client(username) - quota_list = user_client.CoreV1Api().list_namespaced_resource_quota(namespace) + api_client = notebook.utils.get_user_api_client(username) + quota_list = api_client.CoreV1Api(api_cilent=api_client) + .list_namespaced_resource_quota(namespace) return Response(quota_list.to_dict()) From 89dc9d10235484d6cf57aeca750e9ca5a52983ec Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 31 May 2017 14:51:00 +0800 Subject: [PATCH 3/3] update --- paddlecloud/paddlejob/views.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index e4909e5d..d5964bbf 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -140,7 +140,7 @@ def delete(self, request, format=None): # delete job trainer_name = jobname + "-trainer" try: - u_status = client.BatchV1Api(api_client=api_client) + u_status = client.BatchV1Api(api_client=api_client)\ .delete_namespaced_job(trainer_name, namespace, {}) except ApiException, e: logging.error("error deleting job: %s, %s", jobname, str(e)) @@ -148,12 +148,12 @@ def delete(self, request, format=None): # delete job pods try: - job_pod_list = client.CoreV1Api(api_client=api_client) + job_pod_list = client.CoreV1Api(api_client=api_client)\ .list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) for i in job_pod_list.items: - u_status = client.CoreV1Api(api_client=api_client).delete_namespaced_pod( - i.metadata.name, namespace, {}) + u_status = client.CoreV1Api(api_client=api_client)\ + .delete_namespaced_pod(i.metadata.name, namespace, {}) except ApiException, e: logging.error("error deleting job pod: %s", str(e)) delete_status.append(str(e)) @@ -161,7 +161,7 @@ def delete(self, request, format=None): # delete pserver rs pserver_name = jobname + "-pserver" try: - u_status = client.ExtensionsV1beta1Api(api_client=api_client) + u_status = client.ExtensionsV1beta1Api(api_client=api_client)\ .delete_namespaced_replica_set(pserver_name, namespace, {}) except ApiException, e: logging.error("error deleting pserver: %s" % str(e)) @@ -170,11 +170,11 @@ def delete(self, request, format=None): # delete pserver pods try: # pserver replica set has label with jobname - job_pod_list = client.CoreV1Api(api_client=api_client) + job_pod_list = client.CoreV1Api(api_client=api_client)\ .list_namespaced_pod(namespace, label_selector="paddle-job-pserver=%s"%jobname) for i in job_pod_list.items: - u_status = client.CoreV1Api(api_client=api_client) + u_status = client.CoreV1Api(api_client=api_client)\ .delete_namespaced_pod(i.metadata.name, namespace, {}) except ApiException, e: logging.error("error deleting pserver pods: %s" % str(e)) @@ -199,26 +199,26 @@ def get(self, request, format=None): jobname = request.query_params.get("jobname") num_lines = request.query_params.get("n") worker = request.query_params.get("w") - job_pod_list = client.CoreV1Api(api_client=api_client) + job_pod_list = client.CoreV1Api(api_client=api_client)\ .list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) total_job_log = "" if not worker: for i in job_pod_list.items: total_job_log = "".join((total_job_log, "==========================%s==========================" % i.metadata.name)) if num_lines: - pod_log = client.CoreV1Api(api_client=api_client) + pod_log = client.CoreV1Api(api_client=api_client)\ .read_namespaced_pod_log( i.metadata.name, namespace, tail_lines=int(num_lines)) else: - pod_log = client.CoreV1Api(api_client=api_client) + pod_log = client.CoreV1Api(api_client=api_client)\ .read_namespaced_pod_log(i.metadata.name, namespace) total_job_log = "\n".join((total_job_log, pod_log)) else: if num_lines: - pod_log = client.CoreV1Api(api_client=api_client) + pod_log = client.CoreV1Api(api_client=api_client)\ .read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines)) else: - pod_log = client.CoreV1Api(api_client=api_client) + pod_log = client.CoreV1Api(api_client=api_client)\ .read_namespaced_pod_log(worker, namespace) total_job_log = pod_log return utils.simple_response(200, total_job_log) @@ -236,11 +236,11 @@ def get(self, request, format=None): job_pod_list = None api_client = notebook.utils.get_user_api_client(username) if not jobname: - job_pod_list = client.CoreV1Api(api_client=api_client) + job_pod_list = client.CoreV1Api(api_client=api_client)\ .list_namespaced_pod(namespace) else: selector = "paddle-job=%s"%jobname - job_pod_list = client.CoreV1Api(api_client=api_client) + job_pod_list = client.CoreV1Api(api_client=api_client)\ .list_namespaced_pod(namespace, label_selector=selector) return Response(job_pod_list.to_dict()) @@ -254,6 +254,6 @@ def get(self, request, format=None): username = request.user.username namespace = notebook.utils.email_escape(username) api_client = notebook.utils.get_user_api_client(username) - quota_list = api_client.CoreV1Api(api_cilent=api_client) + quota_list = api_client.CoreV1Api(api_cilent=api_client)\ .list_namespaced_resource_quota(namespace) return Response(quota_list.to_dict())