Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect time series data #428

Merged
merged 5 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
.cache
vendor
*~
*.pyc
39 changes: 36 additions & 3 deletions doc/autoscale/experiment/autoscale_experiment.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@

metrics | auto-scaling training job| general training job
-- | -- | --
training time | 6h | 8h
average waiting time | 0 | 2h
average running time | 6h | 8h
average pending time | 0 | 2h
CPU utils | 100% | 60%

### Hybrid Deployment with Online Serving and Offline Training Job
Expand All @@ -60,4 +60,37 @@ metrics | QPS(1w) | QPS(10w) | QPS(50w)
-- | -- | -- | --
Trainer Pods | 100 | 80 | 50
Nginx Pods | 80 | 100 | 150
CPU utils| 100% | 100% | 100%
CPU utils| 100% | 100% | 100%

## Reproduce the experiment

- Configure kubectl on your host
- Submit the TrainingJob controller with YAML file
```bash
> git clone https://github.com/PaddlePaddle/cloud.git && cd cloud
> kubectl create -f k8s/controller/trainingjob_resource.yaml
> kubectl create -f k8s/controller/controller.yaml
```
- Test Case1
1. Run the data collecting Python program.
```bash
> cd cloud/doc/autoscale/experiment/python
> python main.py case1 mnist1,mnist2
```
1. Submit two general jobs naming mnist1 and mnist2 as following,
maybe you would adust the resource configuration as your cluster.
```bash
> cd cloud/demo
> paddlectl submit mnist1
> paddlecloud submit -jobname mnist1 \
-cpu 8 \
-gpu 0 \
-memory 8Gi \
-parallelism 40 \
-pscpu 4 \
-pservers 8 \
-psmemory 1Gi \
-entry "python ./train.py train" \
./recognize_digits
```
1. You will se the time series data in the terminal
143 changes: 143 additions & 0 deletions doc/autoscale/experiment/python/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from pprint import pprint
import time

JOB_STATUS_NOT_EXISTS = 0
JOB_STATUS_PENDING = 1
JOB_STATUS_RUNNING = 2
JOB_STATUS_FINISHED = 3

class JobInfo(object):
def __init__(self, name):
self.name = name
self.started = False
self.status = JOB_STATUS_NOT_EXISTS
self.submit_time = 0
self.start_time = 0
self.end_time = 0
self.parallelism = 0

def status_str(self):
if self.status == JOB_STATUS_FINISHED:
return 'FINISH'
elif self.status == JOB_STATUS_PENDING:
return 'PENDING'
elif self.status == JOB_STATUS_NOT_EXISTS:
return 'WAITING'
elif self.status == JOB_STATUS_RUNNING:
return 'RUNNING'


class Collector(object):
'''
Collector monitor data from Kubernetes API
'''
def __init__(self):
# TODO(Yancey1989):
# init kubernetes configuration
# from settings.py
config.load_kube_config()
self.cpu_allocatable = 0
self.gpu_allocatable = 0
self.cpu_requests = 0
self.gpu_requests = 0

# Collect cluster wide resource
self._init_allocatable()

self._pods = []

def _init_allocatable(self):
api_instance = client.CoreV1Api()
try:
api_response = api_instance.list_node()
cpu = 0
gpu = 0
for item in api_response.items:
allocate = item.status.allocatable
cpu += int(allocate.get('cpu', 0))
gpu += int(allocate.get('gpu', 0))
self.cpu_allocatable = cpu
self.gpu_allocatable = gpu
except ApiException as e:
print("Exception when calling CoreV1Api->list_node: %s\n" % e)

def _real_cpu(self, cpu):
if cpu:
if cpu.endswith('m'):
return 0.001 * int(cpu[:-1])
else:
return int(cpu)
return 0


def run_once(self):
api_instance = client.CoreV1Api()
try:
api_response = api_instance.list_pod_for_all_namespaces(pretty=True)
self._pods = api_response.items
except ApiException as e:
print("Exception when calling CoreV1Api->list_pod_for_all_namespaces: %s\n" % e)
return int(time.time())

def cpu_utils(self):
cpu = 0
for item in self._pods:
if item.status.phase != 'Running':
continue
for container in item.spec.containers:
requests = container.resources.requests
if requests:
cpu += self._real_cpu(requests.get('cpu', None))

return '%0.2f%%' % ((100.0 * cpu) / self.cpu_allocatable)

def gpu_utils(self):
gpu = 0
for item in self._pods:
if item.status.phase != 'Running':
continue
for container in item.spec.containers:
limits = container.resources.limits
if limits:
gpu += int(limits.get('alpha.kubernetes.io/nvidia-gpu',0))
if not self.gpu_allocatable:
return '0'
return '%0.2f%%' % ((100.0 * gpu) / self.gpu_allocatable)

def update_job(self, job, times):
phases = set()
parallelism = 0
for item in self._pods:
if item.metadata.labels:
for k, v in item.metadata.labels.items():
# All PaddleCloud jobs has the label key: paddle-job-*
if k == 'paddle-job' and v == job.name:
parallelism += 1
if not job.submit_time:
job.submit_time = times
phases.add(item.status.phase)

job.parallelism = parallelism
if phases and not job.submit_time:
job.submit_time = times

if len(phases) == 0:
# The job has not been submited
return
elif len(phases) == 1 and 'Running' in phases:
#elif 'Running' in phases:
# If all pods is Running phase, the job is running
# TODO(Yancey1989): If fault-tolerant job, only need
# one of the Pod running
if not job.start_time:
job.start_time = times
job.status = JOB_STATUS_RUNNING
elif ('Failed' in phases or \
(len(phases) == 1 and 'Succeeded' in phases)) and \
job.end_time == 0:
job.end_time = times
job.status = JOB_STATUS_FINISHED
elif 'Pending' in phases:
job.status = JOB_STATUS_PENDING
39 changes: 39 additions & 0 deletions doc/autoscale/experiment/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import time
import settings
import collector
import sys
import utils

def case1(c, jobs):
print 'Times\tName\tStatus\tCPU\tGPU\tPARALLELISM'
times = 0
while True:
c.run_once()
for job in jobs:
c.update_job(job, times)

print '%d\t%s\t%s\t%s\t%s\t%d' % (times,\
job.name, job.status_str(), c.cpu_utils(), c.gpu_utils(), job.parallelism)

if utils.is_all_jobs_finished(jobs):
print 'Average running time:', utils.avg_running_time(jobs)
print 'Average pending time:', utils.avg_pending_time(jobs)
print 'Cluster wide CPU:', c.cpu_allocatable
print 'Cluster wide GPU:', c.gpu_allocatable
for job in jobs:
print '%s runnint time:' % job.name, (job.end_time - job.start_time)
sys.exit(0)

# TODO(Yancey1989): draw the figure with Ploter

time.sleep(settings.COLLECTION_INTERVAL)
times += settings.COLLECTION_INTERVAL

if __name__=="__main__":
if len(sys.argv) != 3:
print 'Usage python main.py [case1|case2] [jobname1,jobname2]'
exit(0)

c = collector.Collector()
if sys.argv[1] == 'case1':
case1(c, [collector.JobInfo(name) for name in sys.argv[2].split(',')])
15 changes: 15 additions & 0 deletions doc/autoscale/experiment/python/ploter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import matplotlib.pyplot as plt

class Ploter(object):
'''
Plot graph
'''
def __init__(self):
self.plt = plt

def plot(self):
pass

if __name__ == '__main__':
p = Ploter()
p.plot()
4 changes: 4 additions & 0 deletions doc/autoscale/experiment/python/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Kubernetes API server address with insecuret
MASTER_ADDR='http://localhost:8080'
# Collection time interval, unit is second
COLLECTION_INTERVAL=5
19 changes: 19 additions & 0 deletions doc/autoscale/experiment/python/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import collector

def is_all_jobs_finished(jobs):
for job in jobs:
if job.status != collector.JOB_STATUS_FINISHED:
return False
return True

def avg_running_time(jobs):
sum = 0
for job in jobs:
sum += job.end_time - job.start_time
return sum / len(jobs)

def avg_pending_time(jobs):
sum = 0
for job in jobs:
sum += job.start_time - job.submit_time
return sum * 1.0 / len(jobs)