diff --git a/paddlecloud/paddlecloud/settings.py b/paddlecloud/paddlecloud/settings.py index 20f2baef..a7728bd4 100644 --- a/paddlecloud/paddlecloud/settings.py +++ b/paddlecloud/paddlecloud/settings.py @@ -251,13 +251,6 @@ PADDLE_BOOK_IMAGE="yancey1989/book-cloud" PADDLE_BOOK_PORT=8888 -if os.getenv("KUBERNETES_SERVICE_HOST", None): - # init kubernete client with service account - config.load_incluster_config() -else: - # init kubernetes client with ~/.kube/config file - config.load_kube_config() - # ============== Datacenter Storage Config Samples ============== #if Paddle cloud use CephFS as backend storage, configure CEPHFS_CONFIGURATION #the following is an example: diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 35ef2c4e..d5964bbf 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -23,7 +23,8 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - job_list = client.BatchV1Api().list_namespaced_job(namespace) + api_instance = client.BatchV1Api(api_client=notebook.utils.get_user_api_client(username)) + job_list = api_instance.list_namespaced_job(namespace) return Response(job_list.to_dict()) def post(self, request, format=None): @@ -36,6 +37,7 @@ def post(self, request, format=None): obj = json.loads(request.body) topology = obj.get("topology", "") entry = obj.get("entry", "") + api_client = notebook.utils.get_user_api_client(username) if not topology and not entry: return utils.simple_response(500, "no topology or entry specified") if not obj.get("datacenter"): @@ -102,7 +104,7 @@ def post(self, request, format=None): volumes = volumes ) try: - ret = client.ExtensionsV1beta1Api().create_namespaced_replica_set( + ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( namespace, paddle_job.new_pserver_job(), pretty=True) @@ -112,7 +114,7 @@ def post(self, request, format=None): #submit trainer job, it's Kubernetes Job try: - ret = client.BatchV1Api().create_namespaced_job( + ret = client.BatchV1Api(api_client=api_client).create_namespaced_job( namespace, paddle_job.new_trainer_job(), pretty=True) @@ -129,6 +131,7 @@ def delete(self, request, format=None): namespace = notebook.utils.email_escape(username) obj = json.loads(request.body) jobname = obj.get("jobname") + api_client = notebook.utils.get_user_api_client(username) if not jobname: return utils.simple_response(500, "must specify jobname") # FIXME: options needed: grace_period_seconds, orphan_dependents, preconditions @@ -137,16 +140,20 @@ def delete(self, request, format=None): # delete job trainer_name = jobname + "-trainer" try: - u_status = client.BatchV1Api().delete_namespaced_job(trainer_name, namespace, {}) + u_status = client.BatchV1Api(api_client=api_client)\ + .delete_namespaced_job(trainer_name, namespace, {}) except ApiException, e: logging.error("error deleting job: %s, %s", jobname, str(e)) delete_status.append(str(e)) # delete job pods try: - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, + label_selector="paddle-job=%s"%jobname) for i in job_pod_list.items: - u_status = client.CoreV1Api().delete_namespaced_pod(i.metadata.name, namespace, {}) + u_status = client.CoreV1Api(api_client=api_client)\ + .delete_namespaced_pod(i.metadata.name, namespace, {}) except ApiException, e: logging.error("error deleting job pod: %s", str(e)) delete_status.append(str(e)) @@ -154,7 +161,8 @@ def delete(self, request, format=None): # delete pserver rs pserver_name = jobname + "-pserver" try: - u_status = client.ExtensionsV1beta1Api().delete_namespaced_replica_set(pserver_name, namespace, {}) + u_status = client.ExtensionsV1beta1Api(api_client=api_client)\ + .delete_namespaced_replica_set(pserver_name, namespace, {}) except ApiException, e: logging.error("error deleting pserver: %s" % str(e)) delete_status.append(str(e)) @@ -162,9 +170,12 @@ def delete(self, request, format=None): # delete pserver pods try: # pserver replica set has label with jobname - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job-pserver=%s"%jobname) + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, + label_selector="paddle-job-pserver=%s"%jobname) for i in job_pod_list.items: - u_status = client.CoreV1Api().delete_namespaced_pod(i.metadata.name, namespace, {}) + u_status = client.CoreV1Api(api_client=api_client)\ + .delete_namespaced_pod(i.metadata.name, namespace, {}) except ApiException, e: logging.error("error deleting pserver pods: %s" % str(e)) delete_status.append(str(e)) @@ -184,25 +195,31 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - + api_client = notebook.utils.get_user_api_client(username) jobname = request.query_params.get("jobname") num_lines = request.query_params.get("n") worker = request.query_params.get("w") - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname) total_job_log = "" if not worker: for i in job_pod_list.items: total_job_log = "".join((total_job_log, "==========================%s==========================" % i.metadata.name)) if num_lines: - pod_log = client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace, tail_lines=int(num_lines)) + pod_log = client.CoreV1Api(api_client=api_client)\ + .read_namespaced_pod_log( + i.metadata.name, namespace, tail_lines=int(num_lines)) else: - pod_log = client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace) + pod_log = client.CoreV1Api(api_client=api_client)\ + .read_namespaced_pod_log(i.metadata.name, namespace) total_job_log = "\n".join((total_job_log, pod_log)) else: if num_lines: - pod_log = client.CoreV1Api().read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines)) + pod_log = client.CoreV1Api(api_client=api_client)\ + .read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines)) else: - pod_log = client.CoreV1Api().read_namespaced_pod_log(worker, namespace) + pod_log = client.CoreV1Api(api_client=api_client)\ + .read_namespaced_pod_log(worker, namespace) total_job_log = pod_log return utils.simple_response(200, total_job_log) @@ -217,11 +234,14 @@ def get(self, request, format=None): namespace = notebook.utils.email_escape(username) jobname = request.query_params.get("jobname") job_pod_list = None + api_client = notebook.utils.get_user_api_client(username) if not jobname: - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace) + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace) else: selector = "paddle-job=%s"%jobname - job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector=selector) + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, label_selector=selector) return Response(job_pod_list.to_dict()) class QuotaView(APIView): @@ -233,6 +253,7 @@ def get(self, request, format=None): """ username = request.user.username namespace = notebook.utils.email_escape(username) - - quota_list = client.CoreV1Api().list_namespaced_resource_quota(namespace) + api_client = notebook.utils.get_user_api_client(username) + quota_list = api_client.CoreV1Api(api_cilent=api_client)\ + .list_namespaced_resource_quota(namespace) return Response(quota_list.to_dict())