From 79c217ec8191a785cf6ae7cb40290248d8387f38 Mon Sep 17 00:00:00 2001 From: Stefan van der Walt Date: Tue, 13 Mar 2018 17:44:32 -0700 Subject: [PATCH 01/26] WIP: replace build queue with ZeroMQ message bus --- build_papers.py | 16 ++++++++ procbuild/__init__.py | 2 +- procbuild/message_proxy.py | 21 ++++++++++ procbuild/server.py | 79 +++++++++++++++++++++++--------------- procbuild/test_listen.py | 21 ++++++++++ procbuild/test_submit.py | 17 ++++++++ runserver.py | 2 +- 7 files changed, 126 insertions(+), 32 deletions(-) create mode 100644 build_papers.py create mode 100644 procbuild/message_proxy.py create mode 100644 procbuild/test_listen.py create mode 100644 procbuild/test_submit.py diff --git a/build_papers.py b/build_papers.py new file mode 100644 index 0000000..f13cf88 --- /dev/null +++ b/build_papers.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python + +# Schedule some or all papers for build + +from procbuild import get_papers, paper_queue +import sys + +if len(sys.argv) > 1: + to_build = sys.argv[1:] +else: + to_build = [int(nr) for nr, info in get_papers()] + +for p in to_build: + print("Placing %s in the build queue." % p) + + ### TODO: submit message to zmq bus diff --git a/procbuild/__init__.py b/procbuild/__init__.py index 7f81fe0..3f8870d 100644 --- a/procbuild/__init__.py +++ b/procbuild/__init__.py @@ -14,4 +14,4 @@ # --- Customize these variables --- -__all__ = ['app', 'log', 'MASTER_BRANCH', 'paper_queue'] +#__all__ = ['app', 'log', 'MASTER_BRANCH'] diff --git a/procbuild/message_proxy.py b/procbuild/message_proxy.py new file mode 100644 index 0000000..eadd626 --- /dev/null +++ b/procbuild/message_proxy.py @@ -0,0 +1,21 @@ +# http://zguide.zeromq.org/page:all#The-Dynamic-Discovery-Problem + +import zmq +from . import package_dir + + +IN = f'ipc://{package_dir}/queue.in' +OUT = f'ipc://{package_dir}/queue.out' + + +if __name__ == "__main__": + context = zmq.Context() + + feed_in = context.socket(zmq.PULL) + feed_in.bind(IN) + + feed_out = context.socket(zmq.PUB) + feed_out.bind(OUT) + + print('[message_proxy] Forwarding messages between {} and {}'.format(IN, OUT)) + zmq.proxy(feed_in, feed_out) diff --git a/procbuild/server.py b/procbuild/server.py index 79f0530..aa4c8a5 100644 --- a/procbuild/server.py +++ b/procbuild/server.py @@ -6,14 +6,20 @@ import os import io import time -import inspect +import inspect import codecs from os.path import join as joinp from glob import glob from flask import Flask -from multiprocessing import Process, Queue +import subprocess + +import zmq +import random +import json + +from .message_proxy import IN from . import ALLOW_MANUAL_BUILD_TRIGGER, MASTER_BRANCH @@ -22,6 +28,14 @@ from .utils import file_age, status_file, log +print("Connecting to message bus") +ctx = zmq.Context() +socket = ctx.socket(zmq.PUSH) +socket.connect(IN) + + + + def status_from_cache(nr): papers = get_papers() if nr == '*': @@ -58,11 +72,8 @@ def status_from_cache(nr): app = Flask(__name__) -print("Setting up build queue...") - -paper_queue_size = 0 -paper_queue = {0: Queue(), 1: paper_queue_size} - +print("Starting up build queue...") +subprocess.Popen(['python', '-m', 'procbuild.message_proxy']) @app.route('/') def index(): @@ -76,23 +87,24 @@ def index(): allow_manual_build_trigger=ALLOW_MANUAL_BUILD_TRIGGER) -def _process_queue(queue): - done = False - while not done: - nr = queue.get() - if nr is None: - log("Sentinel found in queue. Ending queue monitor.") - done = True - else: - log("Queue yielded paper #%d." % nr) - _build_worker(nr) +# TODO: MOVE THIS TO THE LISTENER +# +# def _process_queue(queue): +# done = False +# while not done: +# nr = queue.get() +# if nr is None: +# log("Sentinel found in queue. Ending queue monitor.") +# done = True +# else: +# log("Queue yielded paper #%d." % nr) +# _build_worker(nr) def monitor_queue(): - print("Launching queue monitoring process...") - p = Process(target=_process_queue, kwargs=dict(queue=paper_queue[0])) - p.start() - + print("Launching queue monitoring...") + ## TODO: Add logging to this subprocess + subprocess.Popen(['python', '-m', 'procbuild.test_listen']) def dummy_build(nr): return jsonify({'status': 'fail', 'message': 'Not authorized'}) @@ -106,12 +118,19 @@ def real_build(nr): return jsonify({'status': 'fail', 'message': 'Invalid paper specified'}) - if paper_queue[1] >= 50: - return jsonify({'status': 'fail', - 'message': 'Build queue is currently full.'}) +## TODO: Move check to the listener +# +# if paper_queue[1] >= 50: +# return jsonify({'status': 'fail', +# 'message': 'Build queue is currently full.'}) + + message = ['build_queue', json.dumps({'build_paper': nr})] - paper_queue[0].put(int(nr)) - paper_queue[1] += 1 + # TODO: remove after debugging + print('Submitting:', message) + + # TODO: Error checking around this send? + socket.send_multipart([m.encode('utf-8') for m in message]) return jsonify({'status': 'success', 'data': {'info': 'Build for paper %s scheduled. Note that ' @@ -173,10 +192,10 @@ def killer(process, timeout): p.join() k.terminate() -@app.route('/build_queue_size') -def print_build_queue(nr=None): - - return jsonify(paper_queue[1]) +#@app.route('/build_queue_size') +#def print_build_queue(nr=None): +# +# return jsonify(paper_queue[1]) @app.route('/status') @app.route('/status/') diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py new file mode 100644 index 0000000..9e1f2cb --- /dev/null +++ b/procbuild/test_listen.py @@ -0,0 +1,21 @@ +import zmq +import json + + +from .message_proxy import OUT + +def handle_message(data): + print('Message received:', data) + +ctx = zmq.Context.instance() +socket = ctx.socket(zmq.SUB) +socket.connect(OUT) + +socket.setsockopt(zmq.SUBSCRIBE, 'build_queue'.encode('utf-8')) + +if __name__ == "__main__": + print('Listening for incoming messages...') + while True: + msg = socket.recv_multipart() + target, payload = msg + print('received', json.loads(payload.decode('utf-8'))) diff --git a/procbuild/test_submit.py b/procbuild/test_submit.py new file mode 100644 index 0000000..a83c55e --- /dev/null +++ b/procbuild/test_submit.py @@ -0,0 +1,17 @@ +import zmq +import random +import json + +from .message_proxy import IN + + +ctx = zmq.Context() +socket = ctx.socket(zmq.PUSH) +socket.connect(IN) + + +if __name__ == "__main__": + for i in range(10): + message = ['build_queue', json.dumps({'build_paper': 0})] + print('Submitting:', message) + socket.send_multipart([m.encode('utf-8') for m in message]) diff --git a/runserver.py b/runserver.py index 877cf57..aaaf2be 100755 --- a/runserver.py +++ b/runserver.py @@ -2,7 +2,7 @@ # imports import os -from procbuild import app, monitor_queue +from procbuild.server import app, monitor_queue from waitress import serve # -- SERVER CONFIGURATION -- (can be overridden from shell) From e122da825601064c96cd672cf6519e8ca26aaea1 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Wed, 14 Mar 2018 17:34:05 -0700 Subject: [PATCH 02/26] move _build_worker out of the server module to the test_listen module --- procbuild/server.py | 44 ----------------------------- procbuild/test_listen.py | 61 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 47 deletions(-) diff --git a/procbuild/server.py b/procbuild/server.py index aa4c8a5..bff5afc 100644 --- a/procbuild/server.py +++ b/procbuild/server.py @@ -147,50 +147,6 @@ def build(*args, **kwarg): return dummy_build(*args, **kwarg) -def _build_worker(nr): - pr_info = get_pr_info() - pr = pr_info[int(nr)] - age = file_age(status_file(nr)) - min_wait = 0.5 - if not (age is None or age > min_wait): - log("Did not build paper %d--recently built." % nr) - return - - status_log = status_file(nr) - with io.open(status_log, 'wb') as f: - build_record = {'status': 'fail', - 'data': {'build_status': 'Building...', - 'build_output': 'Initializing build...', - 'build_timestamp': ''}} - json.dump(build_record, codecs.getwriter('utf-8')(f), ensure_ascii=False) - - - def build_and_log(*args, **kwargs): - build_manager = BuildManager(*args, **kwargs) - status = build_manager.build_paper() - with io.open(status_log, 'wb') as f: - json.dump(status, codecs.getwriter('utf-8')(f), ensure_ascii=False) - - p = Process(target=build_and_log, - kwargs=dict(user=pr['user'], - branch=pr['branch'], - master_branch=MASTER_BRANCH, - target=nr, log=log)) - p.start() - - def killer(process, timeout): - time.sleep(timeout) - try: - process.terminate() - except OSError: - pass - - k = Process(target=killer, args=(p, 180)) - k.start() - - # Wait for process to complete or to be killed - p.join() - k.terminate() #@app.route('/build_queue_size') #def print_build_queue(nr=None): diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index 9e1f2cb..e501678 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -1,8 +1,15 @@ import zmq import json +import io +import codecs +import time +from multiprocessing import Process - +from . import MASTER_BRANCH from .message_proxy import OUT +from .utils import file_age, status_file +from .pr_list import get_pr_info, log +from .builder import BuildManager def handle_message(data): print('Message received:', data) @@ -13,9 +20,57 @@ def handle_message(data): socket.setsockopt(zmq.SUBSCRIBE, 'build_queue'.encode('utf-8')) +def _build_worker(nr): + pr_info = get_pr_info() + pr = pr_info[int(nr)] + age = file_age(status_file(nr)) + min_wait = 0.5 + if not (age is None or age > min_wait): + log("Did not build paper %d--recently built." % nr) + return + + status_log = status_file(nr) + with io.open(status_log, 'wb') as f: + build_record = {'status': 'fail', + 'data': {'build_status': 'Building...', + 'build_output': 'Initializing build...', + 'build_timestamp': ''}} + json.dump(build_record, codecs.getwriter('utf-8')(f), ensure_ascii=False) + + + def build_and_log(*args, **kwargs): + build_manager = BuildManager(*args, **kwargs) + status = build_manager.build_paper() + with io.open(status_log, 'wb') as f: + json.dump(status, codecs.getwriter('utf-8')(f), ensure_ascii=False) + + p = Process(target=build_and_log, + kwargs=dict(user=pr['user'], + branch=pr['branch'], + master_branch=MASTER_BRANCH, + target=nr, log=log)) + p.start() + + def killer(process, timeout): + time.sleep(timeout) + try: + process.terminate() + except OSError: + pass + + k = Process(target=killer, args=(p, 180)) + k.start() + + # Wait for process to complete or to be killed + p.join() + k.terminate() + if __name__ == "__main__": print('Listening for incoming messages...') while True: msg = socket.recv_multipart() - target, payload = msg - print('received', json.loads(payload.decode('utf-8'))) + target, raw_payload= msg + payload = json.loads(raw_payload.decode('utf-8')) + print('received', payload) + paper_to_build = payload.get('build_paper', None) + _build_worker(paper_to_build) From f406d333f87c8a5462e4ea9bc46997a59ba35ef9 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Wed, 14 Mar 2018 17:34:39 -0700 Subject: [PATCH 03/26] reduce the number of times that test_submit submits a message --- procbuild/test_submit.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/procbuild/test_submit.py b/procbuild/test_submit.py index a83c55e..c8963c5 100644 --- a/procbuild/test_submit.py +++ b/procbuild/test_submit.py @@ -11,7 +11,7 @@ if __name__ == "__main__": - for i in range(10): - message = ['build_queue', json.dumps({'build_paper': 0})] - print('Submitting:', message) - socket.send_multipart([m.encode('utf-8') for m in message]) + # for i in range(10): + message = ['build_queue', json.dumps({'build_paper': 0})] + print('Submitting:', message) + socket.send_multipart([m.encode('utf-8') for m in message]) From 935290797aca362fe74fbc068aa796c820c472d6 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 18 Mar 2018 00:01:49 -0700 Subject: [PATCH 04/26] Refactor, rename and generally clean-up codebase - rename package_dir --> package_path - remove unused imports from builder, utils, pr_list.__init__, - move cache from builder to pr_list - move status_file from utils to pr_list - move status_from_cache from server to pr_list - change outdated_pr_list --> update_pr_list --- procbuild/__init__.py | 13 +----- procbuild/builder.py | 16 ++------ procbuild/message_proxy.py | 6 +-- procbuild/pr_list/__init__.py | 53 ++++++++++++++++++++++++- procbuild/server.py | 75 ++--------------------------------- procbuild/test_listen.py | 4 +- procbuild/utils.py | 10 ++--- 7 files changed, 69 insertions(+), 108 deletions(-) diff --git a/procbuild/__init__.py b/procbuild/__init__.py index 3f8870d..228f6d0 100644 --- a/procbuild/__init__.py +++ b/procbuild/__init__.py @@ -2,16 +2,7 @@ import os -package_dir = os.path.abspath(os.path.dirname(__file__)) +package_path = os.path.abspath(os.path.dirname(__file__)) MASTER_BRANCH = os.environ.get('MASTER_BRANCH', '2017') -ALLOW_MANUAL_BUILD_TRIGGER = bool(int( - os.environ.get('ALLOW_MANUAL_BUILD_TRIGGER', 1)) - ) - -#from .server import (app, log, get_papers, monitor_queue, -# MASTER_BRANCH, ALLOW_MANUAL_BUILD_TRIGGER) - -# --- Customize these variables --- - -#__all__ = ['app', 'log', 'MASTER_BRANCH'] +ALLOW_MANUAL_BUILD_TRIGGER = bool(int(os.environ.get('ALLOW_MANUAL_BUILD_TRIGGER', 1))) diff --git a/procbuild/builder.py b/procbuild/builder.py index dd8c44f..c210ca9 100644 --- a/procbuild/builder.py +++ b/procbuild/builder.py @@ -11,19 +11,9 @@ from os.path import join as joinp from glob import iglob -excluded = ['vanderwalt', '00_vanderwalt', 'jane_doe', 'bibderwalt', '00_intro'] - -base_path = os.path.abspath(os.path.dirname(__file__)) - +from . import package_path -def cache(path='../cache'): - cache_path = joinp(base_path, path) - try: - os.mkdir(cache_path) - except OSError as e: - pass - - return cache_path +excluded = ['vanderwalt', '00_vanderwalt', 'jane_doe', 'bibderwalt', '00_intro'] def repo(user='scipy'): @@ -121,7 +111,7 @@ def __init__(self, data_filenames = ['IEEEtran.cls', 'draftwatermark.sty', 'everypage.sty'] - self.data_files = [joinp(base_path, 'data', f) for f in data_filenames] + self.data_files = [joinp(package_path, 'data', f) for f in data_filenames] def add_output(self, msg): self.build_output += msg diff --git a/procbuild/message_proxy.py b/procbuild/message_proxy.py index eadd626..ef6c8b2 100644 --- a/procbuild/message_proxy.py +++ b/procbuild/message_proxy.py @@ -1,11 +1,11 @@ # http://zguide.zeromq.org/page:all#The-Dynamic-Discovery-Problem import zmq -from . import package_dir +from . import package_path -IN = f'ipc://{package_dir}/queue.in' -OUT = f'ipc://{package_dir}/queue.out' +IN = f'ipc://{package_path}/queue.in' +OUT = f'ipc://{package_path}/queue.out' if __name__ == "__main__": diff --git a/procbuild/pr_list/__init__.py b/procbuild/pr_list/__init__.py index 6237f95..3033e12 100644 --- a/procbuild/pr_list/__init__.py +++ b/procbuild/pr_list/__init__.py @@ -8,14 +8,63 @@ from os.path import join as joinp -from ..builder import cache +from .. import package_path from ..utils import file_age, log __all__ = ['fetch_PRs', 'update_papers'] + + +def cache(path='../cache'): + cache_path = joinp(package_path, path) + try: + os.mkdir(cache_path) + except OSError as e: + pass + + return cache_path + pr_list_file = joinp(cache(), 'pr_info.json') -def outdated_pr_list(expiry=1): +def status_file(nr): + return joinp(cache(), str(nr) + '.status') + + +def status_from_cache(nr): + papers = get_papers() + if nr == '*': + status_files = [status_file(i) for i in range(len(papers))] + else: + status_files = [status_file(nr)] + + data = {} + + for fn in status_files: + n = fn.split('/')[-1].split('.')[0] + + try: + papers[int(n)] + except: + data[n] = {'status': 'fail', + 'data': {'build_output': 'Invalid paper'}} + else: + status = {'status': 'fail', + 'data': {'build_output': 'No build info'}} + + if os.path.exists(fn): + with io.open(fn, 'r') as f: + try: + data[n] = json.load(f) + except ValueError: + pass + + # Unpack status if only one record requested + if nr != '*': + return data[nr] + else: + return data + +def update_pr_list(expiry=1): if not os.path.isfile(pr_list_file): update_papers() elif file_age(pr_list_file) > expiry: diff --git a/procbuild/server.py b/procbuild/server.py index bff5afc..e8a0cae 100644 --- a/procbuild/server.py +++ b/procbuild/server.py @@ -3,29 +3,14 @@ from flask import (render_template, url_for, send_file, jsonify, request, Flask) import json -import os -import io -import time -import inspect -import codecs - -from os.path import join as joinp -from glob import glob -from flask import Flask - import subprocess import zmq -import random -import json +from . import ALLOW_MANUAL_BUILD_TRIGGER from .message_proxy import IN - - -from . import ALLOW_MANUAL_BUILD_TRIGGER, MASTER_BRANCH -from .builder import BuildManager, cache, base_path -from .pr_list import outdated_pr_list, get_papers, get_pr_info -from .utils import file_age, status_file, log +from .pr_list import update_pr_list, get_papers, get_pr_info, status_from_cache +from .utils import log print("Connecting to message bus") @@ -33,44 +18,6 @@ socket = ctx.socket(zmq.PUSH) socket.connect(IN) - - - -def status_from_cache(nr): - papers = get_papers() - if nr == '*': - status_files = [status_file(i) for i in range(len(papers))] - else: - status_files = [status_file(nr)] - - data = {} - - for fn in status_files: - n = fn.split('/')[-1].split('.')[0] - - try: - papers[int(n)] - except: - data[n] = {'status': 'fail', - 'data': {'build_output': 'Invalid paper'}} - else: - status = {'status': 'fail', - 'data': {'build_output': 'No build info'}} - - if os.path.exists(fn): - with io.open(fn, 'r') as f: - try: - data[n] = json.load(f) - except ValueError: - pass - - # Unpack status if only one record requested - if nr != '*': - return data[nr] - else: - return data - - app = Flask(__name__) print("Starting up build queue...") subprocess.Popen(['python', '-m', 'procbuild.message_proxy']) @@ -78,7 +25,7 @@ def status_from_cache(nr): @app.route('/') def index(): # if it's never been built or is over 1 minute old, update_papers - outdated_pr_list(expiry=5) + update_pr_list(expiry=5) papers = get_papers() return render_template('index.html', papers=papers, @@ -87,20 +34,6 @@ def index(): allow_manual_build_trigger=ALLOW_MANUAL_BUILD_TRIGGER) -# TODO: MOVE THIS TO THE LISTENER -# -# def _process_queue(queue): -# done = False -# while not done: -# nr = queue.get() -# if nr is None: -# log("Sentinel found in queue. Ending queue monitor.") -# done = True -# else: -# log("Queue yielded paper #%d." % nr) -# _build_worker(nr) - - def monitor_queue(): print("Launching queue monitoring...") ## TODO: Add logging to this subprocess diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index e501678..105ee7d 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -7,8 +7,8 @@ from . import MASTER_BRANCH from .message_proxy import OUT -from .utils import file_age, status_file -from .pr_list import get_pr_info, log +from .utils import file_age, log +from .pr_list import get_pr_info, status_file from .builder import BuildManager def handle_message(data): diff --git a/procbuild/utils.py b/procbuild/utils.py index 4ec0c36..aa84347 100644 --- a/procbuild/utils.py +++ b/procbuild/utils.py @@ -3,15 +3,15 @@ import io import time -from datetime import datetime, timedelta +from datetime import datetime from os.path import join as joinp -from . import package_dir -from .builder import cache +from . import package_path + def log(message): print(message) - with io.open(joinp(package_dir, '../flask.log'), 'a') as f: + with io.open(joinp(package_path, '../flask.log'), 'a') as f: time_of_message = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) cf = inspect.currentframe().f_back where = f'{cf.f_code.co_filename}:{cf.f_lineno}' @@ -32,6 +32,4 @@ def file_age(fn): return delta.seconds / 60 -def status_file(nr): - return joinp(cache(), str(nr) + '.status') From 4002ba120708a87244614f9f1eac2cd5afc4741e Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 18 Mar 2018 00:16:24 -0700 Subject: [PATCH 05/26] change pr_list to be more resilient to removing the cache --- procbuild/pr_list/__init__.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/procbuild/pr_list/__init__.py b/procbuild/pr_list/__init__.py index 3033e12..c3d2a92 100644 --- a/procbuild/pr_list/__init__.py +++ b/procbuild/pr_list/__init__.py @@ -17,14 +17,11 @@ def cache(path='../cache'): cache_path = joinp(package_path, path) - try: - os.mkdir(cache_path) - except OSError as e: - pass - + os.makedirs(cache_path, exist_ok=True) return cache_path -pr_list_file = joinp(cache(), 'pr_info.json') +def get_pr_list_file(): + return joinp(cache(), 'pr_info.json') def status_file(nr): return joinp(cache(), str(nr) + '.status') @@ -64,18 +61,23 @@ def status_from_cache(nr): else: return data + def update_pr_list(expiry=1): - if not os.path.isfile(pr_list_file): + if not os.path.isfile(get_pr_list_file()): update_papers() - elif file_age(pr_list_file) > expiry: + elif file_age(get_pr_list_file()) > expiry: log("Updating papers...") update_papers() + def get_pr_info(): - with io.open(pr_list_file) as f: + if not os.path.exists(get_pr_list_file()): + update_papers() + with io.open(get_pr_list_file(), 'r') as f: pr_info = json.load(f) return pr_info + def get_papers(): return [(str(n), pr) for n, pr in enumerate(get_pr_info())] @@ -145,5 +147,5 @@ def update_papers(): pr_info.append({'user': p['head']['user']['login'], 'title': p['title'], 'branch': p['head']['ref'], 'url': p['html_url']}) - with io.open(pr_list_file, 'wb') as f: + with io.open(get_pr_list_file(), 'wb') as f: json.dump(pr_info, codecs.getwriter('utf-8')(f), ensure_ascii=False) From a062a93d7b8d5180666179cddaf90d89297e552a Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 18 Mar 2018 01:37:07 -0700 Subject: [PATCH 06/26] =?UTF-8?q?make=20build=5Fpapers=20work=20for=20one?= =?UTF-8?q?=20paper=E2=80=A6=20even=20if=20it=20doesn't=20work=20for=20man?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build_papers.py | 11 ++++++----- procbuild/test_submit.py | 12 ++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) mode change 100644 => 100755 build_papers.py diff --git a/build_papers.py b/build_papers.py old mode 100644 new mode 100755 index f13cf88..8d14274 --- a/build_papers.py +++ b/build_papers.py @@ -2,15 +2,16 @@ # Schedule some or all papers for build -from procbuild import get_papers, paper_queue +from procbuild.pr_list import get_papers +from procbuild.test_submit import submit_build_request import sys if len(sys.argv) > 1: - to_build = sys.argv[1:] + to_build = argv[1:] + import ipdb; ipdb.set_trace() else: - to_build = [int(nr) for nr, info in get_papers()] + to_build = [nr for nr, info in get_papers()] for p in to_build: print("Placing %s in the build queue." % p) - - ### TODO: submit message to zmq bus + submit_build_request(p) diff --git a/procbuild/test_submit.py b/procbuild/test_submit.py index c8963c5..f1f9275 100644 --- a/procbuild/test_submit.py +++ b/procbuild/test_submit.py @@ -1,17 +1,17 @@ import zmq -import random import json from .message_proxy import IN - ctx = zmq.Context() socket = ctx.socket(zmq.PUSH) socket.connect(IN) - -if __name__ == "__main__": - # for i in range(10): - message = ['build_queue', json.dumps({'build_paper': 0})] +def submit_build_request(nr): + message = ['build_queue', json.dumps({'build_paper': nr})] print('Submitting:', message) socket.send_multipart([m.encode('utf-8') for m in message]) + + +if __name__ == "__main__": + main(0) From 1dcc17d3e7a5baa35d97d5002513512193a6d193 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Mon, 19 Mar 2018 15:32:39 -0700 Subject: [PATCH 07/26] use new BuildManager class instantiation signature --- procbuild/test_listen.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index 105ee7d..a120f28 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -8,17 +8,22 @@ from . import MASTER_BRANCH from .message_proxy import OUT from .utils import file_age, log -from .pr_list import get_pr_info, status_file +from .pr_list import get_pr_info, status_file, cache from .builder import BuildManager + def handle_message(data): print('Message received:', data) -ctx = zmq.Context.instance() -socket = ctx.socket(zmq.SUB) -socket.connect(OUT) -socket.setsockopt(zmq.SUBSCRIBE, 'build_queue'.encode('utf-8')) +def create_listener_socket(): + ctx = zmq.Context.instance() + socket = ctx.socket(zmq.SUB) + socket.connect(OUT) + + socket.setsockopt(zmq.SUBSCRIBE, 'build_queue'.encode('utf-8')) + return socket + def _build_worker(nr): pr_info = get_pr_info() @@ -47,6 +52,7 @@ def build_and_log(*args, **kwargs): p = Process(target=build_and_log, kwargs=dict(user=pr['user'], branch=pr['branch'], + cache=cache(), master_branch=MASTER_BRANCH, target=nr, log=log)) p.start() @@ -68,8 +74,9 @@ def killer(process, timeout): if __name__ == "__main__": print('Listening for incoming messages...') while True: + socket = create_listener_socket() msg = socket.recv_multipart() - target, raw_payload= msg + target, raw_payload = msg payload = json.loads(raw_payload.decode('utf-8')) print('received', payload) paper_to_build = payload.get('build_paper', None) From 2c88eb54ddff082e45aced152b198b2bc64e5029 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Mon, 19 Mar 2018 17:17:51 -0700 Subject: [PATCH 08/26] move request submission machinery into BuildRequestSubmitter class --- build_papers.py | 10 +++++----- procbuild/server.py | 17 +++-------------- procbuild/test_submit.py | 26 +++++++++++++++++--------- 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/build_papers.py b/build_papers.py index 8d14274..bc2ada4 100755 --- a/build_papers.py +++ b/build_papers.py @@ -2,16 +2,16 @@ # Schedule some or all papers for build -from procbuild.pr_list import get_papers -from procbuild.test_submit import submit_build_request +from procbuild.pr_list import get_papers +from procbuild.test_submit import BuildRequestSubmitter import sys if len(sys.argv) > 1: to_build = argv[1:] - import ipdb; ipdb.set_trace() else: to_build = [nr for nr, info in get_papers()] +submitter = BuildRequestSubmitter() for p in to_build: - print("Placing %s in the build queue." % p) - submit_build_request(p) + print(f"Submitting paper {p} to build queue.") + submitter.submit(p) diff --git a/procbuild/server.py b/procbuild/server.py index e8a0cae..163afe5 100644 --- a/procbuild/server.py +++ b/procbuild/server.py @@ -8,19 +8,15 @@ import zmq from . import ALLOW_MANUAL_BUILD_TRIGGER -from .message_proxy import IN +from .test_submit import BuildRequestSubmitter from .pr_list import update_pr_list, get_papers, get_pr_info, status_from_cache from .utils import log -print("Connecting to message bus") -ctx = zmq.Context() -socket = ctx.socket(zmq.PUSH) -socket.connect(IN) - app = Flask(__name__) print("Starting up build queue...") subprocess.Popen(['python', '-m', 'procbuild.message_proxy']) +submitter = BuildRequestSubmitter() @app.route('/') def index(): @@ -56,14 +52,7 @@ def real_build(nr): # if paper_queue[1] >= 50: # return jsonify({'status': 'fail', # 'message': 'Build queue is currently full.'}) - - message = ['build_queue', json.dumps({'build_paper': nr})] - - # TODO: remove after debugging - print('Submitting:', message) - - # TODO: Error checking around this send? - socket.send_multipart([m.encode('utf-8') for m in message]) + submitter.submit(nr) return jsonify({'status': 'success', 'data': {'info': 'Build for paper %s scheduled. Note that ' diff --git a/procbuild/test_submit.py b/procbuild/test_submit.py index f1f9275..b2a8ae8 100644 --- a/procbuild/test_submit.py +++ b/procbuild/test_submit.py @@ -3,15 +3,23 @@ from .message_proxy import IN -ctx = zmq.Context() -socket = ctx.socket(zmq.PUSH) -socket.connect(IN) - -def submit_build_request(nr): - message = ['build_queue', json.dumps({'build_paper': nr})] - print('Submitting:', message) - socket.send_multipart([m.encode('utf-8') for m in message]) +class BuildRequestSubmitter: + + def __init__(self): + ctx = zmq.Context() + self.socket = ctx.socket(zmq.PUSH) + self.socket.connect(IN) + + def construct_message(self, nr): + return ['build_queue', json.dumps({'build_paper': nr})] + + def submit(self, nr): + message = self.construct_message(nr) + # TODO: remove after debugging + print('Submitting:', message) + # TODO: Error checking around this send? + self.socket.send_multipart([m.encode('utf-8') for m in message]) if __name__ == "__main__": - main(0) + BuildRequestSubmitter().submit(0) From 9c9811b748e4416394ee723f71dfd6d0f842692e Mon Sep 17 00:00:00 2001 From: M Pacer Date: Mon, 19 Mar 2018 21:49:31 -0700 Subject: [PATCH 09/26] cleanup listener, remove monitor_queue, directly open listen subprocess --- procbuild/server.py | 6 ++--- procbuild/test_listen.py | 56 +++++++++++++++++++++------------------- runserver.py | 4 +-- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/procbuild/server.py b/procbuild/server.py index 163afe5..677c31a 100644 --- a/procbuild/server.py +++ b/procbuild/server.py @@ -15,7 +15,9 @@ app = Flask(__name__) print("Starting up build queue...") +#TODO add logging to these processes subprocess.Popen(['python', '-m', 'procbuild.message_proxy']) +subprocess.Popen(['python', '-m', 'procbuild.test_listen']) submitter = BuildRequestSubmitter() @app.route('/') @@ -30,10 +32,6 @@ def index(): allow_manual_build_trigger=ALLOW_MANUAL_BUILD_TRIGGER) -def monitor_queue(): - print("Launching queue monitoring...") - ## TODO: Add logging to this subprocess - subprocess.Popen(['python', '-m', 'procbuild.test_listen']) def dummy_build(nr): return jsonify({'status': 'fail', 'message': 'Not authorized'}) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index a120f28..60de74c 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -1,10 +1,11 @@ -import zmq import json import io import codecs import time from multiprocessing import Process +import zmq + from . import MASTER_BRANCH from .message_proxy import OUT from .utils import file_age, log @@ -12,26 +13,30 @@ from .builder import BuildManager -def handle_message(data): - print('Message received:', data) - - -def create_listener_socket(): - ctx = zmq.Context.instance() - socket = ctx.socket(zmq.SUB) - socket.connect(OUT) - - socket.setsockopt(zmq.SUBSCRIBE, 'build_queue'.encode('utf-8')) - return socket - - +class Listener: + def __init__(self, prefix='build_queue'): + self.ctx = zmq.Context() + self.socket = self.ctx.socket(zmq.SUB) + self.socket.connect(OUT) + self.socket.setsockopt(zmq.SUBSCRIBE, prefix.encode('utf-8')) + + def listen(self): + while True: + msg = self.socket.recv_multipart() + target, raw_payload = msg + payload = json.loads(raw_payload.decode('utf-8')) + print('received', payload) + paper_to_build = payload.get('build_paper', None) + _build_worker(paper_to_build) + + def _build_worker(nr): pr_info = get_pr_info() pr = pr_info[int(nr)] age = file_age(status_file(nr)) min_wait = 0.5 - if not (age is None or age > min_wait): - log("Did not build paper %d--recently built." % nr) + if age is None or age <= min_wait: + log(f"Did not build paper {nr}--recently built.") return status_log = status_file(nr) @@ -43,8 +48,13 @@ def _build_worker(nr): json.dump(build_record, codecs.getwriter('utf-8')(f), ensure_ascii=False) - def build_and_log(*args, **kwargs): - build_manager = BuildManager(*args, **kwargs) + def build_and_log(user, branch, cache, master_branch, target, log): + build_manager = BuildManager(user=user, + branch=branch, + cache=cache, + master_branch=master_branch, + target=target, + log=log) status = build_manager.build_paper() with io.open(status_log, 'wb') as f: json.dump(status, codecs.getwriter('utf-8')(f), ensure_ascii=False) @@ -73,11 +83,5 @@ def killer(process, timeout): if __name__ == "__main__": print('Listening for incoming messages...') - while True: - socket = create_listener_socket() - msg = socket.recv_multipart() - target, raw_payload = msg - payload = json.loads(raw_payload.decode('utf-8')) - print('received', payload) - paper_to_build = payload.get('build_paper', None) - _build_worker(paper_to_build) + listener = Listener() + listener.listen() diff --git a/runserver.py b/runserver.py index aaaf2be..1767625 100755 --- a/runserver.py +++ b/runserver.py @@ -2,7 +2,7 @@ # imports import os -from procbuild.server import app, monitor_queue +from procbuild.server import app from waitress import serve # -- SERVER CONFIGURATION -- (can be overridden from shell) @@ -19,9 +19,7 @@ print('Monitoring build queue...') # Iniitalize queue monitor -monitor_queue() serve(app, host='0.0.0.0', port=os.environ['PORT']) # Without waitress, this is the call: -# # app.run(debug=False, host='0.0.0.0', port=7001) From 68105dc1e5a5c5644922d35fec192dc809b344a9 Mon Sep 17 00:00:00 2001 From: Stefan van der Walt Date: Tue, 20 Mar 2018 14:40:05 -0700 Subject: [PATCH 10/26] Add async queue --- procbuild/test_listen.py | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index 60de74c..d170417 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -5,6 +5,8 @@ from multiprocessing import Process import zmq +from zmq.asyncio import Context +import asyncio from . import MASTER_BRANCH from .message_proxy import OUT @@ -15,21 +17,23 @@ class Listener: def __init__(self, prefix='build_queue'): - self.ctx = zmq.Context() + self.ctx = Context.instance() + self.prefix = prefix + self.socket = self.ctx.socket(zmq.SUB) self.socket.connect(OUT) - self.socket.setsockopt(zmq.SUBSCRIBE, prefix.encode('utf-8')) - - def listen(self): + self.socket.setsockopt(zmq.SUBSCRIBE, self.prefix.encode('utf-8')) + + async def listen(self): while True: - msg = self.socket.recv_multipart() + msg = await self.socket.recv_multipart() target, raw_payload = msg payload = json.loads(raw_payload.decode('utf-8')) print('received', payload) paper_to_build = payload.get('build_paper', None) _build_worker(paper_to_build) - + def _build_worker(nr): pr_info = get_pr_info() pr = pr_info[int(nr)] @@ -60,7 +64,7 @@ def build_and_log(user, branch, cache, master_branch, target, log): json.dump(status, codecs.getwriter('utf-8')(f), ensure_ascii=False) p = Process(target=build_and_log, - kwargs=dict(user=pr['user'], + kwargs=dict(user=pr['user'], branch=pr['branch'], cache=cache(), master_branch=MASTER_BRANCH, @@ -81,7 +85,22 @@ def killer(process, timeout): p.join() k.terminate() + +async def queue_builder(): + while True: + # await an item from the queue + pr = await queue.pop() + # launch subprocess to build item + + if __name__ == "__main__": print('Listening for incoming messages...') + listener = Listener() - listener.listen() + + loop = asyncio.get_event_loop() + loop.run_until_complete(asyncio.gather([ + listener.listen(), + queue_builder() + ] + ) From a955b28bbba5f051aa7a7e6786521f69a3ba049e Mon Sep 17 00:00:00 2001 From: M Pacer Date: Fri, 23 Mar 2018 19:00:49 -0700 Subject: [PATCH 11/26] Await run_in_executor! Make self.build_queue; Add loop.close(); Fix age --- procbuild/test_listen.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index d170417..e36d9e0 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -7,6 +7,7 @@ import zmq from zmq.asyncio import Context import asyncio +from concurrent.futures import ThreadPoolExecutor from . import MASTER_BRANCH from .message_proxy import OUT @@ -23,6 +24,8 @@ def __init__(self, prefix='build_queue'): self.socket = self.ctx.socket(zmq.SUB) self.socket.connect(OUT) self.socket.setsockopt(zmq.SUBSCRIBE, self.prefix.encode('utf-8')) + self.queue = asyncio.Queue() + async def listen(self): while True: @@ -31,15 +34,22 @@ async def listen(self): payload = json.loads(raw_payload.decode('utf-8')) print('received', payload) paper_to_build = payload.get('build_paper', None) - _build_worker(paper_to_build) + await self.queue.put(paper_to_build) + async def queue_builder(self, loop=None): + while True: + # await an item from the queue + pr = await self.queue.get() + # launch subprocess to build item + with ThreadPoolExecutor(max_workers=1) as e: + await loop.run_in_executor(e, _build_worker, pr) def _build_worker(nr): pr_info = get_pr_info() pr = pr_info[int(nr)] age = file_age(status_file(nr)) min_wait = 0.5 - if age is None or age <= min_wait: + if age is not None and age <= min_wait: log(f"Did not build paper {nr}--recently built.") return @@ -86,12 +96,6 @@ def killer(process, timeout): k.terminate() -async def queue_builder(): - while True: - # await an item from the queue - pr = await queue.pop() - # launch subprocess to build item - if __name__ == "__main__": print('Listening for incoming messages...') @@ -99,8 +103,10 @@ async def queue_builder(): listener = Listener() loop = asyncio.get_event_loop() - loop.run_until_complete(asyncio.gather([ - listener.listen(), - queue_builder() - ] - ) + tasks = asyncio.gather(listener.listen(), + listener.queue_builder(loop)) + try: + loop.run_until_complete(tasks) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() From 01f5da1f0cff02563ff1abe808d67bdf64e3bf86 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Fri, 23 Mar 2018 20:01:50 -0700 Subject: [PATCH 12/26] move "too soon" catch into the listener --- procbuild/test_listen.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index e36d9e0..ff65a66 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -34,6 +34,12 @@ async def listen(self): payload = json.loads(raw_payload.decode('utf-8')) print('received', payload) paper_to_build = payload.get('build_paper', None) + + age = file_age(status_file(paper_to_build)) + min_wait = 0.5 + if age is not None and age <= min_wait: + log(f"Did not build paper {paper_to_build}--recently built.") + continue await self.queue.put(paper_to_build) async def queue_builder(self, loop=None): @@ -47,11 +53,6 @@ async def queue_builder(self, loop=None): def _build_worker(nr): pr_info = get_pr_info() pr = pr_info[int(nr)] - age = file_age(status_file(nr)) - min_wait = 0.5 - if age is not None and age <= min_wait: - log(f"Did not build paper {nr}--recently built.") - return status_log = status_file(nr) with io.open(status_log, 'wb') as f: From f05fded7cbbec183faf4b3e5363af1b91ab9b478 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Fri, 23 Mar 2018 20:07:12 -0700 Subject: [PATCH 13/26] Avoid queueing multiples with dont_build set between awaits Use nr not pr to refer to numbers --- procbuild/test_listen.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index ff65a66..a26e14f 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -24,8 +24,9 @@ def __init__(self, prefix='build_queue'): self.socket = self.ctx.socket(zmq.SUB) self.socket.connect(OUT) self.socket.setsockopt(zmq.SUBSCRIBE, self.prefix.encode('utf-8')) + self.queue = asyncio.Queue() - + self.dont_build = set() async def listen(self): while True: @@ -40,15 +41,20 @@ async def listen(self): if age is not None and age <= min_wait: log(f"Did not build paper {paper_to_build}--recently built.") continue + elif paper_to_build in self.dont_build: + log(f"Did not queue paper {paper_to_build}--already in queue.") + continue + self.dont_build.add(paper_to_build) await self.queue.put(paper_to_build) async def queue_builder(self, loop=None): while True: # await an item from the queue - pr = await self.queue.get() + nr = await self.queue.get() + self.dont_build.remove(nr) # launch subprocess to build item with ThreadPoolExecutor(max_workers=1) as e: - await loop.run_in_executor(e, _build_worker, pr) + await loop.run_in_executor(e, _build_worker, nr) def _build_worker(nr): pr_info = get_pr_info() From c928a96d0ab6502dadd320f09929bb7370a47923 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 25 Mar 2018 13:19:14 -0700 Subject: [PATCH 14/26] update requirements.txt to include pyzmq and explicitly list version # --- requirements.txt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9e3e6d7..a439903 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ -Flask -waitress -urllib3 +Flask==0.12.2 +waitress==1.0.2 +urllib3==1.22 +pyzmq==17.0.0 From ae086d02fab13b92d0adebac0a053e126a46cb87 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 25 Mar 2018 14:18:05 -0700 Subject: [PATCH 15/26] isolate age and queue checking and logging in self-contained methods --- procbuild/test_listen.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index a26e14f..511fba3 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -36,16 +36,29 @@ async def listen(self): print('received', payload) paper_to_build = payload.get('build_paper', None) - age = file_age(status_file(paper_to_build)) - min_wait = 0.5 - if age is not None and age <= min_wait: - log(f"Did not build paper {paper_to_build}--recently built.") - continue - elif paper_to_build in self.dont_build: - log(f"Did not queue paper {paper_to_build}--already in queue.") + if self.check_age_and_queue(nr): continue self.dont_build.add(paper_to_build) await self.queue.put(paper_to_build) + + def check_age(self, nr): + age = file_age(status_file(paper_to_build)) + min_wait = 0.5 + too_young = False + if age is not None and age <= min_wait: + log(f"Did not build paper {paper_to_build}--recently built.") + too_young = True + return too_young + + def check_queue(self, nr): + in_queue = False + if paper_to_build in self.dont_build: + log(f"Did not queue paper {paper_to_build}--already in queue.") + in_queue = True + return in_queue + + def check_age_and_queue(self, nr): + return check_age(nr) or check_queue(nr) async def queue_builder(self, loop=None): while True: From ee52c6dd108db366f33b92e14e9abc71c9ad85d0 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 25 Mar 2018 14:19:46 -0700 Subject: [PATCH 16/26] Create report_status method; after build print paper status to stdout --- procbuild/test_listen.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index 511fba3..bbcc10a 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -59,6 +59,19 @@ def check_queue(self, nr): def check_age_and_queue(self, nr): return check_age(nr) or check_queue(nr) + + def report_status(self, nr): + """prints status notification from status_file for paper `nr` + + """ + with io.open(status_file(nr), 'r') as f: + status = json.load(f)['status'] + + if status['status'] == 'success': + print(f"Completed build for paper {nr}.") + else: + print(f"Paper for {nr} did not build successfully.") + async def queue_builder(self, loop=None): while True: @@ -68,6 +81,8 @@ async def queue_builder(self, loop=None): # launch subprocess to build item with ThreadPoolExecutor(max_workers=1) as e: await loop.run_in_executor(e, _build_worker, nr) + self.report_status(nr) + def _build_worker(nr): pr_info = get_pr_info() @@ -116,7 +131,6 @@ def killer(process, timeout): k.terminate() - if __name__ == "__main__": print('Listening for incoming messages...') From 259d105352504990b713b65e32db664409b42541 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 25 Mar 2018 14:32:14 -0700 Subject: [PATCH 17/26] Improve docs for setting up the environment - describe how to get the latest version of the scipy_proc - describe where to go to install LaTeX - remove sudo from apt-get - explicitly point out that that will only work if you have access to `apt-get` --- README.md | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 0d61d3c..23b0966 100644 --- a/README.md +++ b/README.md @@ -32,29 +32,40 @@ docker run -it -p 7001:7001 yourname/procbuild ## General notes - Customize `runserver.py` to update this year's branch. - For debugging, enable ``ALLOW_MANUAL_BUILD_TRIGGER``. + For debugging, enable `ALLOW_MANUAL_BUILD_TRIGGER`. - In the `scipy-conference/scipy_proceedings` repo: in the webhooks add a payload URL pointing to the webapp (such as `http://server.com:5000/webhook`). You must select only Pull Requests in the checkbox menu. -- Install dependencies: ``pip install -r requirements.txt`` +- Install dependencies: `pip install -r requirements.txt` - Fetch PRs by running `./update_prs` - Launch by running `runserver.py` Note: the server will run only on 3.6+ -You need all the same dependencies as for building the proceedings as well: +You need all the same dependencies as for building the proceedings as well. - - IEEETran (often packaged as ``texlive-publishers``, or download from - [CTAN](http://www.ctan.org/tex-archive/macros/latex/contrib/IEEEtran/) - LaTeX class +This includes some packages on pypi which we'll install with `pip`. The easiest +way to do this is by pulling down the latest version of the file with `curl`: + +``` +pip install -r <(curl https://raw.githubusercontent.com/scipy-conference/scipy_proceedings/2017/requirements.txt) +``` + +Additionally, you will need to install a version of LaTeX and some external +packages. We encourage you to visit https://www.tug.org/texlive/ to see how to +best install LaTeX for your system. + + - IEEETran LaTeX class + - (often packaged as `texlive-publishers`, or download from + [CTAN](http://www.ctan.org/tex-archive/macros/latex/contrib/IEEEtran/)) - AMSmath LaTeX classes (included in most LaTeX distributions) - - `docutils` 0.11 or later (``easy_install docutils``) - - `pygments` for code highlighting (``easy_install pygments``) + +If you can use `apt-get`, you are likely to install everything with: ``` -sudo apt-get install python-docutils texlive-latex-base texlive-publishers \ - texlive-latex-extra texlive-fonts-recommended \ - texlive-bibtex-extra +apt-get install python-docutils texlive-latex-base texlive-publishers \ + texlive-latex-extra texlive-fonts-recommended \ + texlive-bibtex-extra ``` From d2d803e4c7309c65071d1c3167e0c99307c37115 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Tue, 27 Mar 2018 09:12:54 -0700 Subject: [PATCH 18/26] cleanup imports and variable names --- procbuild/test_listen.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index bbcc10a..c81e184 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -2,12 +2,13 @@ import io import codecs import time +import asyncio + from multiprocessing import Process +from concurrent.futures import ThreadPoolExecutor import zmq from zmq.asyncio import Context -import asyncio -from concurrent.futures import ThreadPoolExecutor from . import MASTER_BRANCH from .message_proxy import OUT @@ -36,29 +37,29 @@ async def listen(self): print('received', payload) paper_to_build = payload.get('build_paper', None) - if self.check_age_and_queue(nr): + if self.check_age_and_queue(paper_to_build): continue self.dont_build.add(paper_to_build) await self.queue.put(paper_to_build) def check_age(self, nr): - age = file_age(status_file(paper_to_build)) + age = file_age(status_file(nr)) min_wait = 0.5 too_young = False if age is not None and age <= min_wait: - log(f"Did not build paper {paper_to_build}--recently built.") + log(f"Did not build paper {nr}--recently built.") too_young = True return too_young def check_queue(self, nr): in_queue = False - if paper_to_build in self.dont_build: - log(f"Did not queue paper {paper_to_build}--already in queue.") + if nr in self.dont_build: + log(f"Did not queue paper {nr}--already in queue.") in_queue = True return in_queue def check_age_and_queue(self, nr): - return check_age(nr) or check_queue(nr) + return self.check_age(nr) or self.check_queue(nr) def report_status(self, nr): """prints status notification from status_file for paper `nr` @@ -67,7 +68,7 @@ def report_status(self, nr): with io.open(status_file(nr), 'r') as f: status = json.load(f)['status'] - if status['status'] == 'success': + if status == 'success': print(f"Completed build for paper {nr}.") else: print(f"Paper for {nr} did not build successfully.") @@ -77,10 +78,10 @@ async def queue_builder(self, loop=None): while True: # await an item from the queue nr = await self.queue.get() - self.dont_build.remove(nr) # launch subprocess to build item with ThreadPoolExecutor(max_workers=1) as e: await loop.run_in_executor(e, _build_worker, nr) + self.dont_build.remove(nr) self.report_status(nr) From 962f796092acd4e2acef81e77b154e207c113431 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Tue, 27 Mar 2018 09:14:49 -0700 Subject: [PATCH 19/26] Move build command inside class, Refactor paper logging inside class --- procbuild/test_listen.py | 59 ++++++++++++---------------------------- 1 file changed, 18 insertions(+), 41 deletions(-) diff --git a/procbuild/test_listen.py b/procbuild/test_listen.py index c81e184..abd6464 100644 --- a/procbuild/test_listen.py +++ b/procbuild/test_listen.py @@ -80,57 +80,34 @@ async def queue_builder(self, loop=None): nr = await self.queue.get() # launch subprocess to build item with ThreadPoolExecutor(max_workers=1) as e: - await loop.run_in_executor(e, _build_worker, nr) + await loop.run_in_executor(e, self.build_and_log, nr) self.dont_build.remove(nr) self.report_status(nr) + + def paper_log(self, nr, record): + status_log = status_file(nr) + with io.open(status_log, 'wb') as f: + json.dump(record, codecs.getwriter('utf-8')(f), ensure_ascii=False) + + def build_and_log(self, nr): + pr_info = get_pr_info() + pr = pr_info[int(nr)] - -def _build_worker(nr): - pr_info = get_pr_info() - pr = pr_info[int(nr)] - - status_log = status_file(nr) - with io.open(status_log, 'wb') as f: build_record = {'status': 'fail', 'data': {'build_status': 'Building...', 'build_output': 'Initializing build...', 'build_timestamp': ''}} - json.dump(build_record, codecs.getwriter('utf-8')(f), ensure_ascii=False) + self.paper_log(nr, build_record) - - def build_and_log(user, branch, cache, master_branch, target, log): - build_manager = BuildManager(user=user, - branch=branch, - cache=cache, - master_branch=master_branch, - target=target, + build_manager = BuildManager(user=pr['user'], + branch=pr['branch'], + cache=cache(), + master_branch=MASTER_BRANCH, + target=nr, log=log) - status = build_manager.build_paper() - with io.open(status_log, 'wb') as f: - json.dump(status, codecs.getwriter('utf-8')(f), ensure_ascii=False) - - p = Process(target=build_and_log, - kwargs=dict(user=pr['user'], - branch=pr['branch'], - cache=cache(), - master_branch=MASTER_BRANCH, - target=nr, log=log)) - p.start() - - def killer(process, timeout): - time.sleep(timeout) - try: - process.terminate() - except OSError: - pass - - k = Process(target=killer, args=(p, 180)) - k.start() - - # Wait for process to complete or to be killed - p.join() - k.terminate() + status = build_manager.build_paper() + self.paper_log(nr, status) if __name__ == "__main__": print('Listening for incoming messages...') From 99bc49b780dc852038d80a4fe73694dabe158761 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 8 Apr 2018 01:00:37 -0700 Subject: [PATCH 20/26] update module/file names and imports to no longer reflect testing status --- build_papers.py | 2 +- procbuild/{test_listen.py => listener.py} | 0 procbuild/server.py | 9 ++++----- procbuild/{test_submit.py => submitter.py} | 0 4 files changed, 5 insertions(+), 6 deletions(-) rename procbuild/{test_listen.py => listener.py} (100%) rename procbuild/{test_submit.py => submitter.py} (100%) diff --git a/build_papers.py b/build_papers.py index bc2ada4..e9b9741 100755 --- a/build_papers.py +++ b/build_papers.py @@ -3,7 +3,7 @@ # Schedule some or all papers for build from procbuild.pr_list import get_papers -from procbuild.test_submit import BuildRequestSubmitter +from procbuild.submitter import BuildRequestSubmitter import sys if len(sys.argv) > 1: diff --git a/procbuild/test_listen.py b/procbuild/listener.py similarity index 100% rename from procbuild/test_listen.py rename to procbuild/listener.py diff --git a/procbuild/server.py b/procbuild/server.py index 677c31a..2b80801 100644 --- a/procbuild/server.py +++ b/procbuild/server.py @@ -5,21 +5,21 @@ import json import subprocess -import zmq from . import ALLOW_MANUAL_BUILD_TRIGGER -from .test_submit import BuildRequestSubmitter +from .submitter import BuildRequestSubmitter from .pr_list import update_pr_list, get_papers, get_pr_info, status_from_cache from .utils import log app = Flask(__name__) print("Starting up build queue...") -#TODO add logging to these processes +# TODO add logging to these processes subprocess.Popen(['python', '-m', 'procbuild.message_proxy']) -subprocess.Popen(['python', '-m', 'procbuild.test_listen']) +subprocess.Popen(['python', '-m', 'procbuild.listener']) submitter = BuildRequestSubmitter() + @app.route('/') def index(): # if it's never been built or is over 1 minute old, update_papers @@ -122,4 +122,3 @@ def webhook(): return jsonify({'status': 'fail', 'message': 'Hook called for building ' 'non-existing paper (%s)' % pr_url}) - diff --git a/procbuild/test_submit.py b/procbuild/submitter.py similarity index 100% rename from procbuild/test_submit.py rename to procbuild/submitter.py From 68f46ec1f442db510b6324e9caf9a379ead6392c Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 8 Apr 2018 01:24:42 -0700 Subject: [PATCH 21/26] add docstrings for Listener class and methods --- procbuild/listener.py | 62 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/procbuild/listener.py b/procbuild/listener.py index abd6464..8389a32 100644 --- a/procbuild/listener.py +++ b/procbuild/listener.py @@ -18,7 +18,29 @@ class Listener: + """ Listener class for defining zmq sockets and maintaining a build queue. + + Attributes: + ------------ + ctx: zmq.asyncio.Context, the main context for the listener class + + prefix: str, the prefix listened for by zmq sockets + + socket: zmq.socket, the socket for listening to + + queue: asyncio.Queue, the queue for holding the builds + + dont_build: set, unique collection of PRs currently in self.queue + Note: Only modify self.dont_build within synchronous blocks. + + """ + def __init__(self, prefix='build_queue'): + """ + Parameters: + ------------ + build_queue: str, the prefix that will be checked for by the zmq socket + """ self.ctx = Context.instance() self.prefix = prefix @@ -30,6 +52,8 @@ def __init__(self, prefix='build_queue'): self.dont_build = set() async def listen(self): + """Listener method, containing while loop for checking socket + """ while True: msg = await self.socket.recv_multipart() target, raw_payload = msg @@ -43,6 +67,12 @@ async def listen(self): await self.queue.put(paper_to_build) def check_age(self, nr): + """Check the age of a PR's status_file based on its number. + + Parameters: + ------------ + nr: int, the number of the PR in order of receipt + """ age = file_age(status_file(nr)) min_wait = 0.5 too_young = False @@ -52,6 +82,12 @@ def check_age(self, nr): return too_young def check_queue(self, nr): + """Check whether the queue currently contains a build request for a PR. + + Parameters: + ------------ + nr: int, the number of the PR to check + """ in_queue = False if nr in self.dont_build: log(f"Did not queue paper {nr}--already in queue.") @@ -59,11 +95,20 @@ def check_queue(self, nr): return in_queue def check_age_and_queue(self, nr): + """Check whether the PR is old enough or whether it is already in queue. + + Parameters: + ------------ + nr: int, the number of the PR to check + """ return self.check_age(nr) or self.check_queue(nr) def report_status(self, nr): """prints status notification from status_file for paper `nr` + Parameters: + ------------ + nr: int, the number of the PR to check """ with io.open(status_file(nr), 'r') as f: status = json.load(f)['status'] @@ -75,6 +120,10 @@ def report_status(self, nr): async def queue_builder(self, loop=None): + """Manage queue and trigger builds, report results. + + loop: asyncio.loop, the loop on which to be running these tasks + """ while True: # await an item from the queue nr = await self.queue.get() @@ -85,11 +134,24 @@ async def queue_builder(self, loop=None): self.report_status(nr) def paper_log(self, nr, record): + """Writes status to PR's log file + + Parameters: + ------------ + nr: int, the number of the PR to check + record: dict, the dictionary content to be written to the log + """ status_log = status_file(nr) with io.open(status_log, 'wb') as f: json.dump(record, codecs.getwriter('utf-8')(f), ensure_ascii=False) def build_and_log(self, nr): + """Builds paper for PR number and logs the resulting status + + Parameters: + ------------ + nr: int, the number of the PR to check + """ pr_info = get_pr_info() pr = pr_info[int(nr)] From fb10fab08cde4a2783750d0d5660c9caf3914608 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 8 Apr 2018 01:43:34 -0700 Subject: [PATCH 22/26] add docstrings to submitter, remove or revise out-of-date TODOs It is nice to keep the message printing around for debugging when more than one kind of message can be sent through. --- build_papers.py | 2 +- procbuild/server.py | 7 +------ procbuild/submitter.py | 37 ++++++++++++++++++++++++++++++++++--- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/build_papers.py b/build_papers.py index e9b9741..eca5594 100755 --- a/build_papers.py +++ b/build_papers.py @@ -11,7 +11,7 @@ else: to_build = [nr for nr, info in get_papers()] -submitter = BuildRequestSubmitter() +submitter = BuildRequestSubmitter(verbose=True) for p in to_build: print(f"Submitting paper {p} to build queue.") submitter.submit(p) diff --git a/procbuild/server.py b/procbuild/server.py index 2b80801..ac803eb 100644 --- a/procbuild/server.py +++ b/procbuild/server.py @@ -22,7 +22,7 @@ @app.route('/') def index(): - # if it's never been built or is over 1 minute old, update_papers + # if it's never been built or is over 5 minutes old, update_papers update_pr_list(expiry=5) papers = get_papers() @@ -45,11 +45,6 @@ def real_build(nr): return jsonify({'status': 'fail', 'message': 'Invalid paper specified'}) -## TODO: Move check to the listener -# -# if paper_queue[1] >= 50: -# return jsonify({'status': 'fail', -# 'message': 'Build queue is currently full.'}) submitter.submit(nr) return jsonify({'status': 'success', diff --git a/procbuild/submitter.py b/procbuild/submitter.py index b2a8ae8..dc64ebb 100644 --- a/procbuild/submitter.py +++ b/procbuild/submitter.py @@ -5,21 +5,52 @@ class BuildRequestSubmitter: + """Class for submitting build requests to zmq socket. - def __init__(self): + Attributes: + ------------ + + socket: zmq.socket, socket for pushing messages out + + verbose: bool, whether to print a message when submitting with builder + """ + + def __init__(self, verbose=False): + """ + + Parameters: + ------------ + verbose: bool, whether to print a message when submitting with builder + defaults to False + """ ctx = zmq.Context() self.socket = ctx.socket(zmq.PUSH) self.socket.connect(IN) + self.verbose = verbose def construct_message(self, nr): + """Creates message to be sent on zmq socket. + + Parameters: + ------------ + nr: int, the number of the PR in order of receipt + """ return ['build_queue', json.dumps({'build_paper': nr})] def submit(self, nr): + """Submits message to zmq socket + + Parameters: + ------------ + nr: int, the number of the PR in order of receipt + """ message = self.construct_message(nr) - # TODO: remove after debugging - print('Submitting:', message) + if self.verbose: + print('Submitting:', message) + # TODO: Error checking around this send? self.socket.send_multipart([m.encode('utf-8') for m in message]) + if __name__ == "__main__": BuildRequestSubmitter().submit(0) From 90c3960c6b9c29fdb1d4aaa8db7e0f0ddd7652dd Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 8 Apr 2018 01:52:55 -0700 Subject: [PATCH 23/26] allow building enumerated papers from build_papers.py --- build_papers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build_papers.py b/build_papers.py index eca5594..186752f 100755 --- a/build_papers.py +++ b/build_papers.py @@ -7,7 +7,7 @@ import sys if len(sys.argv) > 1: - to_build = argv[1:] + to_build = sys.argv[1:] else: to_build = [nr for nr, info in get_papers()] From c38889d38ecf52df0ef956fd7df98187f7c99ec5 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 8 Apr 2018 14:18:24 -0700 Subject: [PATCH 24/26] remove from __future__ imports since we are requiring python 3.6 anyway --- procbuild/__init__.py | 2 -- procbuild/builder.py | 2 -- procbuild/pr_list/__init__.py | 7 +++---- procbuild/server.py | 2 -- 4 files changed, 3 insertions(+), 10 deletions(-) diff --git a/procbuild/__init__.py b/procbuild/__init__.py index 228f6d0..ab30b7a 100644 --- a/procbuild/__init__.py +++ b/procbuild/__init__.py @@ -1,5 +1,3 @@ -from __future__ import print_function, absolute_import - import os package_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/procbuild/builder.py b/procbuild/builder.py index c210ca9..3fdaf44 100644 --- a/procbuild/builder.py +++ b/procbuild/builder.py @@ -1,5 +1,3 @@ -from __future__ import print_function, absolute_import, division - import tempfile import subprocess import shlex diff --git a/procbuild/pr_list/__init__.py b/procbuild/pr_list/__init__.py index c3d2a92..26bd0ac 100644 --- a/procbuild/pr_list/__init__.py +++ b/procbuild/pr_list/__init__.py @@ -1,5 +1,3 @@ -from __future__ import print_function, absolute_import - import urllib3 import json import os @@ -14,15 +12,16 @@ __all__ = ['fetch_PRs', 'update_papers'] - def cache(path='../cache'): cache_path = joinp(package_path, path) os.makedirs(cache_path, exist_ok=True) return cache_path - + + def get_pr_list_file(): return joinp(cache(), 'pr_info.json') + def status_file(nr): return joinp(cache(), str(nr) + '.status') diff --git a/procbuild/server.py b/procbuild/server.py index ac803eb..98a50c3 100644 --- a/procbuild/server.py +++ b/procbuild/server.py @@ -1,5 +1,3 @@ -from __future__ import print_function, absolute_import, unicode_literals - from flask import (render_template, url_for, send_file, jsonify, request, Flask) import json From 932dd1e71be20f3d20d0be7e14354b60ef9e0da3 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 8 Apr 2018 15:56:48 -0700 Subject: [PATCH 25/26] format docstrings, remove prefix, create target_set, remove print remove unused imports --- procbuild/builder.py | 18 ++++++--- procbuild/listener.py | 91 +++++++++++++++++++----------------------- procbuild/submitter.py | 36 ++++++++--------- 3 files changed, 70 insertions(+), 75 deletions(-) diff --git a/procbuild/builder.py b/procbuild/builder.py index 3fdaf44..dbdef61 100644 --- a/procbuild/builder.py +++ b/procbuild/builder.py @@ -78,12 +78,18 @@ class BuildManager: Parameters ---------- - user: str; name of GitHub user - branch: str; name of git branch on user's PR - target: str; string representation of integer for which paper to build - cache: str; cache directory in which the build tools and final paper live - master_branch: str; git branch for build tools - log: function; logging function + user : str + name of GitHub user + branch : str + name of git branch on user's PR + target : str + string representation of integer for which paper to build + cache : str + cache directory in which the build tools and final paper live + master_branch : str, optional + git branch for build tools, defaults to "master" + log : function + logging function """ def __init__(self, diff --git a/procbuild/listener.py b/procbuild/listener.py index 8389a32..db66582 100644 --- a/procbuild/listener.py +++ b/procbuild/listener.py @@ -1,10 +1,8 @@ import json import io import codecs -import time import asyncio -from multiprocessing import Process from concurrent.futures import ThreadPoolExecutor import zmq @@ -20,48 +18,42 @@ class Listener: """ Listener class for defining zmq sockets and maintaining a build queue. - Attributes: - ------------ - ctx: zmq.asyncio.Context, the main context for the listener class - - prefix: str, the prefix listened for by zmq sockets - - socket: zmq.socket, the socket for listening to - - queue: asyncio.Queue, the queue for holding the builds - - dont_build: set, unique collection of PRs currently in self.queue + Attributes + ---------- + ctx : zmq.asyncio.Context + main context for the listener class + socket : zmq.socket + the socket for listening to + queue : asyncio.Queue + the queue for holding the builds + dont_build : set + unique collection of PRs currently in self.queue Note: Only modify self.dont_build within synchronous blocks. - """ - def __init__(self, prefix='build_queue'): - """ - Parameters: - ------------ - build_queue: str, the prefix that will be checked for by the zmq socket - """ + def __init__(self): self.ctx = Context.instance() - self.prefix = prefix + target_set = {'build_queue'} self.socket = self.ctx.socket(zmq.SUB) self.socket.connect(OUT) - self.socket.setsockopt(zmq.SUBSCRIBE, self.prefix.encode('utf-8')) + for target in target_set: + self.socket.setsockopt(zmq.SUBSCRIBE, target.encode('utf-8')) self.queue = asyncio.Queue() self.dont_build = set() async def listen(self): """Listener method, containing while loop for checking socket + """ while True: msg = await self.socket.recv_multipart() target, raw_payload = msg payload = json.loads(raw_payload.decode('utf-8')) - print('received', payload) paper_to_build = payload.get('build_paper', None) - if self.check_age_and_queue(paper_to_build): + if self.check_age(paper_to_build) or self.check_queue(paper_to_build): continue self.dont_build.add(paper_to_build) await self.queue.put(paper_to_build) @@ -69,9 +61,10 @@ async def listen(self): def check_age(self, nr): """Check the age of a PR's status_file based on its number. - Parameters: - ------------ - nr: int, the number of the PR in order of receipt + Parameters + ---------- + nr : int + the number of the PR in order of receipt """ age = file_age(status_file(nr)) min_wait = 0.5 @@ -84,9 +77,10 @@ def check_age(self, nr): def check_queue(self, nr): """Check whether the queue currently contains a build request for a PR. - Parameters: - ------------ - nr: int, the number of the PR to check + Parameters + ---------- + nr : int + the number of the PR to check """ in_queue = False if nr in self.dont_build: @@ -94,21 +88,13 @@ def check_queue(self, nr): in_queue = True return in_queue - def check_age_and_queue(self, nr): - """Check whether the PR is old enough or whether it is already in queue. - - Parameters: - ------------ - nr: int, the number of the PR to check - """ - return self.check_age(nr) or self.check_queue(nr) - def report_status(self, nr): """prints status notification from status_file for paper `nr` - Parameters: - ------------ - nr: int, the number of the PR to check + Parameters + ---------- + nr : int + the number of the PR to check """ with io.open(status_file(nr), 'r') as f: status = json.load(f)['status'] @@ -122,7 +108,8 @@ def report_status(self, nr): async def queue_builder(self, loop=None): """Manage queue and trigger builds, report results. - loop: asyncio.loop, the loop on which to be running these tasks + loop : asyncio.loop + the loop on which to be running these tasks """ while True: # await an item from the queue @@ -136,10 +123,12 @@ async def queue_builder(self, loop=None): def paper_log(self, nr, record): """Writes status to PR's log file - Parameters: - ------------ - nr: int, the number of the PR to check - record: dict, the dictionary content to be written to the log + Parameters + ---------- + nr : int + the number of the PR to check + record : dict + the dictionary content to be written to the log """ status_log = status_file(nr) with io.open(status_log, 'wb') as f: @@ -148,9 +137,10 @@ def paper_log(self, nr, record): def build_and_log(self, nr): """Builds paper for PR number and logs the resulting status - Parameters: - ------------ - nr: int, the number of the PR to check + Parameters + ---------- + nr : int + the number of the PR to check """ pr_info = get_pr_info() pr = pr_info[int(nr)] @@ -171,6 +161,7 @@ def build_and_log(self, nr): status = build_manager.build_paper() self.paper_log(nr, status) + if __name__ == "__main__": print('Listening for incoming messages...') diff --git a/procbuild/submitter.py b/procbuild/submitter.py index dc64ebb..72a4615 100644 --- a/procbuild/submitter.py +++ b/procbuild/submitter.py @@ -7,22 +7,18 @@ class BuildRequestSubmitter: """Class for submitting build requests to zmq socket. - Attributes: - ------------ - - socket: zmq.socket, socket for pushing messages out - - verbose: bool, whether to print a message when submitting with builder + Parameters + ---------- + verbose : bool, optional + whether to print a message when submitting with builder (default is False) + + Attributes + ---------- + verbose + socket : zmq.socket, socket for pushing messages out """ def __init__(self, verbose=False): - """ - - Parameters: - ------------ - verbose: bool, whether to print a message when submitting with builder - defaults to False - """ ctx = zmq.Context() self.socket = ctx.socket(zmq.PUSH) self.socket.connect(IN) @@ -31,18 +27,20 @@ def __init__(self, verbose=False): def construct_message(self, nr): """Creates message to be sent on zmq socket. - Parameters: - ------------ - nr: int, the number of the PR in order of receipt + Parameters + ---------- + nr : int + the number of the PR in order of receipt """ return ['build_queue', json.dumps({'build_paper': nr})] def submit(self, nr): """Submits message to zmq socket - Parameters: - ------------ - nr: int, the number of the PR in order of receipt + Parameters + ---------- + nr : int + the number of the PR in order of receipt """ message = self.construct_message(nr) if self.verbose: From c08b4d00b566fc82b275e96bdfc2f201aeee8484 Mon Sep 17 00:00:00 2001 From: M Pacer Date: Sun, 8 Apr 2018 16:38:38 -0700 Subject: [PATCH 26/26] change check_age to paper_too_young and check_queue to paper_in_queue also make the code pattern nicer --- procbuild/listener.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/procbuild/listener.py b/procbuild/listener.py index db66582..97460ef 100644 --- a/procbuild/listener.py +++ b/procbuild/listener.py @@ -51,14 +51,14 @@ async def listen(self): msg = await self.socket.recv_multipart() target, raw_payload = msg payload = json.loads(raw_payload.decode('utf-8')) - paper_to_build = payload.get('build_paper', None) + paper_nr = payload.get('build_paper', None) - if self.check_age(paper_to_build) or self.check_queue(paper_to_build): + if self.paper_too_young(paper_nr) or self.paper_in_queue(paper_nr): continue - self.dont_build.add(paper_to_build) - await self.queue.put(paper_to_build) + self.dont_build.add(paper_nr) + await self.queue.put(paper_nr) - def check_age(self, nr): + def paper_too_young(self, nr): """Check the age of a PR's status_file based on its number. Parameters @@ -68,13 +68,12 @@ def check_age(self, nr): """ age = file_age(status_file(nr)) min_wait = 0.5 - too_young = False - if age is not None and age <= min_wait: + too_young = (age is not None) and (age <= min_wait) + if too_young: log(f"Did not build paper {nr}--recently built.") - too_young = True return too_young - def check_queue(self, nr): + def paper_in_queue(self, nr): """Check whether the queue currently contains a build request for a PR. Parameters @@ -82,10 +81,9 @@ def check_queue(self, nr): nr : int the number of the PR to check """ - in_queue = False - if nr in self.dont_build: + in_queue = nr in self.dont_build + if in_queue: log(f"Did not queue paper {nr}--already in queue.") - in_queue = True return in_queue def report_status(self, nr):