Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ZeroMQ message bus in lieu of build queue using new BuildManager #29

Merged
merged 26 commits into from
Apr 8, 2018
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
79c217e
WIP: replace build queue with ZeroMQ message bus
stefanv Mar 14, 2018
e122da8
move _build_worker out of the server module to the test_listen module
mpacer Mar 15, 2018
f406d33
reduce the number of times that test_submit submits a message
mpacer Mar 15, 2018
9352907
Refactor, rename and generally clean-up codebase
mpacer Mar 18, 2018
4002ba1
change pr_list to be more resilient to removing the cache
mpacer Mar 18, 2018
a062a93
make build_papers work for one paper… even if it doesn't work for many
mpacer Mar 18, 2018
1dcc17d
use new BuildManager class instantiation signature
mpacer Mar 19, 2018
2c88eb5
move request submission machinery into BuildRequestSubmitter class
mpacer Mar 20, 2018
9c9811b
cleanup listener, remove monitor_queue, directly open listen subprocess
mpacer Mar 20, 2018
68105dc
Add async queue
stefanv Mar 20, 2018
a955b28
Await run_in_executor! Make self.build_queue; Add loop.close(); Fix age
mpacer Mar 24, 2018
01f5da1
move "too soon" catch into the listener
mpacer Mar 24, 2018
f05fded
Avoid queueing multiples with dont_build set between awaits
mpacer Mar 24, 2018
c928a96
update requirements.txt to include pyzmq and explicitly list version #
mpacer Mar 25, 2018
ae086d0
isolate age and queue checking and logging in self-contained methods
mpacer Mar 25, 2018
ee52c6d
Create report_status method; after build print paper status to stdout
mpacer Mar 25, 2018
259d105
Improve docs for setting up the environment
mpacer Mar 25, 2018
d2d803e
cleanup imports and variable names
mpacer Mar 27, 2018
962f796
Move build command inside class, Refactor paper logging inside class
mpacer Mar 27, 2018
99bc49b
update module/file names and imports to no longer reflect testing status
mpacer Apr 8, 2018
68f46ec
add docstrings for Listener class and methods
mpacer Apr 8, 2018
fb10fab
add docstrings to submitter, remove or revise out-of-date TODOs
mpacer Apr 8, 2018
90c3960
allow building enumerated papers from build_papers.py
mpacer Apr 8, 2018
c38889d
remove from __future__ imports since we are requiring python 3.6 anyway
mpacer Apr 8, 2018
932dd1e
format docstrings, remove prefix, create target_set, remove print
mpacer Apr 8, 2018
c08b4d0
change check_age to paper_too_young and check_queue to paper_in_queue
mpacer Apr 8, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,40 @@ docker run -it -p 7001:7001 yourname/procbuild
## General notes

- Customize `runserver.py` to update this year's branch.
For debugging, enable ``ALLOW_MANUAL_BUILD_TRIGGER``.
For debugging, enable `ALLOW_MANUAL_BUILD_TRIGGER`.
- In the `scipy-conference/scipy_proceedings` repo: in the webhooks add a payload
URL pointing to the webapp (such as `http://server.com:5000/webhook`). You must
select only Pull Requests in the checkbox menu.
- Install dependencies: ``pip install -r requirements.txt``
- Install dependencies: `pip install -r requirements.txt`
- Fetch PRs by running `./update_prs`
- Launch by running `runserver.py`

Note: the server will run only on 3.6+

You need all the same dependencies as for building the proceedings as well:
You need all the same dependencies as for building the proceedings as well.

- IEEETran (often packaged as ``texlive-publishers``, or download from
[CTAN](http://www.ctan.org/tex-archive/macros/latex/contrib/IEEEtran/)
LaTeX class
This includes some packages on pypi which we'll install with `pip`. The easiest
way to do this is by pulling down the latest version of the file with `curl`:

```
pip install -r <(curl https://raw.githubusercontent.com/scipy-conference/scipy_proceedings/2017/requirements.txt)
```

Additionally, you will need to install a version of LaTeX and some external
packages. We encourage you to visit https://www.tug.org/texlive/ to see how to
best install LaTeX for your system.

- IEEETran LaTeX class
- (often packaged as `texlive-publishers`, or download from
[CTAN](http://www.ctan.org/tex-archive/macros/latex/contrib/IEEEtran/))
- AMSmath LaTeX classes (included in most LaTeX distributions)
- `docutils` 0.11 or later (``easy_install docutils``)
- `pygments` for code highlighting (``easy_install pygments``)

If you can use `apt-get`, you are likely to install everything with:

```
sudo apt-get install python-docutils texlive-latex-base texlive-publishers \
texlive-latex-extra texlive-fonts-recommended \
texlive-bibtex-extra
apt-get install python-docutils texlive-latex-base texlive-publishers \
texlive-latex-extra texlive-fonts-recommended \
texlive-bibtex-extra
```


17 changes: 17 additions & 0 deletions build_papers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env python

# Schedule some or all papers for build

from procbuild.pr_list import get_papers
from procbuild.submitter import BuildRequestSubmitter
import sys

if len(sys.argv) > 1:
to_build = sys.argv[1:]
else:
to_build = [nr for nr, info in get_papers()]

submitter = BuildRequestSubmitter(verbose=True)
for p in to_build:
print(f"Submitting paper {p} to build queue.")
submitter.submit(p)
15 changes: 2 additions & 13 deletions procbuild/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
from __future__ import print_function, absolute_import

import os

package_dir = os.path.abspath(os.path.dirname(__file__))
package_path = os.path.abspath(os.path.dirname(__file__))

MASTER_BRANCH = os.environ.get('MASTER_BRANCH', '2017')
ALLOW_MANUAL_BUILD_TRIGGER = bool(int(
os.environ.get('ALLOW_MANUAL_BUILD_TRIGGER', 1))
)

#from .server import (app, log, get_papers, monitor_queue,
# MASTER_BRANCH, ALLOW_MANUAL_BUILD_TRIGGER)

# --- Customize these variables ---

__all__ = ['app', 'log', 'MASTER_BRANCH', 'paper_queue']
ALLOW_MANUAL_BUILD_TRIGGER = bool(int(os.environ.get('ALLOW_MANUAL_BUILD_TRIGGER', 1)))
18 changes: 3 additions & 15 deletions procbuild/builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, absolute_import, division

import tempfile
import subprocess
import shlex
Expand All @@ -11,19 +9,9 @@
from os.path import join as joinp
from glob import iglob

excluded = ['vanderwalt', '00_vanderwalt', 'jane_doe', 'bibderwalt', '00_intro']

base_path = os.path.abspath(os.path.dirname(__file__))
from . import package_path


def cache(path='../cache'):
cache_path = joinp(base_path, path)
try:
os.mkdir(cache_path)
except OSError as e:
pass

return cache_path
excluded = ['vanderwalt', '00_vanderwalt', 'jane_doe', 'bibderwalt', '00_intro']


def repo(user='scipy'):
Expand Down Expand Up @@ -121,7 +109,7 @@ def __init__(self,
data_filenames = ['IEEEtran.cls',
'draftwatermark.sty',
'everypage.sty']
self.data_files = [joinp(base_path, 'data', f) for f in data_filenames]
self.data_files = [joinp(package_path, 'data', f) for f in data_filenames]

def add_output(self, msg):
self.build_output += msg
Expand Down
186 changes: 186 additions & 0 deletions procbuild/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import json
import io
import codecs
import time
import asyncio

from multiprocessing import Process
from concurrent.futures import ThreadPoolExecutor

import zmq
from zmq.asyncio import Context

from . import MASTER_BRANCH
from .message_proxy import OUT
from .utils import file_age, log
from .pr_list import get_pr_info, status_file, cache
from .builder import BuildManager


class Listener:
""" Listener class for defining zmq sockets and maintaining a build queue.

Attributes:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the NumPy documentation format, also below.

------------
ctx: zmq.asyncio.Context, the main context for the listener class

prefix: str, the prefix listened for by zmq sockets

socket: zmq.socket, the socket for listening to

queue: asyncio.Queue, the queue for holding the builds

dont_build: set, unique collection of PRs currently in self.queue
Note: Only modify self.dont_build within synchronous blocks.

"""

def __init__(self, prefix='build_queue'):
"""
Parameters:
------------
build_queue: str, the prefix that will be checked for by the zmq socket
"""
self.ctx = Context.instance()
self.prefix = prefix

self.socket = self.ctx.socket(zmq.SUB)
self.socket.connect(OUT)
self.socket.setsockopt(zmq.SUBSCRIBE, self.prefix.encode('utf-8'))

self.queue = asyncio.Queue()
self.dont_build = set()

async def listen(self):
"""Listener method, containing while loop for checking socket
"""
while True:
msg = await self.socket.recv_multipart()
target, raw_payload = msg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use consistent naming for prefix or target or channel (or whatever you decide is best).

payload = json.loads(raw_payload.decode('utf-8'))
print('received', payload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug statement?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I'll remove it, I found it useful even when it was running to know that it was receiving things appropriately and it would show up in the heroku logs (I think). But I'm ok abandoning this.

paper_to_build = payload.get('build_paper', None)

if self.check_age_and_queue(paper_to_build):
continue
self.dont_build.add(paper_to_build)
await self.queue.put(paper_to_build)

def check_age(self, nr):
"""Check the age of a PR's status_file based on its number.

Parameters:
------------
nr: int, the number of the PR in order of receipt
"""
age = file_age(status_file(nr))
min_wait = 0.5
too_young = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

too_young = (age is not None) and (age <= min_wait)
if too_young:
    log(f"...")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

if age is not None and age <= min_wait:
log(f"Did not build paper {nr}--recently built.")
too_young = True
return too_young

def check_queue(self, nr):
"""Check whether the queue currently contains a build request for a PR.

Parameters:
------------
nr: int, the number of the PR to check
"""
in_queue = False
if nr in self.dont_build:
log(f"Did not queue paper {nr}--already in queue.")
in_queue = True
return in_queue

def check_age_and_queue(self, nr):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend removing this method and use these two functions inline above. It doesn't add enough to warrant its own function.

"""Check whether the PR is old enough or whether it is already in queue.

Parameters:
------------
nr: int, the number of the PR to check
"""
return self.check_age(nr) or self.check_queue(nr)

def report_status(self, nr):
"""prints status notification from status_file for paper `nr`

Parameters:
------------
nr: int, the number of the PR to check
"""
with io.open(status_file(nr), 'r') as f:
status = json.load(f)['status']

if status == 'success':
print(f"Completed build for paper {nr}.")
else:
print(f"Paper for {nr} did not build successfully.")


async def queue_builder(self, loop=None):
"""Manage queue and trigger builds, report results.

loop: asyncio.loop, the loop on which to be running these tasks
"""
while True:
# await an item from the queue
nr = await self.queue.get()
# launch subprocess to build item
with ThreadPoolExecutor(max_workers=1) as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many CPUs do we have on Heroku? We could ramp this up, if multiple are available.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When i just ran heroku run grep -c processor /proc/cpuinfo I got 8 back… but I don't know if that will use up our dyno hours more quickly

await loop.run_in_executor(e, self.build_and_log, nr)
self.dont_build.remove(nr)
self.report_status(nr)

def paper_log(self, nr, record):
"""Writes status to PR's log file

Parameters:
------------
nr: int, the number of the PR to check
record: dict, the dictionary content to be written to the log
"""
status_log = status_file(nr)
with io.open(status_log, 'wb') as f:
json.dump(record, codecs.getwriter('utf-8')(f), ensure_ascii=False)

def build_and_log(self, nr):
"""Builds paper for PR number and logs the resulting status

Parameters:
------------
nr: int, the number of the PR to check
"""
pr_info = get_pr_info()
pr = pr_info[int(nr)]

build_record = {'status': 'fail',
'data': {'build_status': 'Building...',
'build_output': 'Initializing build...',
'build_timestamp': ''}}
self.paper_log(nr, build_record)

build_manager = BuildManager(user=pr['user'],
branch=pr['branch'],
cache=cache(),
master_branch=MASTER_BRANCH,
target=nr,
log=log)

status = build_manager.build_paper()
self.paper_log(nr, status)

if __name__ == "__main__":
print('Listening for incoming messages...')

listener = Listener()

loop = asyncio.get_event_loop()
tasks = asyncio.gather(listener.listen(),
listener.queue_builder(loop))
try:
loop.run_until_complete(tasks)
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
21 changes: 21 additions & 0 deletions procbuild/message_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# http://zguide.zeromq.org/page:all#The-Dynamic-Discovery-Problem

import zmq
from . import package_path


IN = f'ipc://{package_path}/queue.in'
OUT = f'ipc://{package_path}/queue.out'


if __name__ == "__main__":
context = zmq.Context()

feed_in = context.socket(zmq.PULL)
feed_in.bind(IN)

feed_out = context.socket(zmq.PUB)
feed_out.bind(OUT)

print('[message_proxy] Forwarding messages between {} and {}'.format(IN, OUT))
zmq.proxy(feed_in, feed_out)
Loading