Skip to content

Commit

Permalink
Gevent-based websocket support (#23)
Browse files Browse the repository at this point in the history
* Added method to return current GET response

* Split out function to encode JSON with current Flask encoder settings

* Added basic property subscriptions via websocket

* Run gevent monkey patch on server import

* Added default eventlet support

* Updated to default install eventlet

* Moved monkey patches to server submodules

* Only let eventlet websockets handle connections requesting an upgrade

* Fixed websocket route URL

* Added eventlet server debug and log options

* Reverted to gevent default (OFM live stream breaks with eventlet)

* Moved monkey patch back to server top level

* Better handle requests without websocket upgrade

* Better handle NULL websocket messages

* Removed default websocket echo

* Removed eventlet dependency
  • Loading branch information
jtc42 authored Mar 13, 2020
1 parent 028e21c commit 3693c16
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 139 deletions.
6 changes: 5 additions & 1 deletion examples/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def cleanup():
# Create LabThings Flask app
app, labthing = create_app(
__name__,
prefix="/api",
title="My Lab Device API",
description="Test LabThing-based API",
version="0.1.0",
Expand Down Expand Up @@ -46,5 +47,8 @@ def cleanup():
if __name__ == "__main__":
from labthings.server.wsgi import Server

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

server = Server(app)
server.run(host="0.0.0.0", port=5000, debug=True)
server.run(host="0.0.0.0", port=5000, debug=False)
10 changes: 10 additions & 0 deletions labthings/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
import logging

EXTENSION_NAME = "flask-labthings"

# Monkey patching is bad and should never be done
# import eventlet
# eventlet.monkey_patch()

from gevent import monkey

monkey.patch_all()
27 changes: 25 additions & 2 deletions labthings/server/decorators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from webargs import flaskparser
from functools import wraps, update_wrapper
from flask import make_response, abort, request
from werkzeug.wrappers import Response as ResponseBase
from http import HTTPStatus
from marshmallow.exceptions import ValidationError
from collections import Mapping
Expand All @@ -9,6 +10,7 @@
from .schema import TaskSchema, Schema, FieldSchema
from .fields import Field
from .view import View
from .find import current_labthing

import logging

Expand Down Expand Up @@ -102,7 +104,7 @@ def ThingAction(viewcls: View):
Returns:
View: View class with Action spec tags
"""
# Pass params to call function attribute for external access
# Update Views API spec
update_spec(viewcls, {"tags": ["actions"]})
update_spec(viewcls, {"_groups": ["actions"]})
return viewcls
Expand All @@ -120,7 +122,28 @@ def ThingProperty(viewcls):
Returns:
View: View class with Property spec tags
"""
# Pass params to call function attribute for external access

def property_notify(func):
@wraps(func)
def wrapped(*args, **kwargs):
# Call the update function first to update property value
original_response = func(*args, **kwargs)

# Once updated, then notify all subscribers
subscribers = getattr(current_labthing(), "subscribers", [])
for sub in subscribers:
sub.property_notify(viewcls)
return original_response

return wrapped

if hasattr(viewcls, "post") and callable(viewcls.post):
viewcls.post = property_notify(viewcls.post)

if hasattr(viewcls, "put") and callable(viewcls.put):
viewcls.put = property_notify(viewcls.put)

# Update Views API spec
update_spec(viewcls, {"tags": ["properties"]})
update_spec(viewcls, {"_groups": ["properties"]})
return viewcls
Expand Down
24 changes: 19 additions & 5 deletions labthings/server/labthing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .spec.utilities import get_spec
from .spec.td import ThingDescription
from .decorators import tag
from .sockets import Sockets
from .sockets import Sockets, SocketSubscriber, socket_handler_loop

from .views.extensions import ExtensionList
from .views.tasks import TaskList, TaskView
Expand Down Expand Up @@ -38,6 +38,10 @@ def __init__(
self.extensions = {}

self.views = []
self._property_views = {}
self._action_views = {}

self.subscribers = set()

self.endpoints = set()

Expand Down Expand Up @@ -130,12 +134,20 @@ def _create_base_routes(self):
self.add_view(TaskView, "/tasks/<task_id>", endpoint=TASK_ENDPOINT)

def _create_base_sockets(self):
self.sockets.add_url_rule("/", self._socket_handler)
self.sockets.add_url_rule(f"{self.url_prefix}", self._socket_handler)

def _socket_handler(self, ws):
while not ws.closed:
message = ws.receive()
ws.send("Web sockets not yet implemented")
# Create a socket subscriber
wssub = SocketSubscriber(ws)
self.subscribers.add(wssub)
logging.info(f"Added subscriber {wssub}")
logging.debug(list(self.subscribers))
# Start the socket connection handler loop
socket_handler_loop(ws)
# Remove the subscriber once the loop returns
self.subscribers.remove(wssub)
logging.info(f"Removed subscriber {wssub}")
logging.debug(list(self.subscribers))

# Device stuff

Expand Down Expand Up @@ -277,8 +289,10 @@ def _register_view(self, app, view, *urls, endpoint=None, **kwargs):
view_groups = view_spec.get("_groups", {})
if "actions" in view_groups:
self.thing_description.action(flask_rules, view)
self._action_views[view.endpoint] = view
if "properties" in view_groups:
self.thing_description.property(flask_rules, view)
self._property_views[view.endpoint] = view

# Utilities

Expand Down
12 changes: 10 additions & 2 deletions labthings/server/representations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from ..core.utilities import PY3


def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""
def encode_json(data):
"""Makes JSON encoded data using the current Flask apps JSON settings"""

settings = current_app.config.get("LABTHINGS_JSON", {})
encoder = current_app.json_encoder
Expand All @@ -21,6 +21,14 @@ def output_json(data, code, headers=None):
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(data, cls=encoder, **settings) + "\n"

return dumped


def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""

dumped = encode_json(data) + "\n"

resp = make_response(dumped, code)
resp.headers.extend(headers or {})
return resp
Expand Down
119 changes: 0 additions & 119 deletions labthings/server/sockets.py

This file was deleted.

2 changes: 2 additions & 0 deletions labthings/server/sockets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .base import SocketSubscriber
from .gevent import Sockets, socket_handler_loop
93 changes: 93 additions & 0 deletions labthings/server/sockets/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-

from werkzeug.routing import Map, Rule
from werkzeug.exceptions import NotFound
from werkzeug.http import parse_cookie
from flask import request, current_app
import logging
from abc import ABC, abstractmethod

from ..representations import encode_json


class SocketSubscriber:
def __init__(self, ws):
self.ws = ws

def property_notify(self, viewcls):
if hasattr(viewcls, "get_value") and callable(viewcls.get_value):
property_value = viewcls().get_value()
else:
property_value = None

property_name = str(getattr(viewcls, "endpoint", "unknown"))

response = encode_json(
{"messageType": "propertyStatus", "data": {property_name: property_value},}
)

self.ws.send(response)


class BaseSockets(ABC):
def __init__(self, app=None):
#: Compatibility with 'Flask' application.
#: The :class:`~werkzeug.routing.Map` for this instance. You can use
#: this to change the routing converters after the class was created
#: but before any routes are connected.
self.url_map = Map()

#: Compatibility with 'Flask' application.
#: All the attached blueprints in a dictionary by name. Blueprints
#: can be attached multiple times so this dictionary does not tell
#: you how often they got attached.
self.blueprints = {}
self._blueprint_order = []

if app:
self.init_app(app)

@abstractmethod
def init_app(self, app):
pass

def route(self, rule, **options):
def decorator(view_func):
options.pop("endpoint", None)
self.add_url_rule(rule, view_func, **options)
return view_func

return decorator

def add_url_rule(self, rule, view_func, **options):
self.url_map.add(Rule(rule, endpoint=view_func))

def register_blueprint(self, blueprint, **options):
"""
Registers a blueprint for web sockets like for 'Flask' application.
Decorator :meth:`~flask.app.setupmethod` is not applied, because it
requires ``debug`` and ``_got_first_request`` attributes to be defined.
"""
first_registration = False

if blueprint.name in self.blueprints:
assert self.blueprints[blueprint.name] is blueprint, (
"A blueprint's name collision occurred between %r and "
'%r. Both share the same name "%s". Blueprints that '
"are created on the fly need unique names."
% (blueprint, self.blueprints[blueprint.name], blueprint.name)
)
else:
self.blueprints[blueprint.name] = blueprint
self._blueprint_order.append(blueprint)
first_registration = True

blueprint.register(self, options, first_registration)


def process_socket_message(message: str):
if message:
# return f"Recieved: {message}"
return None
else:
return None
Loading

0 comments on commit 3693c16

Please sign in to comment.