Skip to content

Commit

Permalink
add processing server…
Browse files Browse the repository at this point in the history
- add `--server` option to CLI decorator
- implement via new `ocrd.server.ProcessingServer`:
  - based on gunicorn (for preforking directly from
    configured CLI in Python, but instantiating the
    processor after forking to avoid any shared GPU
    context)
  - using multiprocessing.Lock and Manager to lock
    (synchronize) workspaces among workers
  - using signal.alarm for worker timeout mechanics
  - using pre- and post-fork hooks for GPU- vs CPU-
    worker mechanics
  - doing Workspace validation within the request
  • Loading branch information
bertsky committed Jun 15, 2021
1 parent cac80d6 commit 6263bb1
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 12 deletions.
43 changes: 31 additions & 12 deletions ocrd/ocrd/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,27 @@

from ..resolver import Resolver
from ..processor.base import run_processor
from ..server import ProcessingServer

from .loglevel_option import ocrd_loglevel
from .parameter_option import parameter_option, parameter_override_option
from .ocrd_cli_options import ocrd_cli_options
from .mets_find_options import mets_find_options

def ocrd_cli_wrap_processor(
processorClass,
ocrd_tool=None,
mets=None,
working_dir=None,
dump_json=False,
help=False, # pylint: disable=redefined-builtin
version=False,
overwrite=False,
show_resource=None,
list_resources=False,
**kwargs
):
processorClass,
ocrd_tool=None,
mets=None,
working_dir=None,
server=None,
log_level=None,
dump_json=False,
help=False, # pylint: disable=redefined-builtin
version=False,
overwrite=False,
show_resource=None,
list_resources=False,
**kwargs):
if not sys.argv[1:]:
processorClass(workspace=None, show_help=True)
sys.exit(1)
Expand All @@ -46,6 +48,23 @@ def ocrd_cli_wrap_processor(
list_resources=list_resources
)
sys.exit()
elif server:
initLogging()
LOG = getLogger('ocrd_cli_wrap_processor')
# Merge parameter overrides and parameters
if 'parameter_override' in kwargs:
set_json_key_value_overrides(kwargs['parameter'], *kwargs['parameter_override'])
# instantiate processor without workspace
processorArgs = dict()
for param in kwargs:
if param in ['parameter', 'input_file_grp', 'output_file_grp', 'page_timeout']:
processorArgs[param] = kwargs[param]
host, port, workers = server
options = {'bind': '%s:%s' % (host, port),
'workers': workers,
'loglevel': log_level}
server = ProcessingServer(processorClass, processorArgs, options)
server.run()
else:
initLogging()
LOG = getLogger('ocrd_cli_wrap_processor')
Expand Down
1 change: 1 addition & 0 deletions ocrd/ocrd/decorators/ocrd_cli_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def cli(mets_url):
option('-O', '--output-file-grp', help='File group(s) used as output.', default='OUTPUT'),
option('-g', '--page-id', help="ID(s) of the pages to process"),
option('--overwrite', help="Overwrite the output file group or a page range (--page-id)", is_flag=True, default=False),
option('-s', '--server', help='Run web server instead of one-shot processing (shifts mets/working-dir/page-id options to HTTP request arguments); pass network interface to bind to, TCP port, number of worker processes', nargs=3),
option('-C', '--show-resource', help='Dump the content of processor resource RESNAME', metavar='RESNAME'),
option('-L', '--list-resources', is_flag=True, default=False, help='List names of processor resources'),
parameter_option,
Expand Down
1 change: 1 addition & 0 deletions ocrd/ocrd/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Processor,
)
from .helpers import (
run_api,
run_cli,
run_processor,
generate_processor_help
Expand Down
1 change: 1 addition & 0 deletions ocrd/ocrd/processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
input_file_grp=None,
output_file_grp=None,
page_id=None,
server=None,
show_resource=None,
list_resources=False,
show_help=False,
Expand Down
4 changes: 4 additions & 0 deletions ocrd/ocrd/processor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ def wrap(s):
or JSON file path
-P, --param-override KEY VAL Override a single JSON object key-value pair,
taking precedence over --parameter
-s, --server HOST PORT WORKERS Run web server instead of one-shot processing
(shifts mets/working-dir/page-id options to
HTTP request arguments); pass network interface
to bind to, TCP port, number of worker processes
-m, --mets URL-PATH URL or file path of METS to process
-w, --working-dir PATH Working directory of local workspace
-l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
Expand Down
186 changes: 186 additions & 0 deletions ocrd/ocrd/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""
Flask application and gunicorn processing server for Processor
"""
import os
import signal
import multiprocessing as mp
import atexit
import json
import flask
import gunicorn.app.base

from ocrd_validators import WorkspaceValidator
from ocrd_utils import getLogger
from ocrd.task_sequence import ProcessorTask
from .processor import run_api
from . import Resolver

class ProcessingServer(gunicorn.app.base.BaseApplication):

def __init__(self, processorClass, processorArgs, options=None):
# happens in pre-fork context
self.options = options or {'bind': '127.0.0.1:5000', 'workers': 1}
# TODOs:
# - add 'CUDA_VISIBLE_DEVICES' to 'raw_env' to options (server level instead of worker level)
# - customize 'errorlog' (over stdout) in options
# - customize 'accesslog' (over None) in options
self.options['accesslog'] = '-'
self.options['access_log_format'] = '%(t)s "%(r)s" %(s)s %(b)s "%(T)s"'
# - customize 'logger_class' in options
# - customize 'logconfig' or 'logconfig_dict' in options
# - customize 'access_log_format' in options
self.options['timeout'] = 0 # disable (timeout managed by workers on request level)
self.options['preload_app'] = False # instantiate workers independently
self.options['pre_fork'] = pre_fork # see below
self.options['post_fork'] = post_fork # see below
self.options['pre_request'] = pre_request # see below
self.options['post_request'] = post_request # see below
self.options['worker_abort'] = worker_abort # see below
self.processor_cls = processorClass
self.processor_opt = processorArgs
self.master_pid = os.getpid()
manager = mp.Manager()
self.master_lock = manager.Lock()
self.master_cache = manager.dict()
# (Manager creates an additional mp.Process on __enter__,
# and registers an atexit handler joining that in __exit__,
# but our forked workers inherit this. To prevent attempting
# to join a non-child, we need to remove that in post_fork.)
super().__init__()

def load_config(self):
config = {key: value for key, value in self.options.items()
if key in self.cfg.settings and value is not None}
for key, value in config.items():
self.cfg.set(key.lower(), value)

def load(self):
# happens in (forked) worker context (because preload_app=False)
# instantiate
self.obj = self.processor_cls(None, **self.processor_opt)
self.exe = self.obj.ocrd_tool['executable']
self.res = Resolver()
self.log = getLogger('ocrd.processor.server')
self.app = flask.Flask(self.exe)
# add routes
self.app.add_url_rule('/process', None, self.process)
self.app.add_url_rule('/list-tasks', None, self.list_tasks)
self.app.add_url_rule('/shutdown', None, self.shutdown)
return self.app

def process(self):
self.log.debug("Processing request: %s", str(flask.request))
if flask.request.args.get("mets"):
mets = flask.request.args["mets"]
else:
return 'Error: No METS', 400
# prevent multiple concurrent requests to the same workspace/METS
if not self.lock(mets):
return 'Error: Locked METS', 423
if flask.request.args.get('page_id'):
page_id = flask.request.args["page_id"]
else:
page_id = ''
# if flask.request.args.get('log_level'):
# log_level = flask.request.args["log_level"]
# else:
# log_level = None
if flask.request.args.get('overwrite'):
overwrite = flask.request.args["overwrite"] in ["True", "true", "1"]
else:
overwrite = False
try:
workspace = self.res.workspace_from_url(mets)
workspace.overwrite_mode = overwrite
report = WorkspaceValidator.check_file_grp(
workspace,
self.obj.input_file_grp,
'' if overwrite else self.obj.output_file_grp,
page_id)
if not report.is_valid:
raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors))
if page_id:
npages = len(page_id.split(','))
else:
npages = len(workspace.mets.physical_pages)
# allow no more than page_timeout before restarting worker:
timeout = getattr(self.obj, 'page_timeout', 0)
timeout *= npages
self.log.info("Processing %s on %d pages of '%s' (timeout=%ds)", self.exe, npages, mets, timeout)
with Timeout(timeout, "processing %s on %s cancelled after %d seconds on %d pages" % (
self.exe, mets, timeout, npages)):
# run the workflow
error = run_api(self.obj, workspace, page_id)
if error:
raise error
workspace.save_mets()
except Exception as e:
self.log.exception("Request '%s' failed", str(flask.request.args))
self.unlock(mets)
return 'Failed: %s' % str(e), 500
self.unlock(mets)
return 'Finished'

def list_tasks(self):
task = ProcessorTask(self.exe, [self.obj.input_file_grp], [self.obj.output_file_grp], self.obj.parameter)
return str(task) + '\n'

def shutdown(self):
self.log.debug("Shutting down")
os.kill(self.master_pid, signal.SIGTERM)
return 'Stopped'

def lock(self, resource):
with self.master_lock:
if resource in self.master_cache:
return False
self.master_cache[resource] = True
return True
def unlock(self, resource):
with self.master_lock:
del self.master_cache[resource]

class Timeout:
def __init__(self, seconds, message):
self.seconds = seconds
self.message = message
def _handler(self, signum, stack):
raise TimeoutError(self.message)
def __enter__(self):
signal.signal(signal.SIGALRM, self._handler)
signal.alarm(self.seconds)
def __exit__(self, *args):
signal.alarm(0)

def pre_fork(server, worker):
# happens when worker (but not app/processor) was instantiated (but not forked yet)
worker.num_workers = server.num_workers # nominal value
worker.worker_id = len(server.WORKERS) + 1 # actual value

def post_fork(server, worker):
# happens when worker (but not app/processor) was was instantiated (and forked)
# remove atexit handler for multiprocessing.Manager process
atexit.unregister(mp.util._exit_function)
# differentiate GPU workers from CPU workers via envvar
if "CUDA_WORKERS" in os.environ:
cuda_workers = int(os.environ["CUDA_WORKERS"])
assert cuda_workers <= worker.num_workers, \
"CUDA_WORKERS[%d] <= workers[%d] violated" % (cuda_workers, worker.num_workers)
else:
cuda_workers = worker.num_workers
if worker.worker_id > cuda_workers:
worker.log.debug("Setup for worker %d (non-CUDA)", worker.worker_id)
os.environ["CUDA_VISIBLE_DEVICES"] = "" # avoid GPU
else:
worker.log.debug("Setup for worker %d (normal)", worker.worker_id)

def pre_request(worker, req):
worker.log.debug("%s %s at worker %d" % (req.method, req.path, worker.worker_id))

def post_request(worker, req, env, res):
worker.log.debug("%s %s at worker %d: %s" % (req.method, req.path, worker.worker_id, res))

def worker_abort(worker):
worker.log.debug("aborting worker %s", worker)
# FIXME: skip/fallback remaining pages, save_mets, signalling ...
# worker.app.obj.clean_up()
1 change: 1 addition & 0 deletions ocrd/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ requests
lxml
opencv-python-headless
Flask
gunicorn
uwsgi
jsonschema
pyyaml
Expand Down

0 comments on commit 6263bb1

Please sign in to comment.