diff --git a/README.md b/README.md index 227278e..1a0224a 100644 --- a/README.md +++ b/README.md @@ -61,3 +61,4 @@ Endpoints are avaliable on [restapi-endpoints.md](docs/restapi-endpoints.md) doc ## Avaliable plugins * [Kube Jobs](docs/plugins/kubejobs.md) +* [Kube Jobs_Cost](docs/plugins/kubejobs_cost.md) diff --git a/docs/plugins/kubejobs_cost.md b/docs/plugins/kubejobs_cost.md new file mode 100644 index 0000000..ea9bb96 --- /dev/null +++ b/docs/plugins/kubejobs_cost.md @@ -0,0 +1,125 @@ +# KubeJobs Cost Plugin + +The Kubejobs Cost Plugin provides information on how a running application is performing regarding compliance to +a cost reference given by the user. The cost is calculated based on resources usage and resources price. The later is +acquired from a price information service, required for this plugin to run properly. Compliance to the cost reference is expressed as +a cost error, which is published on a Redis queue. This information is, then, consumed by other components and services. + +## How does it work? + +The following steps describe the basic flow of an execution using KubeJobs Cost: + +* Step 1: The client sends a POST request to the Asperathos Manager with a JSON body describing the execution. +* Step 2: The Manager creates a Redis service in the cluster, enqueues the items described in the input file in Redis through the queue auxiliary service. +* Step 3: The application execution is triggered on the cluster. +* Step 4: The application running on Kubernetes cluster starts to consume items from the Redis storage. +* Step 5: The Monitor is triggered by the Manager when application starts to run. +* Step 6: The Monitor periodically gets CPU and memory prices from a price information service. +* Step 7: As soon as metrics are being published, the Manager starts the Controller, which consumes metrics from Redis to take decisions about scaling based on the predefined control logic. + + +## Configuration + +In order to correctly configure the Monitor to execute the KubeJobs Cost plugin, modifications in the *monitor.cfg* file are necessary. Following you can check an example of a configuration file that can be used to execute the KubeJobs Cost plugin. + +### Configuration file example: + +``` +[general] +host = 0.0.0.0 +port = 5001 +plugins = kubejobs +debug = True +retries = 5 + +[kubejobs] +# No variables are needed +``` + +## Execute plugin + +In order to execute the plugin, a JSON needs to be correctly configured with all necessary variables that will be used by Asperathos components. Following you can check an example of this JSON file that will be used to sends a POST request to the Asperathos Manager. + +### JSON file example: + +```javascript +{ + "plugin":"kubejobs", + "plugin_info":{ + "username":"usr", + "password":"psswrd", + "cmd":[ + [...] + ], + "img":"img", + "init_size":1, + "redis_workload":"workload", + "config_id":"id", + "control_plugin":"kubejobs", + "control_parameters":{ + [...] + }, + "monitor_plugin":"kubejobs_cost", + "monitor_info":{ + "cluster_info_url": "http://0.0.0.0:20000/price", + "desired_cost": 1 + }, + "enable_visualizer":true, + "visualizer_plugin":"k8s-grafana", + "visualizer_info":{ + [...] + }, + "env_vars":{ + [...] + } + } +} +``` + +## Request example +`POST /monitoring/:id` + +Request body: +```javascript +{ + "plugin": "kubejobs_cost", + "plugin_info": { + "cluster_info_url": "http://0.0.0.0:20000/price", + "desired_cost": 1 + } +} +``` + +### Request parameters + +#### cluster_info_url +Url accessed by the monitor to retrieve information on CPU and memory prices. The plugin expects a +response with the format +```javascript +{ + 'cpu_price':, + 'memory_price': +}. +``` + +#### desired_cost +Reference cost used by the monitor to calculate cost error + +## Requirements + +### Cost service +The Kubejobs cost plugin requires access to a service which provides information on CPU and memory prices. An example of +implementation of such service is presented as follows. + +```python +from flask import Flask +app = Flask(__name__) + +@app.route('/price', methods=['GET']) +def price(): + return {'cpu_price':1, 'memory_price':3} +``` + +### Metrics server +Installation of Metrics server (https://github.com/kubernetes-sigs/metrics-server) in the used kubernetes cluster is +required before using this plugin. Metrics server is used to retrieve application resources usage data. diff --git a/kubejobs_cost/__init__.py b/kubejobs_cost/__init__.py new file mode 100644 index 0000000..7379b68 --- /dev/null +++ b/kubejobs_cost/__init__.py @@ -0,0 +1,171 @@ +# Copyright (c) 2020 UFCG-LSD. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from kubejobs import KubeJobProgress +from monitor.utils.plugin import k8s +from monitor.utils.logger import Log +from monitor.utils.job_report.job_report import JobReport +from datetime import datetime + +import requests +import json +import time + +LOG_FILE = "progress.log" +LOG_NAME = "kubejobs-progress" + + +class KubeJobCost(KubeJobProgress): + + def __init__(self, app_id, info_plugin): + + KubeJobProgress.__init__(self, app_id, info_plugin, + collect_period=2, retries=20) + self.cluster_info_url = info_plugin.get('cluster_info_url') + self.desired_cost = info_plugin.get('desired_cost') + self.last_error = None + self.last_rep = None + self.last_cost = None + self.LOG = Log(LOG_NAME, LOG_FILE) + self.job_report = JobReport(info_plugin) + + def monitoring_application(self): + try: + if self.report_flag: + + self.calculate_error() + self.LOG.log("Calculated error") + timestamp = time.time() * 1000 + err_manifest = \ + self.get_application_cost_error_manifest(self.last_error, + timestamp) + self.LOG.log(err_manifest) + self.LOG.log("Publishing error") + self.rds.rpush(self.metric_queue, + str(err_manifest)) + + self.LOG.log("Getting replicas") + replicas_manifest = \ + self.get_parallelism_manifest(self.last_replicas, + timestamp) + self.LOG.log(replicas_manifest) + + reference_manifest = self.get_reference_manifest(timestamp) + + self.LOG.log("Getting cost") + current_cost_manifest = \ + self.get_current_cost_manifest(timestamp) + self.LOG.log(current_cost_manifest) + + self.publish_persistent_measurement(err_manifest, + reference_manifest, + current_cost_manifest, + replicas_manifest) + + self.report_job(timestamp) + + except Exception as ex: + self.LOG.log(ex) + + def report_job(self, timestamp): + if self.report_flag: + self.LOG.log("report_flag-cost") + self.job_report.set_start_timestamp(timestamp) + current_time = datetime.fromtimestamp(timestamp/1000)\ + .strftime('%Y-%m-%dT%H:%M:%SZ') + if self.last_progress == 1: + self.job_report.calculate_execution_time(timestamp) + self.job_report.\ + verify_and_set_max_error(self.last_error, current_time) + self.job_report.\ + verify_and_set_min_error(self.last_error, current_time) + + if self.job_is_completed(): + self.report_flag = False + self.job_report.calculate_execution_time(timestamp) + self.generate_report(current_time) + + # TODO: We need to think in a better design solution + # for this + def get_detailed_report(self): + if not self.report_flag: + return self.datasource.get_cost_measurements() + return {'message': 'Job is still running...'} + + def get_reference_manifest(self, timestamp): + reference_manifest = {'name': 'desired_cost', + 'value': self.desired_cost, + 'timestamp': timestamp, + 'dimensions': self.dimensions + } + return reference_manifest + + def get_current_cost_manifest(self, timestamp): + current_cost_manifest = {'name': 'current_spent', + 'value': self.last_cost, + 'timestamp': timestamp, + 'dimensions': self.dimensions + } + return current_cost_manifest + + def calculate_error(self): + rep = self._get_num_replicas() + cpu_cost, memory_cost = self.get_current_cost() + cpu_usage, memory_usage = \ + k8s.get_current_job_resources_usage(self.app_id) + job_cpu_cost = cpu_cost * cpu_usage + job_memory_cost = memory_cost * memory_usage + job_total_cost = job_cpu_cost + job_memory_cost + + err = job_total_cost - self.desired_cost + + self.pretty_print(cpu_cost, memory_cost, cpu_usage, + memory_usage, job_total_cost, err) + self.last_error = err + self.last_cost = job_total_cost + self.last_rep = rep + return err + + def pretty_print(self, cpu_cost, memory_cost, cpu_usage, + memory_usage, job_total_cost, err): + + self.LOG.log('Cpu usage: {}\nCpu cost: {}\nMemory usage:' + ' {}\nMemory cost: {}\nJob cost: {}\nError: {}'. + format(cpu_usage, cpu_cost, memory_usage, + memory_cost, job_total_cost, err)) + + def get_application_cost_error_manifest(self, error, timestamp): + application_progress_error = {'name': 'application_cost_error', + 'value': error, + 'timestamp': timestamp, + 'dimensions': self.dimensions + } + return application_progress_error + + def get_current_cost(self): + + cost = json.loads(requests.get(self.cluster_info_url.strip()).text) + total_cost = float(cost.get('cpu_price')),\ + float(cost.get('memory_price')) + + return total_cost + + def get_dimensions(self): + return {'application_id': self.app_id, + 'service': 'kubejobs_cost'} + + +PLUGIN = KubeJobCost diff --git a/monitor/cli/main.py b/monitor/cli/main.py index 49bf32e..07d26b0 100644 --- a/monitor/cli/main.py +++ b/monitor/cli/main.py @@ -23,4 +23,5 @@ def main(): app = Flask(__name__) app.register_blueprint(rest) logger.configure_logging() - app.run(host='0.0.0.0', port=api.port, debug=True) + logger.enable() + app.run(host='0.0.0.0', port=api.port) diff --git a/monitor/plugins/base.py b/monitor/plugins/base.py index 50a5a9a..6de7c8c 100644 --- a/monitor/plugins/base.py +++ b/monitor/plugins/base.py @@ -73,3 +73,4 @@ def run(self): except Exception as ex: self.attempts -= 1 self.LOG.log(ex) + time.sleep(self.collect_period) diff --git a/monitor/plugins/builder.py b/monitor/plugins/builder.py index e875eb0..2dc459a 100644 --- a/monitor/plugins/builder.py +++ b/monitor/plugins/builder.py @@ -15,7 +15,8 @@ from monitor import exceptions as ex from monitor.service import api -from monitor.plugins.kubejobs.plugin import KubeJobProgress +import kubejobs_cost +import kubejobs class MonitorBuilder: @@ -26,9 +27,13 @@ def get_monitor(self, plugin, app_id, plugin_info): executor = None if plugin == "kubejobs": - executor = KubeJobProgress( + executor = kubejobs.PLUGIN( app_id, plugin_info, retries=api.retries) + elif plugin == "kubejobs_cost": + executor = kubejobs_cost.PLUGIN( + app_id, plugin_info) + else: raise ex.BadRequestException() diff --git a/monitor/utils/influxdb/connector.py b/monitor/utils/influxdb/connector.py index 8c72183..013b934 100644 --- a/monitor/utils/influxdb/connector.py +++ b/monitor/utils/influxdb/connector.py @@ -31,6 +31,7 @@ def __init__(self, database_url, database_port, database_name, def get_measurements(self): out = {} + for i in self.get_job_progress(): out[i['time']] = {'job_progress': i['value']} @@ -45,6 +46,41 @@ def get_measurements(self): return out + # TODO: We need to think in a better design solution + # for this + def get_cost_measurements(self): + + out = {} + + for i in self.get_current_spent(): + out[i['time']] = {'current_spent': i['value']} + + for i in self.get_desired_cost(): + out[i['time']].update({'desired_cost': i['value']}) + + for i in self.get_replicas(): + out[i['time']].update({'replicas': i['value']}) + + for i in self.get_application_cost_error(): + out[i['time']].update({'application_cost_error': i['value']}) + + return out + + def get_current_spent(self): + result = self._get_influx_client().\ + query('select value from current_spent;') + return list(result.get_points(measurement='current_spent')) + + def get_desired_cost(self): + result = self._get_influx_client().\ + query('select value from desired_cost;') + return list(result.get_points(measurement='desired_cost')) + + def get_application_cost_error(self): + result = self._get_influx_client().\ + query('select value from application_cost_error;') + return list(result.get_points(measurement='application_cost_error')) + def get_job_progress(self): result = self._get_influx_client().\ query('select value from job_progress;') diff --git a/monitor/utils/logger.py b/monitor/utils/logger.py index 6288ed6..b395a3f 100644 --- a/monitor/utils/logger.py +++ b/monitor/utils/logger.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 UFCG-LSD. +# Copyright (c) 2017 LSD - UFCG. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,22 +14,54 @@ # limitations under the License. import logging +from datetime import datetime -class Log: +global_enabled = False + +class Log: def __init__(self, name, output_file_path): self.logger = logging.getLogger(name) - handler = logging.StreamHandler() - handler.setLevel(logging.DEBUG) - self.logger.addHandler(handler) - handler = logging.FileHandler(output_file_path) - self.logger.addHandler(handler) - self.logger.propagate = False + if not len(self.logger.handlers): + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) + self.logger.addHandler(handler) + handler = logging.FileHandler(output_file_path) + self.logger.addHandler(handler) + self.logger.propagate = False + + def log(self, text): + if global_enabled: + self.logger.info(text) + + +class ScalingLog: + def __init__(self, name, output_file_path, application_id=""): + self.application_id = application_id + self.logger = Log(name, output_file_path) def log(self, text): - self.logger.info(text) + timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + log_string = "%s|%s|%s" % (self.application_id, timestamp, text) + self.logger.log(log_string) + + +def enable(): + global global_enabled + global_enabled = True + + +def disable(): + global global_enabled + global_enabled = False + +def configure_logging(logging_level="INFO"): + levels = {"CRITICAL": logging.CRITICAL, "DEBUG": logging.DEBUG, + "ERROR": logging.ERROR, "FATAL": logging.FATAL, + "INFO": logging.INFO, "NOTSET": logging.NOTSET, + "WARN": logging.WARN, "WARNING": logging.WARNING + } -def configure_logging(): - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=levels[logging_level]) diff --git a/monitor/utils/plugin/__init__.py b/monitor/utils/plugin/__init__.py new file mode 100644 index 0000000..e755060 --- /dev/null +++ b/monitor/utils/plugin/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2017 UFCG-LSD. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/monitor/utils/plugin/k8s.py b/monitor/utils/plugin/k8s.py new file mode 100644 index 0000000..d473ec6 --- /dev/null +++ b/monitor/utils/plugin/k8s.py @@ -0,0 +1,75 @@ +# Copyright (c) 2020 UFCG-LSD. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from monitor.service import api + +import kubernetes as kube + + +kube.config.load_kube_config(api.k8s_manifest) + + +def get_running_pods_from_a_job(job_id, namespace='default'): + api_instance = kube.client.CoreV1Api() + + pod_names = [] + for pod in api_instance.list_namespaced_pod(namespace).items: + if pod.metadata.labels.get('job-name') == job_id.strip() \ + and pod.status.phase == 'Running': + pod_names.append(pod.metadata.name) + return pod_names + + +def convert_cpu_memory_unity(resources): + ''' + Convert cpu unity from Nanocpu to Milicpu + and memory unity from Kibibyte to Gigabyte + ''' + + cpu = resources.get('cpu') + memory = resources.get('memory') + + vcpu = cpu / (10.0 ** 6) / 1000 + gb_memory = (memory * (2 ** 10)) / 10 ** 9 + + return { + 'cpu': vcpu, + 'memory': gb_memory + } + + +def get_current_job_resources_usage(job_id): + list_pods = get_running_pods_from_a_job(job_id) + + total_cpu = 0 + total_memory = 0 + cust = kube.client.CustomObjectsApi() + for pod in cust.list_cluster_custom_object('metrics.k8s.io', + 'v1beta1', 'pods').\ + get('items'): + if pod.get('metadata').get('name') in list_pods: + for container in pod.get('containers'): + cpu = container.get('usage').get('cpu')[:-1] + memory = container.get('usage').get('memory')[:-2] + + total_cpu += int(cpu) + total_memory += int(memory) + + converted_usage = convert_cpu_memory_unity({ + 'cpu': total_cpu, + 'memory': total_memory + }) + + return converted_usage.get('cpu'), converted_usage.get('memory')