Skip to content

Commit 8b6fa3f

Browse files
authored
chore: modify check execution from multi-threaded to single-threaded (#650)
Signed-off-by: Nathan Nguyen <nathan.nguyen@oracle.com>
1 parent 621fad7 commit 8b6fa3f

File tree

6 files changed

+63
-330
lines changed

6 files changed

+63
-330
lines changed

docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.rst

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,6 @@ macaron.slsa\_analyzer.registry module
7272
:undoc-members:
7373
:show-inheritance:
7474

75-
macaron.slsa\_analyzer.runner module
76-
------------------------------------
77-
78-
.. automodule:: macaron.slsa_analyzer.runner
79-
:members:
80-
:undoc-members:
81-
:show-inheritance:
82-
8375
macaron.slsa\_analyzer.slsa\_req module
8476
---------------------------------------
8577

src/macaron/config/defaults.ini

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
22
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
33

4-
[runner]
5-
# The number of runners. Macaron only supports one runner at the moment.
6-
runner_num = 1
7-
# The duration (in seconds) for the main thread to
8-
# wait for a runner to complete its check. After this time period,
9-
# that runner will be put back into the queue to keep running if it hasn't finished.
10-
timeout = 5
11-
124
[requests]
135
# The default timeout in seconds for 'requests' API calls.
146
timeout = 10

src/macaron/slsa_analyzer/registry.py

Lines changed: 56 additions & 211 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@
33

44
"""This module contains the Registry class for loading checks."""
55

6-
import concurrent.futures
76
import fnmatch
87
import inspect
98
import logging
10-
import queue
119
import re
1210
import sys
11+
import traceback
1312
from collections.abc import Callable, Iterable
14-
from copy import deepcopy
1513
from graphlib import CycleError, TopologicalSorter
1614
from typing import Any, TypeVar
1715

@@ -26,7 +24,6 @@
2624
CheckResultType,
2725
SkippedInfo,
2826
)
29-
from macaron.slsa_analyzer.runner import Runner
3027
from macaron.slsa_analyzer.slsa_req import ReqName
3128

3229
logger: logging.Logger = logging.getLogger(__name__)
@@ -47,31 +44,13 @@ class Registry:
4744
# The format for check id
4845
_id_format = re.compile(r"^mcn_([a-z]+_)+([0-9]+)$")
4946

50-
# The directed graph that represents the check relationships.
51-
# This graph is used to get the order in which checks are
52-
# executed. Each node of this graph is the id of a check.
53-
_graph: TopologicalSorter = TopologicalSorter()
54-
55-
# True if we have already call Registry._graph.prepare
56-
# (which will then call TopologicalSorter.prepare).
57-
# This is because TopologicalSorter will raise an Exception
58-
# if we call TopologicalSorter.prepare multiple times.
59-
# Reference: https://docs.python.org/3/library/graphlib.html
60-
_is_graph_ready = False
61-
6247
def __init__(self) -> None:
6348
"""Initiate the Registry instance."""
64-
self.runners: list[Runner] = []
65-
self.runner_queue: queue.Queue = queue.Queue()
66-
67-
# Use default values.
68-
self.runner_num = 1
69-
self.runner_timeout = 5
70-
7149
self.checks_to_run: list[str] = []
7250
self.no_parent_checks: list[str] = []
7351

7452
self.check_tree: CheckTree = {}
53+
self.execution_order: list[str] = []
7554

7655
def register(self, check: BaseCheck) -> None:
7756
"""Register the check.
@@ -100,10 +79,6 @@ def register(self, check: BaseCheck) -> None:
10079
logger.error("Cannot load relationships of check %s.", check.check_info.check_id)
10180
sys.exit(1)
10281

103-
if not self._add_node(check):
104-
logger.critical("Cannot add check %s to the directed graph.", check.check_info.check_id)
105-
sys.exit(1)
106-
10782
self._all_checks_mapping[check.check_info.check_id] = check
10883

10984
def _add_relationship_entry(self, check_id: str, relationship: tuple[str, CheckResultType]) -> bool:
@@ -227,42 +202,6 @@ def _validate_check(check: Any) -> bool:
227202

228203
return True
229204

230-
def _add_node(self, check: BaseCheck) -> bool:
231-
"""Add this check to the directed graph along with its predecessors.
232-
233-
This method only fails if Registry._graph.prepare() has already been called.
234-
235-
References
236-
----------
237-
- https://docs.python.org/3/library/graphlib.html#graphlib.TopologicalSorter.add
238-
239-
Parameters
240-
----------
241-
check : BaseCheck
242-
The check to be added.
243-
244-
Returns
245-
-------
246-
bool
247-
True if added successfully, else False.
248-
"""
249-
try:
250-
parent_ids = (parent[0] for parent in check.depends_on)
251-
252-
# Add this check to the graph first.
253-
# The graphlib library supports adding duplicated nodes
254-
# or predecessors without breaking the graph.
255-
self._graph.add(check.check_info.check_id)
256-
257-
# Add predecessors.
258-
for parent in parent_ids:
259-
self._graph.add(check.check_info.check_id, parent)
260-
261-
return True
262-
except ValueError as err:
263-
logger.error(str(err))
264-
return False
265-
266205
@staticmethod
267206
def _validate_eval_reqs(eval_reqs: list[Any]) -> bool:
268207
"""Return True if the all evaluated requirements are valid.
@@ -488,6 +427,24 @@ def get_final_checks(self, ex_pats: list[str], in_pats: list[str]) -> list[str]:
488427
final = include.difference(exclude)
489428
return list(final)
490429

430+
def get_check_execution_order(self) -> list[str]:
431+
"""Get the execution order of checks.
432+
433+
This follows the topological order on the check graph.
434+
435+
Returns
436+
-------
437+
list[str]
438+
A list of check ids representing the order of checks to run.
439+
"""
440+
graph: TopologicalSorter = TopologicalSorter()
441+
for node in self._all_checks_mapping:
442+
graph.add(node)
443+
for node, children_entries in self._check_relationships_mapping.items():
444+
for child in children_entries:
445+
graph.add(child, node)
446+
return list(graph.static_order())
447+
491448
def scan(self, target: AnalyzeContext) -> dict[str, CheckResult]:
492449
"""Run all checks on a target repo.
493450
@@ -507,149 +464,46 @@ def scan(self, target: AnalyzeContext) -> dict[str, CheckResult]:
507464
results: dict[str, CheckResult] = {}
508465
skipped_checks: list[SkippedInfo] = []
509466

510-
with concurrent.futures.ThreadPoolExecutor(max_workers=self.runner_num) as executor:
511-
# To allow the graph to be traversed again after this run.
512-
graph = deepcopy(self._graph)
513-
514-
# This queue contains the futures instances returned from
515-
# submitting tasks to the ThreadPoolExecutor.
516-
futures_queue: queue.Queue = queue.Queue()
517-
518-
# This queue contains the currently available Check instances
519-
# for the Runners to pickup and execute.
520-
check_queue: queue.Queue = queue.Queue()
521-
522-
while graph.is_active():
523-
# Enqueue all checks that are available to be processed.
524-
# After a runner has completed a check, we call graph.done(check_id)
525-
# to signal the TopologicalSorter to proceed with more nodes.
526-
# These newly proceeded nodes are returned to us in the next call
527-
# to graph.get_ready().
528-
for check_id in graph.get_ready():
529-
logger.debug("Check to run %s", check_id)
530-
check = all_checks.get(check_id)
531-
532-
if not check:
533-
logger.error(
534-
"Check %s is not defined yet. Please add the implementation for %s.", check_id, check_id
535-
)
536-
results[check_id] = CheckResult(
537-
check=CheckInfo(
538-
check_id=check_id,
539-
check_description="",
540-
eval_reqs=[],
541-
),
542-
result=CheckResultData(
543-
result_type=CheckResultType.UNKNOWN,
544-
result_tables=[],
545-
),
546-
)
547-
graph.done(check_id)
548-
else:
549-
check_queue.put(check)
550-
551-
# If the runner_queue is empty, wait for a runner to become available.
552-
if self.runner_queue.empty():
553-
while not futures_queue.empty():
554-
current_runner, current_check_id, current_future = futures_queue.get()
555-
556-
try:
557-
# Explicitly check and exit if the check has raised any exception.
558-
if current_future.exception(timeout=self.runner_timeout):
559-
logger.error("Exception in check %s: %s.", current_check_id, current_future.exception())
560-
logger.info("Check %s has failed.", current_check_id)
561-
current_future.cancel()
562-
self.runner_queue.put(current_runner)
563-
return results
564-
except (
565-
concurrent.futures.TimeoutError,
566-
concurrent.futures.CancelledError,
567-
concurrent.futures.InvalidStateError,
568-
concurrent.futures.BrokenExecutor,
569-
):
570-
# The check is still running, put the future back into the queue.
571-
futures_queue.put((current_runner, current_check_id, current_future))
572-
573-
# Break out if a runner is available.
574-
if not self.runner_queue.empty():
575-
break
576-
577-
if current_future.done():
578-
result = current_future.result()
579-
results[current_check_id] = result
580-
graph.done(current_check_id)
581-
582-
# Run the check with the next available runner.
583-
if self.runner_queue.empty():
584-
# We should not reach here.
585-
logger.critical("Could not find any available runners. Stop the analysis...")
586-
return results
587-
runner: Runner = self.runner_queue.get()
588-
589-
if check_queue.empty():
590-
# We should not reach here.
591-
logger.error("Could not find any checks to run.")
592-
return results
593-
next_check: BaseCheck = check_queue.get()
594-
595-
# Don't run excluded checks
596-
if next_check.check_info.check_id not in self.checks_to_run:
597-
logger.debug("Check %s is disabled by user configuration.", next_check.check_info.check_id)
598-
graph.done(next_check.check_info.check_id)
599-
self.runner_queue.put(runner)
600-
continue
601-
602-
# Look up check results to see if this check should be run based on its parent status
603-
skipped_info = self._should_skip_check(next_check, results)
604-
if skipped_info:
605-
skipped_checks.append(skipped_info)
606-
607-
# Submit check to run with specified runner.
608-
submitted_future = executor.submit(runner.run, target, next_check, skipped_checks)
609-
futures_queue.put(
610-
(
611-
runner,
612-
next_check.check_info.check_id,
613-
submitted_future,
614-
)
467+
for check_id in self.execution_order:
468+
check = all_checks.get(check_id)
469+
470+
if not check:
471+
logger.error("Check %s is not defined yet. Please add the implementation for %s.", check_id, check_id)
472+
results[check_id] = CheckResult(
473+
check=CheckInfo(
474+
check_id=check_id,
475+
check_description="",
476+
eval_reqs=[],
477+
),
478+
result=CheckResultData(
479+
result_type=CheckResultType.UNKNOWN,
480+
result_tables=[],
481+
),
615482
)
483+
continue
616484

617-
# If the check queue is empty, wait for current check to complete.
618-
if check_queue.empty():
619-
while not futures_queue.empty():
620-
current_runner, current_check_id, current_future = futures_queue.get()
621-
622-
try:
623-
# Explicitly check and exit if the check has raised any exception.
624-
if current_future.exception(timeout=self.runner_timeout):
625-
logger.error("Exception in check %s: %s.", current_check_id, current_future.exception())
626-
logger.info("Check %s has failed.", current_check_id)
627-
current_future.cancel()
628-
return results
629-
except concurrent.futures.TimeoutError:
630-
# The check is still running, put the future back into the queue.
631-
futures_queue.put((current_runner, current_check_id, current_future))
632-
633-
# Break out if more checks can be processed.
634-
if not self.runner_queue.empty():
635-
break
636-
637-
if current_future.done():
638-
result = current_future.result()
639-
results[current_check_id] = result
640-
graph.done(current_check_id)
485+
# Don't run excluded checks
486+
if check_id not in self.checks_to_run:
487+
logger.debug(
488+
"Check %s is disabled by user configuration.",
489+
check.check_info.check_id,
490+
)
491+
continue
641492

642-
return results
493+
# Look up check results to see if this check should be run based on its parent status
494+
skipped_info = self._should_skip_check(check, results)
495+
if skipped_info:
496+
skipped_checks.append(skipped_info)
643497

644-
def _init_runners(self) -> None:
645-
"""Initiate runners from values in defaults.ini."""
646-
self.runner_num = defaults.getint("runner", "runner_num", fallback=1)
647-
self.runner_timeout = defaults.getint("runner", "timeout", fallback=5)
648-
if not self.runners:
649-
self.runners.extend([Runner(self, i) for i in range(self.runner_num)])
498+
try:
499+
results[check_id] = check.run(target, skipped_info)
500+
except Exception as exc: # pylint: disable=broad-exception-caught
501+
logger.error("Exception in check %s: %s. Run in verbose mode to get more information.", check_id, exc)
502+
logger.debug(traceback.format_exc())
503+
logger.info("Check %s has failed.", check_id)
504+
return results
650505

651-
for runner in self.runners:
652-
self.runner_queue.put(runner)
506+
return results
653507

654508
def prepare(self) -> bool:
655509
"""Prepare for the analysis.
@@ -661,21 +515,12 @@ def prepare(self) -> bool:
661515
bool
662516
True if there are no errors, else False.
663517
"""
664-
self._init_runners()
665-
666-
# Only support 1 runner at the moment.
667-
if not self.runners or len(self.runners) != 1:
668-
logger.critical("Invalid number of runners.")
669-
return False
670-
671518
if not self._all_checks_mapping:
672519
logger.error("Cannot run because there is no check registered.")
673520
return False
674521

675522
try:
676-
if not self._is_graph_ready:
677-
self._graph.prepare()
678-
self._is_graph_ready = True
523+
self.execution_order = self.get_check_execution_order()
679524
except CycleError as error:
680525
logger.error("Found circular dependencies in registered checks: %s", str(error))
681526
return False

0 commit comments

Comments
 (0)