From 21018572e64587fc4e08cbac392be2a0eb10e384 Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Thu, 15 Sep 2022 14:30:40 +0200 Subject: [PATCH] appservice: Replace nginx with aiohttp reverse proxy nginx does not get along well with unavailable proxy_pass targets -- as soon as one session pod unexpectedly goes away (crashes, idle timeouts, networking flakes), it errors out and no session routes can be resolved any more. There are workarounds [1], but I can't get them to work properly. Also, nginx does not allow us to hook into (dis)connection events to implement the session status API (see issue #28). So it's finally time to grow to something more flexible. Rewrite multiplexer.py using aiohttp [2], which now handles both our own session control API as well as the dynamic reverse proxying to the session pods. This should still be considered as a PoC, but at least there are fewer moving parts and an easier-to-understand architecture now. Check behaviour with broken session pods in the tests: Crash a session pod, and validate that the other sessions still behave correctly. Fixes #36 [1] https://sandro-keil.de/blog/let-nginx-start-if-upstream-host-is-unavailable-or-down/ [2] https://docs.aiohttp.org/en/stable/index.html --- appservice/Containerfile | 5 +- appservice/multiplexer.py | 372 ++++++++++++++++++-------------------- test/test_basic.py | 16 ++ 3 files changed, 191 insertions(+), 202 deletions(-) diff --git a/appservice/Containerfile b/appservice/Containerfile index 14b5ced..4197299 100644 --- a/appservice/Containerfile +++ b/appservice/Containerfile @@ -1,13 +1,10 @@ FROM debian:bookworm RUN apt-get update && \ - apt-get install -y python3 python3-redis nginx curl inetutils-ping procps && \ + apt-get install -y python3 python3-redis python3-aiohttp curl inetutils-ping procps && \ apt-get clean && \ rm /var/lib/apt/lists/*dists* -# allow unprivileged container user to run nginx and change configuration -RUN chmod -R a+rw /etc/nginx/ /var/lib/nginx/ /var/log/nginx/ /run - COPY *.py /usr/local/bin/ CMD python3 /usr/local/bin/multiplexer.py diff --git a/appservice/multiplexer.py b/appservice/multiplexer.py index f091d5c..cd0e4c0 100644 --- a/appservice/multiplexer.py +++ b/appservice/multiplexer.py @@ -1,139 +1,55 @@ -import os -import socket -import signal -import subprocess -import uuid -import http -import http.client +import async_timeout +import asyncio import json -import time import logging +import os +import uuid -from multiprocessing import Process -from http.server import BaseHTTPRequestHandler, HTTPServer - -import redis +import aiohttp +from aiohttp import web +import redis.asyncio as redis import config -logger = logging.getLogger("multiplexer") +logger = logging.getLogger('multiplexer') API_URL = os.environ['API_URL'] SESSION_INSTANCE_DOMAIN = os.getenv('SESSION_INSTANCE_DOMAIN', '') -NGINX_TEMPLATE = """ -daemon off; -worker_processes auto; - -events {{ - worker_connections 1024; -}} - -http {{ - include /etc/nginx/mime.types; - default_type application/octet-stream; - - log_format main '$remote_addr - $remote_user [$time_local] "$request" ' - '$status $body_bytes_sent "$http_referer" ' - '"$http_user_agent" "$http_x_forwarded_for"'; - - access_log /dev/stderr main; - error_log stderr; - - sendfile on; - - keepalive_timeout 65; - - server {{ - listen 8080 default_server; - listen [::]:8080 default_server; - - server_name localhost; - - {routes} - - location {route_control}/ping {{ - proxy_pass http://127.0.0.1:8081; - }} - location {route_control}/sessions/new {{ - proxy_pass http://127.0.0.1:8081; - }} - - location / {{ - return 404 'no route found in multiplexer\r\n'; - }} - }} -}} -""" - PODMAN_SOCKET = '/run/podman/podman.sock' -NGINX_PROC = None -REDIS = redis.Redis(host=os.environ["REDIS_SERVICE_HOST"], port=int(os.environ.get("REDIS_SERVICE_PORT", "6379"))) - - -def get_sessions(): - # Add new entry to our sessions - sessions = REDIS.get('sessions') - if sessions is None: - sessions = {} - else: - try: - sessions = json.loads(sessions) - except json.decoder.JSONDecodeError: - sessions = {} - - return sessions - - -def write_routes(sessions): - routes = "" - for sessionid in sessions: - routes += f""" -location {config.ROUTE_WSS}/sessions/{sessionid}/web {{ - proxy_pass http://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:9090; - - # Required to proxy the connection to Cockpit - proxy_set_header Host $host; - proxy_set_header X-Forwarded-Proto $scheme; - - # Required for web sockets to function - proxy_http_version 1.1; - proxy_buffering off; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - - # Pass ETag header from Cockpit to clients. - # See: https://github.com/cockpit-project/cockpit/issues/5239 - gzip off; -}} -location {config.ROUTE_WSS}/sessions/{sessionid}/ws {{ - proxy_pass http://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:8080; +SESSIONS = {} +REDIS = redis.Redis(host=os.environ['REDIS_SERVICE_HOST'], port=int(os.environ.get('REDIS_SERVICE_PORT', '6379'))) + + +async def _wsforward(ws_from, ws_to): + async for msg in ws_from: + if msg.type == aiohttp.WSMsgType.TEXT: + await ws_to.send_str(msg.data) + elif msg.type == aiohttp.WSMsgType.BINARY: + await ws_to.send_bytes(msg.data) + elif ws_to.closed: + await ws_to.close(code=ws_to.close_code, message=msg.extra) + else: + raise ValueError(f'unexpected ws message type: {msg.type}') - # Required to proxy the connection to Cockpit - proxy_set_header Host $host; - proxy_set_header X-Forwarded-Proto $scheme; - # Required for web sockets to function - proxy_http_version 1.1; - proxy_buffering off; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; +class Handlers: + def __init__(self): + self.ws_client_sessions = {} - # Pass ETag header from Cockpit to clients. - # See: https://github.com/cockpit-project/cockpit/issues/5239 - gzip off; -}}""" + async def __aenter__(self): + return self - with open('/etc/nginx/nginx.conf', 'w') as f: - f.write(NGINX_TEMPLATE.format(routes=routes, route_control=config.ROUTE_API)) + async def __aexit__(self, exc_type, exc_val, exc_tb): + for s in self.ws_client_sessions.values(): + if not s.closed: + await s.close() + def handle_ping(self, request): + return web.Response(text='pong') -class ProxyHTTPRequestHandler(BaseHTTPRequestHandler): - def new_session_podman(self, sessionid): + async def new_session_podman(self, sessionid): name = f'session-{sessionid}' - connection = http.client.HTTPConnection('localhost') - connection.sock = socket.socket(socket.AF_UNIX) - connection.sock.connect(PODMAN_SOCKET) body = { 'image': 'quay.io/rhn_engineering_mpitt/ws', 'name': name, @@ -152,104 +68,164 @@ def new_session_podman(self, sessionid): 'cni_networks': ['consoledot'], 'user': 'cockpit-wsinstance', } - connection.request('POST', '/v1.12/libpod/containers/create', body=json.dumps(body)) - response = connection.getresponse() - content = response.read() - if response.status >= 200 and response.status < 300: - logger.debug("/new: creating container result: %i %s", response.status, content.decode()) - connection.request('POST', f'/v1.12/libpod/containers/{name}/start') - response = connection.getresponse() - content = response.read() + async with aiohttp.ClientSession(connector=aiohttp.UnixConnector(PODMAN_SOCKET)) as podman: + async with podman.post('http://none/v1.12/libpod/containers/create', + data=json.dumps(body).encode()) as response: + status = response.status + content = await response.text() + + if status >= 200 and status < 300: + logger.debug('/new: creating container succeeded with %i: %s; starting container', status, content) + async with podman.post(f'http://none/v1.12/libpod/containers/{name}/start') as response: + status = response.status + content = await response.text() - return response, content + return status, content - def new_session(self): + async def handle_session_new(self, request): + global SESSIONS sessionid = str(uuid.uuid4()) + assert sessionid not in SESSIONS if os.path.exists(PODMAN_SOCKET): - response, content = self.new_session_podman(sessionid) + pod_status, content = await self.new_session_podman(sessionid) else: # TODO: support k8s API - raise NotImplementedError("cannot create sessions other than podman") - - if response.status >= 200 and response.status < 300: - self.send_response(200) - self.end_headers() - self.wfile.write(json.dumps({"id": sessionid}).encode()) + raise NotImplementedError('cannot create sessions other than podman') + + if pod_status >= 200 and pod_status < 300: + response = web.json_response({'id': sessionid}) + SESSIONS[sessionid] = True + dumped_sessions = json.dumps(SESSIONS) + await REDIS.set('sessions', dumped_sessions) + await REDIS.publish('sessions', dumped_sessions) else: - self.send_response(response.status) - self.end_headers() - self.wfile.write("creating session container failed: ".encode()) - self.wfile.write(content) - return - - sessions = get_sessions() - sessions[sessionid] = True - - dumped_sessions = json.dumps(sessions) - REDIS.set('sessions', dumped_sessions) - REDIS.publish('sessions', dumped_sessions) - - def ping(self): - self.send_response(200) - self.end_headers() - self.wfile.write(b'pong') - - def do_GET(self): - logger.debug("GET %s", self.path) - if self.path == f"{config.ROUTE_API}/sessions/new": - self.new_session() - elif self.path == f"{config.ROUTE_API}/ping": - self.ping() + response = web.Response(status=pod_status, text=f'creating session container failed: {content}') + + return response + + async def handle_session_id(self, upstream_req): + sessionid = upstream_req.match_info['sessionid'] + if sessionid not in SESSIONS: + return web.HTTPNotFound(text='unknown session ID') + + path = upstream_req.match_info['path'] + if path.startswith('web/'): + target_url = f'http://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:9090{upstream_req.path_qs}' + elif path == 'ws': + target_url = f'http://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:8080{upstream_req.path_qs}' else: - self.send_response(404, 'Not found') + return web.HTTPNotFound(text=f'invalid session path prefix: {path}') + # reverse-proxy the request to session pod -def watch_redis(): - redis = REDIS.pubsub() - redis.subscribe("sessions") - logger = logging.getLogger("multiplexer/redis") + # lazily initialize per-session HTTP client + if sessionid not in self.ws_client_sessions or self.ws_client_sessions[sessionid].closed: + cs = aiohttp.ClientSession(auto_decompress=False, cookie_jar=aiohttp.DummyCookieJar()) + self.ws_client_sessions[sessionid] = cs + else: + cs = self.ws_client_sessions[sessionid] + + if (upstream_req.method == 'GET' + and 'upgrade' in upstream_req.headers.get('Connection', '').lower() + and upstream_req.headers.get('Upgrade') == 'websocket'): + # it's a websocket upgrade request + async with cs.ws_connect(target_url, headers=dict(upstream_req.headers)) as downstream_ws_client: + upstream_ws_response = web.WebSocketResponse() + upstream_ws_response._headers = downstream_ws_client._response._headers.copy() + await upstream_ws_response.prepare(upstream_req) + await asyncio.gather( + _wsforward(downstream_ws_client, upstream_ws_response), + _wsforward(upstream_ws_response, downstream_ws_client)) + return upstream_ws_response - while True: - message = redis.get_message() - if message: - logger.debug("got message: %s", message) - sessions = get_sessions() - write_routes(sessions) - os.kill(NGINX_PROC.pid, signal.SIGHUP) - time.sleep(0.01) + else: + # it's an plain HTTP request + async with cs.request( + upstream_req.method, + target_url, + headers=upstream_req.headers, + data=upstream_req.content, + allow_redirects=False, + ) as downstream_response: + h = downstream_response.headers.copy() + + if h.get('Transfer-Encoding') == 'chunked': + upstream_resp = web.StreamResponse( + status=downstream_response.status, + reason=downstream_response.reason, + headers=h, + ) + upstream_resp.enable_chunked_encoding() + await upstream_resp.prepare(upstream_req) + async for data, _ in downstream_response.content.iter_chunks(): + await upstream_resp.write(data) + await upstream_resp.write_eof() + return upstream_resp + else: + upstream_resp = web.Response( + status=downstream_response.status, + reason=downstream_response.reason, + headers=h, + body=await downstream_response.content.read(), + ) + + return upstream_resp + + +async def init_sessions(): + global SESSIONS + sessions = await REDIS.get('sessions') + if sessions is None: + SESSIONS = {} + else: + try: + SESSIONS = json.loads(sessions) + except json.decoder.JSONDecodeError: + SESSIONS = {} + logger.debug('initial sessions: %s', SESSIONS) -def start_nginx(): - proc = subprocess.Popen(['nginx']) - # wait for nginx to start up - connection = http.client.HTTPConnection('localhost:8080') - for _ in range(10): +async def watch_redis(channel): + global SESSIONS + while True: try: - connection.connect() - break - except OSError: - time.sleep(0.2) - else: - raise TimeoutError('timed out waiting for nginx to start up') - return proc + async with async_timeout.timeout(1): + message = await channel.get_message(ignore_subscribe_messages=True) + if message is not None and message['channel'] == b'sessions': + logger.debug("got redis sessions update: %s", message['data']) + try: + SESSIONS = json.loads(message['data'].decode()) + except json.decoder.JSONDecodeError as e: + logger.warning("invalid JSON, starting without sessions: %s", e) + SESSIONS = {} + + await asyncio.sleep(0.01) + except asyncio.TimeoutError: + pass + + +async def main(): + async with Handlers() as handlers: + pubsub = REDIS.pubsub() + await pubsub.subscribe('sessions') + asyncio.create_task(watch_redis(pubsub)) + await init_sessions() + + app = web.Application() + app.router.add_route('GET', f'{config.ROUTE_API}/ping', handlers.handle_ping) + app.router.add_route('GET', f'{config.ROUTE_API}/sessions/new', handlers.handle_session_new) + app.router.add_route('*', f'{config.ROUTE_WSS}/sessions/{{sessionid}}/{{path:.*}}', handlers.handle_session_id) + + runner = web.AppRunner(app, auto_decompress=False) + await runner.setup() + await web.TCPSite(runner, port=8080).start() + while True: + await asyncio.sleep(60) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) - write_routes(get_sessions()) - NGINX_PROC = start_nginx() - - # start redis watcher - redis = Process(target=watch_redis) - redis.start() - - server_address = ('0.0.0.0', 8081) - httpd = HTTPServer(server_address, ProxyHTTPRequestHandler) - try: - httpd.serve_forever() - except KeyboardInterrupt: - NGINX_PROC.kill() - NGINX_PROC.wait() + asyncio.run(main()) diff --git a/test/test_basic.py b/test/test_basic.py index db88408..afa37a4 100755 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -99,6 +99,7 @@ def request(self, url, retries=0): def newSession(self): response = self.request(f'{self.api_url}{config.ROUTE_API}/sessions/new') self.assertEqual(response.status, 200) + self.assertEqual(response.getheader('Content-Type'), 'application/json; charset=utf-8') sessionid = json.load(response)['id'] self.assertIsInstance(sessionid, str) @@ -145,6 +146,21 @@ def testSessions(self): # first session still works self.checkSession(s1) + # crash container for s2; use --time 0 once we have podman 4.0 everywhere + subprocess.check_call(['podman', 'rm', '--force', f'session-{s2}']) + # first session still works + self.checkSession(s1) + # second session is broken + request = self.get_auth_request(f'{self.api_url}{config.ROUTE_WSS}/sessions/{s2}/web/') + with self.assertRaises(OSError): + urllib.request.urlopen(request, context=self.ssl_3scale, timeout=1) + + # can create a new session + s3 = self.newSession() + self.checkSession(s3) + # first session still works + self.checkSession(s1) + def test3scaleErrors(self): # unauthenticated with self.assertRaises(urllib.error.HTTPError) as cm: