Skip to content

Commit

Permalink
Native threaded concurrency (#120)
Browse files Browse the repository at this point in the history
* Converted to native threads for concurrency

* Added elegant action stopping

* Added timeout to Action DELETE requests

* Switch to thread concurrency

* Updated tests and fixed bugs

* Removed assert outside of tests

* Autofix issues in 1 files

Resolved issues in the following files via DeepSource Autofix:
1. src/labthings/wsgi.py

* Fixed legacy import

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
  • Loading branch information
jtc42 and deepsource-autofix[bot] authored Jul 15, 2020
1 parent 5f2da89 commit 506a026
Show file tree
Hide file tree
Showing 22 changed files with 385 additions and 791 deletions.
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
branch = True
source = ./src/labthings
omit = .venv/*, ./src/labthings/wsgi.py, ./src/labthings/monkey.py, ./src/labthings/server/*, ./src/labthings/core/*
concurrency = greenlet
concurrency = thread

[report]
# Regexes for lines to exclude from consideration
Expand Down
4 changes: 4 additions & 0 deletions examples/components/pdf_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import math
import time

from labthings.tasks import current_task

"""
Class for our lab component functionality. This could include serial communication,
equipment API calls, network requests, or a "virtual" device as seen here.
Expand Down Expand Up @@ -44,6 +46,8 @@ def average_data(self, n: int):
summed_data = self.data

for _ in range(n):
if current_task() and current_task().stopped:
return summed_data
summed_data = [summed_data[i] + el for i, el in enumerate(self.data)]
time.sleep(0.25)

Expand Down
236 changes: 15 additions & 221 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ marshmallow = "^3.4.0"
webargs = "^6.0.0"
apispec = "^3.2.0"
flask-cors = "^3.0.8"
gevent = ">=1.4,<21.0"
gevent-websocket = "^0.10.1"
zeroconf = ">=0.24.5,<0.29.0"
flask-threaded-sockets = "^0.1.0"

[tool.poetry.dev-dependencies]
pytest = "^5.4"
Expand Down
11 changes: 8 additions & 3 deletions src/labthings/default_views/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from ..view import View
from ..view.marshalling import marshal_with
from ..view.args import use_args
from ..schema import ActionSchema
from ..find import current_thing
from .. import fields


class ActionQueue(View):
Expand All @@ -12,7 +14,7 @@ class ActionQueue(View):
"""

def get(self):
return ActionSchema(many=True).dump(current_thing.actions.greenlets)
return ActionSchema(many=True).dump(current_thing.actions.threads)


class ActionView(View):
Expand All @@ -38,19 +40,22 @@ def get(self, task_id):

return ActionSchema().dump(task)

def delete(self, task_id):
@use_args({"timeout": fields.Int(missing=5)})
def delete(self, args, task_id):
"""
Terminate a running task.
If the task is finished, deletes its entry.
"""
timeout = args.get("timeout", 5)
task_dict = current_thing.actions.to_dict()

if task_id not in task_dict:
return abort(404) # 404 Not Found

task = task_dict.get(task_id)

task.kill(block=True, timeout=3)
# TODO: Make non-blocking?
task.stop(timeout=timeout)

return ActionSchema().dump(task)
5 changes: 2 additions & 3 deletions src/labthings/default_views/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get(self):
logging.warning(
"TaskList is deprecated and will be removed in a future version. Use the Actions list instead."
)
return TaskSchema(many=True).dump(current_thing.actions.greenlets)
return TaskSchema(many=True).dump(current_thing.actions.threads)


class TaskView(View):
Expand Down Expand Up @@ -64,7 +64,6 @@ def delete(self, task_id):
return abort(404) # 404 Not Found

task = task_dict.get(task_id)

task.kill(block=True, timeout=3)
task.stop(timeout=5)

return TaskSchema().dump(task)
4 changes: 2 additions & 2 deletions src/labthings/labthing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from flask import url_for
from flask_threaded_sockets.flask import Sockets
from apispec import APISpec

# from apispec.ext.marshmallow import MarshmallowPlugin
Expand All @@ -19,7 +20,6 @@
from .representations import DEFAULT_REPRESENTATIONS
from .apispec import MarshmallowPlugin, rule_to_apispec_path
from .td import ThingDescription
from .sockets import Sockets
from .event import Event

from .tasks import Pool
Expand Down Expand Up @@ -61,7 +61,7 @@ def __init__(

self.extensions = {}

self.actions = Pool() # Pool of greenlets for Actions
self.actions = Pool() # Pool of threads for Actions

self.events = {}

Expand Down
3 changes: 2 additions & 1 deletion src/labthings/server/sockets.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from ..sockets import Sockets, SocketSubscriber
from ..sockets import SocketSubscriber
from flask_threaded_sockets.flask import Sockets
171 changes: 0 additions & 171 deletions src/labthings/sockets.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
"""
Once upon a time, based on flask-websocket; Copyright (C) 2013 Kenneth Reitz
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""

# -*- coding: utf-8 -*-

from werkzeug.exceptions import NotFound
from werkzeug.http import parse_cookie
from flask import request

from flask.helpers import _endpoint_from_view_func
from werkzeug.routing import Map, Rule, BuildError

from .representations import encode_json


Expand All @@ -25,158 +9,3 @@ def emit(self, event: dict):
response = encode_json(event)
# TODO: Logic surrounding if this subscriber is subscribed to the requested event type
self.ws.send(response)


class WsUrlAdapterWrapper(object):
def __init__(self, app_adapter, sockets_adapter):
self.__app_adapter = app_adapter
self.__sockets_adapter = sockets_adapter

def build(
self,
endpoint,
values=None,
method=None,
force_external=False,
append_unknown=True,
):
try:
return (
"ws"
+ self.__sockets_adapter.build(
endpoint=endpoint,
values=values,
method=None,
force_external=True,
append_unknown=append_unknown,
)[4:]
)
except BuildError:
return self.__app_adapter.build(
endpoint=endpoint,
values=values,
method=method,
force_external=force_external,
append_unknown=append_unknown,
)

def __getattr__(self, attr):
fun = getattr(self.__app_adapter, attr)
setattr(self, attr, fun)
return fun


class Sockets:
def __init__(self, app=None):
#: Compatibility with 'Flask' application.
#: The :class:`~werkzeug.routing.Map` for this instance. You can use
#: this to change the routing converters after the class was created
#: but before any routes are connected.
self.url_map = Map()

#: Compatibility with 'Flask' application.
#: All the attached blueprints in a dictionary by name. Blueprints
#: can be attached multiple times so this dictionary does not tell
#: you how often they got attached.
self.blueprints = {}
self._blueprint_order = []

self.view_functions = {}

if app:
self.init_app(app)

def __create_url_adapter(self, url_map, request):
if request is not None:
return url_map.bind_to_environ(
request.environ, server_name=self.app.config["SERVER_NAME"]
)
elif self.app.config["SERVER_NAME"] is not None:
return url_map.bind(
self.app.config["SERVER_NAME"],
script_name=self.app.config["APPLICATION_ROOT"] or "/",
url_scheme=self.app.config["PREFERRED_URL_SCHEME"],
)

def create_url_adapter(self, request):
adapter_for_app = self.__create_url_adapter(self.app.url_map, request)
adapter_for_sockets = self.__create_url_adapter(self.url_map, request)
return WsUrlAdapterWrapper(adapter_for_app, adapter_for_sockets)

def init_app(self, app):
self.app = app
self.app_wsgi_app = app.wsgi_app

app.wsgi_app = self.wsgi_app
app.create_url_adapter = self.create_url_adapter

def route(self, rule, **options):
def decorator(f):
endpoint = options.pop("endpoint", None)
self.add_url_rule(rule, endpoint, f, **options)
return f

return decorator

def add_url_rule(self, rule, endpoint, f, **options):
if endpoint is None:
endpoint = _endpoint_from_view_func(f)

methods = options.pop("methods", None)

setattr(f, "endpoint", endpoint)

self.url_map.add(Rule(rule, endpoint=endpoint, **options))
self.view_functions[endpoint] = f

if methods is None:
methods = []
self.app.add_url_rule(rule, endpoint, f, methods=methods, **options)

def add_view(self, url, f, endpoint=None, **options):
return self.add_url_rule(url, endpoint, f, **options)

def register_blueprint(self, blueprint, **options):
"""
Registers a blueprint for web sockets like for 'Flask' application.
Decorator :meth:`~flask.app.setupmethod` is not applied, because it
requires ``debug`` and ``_got_first_request`` attributes to be defined.
"""
first_registration = False

if blueprint.name in self.blueprints:
assert self.blueprints[blueprint.name] is blueprint, (
"A blueprint's name collision occurred between %r and "
'%r. Both share the same name "%s". Blueprints that '
"are created on the fly need unique names."
% (blueprint, self.blueprints[blueprint.name], blueprint.name)
)
else:
self.blueprints[blueprint.name] = blueprint
self._blueprint_order.append(blueprint)
first_registration = True

blueprint.register(self, options, first_registration)

def wsgi_app(self, environ, start_response):
adapter = self.url_map.bind_to_environ(environ)
try:
# Find handler view function
endpoint, values = adapter.match()
handler = self.view_functions[endpoint]

# Handle environment
environment = environ["wsgi.websocket"]
cookie = None
if "HTTP_COOKIE" in environ:
cookie = parse_cookie(environ["HTTP_COOKIE"])

with self.app.app_context():
with self.app.request_context(environ):
# add cookie to the request to have correct session handling
request.cookie = cookie
# Run WebSocket handler
handler(environment, **values)
return []
except (NotFound, KeyError):
return self.app_wsgi_app(environ, start_response)
25 changes: 8 additions & 17 deletions src/labthings/sync/event.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from gevent.hub import getcurrent
import gevent
import time
import logging
from gevent.lock import BoundedSemaphore

from gevent.event import Event
import threading
from _thread import get_ident


class ClientEvent(object):
Expand All @@ -17,25 +15,18 @@ class ClientEvent(object):

def __init__(self):
self.events = {}
self._setting_lock = BoundedSemaphore()
self._setting_lock = threading.Lock()

def wait(self, timeout: int = 5):
"""Wait for the next data frame (invoked from each client's thread)."""
ident = id(getcurrent())
ident = get_ident()
if ident not in self.events:
# this is a new client
# add an entry for it in the self.events dict
# each entry has two elements, a threading.Event() and a timestamp
self.events[ident] = [Event(), time.time()]
self.events[ident] = [threading.Event(), time.time()]

# We have to reimplement event waiting here as we need native thread events to allow gevent context switching
wait_start = time.time()
while not self.events[ident][0].is_set():
now = time.time()
if now - wait_start > timeout:
return False
gevent.sleep(0)
return True
return self.events[ident][0].wait(timeout=timeout)

def set(self, timeout=5):
"""Signal that a new frame is available."""
Expand All @@ -61,10 +52,10 @@ def set(self, timeout=5):

def clear(self):
"""Clear frame event, once processed."""
ident = id(getcurrent())
ident = get_ident()
if ident not in self.events:
logging.error(f"Mismatched ident. Current: {ident}, available:")
logging.error(self.events.keys())
return False
self.events[id(getcurrent())][0].clear()
self.events[get_ident()][0].clear()
return True
Loading

0 comments on commit 506a026

Please sign in to comment.