Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gevent-based websocket support #23

Merged
merged 16 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion examples/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def cleanup():
# Create LabThings Flask app
app, labthing = create_app(
__name__,
prefix="/api",
title="My Lab Device API",
description="Test LabThing-based API",
version="0.1.0",
Expand Down Expand Up @@ -46,5 +47,8 @@ def cleanup():
if __name__ == "__main__":
from labthings.server.wsgi import Server

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

server = Server(app)
server.run(host="0.0.0.0", port=5000, debug=True)
server.run(host="0.0.0.0", port=5000, debug=False)
10 changes: 10 additions & 0 deletions labthings/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
import logging

EXTENSION_NAME = "flask-labthings"

# Monkey patching is bad and should never be done
# import eventlet
# eventlet.monkey_patch()

from gevent import monkey

monkey.patch_all()
27 changes: 25 additions & 2 deletions labthings/server/decorators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from webargs import flaskparser
from functools import wraps, update_wrapper
from flask import make_response, abort, request
from werkzeug.wrappers import Response as ResponseBase
from http import HTTPStatus
from marshmallow.exceptions import ValidationError
from collections import Mapping
Expand All @@ -9,6 +10,7 @@
from .schema import TaskSchema, Schema, FieldSchema
from .fields import Field
from .view import View
from .find import current_labthing

import logging

Expand Down Expand Up @@ -102,7 +104,7 @@ def ThingAction(viewcls: View):
Returns:
View: View class with Action spec tags
"""
# Pass params to call function attribute for external access
# Update Views API spec
update_spec(viewcls, {"tags": ["actions"]})
update_spec(viewcls, {"_groups": ["actions"]})
return viewcls
Expand All @@ -120,7 +122,28 @@ def ThingProperty(viewcls):
Returns:
View: View class with Property spec tags
"""
# Pass params to call function attribute for external access

def property_notify(func):
@wraps(func)
def wrapped(*args, **kwargs):
# Call the update function first to update property value
original_response = func(*args, **kwargs)

# Once updated, then notify all subscribers
subscribers = getattr(current_labthing(), "subscribers", [])
for sub in subscribers:
sub.property_notify(viewcls)
return original_response

return wrapped

if hasattr(viewcls, "post") and callable(viewcls.post):
viewcls.post = property_notify(viewcls.post)

if hasattr(viewcls, "put") and callable(viewcls.put):
viewcls.put = property_notify(viewcls.put)

# Update Views API spec
update_spec(viewcls, {"tags": ["properties"]})
update_spec(viewcls, {"_groups": ["properties"]})
return viewcls
Expand Down
24 changes: 19 additions & 5 deletions labthings/server/labthing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .spec.utilities import get_spec
from .spec.td import ThingDescription
from .decorators import tag
from .sockets import Sockets
from .sockets import Sockets, SocketSubscriber, socket_handler_loop

from .views.extensions import ExtensionList
from .views.tasks import TaskList, TaskView
Expand Down Expand Up @@ -38,6 +38,10 @@ def __init__(
self.extensions = {}

self.views = []
self._property_views = {}
self._action_views = {}

self.subscribers = set()

self.endpoints = set()

Expand Down Expand Up @@ -130,12 +134,20 @@ def _create_base_routes(self):
self.add_view(TaskView, "/tasks/<task_id>", endpoint=TASK_ENDPOINT)

def _create_base_sockets(self):
self.sockets.add_url_rule("/", self._socket_handler)
self.sockets.add_url_rule(f"{self.url_prefix}", self._socket_handler)

def _socket_handler(self, ws):
while not ws.closed:
message = ws.receive()
ws.send("Web sockets not yet implemented")
# Create a socket subscriber
wssub = SocketSubscriber(ws)
self.subscribers.add(wssub)
logging.info(f"Added subscriber {wssub}")
logging.debug(list(self.subscribers))
# Start the socket connection handler loop
socket_handler_loop(ws)
# Remove the subscriber once the loop returns
self.subscribers.remove(wssub)
logging.info(f"Removed subscriber {wssub}")
logging.debug(list(self.subscribers))

# Device stuff

Expand Down Expand Up @@ -277,8 +289,10 @@ def _register_view(self, app, view, *urls, endpoint=None, **kwargs):
view_groups = view_spec.get("_groups", {})
if "actions" in view_groups:
self.thing_description.action(flask_rules, view)
self._action_views[view.endpoint] = view
if "properties" in view_groups:
self.thing_description.property(flask_rules, view)
self._property_views[view.endpoint] = view

# Utilities

Expand Down
12 changes: 10 additions & 2 deletions labthings/server/representations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from ..core.utilities import PY3


def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""
def encode_json(data):
"""Makes JSON encoded data using the current Flask apps JSON settings"""

settings = current_app.config.get("LABTHINGS_JSON", {})
encoder = current_app.json_encoder
Expand All @@ -21,6 +21,14 @@ def output_json(data, code, headers=None):
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(data, cls=encoder, **settings) + "\n"

return dumped


def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""

dumped = encode_json(data) + "\n"

resp = make_response(dumped, code)
resp.headers.extend(headers or {})
return resp
Expand Down
119 changes: 0 additions & 119 deletions labthings/server/sockets.py

This file was deleted.

2 changes: 2 additions & 0 deletions labthings/server/sockets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .base import SocketSubscriber
from .gevent import Sockets, socket_handler_loop
93 changes: 93 additions & 0 deletions labthings/server/sockets/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-

from werkzeug.routing import Map, Rule
from werkzeug.exceptions import NotFound
from werkzeug.http import parse_cookie
from flask import request, current_app
import logging
from abc import ABC, abstractmethod

from ..representations import encode_json


class SocketSubscriber:
def __init__(self, ws):
self.ws = ws

def property_notify(self, viewcls):
if hasattr(viewcls, "get_value") and callable(viewcls.get_value):
property_value = viewcls().get_value()
else:
property_value = None

property_name = str(getattr(viewcls, "endpoint", "unknown"))

response = encode_json(
{"messageType": "propertyStatus", "data": {property_name: property_value},}
)

self.ws.send(response)


class BaseSockets(ABC):
def __init__(self, app=None):
#: Compatibility with 'Flask' application.
#: The :class:`~werkzeug.routing.Map` for this instance. You can use
#: this to change the routing converters after the class was created
#: but before any routes are connected.
self.url_map = Map()

#: Compatibility with 'Flask' application.
#: All the attached blueprints in a dictionary by name. Blueprints
#: can be attached multiple times so this dictionary does not tell
#: you how often they got attached.
self.blueprints = {}
self._blueprint_order = []

if app:
self.init_app(app)

@abstractmethod
def init_app(self, app):
pass

def route(self, rule, **options):
def decorator(view_func):
options.pop("endpoint", None)
self.add_url_rule(rule, view_func, **options)
return view_func

return decorator

def add_url_rule(self, rule, view_func, **options):
self.url_map.add(Rule(rule, endpoint=view_func))

def register_blueprint(self, blueprint, **options):
"""
Registers a blueprint for web sockets like for 'Flask' application.
Decorator :meth:`~flask.app.setupmethod` is not applied, because it
requires ``debug`` and ``_got_first_request`` attributes to be defined.
"""
first_registration = False

if blueprint.name in self.blueprints:
assert self.blueprints[blueprint.name] is blueprint, (
"A blueprint's name collision occurred between %r and "
'%r. Both share the same name "%s". Blueprints that '
"are created on the fly need unique names."
% (blueprint, self.blueprints[blueprint.name], blueprint.name)
)
else:
self.blueprints[blueprint.name] = blueprint
self._blueprint_order.append(blueprint)
first_registration = True

blueprint.register(self, options, first_registration)


def process_socket_message(message: str):
if message:
# return f"Recieved: {message}"
return None
else:
return None
Loading