diff --git a/README.rst b/README.rst index a2293bd9..1a635410 100644 --- a/README.rst +++ b/README.rst @@ -105,6 +105,10 @@ Monitor your clusters with:: $ python manage.py qmonitor +Monitor your clusters' memory usage with:: + + $ python manage.py qmemory + Check overall statistics with:: $ python manage.py qinfo diff --git a/django_q/management/commands/qmemory.py b/django_q/management/commands/qmemory.py new file mode 100644 index 00000000..d82bef2a --- /dev/null +++ b/django_q/management/commands/qmemory.py @@ -0,0 +1,31 @@ +from django.core.management.base import BaseCommand +from django.utils.translation import gettext as _ + +from django_q.monitor import memory + + +class Command(BaseCommand): + # Translators: help text for qmemory management command + help = _("Monitors Q Cluster memory usage") + + def add_arguments(self, parser): + parser.add_argument( + "--run-once", + action="store_true", + dest="run_once", + default=False, + help="Run once and then stop.", + ) + parser.add_argument( + "--workers", + action="store_true", + dest="workers", + default=False, + help="Show each worker's memory usage.", + ) + + def handle(self, *args, **options): + memory( + run_once=options.get("run_once", False), + workers=options.get("workers", False) + ) diff --git a/django_q/monitor.py b/django_q/monitor.py index 7c992556..55bedd2c 100644 --- a/django_q/monitor.py +++ b/django_q/monitor.py @@ -15,6 +15,21 @@ from django_q.brokers import get_broker from django_q import models, VERSION +# optional +try: + import psutil +except ImportError: + psutil = None + + +def get_process_mb(pid): + try: + process = psutil.Process(pid) + mb_used = round(process.memory_info().rss / 1024 ** 2, 2) + except psutil.NoSuchProcess: + mb_used = 'NO_PROCESS_FOUND' + return mb_used + def monitor(run_once=False, broker=None): if not broker: @@ -272,6 +287,151 @@ def info(broker=None): return True +def memory(run_once=False, workers=False, broker=None): + if not broker: + broker = get_broker() + term = Terminal() + broker.ping() + if not psutil: + print(term.clear_eos()) + print(term.white_on_red("Cannot start \"qmemory\" command. Missing \"psutil\" library.")) + return + with term.fullscreen(), term.hidden_cursor(), term.cbreak(): + MEMORY_AVAILABLE_LOWEST_PERCENTAGE = 100.0 + MEMORY_AVAILABLE_LOWEST_PERCENTAGE_AT = timezone.now() + cols = 8 + val = None + start_width = int(term.width / cols) + while val not in ["q", "Q"]: + col_width = int(term.width / cols) + # In case of resize + if col_width != start_width: + print(term.clear()) + start_width = col_width + # sentinel, monitor and workers memory usage + print( + term.move(0, 0 * col_width) + + term.black_on_green(term.center(_("Host"), width=col_width - 1)) + ) + print( + term.move(0, 1 * col_width) + + term.black_on_green(term.center(_("Id"), width=col_width - 1)) + ) + print( + term.move(0, 2 * col_width) + + term.black_on_green(term.center(_("Available (%)"), width=col_width - 1)) + ) + print( + term.move(0, 3 * col_width) + + term.black_on_green(term.center(_("Available (MB)"), width=col_width - 1)) + ) + print( + term.move(0, 4 * col_width) + + term.black_on_green(term.center(_("Total (MB)"), width=col_width - 1)) + ) + print( + term.move(0, 5 * col_width) + + term.black_on_green(term.center(_("Sentinel (MB)"), width=col_width - 1)) + ) + print( + term.move(0, 6 * col_width) + + term.black_on_green(term.center(_("Monitor (MB)"), width=col_width - 1)) + ) + print( + term.move(0, 7 * col_width) + + term.black_on_green(term.center(_("Workers (MB)"), width=col_width - 1)) + ) + row = 2 + stats = Stat.get_all(broker=broker) + print(term.clear_eos()) + for stat in stats: + # memory available (%) + memory_available_percentage = round(psutil.virtual_memory().available * 100 / psutil.virtual_memory().total, 2) + # memory available (MB) + memory_available = round(psutil.virtual_memory().available / 1024 ** 2, 2) + if memory_available_percentage < MEMORY_AVAILABLE_LOWEST_PERCENTAGE: + MEMORY_AVAILABLE_LOWEST_PERCENTAGE = memory_available_percentage + MEMORY_AVAILABLE_LOWEST_PERCENTAGE_AT = timezone.now() + print( + term.move(row, 0 * col_width) + + term.center(stat.host[: col_width - 1], width=col_width - 1) + ) + print( + term.move(row, 1 * col_width) + + term.center(str(stat.cluster_id)[-8:], width=col_width - 1) + ) + print( + term.move(row, 2 * col_width) + + term.center(memory_available_percentage, width=col_width - 1) + ) + print( + term.move(row, 3 * col_width) + + term.center(memory_available, width=col_width - 1) + ) + print( + term.move(row, 4 * col_width) + + term.center(round(psutil.virtual_memory().total / 1024 ** 2, 2), width=col_width - 1) + ) + print( + term.move(row, 5 * col_width) + + term.center(get_process_mb(stat.sentinel), width=col_width - 1) + ) + print( + term.move(row, 6 * col_width) + + term.center(get_process_mb(getattr(stat, 'monitor', None)), width=col_width - 1) + ) + workers_mb = 0 + for worker_pid in stat.workers: + result = get_process_mb(worker_pid) + if isinstance(result, str): + result = 0 + workers_mb += result + print( + term.move(row, 7 * col_width) + + term.center(workers_mb or 'NO_PROCESSES_FOUND', width=col_width - 1) + ) + row += 1 + # each worker's memory usage + if workers: + row += 2 + col_width = int(term.width / (1 + Conf.WORKERS)) + print( + term.move(row, 0 * col_width) + + term.black_on_cyan(term.center(_("Id"), width=col_width - 1)) + ) + for worker_num in range(Conf.WORKERS): + print( + term.move(row, (worker_num + 1) * col_width) + + term.black_on_cyan(term.center("Worker #{} (MB)".format(worker_num + 1), width=col_width - 1)) + ) + row += 2 + for stat in stats: + print( + term.move(row, 0 * col_width) + + term.center(str(stat.cluster_id)[-8:], width=col_width - 1) + ) + for idx, worker_pid in enumerate(stat.workers): + mb_used = get_process_mb(worker_pid) + print( + term.move(row, (idx + 1) * col_width) + + term.center(mb_used, width=col_width - 1) + ) + row += 1 + row += 1 + print( + term.move(row, 0) + + _("Available lowest (%): {} ({})").format( + str(MEMORY_AVAILABLE_LOWEST_PERCENTAGE), + MEMORY_AVAILABLE_LOWEST_PERCENTAGE_AT.strftime('%Y-%m-%d %H:%M:%S+00:00') + ) + ) + # for testing + if run_once: + return Stat.get_all(broker=broker) + print(term.move(row + 2, 0) + term.center("[Press q to quit]")) + val = term.inkey(timeout=1) + + def get_ids(): # prints id (PID) of running clusters stat = Stat.get_all() diff --git a/django_q/tests/test_commands.py b/django_q/tests/test_commands.py index e64a65b6..1b3ff1f7 100644 --- a/django_q/tests/test_commands.py +++ b/django_q/tests/test_commands.py @@ -17,3 +17,9 @@ def test_qinfo(): call_command('qinfo') call_command('qinfo', config=True) call_command('qinfo', ids=True) + + +@pytest.mark.django_db +def test_qmemory(): + call_command('qmemory', run_once=True) + call_command('qmemory', workers=True, run_once=True)