diff --git a/ci/webhook-receiver.py b/ci/webhook-receiver.py index 1549a9d7a..f97a00a8b 100644 --- a/ci/webhook-receiver.py +++ b/ci/webhook-receiver.py @@ -13,6 +13,8 @@ import os import subprocess import logging +import time +import threading # Global vars EVENT_URL = "/github-webhook" @@ -23,6 +25,10 @@ secret_file_name = None private_key_file = None secret = None +queue_lock = None +request_event = None +ci_request_list = None +manager_running = False app = Flask(__name__) @@ -158,6 +164,33 @@ def filter_to_prs_and_pr_comments(json): return None +# run the manager, and rest of CI process +def run_manager(request_ctx): + # requests are tuples of (context, run mode) + log_access_granted(request_ctx, "Running CI") + os.system("./manager.sh config {} \"{}\" \"{}\" {}".format(request_ctx['id'], request_ctx['repo'], request_ctx['body'], request_ctx['mode'])) + +def request_handler(): + global manager_running + while True: + if ci_request_list: + # we have a request + manager_running = True + run_manager(ci_request_list.pop(0)) + # remove flag for other threads + manager_running = False + if not ci_request_list: + # list empty, go to sleep until signaled + request_event.wait() + # remove the flag, so we don't get stuck in loop + request_event.clear() + +def add_request(request_ctx): + ci_request_list.append(request_ctx) + if not request_event.isSet(): + # wake up event listener, if it was asleep + request_event.set() + @app.route(EVENT_URL, methods=['POST']) def init_ci_pipeline(): run_mode = 0 @@ -187,21 +220,31 @@ def init_ci_pipeline(): print("Request matches filter, we should RUN CI. {}".format(get_request_info(request_ctx))) - # Check if there is another CI run in progress - proc1 = subprocess.Popen(['ps', 'cax'], stdout=subprocess.PIPE) - proc2 = subprocess.Popen(['grep', 'manager.sh'], stdin=proc1.stdout, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - proc1.stdout.close() - out, err = proc2.communicate() - - if (out): - print("Can't run CI, another CI run in progress") - log_access_granted(request_ctx, "CI busy, posting busy msg") - os.system("./ci_busy.sh config {} \"{}\" \"{}\" \"Another CI run in progress, please try again in 15 minutes\"" - .format(request_ctx['id'], request_ctx['repo'], request_ctx['body'])) - else: - log_access_granted(request_ctx, "Running CI") - os.system("./manager.sh config {} \"{}\" \"{}\" {}".format(request_ctx['id'], request_ctx['repo'], request_ctx['body'], run_mode)) + request_ctx['mode'] = run_mode + + duplicate_req = False + + if (manager_running or ci_request_list): + busy_msg = "Another CI run in progress, adding request to the end of the list" + log_access_granted(request_ctx, "CI busy, placing request in queue") + if ci_request_list: + # only check for duplicates if list isn't empty + with queue_lock: + for req in ci_request_list: + # make sure this is the same PR + if req['id'] == request_ctx['id'] and req['repo'] == request_ctx['repo']: + duplicate_req = True + break + # ending frees the lock + if duplicate_req: + # let user know we're not running this request, it's a duplicate + busy_msg = "Duplicate request already waiting, ignoring message" + os.system("./ci_busy.sh config {} \"{}\" \"{}\" \"{}\"" + .format(request_ctx['id'], request_ctx['repo'], request_ctx['body'], busy_msg)) + + if not duplicate_req: + # we are not a duplicate, add to list + add_request(request_ctx) return jsonify({"status": "ONLINE"}) @@ -250,8 +293,17 @@ def parse_config(cfg_name): cfg_name = sys.argv[4] parse_config(cfg_name) - secret = decrypt_secret() + queue_lock = threading.Lock() + ci_request_list = [] + + # create an event handler for requests + request_event = threading.Event() + # dedicate thread to check for and run new requests + poll_request = threading.Thread(target=request_handler, args=[]) + # run as daemon to catch ^C termination + poll_request.daemon = True + poll_request.start() logging.info("Starting the CI service") app.run(host=host, port=port)