Skip to content

Commit

Permalink
Add CI Request Queue (#143)
Browse files Browse the repository at this point in the history
Previously CI didn't have an execution queue, thus, every request sent
when CI was running was disregarded. This pr implements a queue of
requests for CI to resolve this issue.

Commit log:

* Save performance in ci

* Add flask to install

* Allowed for public linting of unauthorized users

* Starting the queue

* Start to fixing the queue problem

* Added run mode for future advancements

* Major fixes to ci list system

* Remove debug lines

* Clean up definitions

* Remove exit statement

* Add error handling in run_ci and fix docs

* Create polling thread for requests

* Delete new line

* Account for run mode from #140

* Errors are much more difficult in threads :(

* Smarter than using tuples

* Added event-based handling instead of polling

* Let request handler clears event
  • Loading branch information
kevindweb authored and koolzz committed Jun 28, 2019
1 parent 247d1c0 commit 5a40435
Showing 1 changed file with 68 additions and 16 deletions.
84 changes: 68 additions & 16 deletions ci/webhook-receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import os
import subprocess
import logging
import time
import threading

# Global vars
EVENT_URL = "/github-webhook"
Expand All @@ -23,6 +25,10 @@
secret_file_name = None
private_key_file = None
secret = None
queue_lock = None
request_event = None
ci_request_list = None
manager_running = False

app = Flask(__name__)

Expand Down Expand Up @@ -158,6 +164,33 @@ def filter_to_prs_and_pr_comments(json):

return None

# run the manager, and rest of CI process
def run_manager(request_ctx):
# requests are tuples of (context, run mode)
log_access_granted(request_ctx, "Running CI")
os.system("./manager.sh config {} \"{}\" \"{}\" {}".format(request_ctx['id'], request_ctx['repo'], request_ctx['body'], request_ctx['mode']))

def request_handler():
global manager_running
while True:
if ci_request_list:
# we have a request
manager_running = True
run_manager(ci_request_list.pop(0))
# remove flag for other threads
manager_running = False
if not ci_request_list:
# list empty, go to sleep until signaled
request_event.wait()
# remove the flag, so we don't get stuck in loop
request_event.clear()

def add_request(request_ctx):
ci_request_list.append(request_ctx)
if not request_event.isSet():
# wake up event listener, if it was asleep
request_event.set()

@app.route(EVENT_URL, methods=['POST'])
def init_ci_pipeline():
run_mode = 0
Expand Down Expand Up @@ -187,21 +220,31 @@ def init_ci_pipeline():

print("Request matches filter, we should RUN CI. {}".format(get_request_info(request_ctx)))

# Check if there is another CI run in progress
proc1 = subprocess.Popen(['ps', 'cax'], stdout=subprocess.PIPE)
proc2 = subprocess.Popen(['grep', 'manager.sh'], stdin=proc1.stdout,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc1.stdout.close()
out, err = proc2.communicate()

if (out):
print("Can't run CI, another CI run in progress")
log_access_granted(request_ctx, "CI busy, posting busy msg")
os.system("./ci_busy.sh config {} \"{}\" \"{}\" \"Another CI run in progress, please try again in 15 minutes\""
.format(request_ctx['id'], request_ctx['repo'], request_ctx['body']))
else:
log_access_granted(request_ctx, "Running CI")
os.system("./manager.sh config {} \"{}\" \"{}\" {}".format(request_ctx['id'], request_ctx['repo'], request_ctx['body'], run_mode))
request_ctx['mode'] = run_mode

duplicate_req = False

if (manager_running or ci_request_list):
busy_msg = "Another CI run in progress, adding request to the end of the list"
log_access_granted(request_ctx, "CI busy, placing request in queue")
if ci_request_list:
# only check for duplicates if list isn't empty
with queue_lock:
for req in ci_request_list:
# make sure this is the same PR
if req['id'] == request_ctx['id'] and req['repo'] == request_ctx['repo']:
duplicate_req = True
break
# ending frees the lock
if duplicate_req:
# let user know we're not running this request, it's a duplicate
busy_msg = "Duplicate request already waiting, ignoring message"
os.system("./ci_busy.sh config {} \"{}\" \"{}\" \"{}\""
.format(request_ctx['id'], request_ctx['repo'], request_ctx['body'], busy_msg))

if not duplicate_req:
# we are not a duplicate, add to list
add_request(request_ctx)

return jsonify({"status": "ONLINE"})

Expand Down Expand Up @@ -250,8 +293,17 @@ def parse_config(cfg_name):
cfg_name = sys.argv[4]

parse_config(cfg_name)

secret = decrypt_secret()
queue_lock = threading.Lock()
ci_request_list = []

# create an event handler for requests
request_event = threading.Event()
# dedicate thread to check for and run new requests
poll_request = threading.Thread(target=request_handler, args=[])
# run as daemon to catch ^C termination
poll_request.daemon = True
poll_request.start()

logging.info("Starting the CI service")
app.run(host=host, port=port)

0 comments on commit 5a40435

Please sign in to comment.