diff --git a/.gitignore b/.gitignore index 15fde1ab..338d5f73 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .cache vendor *~ +*.pyc diff --git a/doc/autoscale/experiment/autoscale_experiment.md b/doc/autoscale/experiment/autoscale_experiment.md index 9b85b4cb..1651a989 100644 --- a/doc/autoscale/experiment/autoscale_experiment.md +++ b/doc/autoscale/experiment/autoscale_experiment.md @@ -40,8 +40,8 @@ metrics | auto-scaling training job| general training job -- | -- | -- -training time | 6h | 8h -average waiting time | 0 | 2h +average running time | 6h | 8h +average pending time | 0 | 2h CPU utils | 100% | 60% ### Hybrid Deployment with Online Serving and Offline Training Job @@ -60,4 +60,37 @@ metrics | QPS(1w) | QPS(10w) | QPS(50w) -- | -- | -- | -- Trainer Pods | 100 | 80 | 50 Nginx Pods | 80 | 100 | 150 -CPU utils| 100% | 100% | 100% \ No newline at end of file +CPU utils| 100% | 100% | 100% + +## Reproduce the experiment + +- Configure kubectl on your host +- Submit the TrainingJob controller with YAML file + ```bash + > git clone https://github.com/PaddlePaddle/cloud.git && cd cloud + > kubectl create -f k8s/controller/trainingjob_resource.yaml + > kubectl create -f k8s/controller/controller.yaml + ``` +- Test Case1 + 1. Run the data collecting Python program. + ```bash + > cd cloud/doc/autoscale/experiment/python + > python main.py case1 mnist1,mnist2 + ``` + 1. Submit two general jobs naming mnist1 and mnist2 as following, + maybe you would adust the resource configuration as your cluster. + ```bash + > cd cloud/demo + > paddlectl submit mnist1 + > paddlecloud submit -jobname mnist1 \ + -cpu 8 \ + -gpu 0 \ + -memory 8Gi \ + -parallelism 40 \ + -pscpu 4 \ + -pservers 8 \ + -psmemory 1Gi \ + -entry "python ./train.py train" \ + ./recognize_digits + ``` + 1. You will se the time series data in the terminal diff --git a/doc/autoscale/experiment/python/collector.py b/doc/autoscale/experiment/python/collector.py new file mode 100644 index 00000000..332612bd --- /dev/null +++ b/doc/autoscale/experiment/python/collector.py @@ -0,0 +1,143 @@ +from kubernetes import client, config +from kubernetes.client.rest import ApiException +from pprint import pprint +import time + +JOB_STATUS_NOT_EXISTS = 0 +JOB_STATUS_PENDING = 1 +JOB_STATUS_RUNNING = 2 +JOB_STATUS_FINISHED = 3 + +class JobInfo(object): + def __init__(self, name): + self.name = name + self.started = False + self.status = JOB_STATUS_NOT_EXISTS + self.submit_time = 0 + self.start_time = 0 + self.end_time = 0 + self.parallelism = 0 + + def status_str(self): + if self.status == JOB_STATUS_FINISHED: + return 'FINISH' + elif self.status == JOB_STATUS_PENDING: + return 'PENDING' + elif self.status == JOB_STATUS_NOT_EXISTS: + return 'WAITING' + elif self.status == JOB_STATUS_RUNNING: + return 'RUNNING' + + +class Collector(object): + ''' + Collector monitor data from Kubernetes API + ''' + def __init__(self): + # TODO(Yancey1989): + # init kubernetes configuration + # from settings.py + config.load_kube_config() + self.cpu_allocatable = 0 + self.gpu_allocatable = 0 + self.cpu_requests = 0 + self.gpu_requests = 0 + + # Collect cluster wide resource + self._init_allocatable() + + self._pods = [] + + def _init_allocatable(self): + api_instance = client.CoreV1Api() + try: + api_response = api_instance.list_node() + cpu = 0 + gpu = 0 + for item in api_response.items: + allocate = item.status.allocatable + cpu += int(allocate.get('cpu', 0)) + gpu += int(allocate.get('gpu', 0)) + self.cpu_allocatable = cpu + self.gpu_allocatable = gpu + except ApiException as e: + print("Exception when calling CoreV1Api->list_node: %s\n" % e) + + def _real_cpu(self, cpu): + if cpu: + if cpu.endswith('m'): + return 0.001 * int(cpu[:-1]) + else: + return int(cpu) + return 0 + + + def run_once(self): + api_instance = client.CoreV1Api() + try: + api_response = api_instance.list_pod_for_all_namespaces(pretty=True) + self._pods = api_response.items + except ApiException as e: + print("Exception when calling CoreV1Api->list_pod_for_all_namespaces: %s\n" % e) + return int(time.time()) + + def cpu_utils(self): + cpu = 0 + for item in self._pods: + if item.status.phase != 'Running': + continue + for container in item.spec.containers: + requests = container.resources.requests + if requests: + cpu += self._real_cpu(requests.get('cpu', None)) + + return '%0.2f%%' % ((100.0 * cpu) / self.cpu_allocatable) + + def gpu_utils(self): + gpu = 0 + for item in self._pods: + if item.status.phase != 'Running': + continue + for container in item.spec.containers: + limits = container.resources.limits + if limits: + gpu += int(limits.get('alpha.kubernetes.io/nvidia-gpu',0)) + if not self.gpu_allocatable: + return '0' + return '%0.2f%%' % ((100.0 * gpu) / self.gpu_allocatable) + + def update_job(self, job, times): + phases = set() + parallelism = 0 + for item in self._pods: + if item.metadata.labels: + for k, v in item.metadata.labels.items(): + # All PaddleCloud jobs has the label key: paddle-job-* + if k == 'paddle-job' and v == job.name: + parallelism += 1 + if not job.submit_time: + job.submit_time = times + phases.add(item.status.phase) + + job.parallelism = parallelism + if phases and not job.submit_time: + job.submit_time = times + + if len(phases) == 0: + # The job has not been submited + return + elif len(phases) == 1 and 'Running' in phases: + #elif 'Running' in phases: + # If all pods is Running phase, the job is running + # TODO(Yancey1989): If fault-tolerant job, only need + # one of the Pod running + if not job.start_time: + job.start_time = times + job.status = JOB_STATUS_RUNNING + elif ('Failed' in phases or \ + (len(phases) == 1 and 'Succeeded' in phases)) and \ + job.end_time == 0: + job.end_time = times + job.status = JOB_STATUS_FINISHED + elif 'Pending' in phases: + job.status = JOB_STATUS_PENDING \ No newline at end of file diff --git a/doc/autoscale/experiment/python/main.py b/doc/autoscale/experiment/python/main.py new file mode 100644 index 00000000..4e63a554 --- /dev/null +++ b/doc/autoscale/experiment/python/main.py @@ -0,0 +1,39 @@ +import time +import settings +import collector +import sys +import utils + +def case1(c, jobs): + print 'Times\tName\tStatus\tCPU\tGPU\tPARALLELISM' + times = 0 + while True: + c.run_once() + for job in jobs: + c.update_job(job, times) + + print '%d\t%s\t%s\t%s\t%s\t%d' % (times,\ + job.name, job.status_str(), c.cpu_utils(), c.gpu_utils(), job.parallelism) + + if utils.is_all_jobs_finished(jobs): + print 'Average running time:', utils.avg_running_time(jobs) + print 'Average pending time:', utils.avg_pending_time(jobs) + print 'Cluster wide CPU:', c.cpu_allocatable + print 'Cluster wide GPU:', c.gpu_allocatable + for job in jobs: + print '%s runnint time:' % job.name, (job.end_time - job.start_time) + sys.exit(0) + + # TODO(Yancey1989): draw the figure with Ploter + + time.sleep(settings.COLLECTION_INTERVAL) + times += settings.COLLECTION_INTERVAL + +if __name__=="__main__": + if len(sys.argv) != 3: + print 'Usage python main.py [case1|case2] [jobname1,jobname2]' + exit(0) + + c = collector.Collector() + if sys.argv[1] == 'case1': + case1(c, [collector.JobInfo(name) for name in sys.argv[2].split(',')]) \ No newline at end of file diff --git a/doc/autoscale/experiment/python/ploter.py b/doc/autoscale/experiment/python/ploter.py new file mode 100644 index 00000000..87a32cb9 --- /dev/null +++ b/doc/autoscale/experiment/python/ploter.py @@ -0,0 +1,15 @@ +import matplotlib.pyplot as plt + +class Ploter(object): + ''' + Plot graph + ''' + def __init__(self): + self.plt = plt + + def plot(self): + pass + +if __name__ == '__main__': + p = Ploter() + p.plot() \ No newline at end of file diff --git a/doc/autoscale/experiment/python/settings.py b/doc/autoscale/experiment/python/settings.py new file mode 100644 index 00000000..1dcd18d5 --- /dev/null +++ b/doc/autoscale/experiment/python/settings.py @@ -0,0 +1,4 @@ +# Kubernetes API server address with insecuret +MASTER_ADDR='http://localhost:8080' +# Collection time interval, unit is second +COLLECTION_INTERVAL=5 \ No newline at end of file diff --git a/doc/autoscale/experiment/python/utils.py b/doc/autoscale/experiment/python/utils.py new file mode 100644 index 00000000..07a18698 --- /dev/null +++ b/doc/autoscale/experiment/python/utils.py @@ -0,0 +1,19 @@ +import collector + +def is_all_jobs_finished(jobs): + for job in jobs: + if job.status != collector.JOB_STATUS_FINISHED: + return False + return True + +def avg_running_time(jobs): + sum = 0 + for job in jobs: + sum += job.end_time - job.start_time + return sum / len(jobs) + +def avg_pending_time(jobs): + sum = 0 + for job in jobs: + sum += job.start_time - job.submit_time + return sum * 1.0 / len(jobs) \ No newline at end of file