Skip to content

Commit

Permalink
Merge pull request #51 from ufcg-lsd/cost_plugin_impl
Browse files Browse the repository at this point in the history
Implements cost plugin
  • Loading branch information
armstrongmsg authored Apr 30, 2020
2 parents 11a9376 + efd7026 commit acea99a
Show file tree
Hide file tree
Showing 10 changed files with 475 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ Endpoints are avaliable on [restapi-endpoints.md](docs/restapi-endpoints.md) doc

## Avaliable plugins
* [Kube Jobs](docs/plugins/kubejobs.md)
* [Kube Jobs_Cost](docs/plugins/kubejobs_cost.md)
125 changes: 125 additions & 0 deletions docs/plugins/kubejobs_cost.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# KubeJobs Cost Plugin

The Kubejobs Cost Plugin provides information on how a running application is performing regarding compliance to
a cost reference given by the user. The cost is calculated based on resources usage and resources price. The later is
acquired from a price information service, required for this plugin to run properly. Compliance to the cost reference is expressed as
a cost error, which is published on a Redis queue. This information is, then, consumed by other components and services.

## How does it work?

The following steps describe the basic flow of an execution using KubeJobs Cost:

* Step 1: The client sends a POST request to the Asperathos Manager with a JSON body describing the execution.
* Step 2: The Manager creates a Redis service in the cluster, enqueues the items described in the input file in Redis through the queue auxiliary service.
* Step 3: The application execution is triggered on the cluster.
* Step 4: The application running on Kubernetes cluster starts to consume items from the Redis storage.
* Step 5: The Monitor is triggered by the Manager when application starts to run.
* Step 6: The Monitor periodically gets CPU and memory prices from a price information service.
* Step 7: As soon as metrics are being published, the Manager starts the Controller, which consumes metrics from Redis to take decisions about scaling based on the predefined control logic.


## Configuration

In order to correctly configure the Monitor to execute the KubeJobs Cost plugin, modifications in the *monitor.cfg* file are necessary. Following you can check an example of a configuration file that can be used to execute the KubeJobs Cost plugin.

### Configuration file example:

```
[general]
host = 0.0.0.0
port = 5001
plugins = kubejobs
debug = True
retries = 5
[kubejobs]
# No variables are needed
```

## Execute plugin

In order to execute the plugin, a JSON needs to be correctly configured with all necessary variables that will be used by Asperathos components. Following you can check an example of this JSON file that will be used to sends a POST request to the Asperathos Manager.

### JSON file example:

```javascript
{
"plugin":"kubejobs",
"plugin_info":{
"username":"usr",
"password":"psswrd",
"cmd":[
[...]
],
"img":"img",
"init_size":1,
"redis_workload":"workload",
"config_id":"id",
"control_plugin":"kubejobs",
"control_parameters":{
[...]
},
"monitor_plugin":"kubejobs_cost",
"monitor_info":{
"cluster_info_url": "http://0.0.0.0:20000/price",
"desired_cost": 1
},
"enable_visualizer":true,
"visualizer_plugin":"k8s-grafana",
"visualizer_info":{
[...]
},
"env_vars":{
[...]
}
}
}
```

## Request example
`POST /monitoring/:id`

Request body:
```javascript
{
"plugin": "kubejobs_cost",
"plugin_info": {
"cluster_info_url": "http://0.0.0.0:20000/price",
"desired_cost": 1
}
}
```

### Request parameters

#### cluster_info_url
Url accessed by the monitor to retrieve information on CPU and memory prices. The plugin expects a
response with the format
```javascript
{
'cpu_price':<cpu_price>,
'memory_price':<memory_price>
}.
```

#### desired_cost
Reference cost used by the monitor to calculate cost error

## Requirements

### Cost service
The Kubejobs cost plugin requires access to a service which provides information on CPU and memory prices. An example of
implementation of such service is presented as follows.

```python
from flask import Flask
app = Flask(__name__)

@app.route('/price', methods=['GET'])
def price():
return {'cpu_price':1, 'memory_price':3}
```

### Metrics server
Installation of Metrics server (https://github.com/kubernetes-sigs/metrics-server) in the used kubernetes cluster is
required before using this plugin. Metrics server is used to retrieve application resources usage data.
171 changes: 171 additions & 0 deletions kubejobs_cost/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Copyright (c) 2020 UFCG-LSD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from kubejobs import KubeJobProgress
from monitor.utils.plugin import k8s
from monitor.utils.logger import Log
from monitor.utils.job_report.job_report import JobReport
from datetime import datetime

import requests
import json
import time

LOG_FILE = "progress.log"
LOG_NAME = "kubejobs-progress"


class KubeJobCost(KubeJobProgress):

def __init__(self, app_id, info_plugin):

KubeJobProgress.__init__(self, app_id, info_plugin,
collect_period=2, retries=20)
self.cluster_info_url = info_plugin.get('cluster_info_url')
self.desired_cost = info_plugin.get('desired_cost')
self.last_error = None
self.last_rep = None
self.last_cost = None
self.LOG = Log(LOG_NAME, LOG_FILE)
self.job_report = JobReport(info_plugin)

def monitoring_application(self):
try:
if self.report_flag:

self.calculate_error()
self.LOG.log("Calculated error")
timestamp = time.time() * 1000
err_manifest = \
self.get_application_cost_error_manifest(self.last_error,
timestamp)
self.LOG.log(err_manifest)
self.LOG.log("Publishing error")
self.rds.rpush(self.metric_queue,
str(err_manifest))

self.LOG.log("Getting replicas")
replicas_manifest = \
self.get_parallelism_manifest(self.last_replicas,
timestamp)
self.LOG.log(replicas_manifest)

reference_manifest = self.get_reference_manifest(timestamp)

self.LOG.log("Getting cost")
current_cost_manifest = \
self.get_current_cost_manifest(timestamp)
self.LOG.log(current_cost_manifest)

self.publish_persistent_measurement(err_manifest,
reference_manifest,
current_cost_manifest,
replicas_manifest)

self.report_job(timestamp)

except Exception as ex:
self.LOG.log(ex)

def report_job(self, timestamp):
if self.report_flag:
self.LOG.log("report_flag-cost")
self.job_report.set_start_timestamp(timestamp)
current_time = datetime.fromtimestamp(timestamp/1000)\
.strftime('%Y-%m-%dT%H:%M:%SZ')
if self.last_progress == 1:
self.job_report.calculate_execution_time(timestamp)
self.job_report.\
verify_and_set_max_error(self.last_error, current_time)
self.job_report.\
verify_and_set_min_error(self.last_error, current_time)

if self.job_is_completed():
self.report_flag = False
self.job_report.calculate_execution_time(timestamp)
self.generate_report(current_time)

# TODO: We need to think in a better design solution
# for this
def get_detailed_report(self):
if not self.report_flag:
return self.datasource.get_cost_measurements()
return {'message': 'Job is still running...'}

def get_reference_manifest(self, timestamp):
reference_manifest = {'name': 'desired_cost',
'value': self.desired_cost,
'timestamp': timestamp,
'dimensions': self.dimensions
}
return reference_manifest

def get_current_cost_manifest(self, timestamp):
current_cost_manifest = {'name': 'current_spent',
'value': self.last_cost,
'timestamp': timestamp,
'dimensions': self.dimensions
}
return current_cost_manifest

def calculate_error(self):
rep = self._get_num_replicas()
cpu_cost, memory_cost = self.get_current_cost()
cpu_usage, memory_usage = \
k8s.get_current_job_resources_usage(self.app_id)
job_cpu_cost = cpu_cost * cpu_usage
job_memory_cost = memory_cost * memory_usage
job_total_cost = job_cpu_cost + job_memory_cost

err = job_total_cost - self.desired_cost

self.pretty_print(cpu_cost, memory_cost, cpu_usage,
memory_usage, job_total_cost, err)
self.last_error = err
self.last_cost = job_total_cost
self.last_rep = rep
return err

def pretty_print(self, cpu_cost, memory_cost, cpu_usage,
memory_usage, job_total_cost, err):

self.LOG.log('Cpu usage: {}\nCpu cost: {}\nMemory usage:'
' {}\nMemory cost: {}\nJob cost: {}\nError: {}'.
format(cpu_usage, cpu_cost, memory_usage,
memory_cost, job_total_cost, err))

def get_application_cost_error_manifest(self, error, timestamp):
application_progress_error = {'name': 'application_cost_error',
'value': error,
'timestamp': timestamp,
'dimensions': self.dimensions
}
return application_progress_error

def get_current_cost(self):

cost = json.loads(requests.get(self.cluster_info_url.strip()).text)
total_cost = float(cost.get('cpu_price')),\
float(cost.get('memory_price'))

return total_cost

def get_dimensions(self):
return {'application_id': self.app_id,
'service': 'kubejobs_cost'}


PLUGIN = KubeJobCost
3 changes: 2 additions & 1 deletion monitor/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ def main():
app = Flask(__name__)
app.register_blueprint(rest)
logger.configure_logging()
app.run(host='0.0.0.0', port=api.port, debug=True)
logger.enable()
app.run(host='0.0.0.0', port=api.port)
1 change: 1 addition & 0 deletions monitor/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,4 @@ def run(self):
except Exception as ex:
self.attempts -= 1
self.LOG.log(ex)
time.sleep(self.collect_period)
9 changes: 7 additions & 2 deletions monitor/plugins/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

from monitor import exceptions as ex
from monitor.service import api
from monitor.plugins.kubejobs.plugin import KubeJobProgress
import kubejobs_cost
import kubejobs


class MonitorBuilder:
Expand All @@ -26,9 +27,13 @@ def get_monitor(self, plugin, app_id, plugin_info):
executor = None

if plugin == "kubejobs":
executor = KubeJobProgress(
executor = kubejobs.PLUGIN(
app_id, plugin_info, retries=api.retries)

elif plugin == "kubejobs_cost":
executor = kubejobs_cost.PLUGIN(
app_id, plugin_info)

else:
raise ex.BadRequestException()

Expand Down
36 changes: 36 additions & 0 deletions monitor/utils/influxdb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, database_url, database_port, database_name,
def get_measurements(self):

out = {}

for i in self.get_job_progress():
out[i['time']] = {'job_progress': i['value']}

Expand All @@ -45,6 +46,41 @@ def get_measurements(self):

return out

# TODO: We need to think in a better design solution
# for this
def get_cost_measurements(self):

out = {}

for i in self.get_current_spent():
out[i['time']] = {'current_spent': i['value']}

for i in self.get_desired_cost():
out[i['time']].update({'desired_cost': i['value']})

for i in self.get_replicas():
out[i['time']].update({'replicas': i['value']})

for i in self.get_application_cost_error():
out[i['time']].update({'application_cost_error': i['value']})

return out

def get_current_spent(self):
result = self._get_influx_client().\
query('select value from current_spent;')
return list(result.get_points(measurement='current_spent'))

def get_desired_cost(self):
result = self._get_influx_client().\
query('select value from desired_cost;')
return list(result.get_points(measurement='desired_cost'))

def get_application_cost_error(self):
result = self._get_influx_client().\
query('select value from application_cost_error;')
return list(result.get_points(measurement='application_cost_error'))

def get_job_progress(self):
result = self._get_influx_client().\
query('select value from job_progress;')
Expand Down
Loading

0 comments on commit acea99a

Please sign in to comment.