Skip to content

Commit

Permalink
Merge pull request #66 from ImperialCollegeLondon/messages
Browse files Browse the repository at this point in the history
Basic support for broadcast messages
  • Loading branch information
cc-a authored Sep 24, 2024
2 parents 169648f + c60ddab commit 235f980
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 3 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,25 @@ docker compose exec app python scripts/talk_to_process_manager.py
```

Take the servers down with `docker compose down`

### Working with Kafka

Due to the complexities of containerising Kafka it is not possible to use the standard
Docker Compose setup. Instead when working with functionality that requires Kafka it is
necessary to run the individual components manually.

1. Start Kafka - See [Running drunc with pocket kafka].

1. Start the drunc shell:
`poetry run drunc-unified-shell --log-level debug ./data/process-manager-pocket-kafka.json`

1. Start the application server:
`poetry run python manage.py runserver`

1. Start the Kafka consumer:
`poetry run python scripts/kafka_consumer.py`

From here you should be able to see broadcast messages displayed at the top of the index
page on every refresh.

[Running drunc with pocket kafka]: https://github.com/DUNE-DAQ/drunc/wiki/Running-drunc-with-pocket-kafka
5 changes: 5 additions & 0 deletions data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Data files

## process-manager-pocket-kafka.json

Process manager configuration file for use in local development with Kafka.
13 changes: 13 additions & 0 deletions data/process-manager-pocket-kafka.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"type": "ssh",
"name": "SSHProcessManager",
"command_address": "0.0.0.0:10054",
"authoriser": {
"type": "dummy"
},
"broadcaster": {
"type": "kafka",
"kafka_address": "127.0.0.1:30092",
"publish_timeout": 2
}
}
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ services:
volumes:
- .:/usr/src/app
- db:/usr/src/app/db
environment:
- PROCESS_MANAGER_URL=drunc:10054
drunc:
build: ./drunc_docker_service/
command:
Expand Down
3 changes: 3 additions & 0 deletions dune_processes/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
https://docs.djangoproject.com/en/5.1/ref/settings/
"""

import os
from pathlib import Path

# Build paths inside the project like this: BASE_DIR / 'subdir'.
Expand Down Expand Up @@ -137,6 +138,8 @@
INSTALLED_APPS += ["django_bootstrap5"]
DJANGO_TABLES2_TEMPLATE = "django_tables2/bootstrap5.html"

PROCESS_MANAGER_URL = os.getenv("PROCESS_MANAGER_URL", "localhost:10054")

INSTALLED_APPS += ["crispy_forms", "crispy_bootstrap5"]
CRISPY_ALLOWED_TEMPLATE_PACKS = "bootstrap5"
CRISPY_TEMPLATE_PACK = "bootstrap5"
12 changes: 12 additions & 0 deletions main/templates/main/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,21 @@
{% block title %}Home{% endblock title %}

{% block content %}

<div class="col">
{% if messages %}
<h2>Messages</h2>
<div style="white-space: pre-wrap;">
{% for message in messages %}
{{ message }}
{% endfor %}
</div>
<hr class="solid">
{% endif %}

<a href="{% url 'main:boot_process' %}" class="btn btn-primary">Boot</a>
{% render_table table %}
</div>


{% endblock content %}
1 change: 1 addition & 0 deletions main/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
path("flush/<uuid:uuid>", views.flush_process, name="flush"),
path("logs/<uuid:uuid>", views.logs, name="logs"),
path("boot_process/", views.BootProcessView.as_view(), name="boot_process"),
path("message/", views.deposit_message, name="message"),
]
32 changes: 30 additions & 2 deletions main/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import asyncio
import uuid
from enum import Enum
from http import HTTPStatus

import django_tables2
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse, reverse_lazy
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from django.views.generic.edit import FormView
from drunc.process_manager.process_manager_driver import ProcessManagerDriver
from drunc.utils.shell_utils import DecodedResponse, create_dummy_token_from_uname
Expand All @@ -24,11 +28,18 @@
from .forms import BootProcessForm
from .tables import ProcessTable

# extreme hackiness suitable only for demonstration purposes
# TODO: replace this with per-user session storage - once we've added auth
MESSAGES: list[str] = []
"""Broadcast messages to display to the user."""


def get_process_manager_driver() -> ProcessManagerDriver:
"""Get a ProcessManagerDriver instance."""
token = create_dummy_token_from_uname()
return ProcessManagerDriver("drunc:10054", token=token, aio_channel=True)
return ProcessManagerDriver(
settings.PROCESS_MANAGER_URL, token=token, aio_channel=True
)


async def get_session_info() -> ProcessInstanceList:
Expand Down Expand Up @@ -66,8 +77,10 @@ def index(request: HttpRequest) -> HttpResponse:
table_configurator = django_tables2.RequestConfig(request)
table_configurator.configure(table)

context = {"table": table}
global MESSAGES
MESSAGES, messages = [], MESSAGES

context = {"table": table, "messages": messages}
return render(request=request, context=context, template_name="main/index.html")


Expand Down Expand Up @@ -206,3 +219,18 @@ def form_valid(self, form: BootProcessForm) -> HttpResponse:
"""
asyncio.run(_boot_process("root", form.cleaned_data))
return super().form_valid(form)


@require_POST
@csrf_exempt
def deposit_message(request: HttpRequest) -> HttpResponse:
"""Upload point for broadcast messages for display to end user.
Args:
request: the triggering request.
Returns:
A NO_CONTENT response.
"""
MESSAGES.append(request.POST["message"])
return HttpResponse(status=HTTPStatus.NO_CONTENT)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ module = "tests.*"
disallow_untyped_defs = false

[[tool.mypy.overrides]]
module = ["druncschema.*", "drunc.*", "django_tables2.*"]
module = ["druncschema.*", "drunc.*", "django_tables2.*", "kafka.*"]
ignore_missing_imports = true

[tool.django-stubs]
Expand Down
33 changes: 33 additions & 0 deletions scripts/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Example client that consumes messages from Kafka and sends them to the web app."""

import os
from urllib.parse import urlencode
from urllib.request import Request, urlopen

from druncschema.broadcast_pb2 import BroadcastMessage
from kafka import KafkaConsumer

KAFKA_URL = os.getenv("KAFKA_URL", "127.0.0.1:30092")
SERVER_URL = os.getenv("SERVER_URL", "http://localhost:8000")


def main() -> None:
"""Listen for Kafka messages and process them indefinitely."""
consumer = KafkaConsumer(bootstrap_servers=[KAFKA_URL])
consumer.subscribe(pattern="control.*.process_manager")

print("Listening for messages from Kafka.")
while True:
for messages in consumer.poll(timeout_ms=500).values():
for message in messages:
print(f"Message received: {message}")
bm = BroadcastMessage()
bm.ParseFromString(message.value)

data = urlencode(dict(message=bm.data.value))
request = Request(f"{SERVER_URL}/message/", data=data.encode())
urlopen(request)


if __name__ == "__main__":
main()

0 comments on commit 235f980

Please sign in to comment.