diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 70b98df56..2594edb71 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -69,7 +69,7 @@ def run_headless(): rules_path = pathlib.Path(args.rules) logger.debug("rule path: %s", rules_path) - rules = capa.main.get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() @@ -119,7 +119,7 @@ def run_ui(): rules_path: pathlib.Path = pathlib.Path(rules_dir) logger.info("running capa using rules from %s", str(rules_path)) - rules = capa.main.get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 4e1bd572a..e9249a77f 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -636,7 +636,7 @@ def on_load_rule(_, i, total): if ida_kernwin.user_cancelled(): raise UserCancelledError("user cancelled") - return capa.main.get_rules([rule_path], on_load_rule=on_load_rule) + return capa.rules.get_rules([rule_path], on_load_rule=on_load_rule) except UserCancelledError: logger.info("User cancelled analysis.") return None diff --git a/capa/main.py b/capa/main.py index ec34e581f..f5dd205d7 100644 --- a/capa/main.py +++ b/capa/main.py @@ -19,7 +19,7 @@ import textwrap import contextlib from types import TracebackType -from typing import Any, Set, Dict, List, Callable, Optional +from typing import Any, Set, Dict, List, Optional from pathlib import Path import halo @@ -49,7 +49,7 @@ import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor import capa.features.extractors.cape.extractor -from capa.rules import Rule, RuleSet +from capa.rules import RuleSet from capa.engine import MatchResults from capa.helpers import ( get_file_taste, @@ -398,108 +398,6 @@ def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtr return file_extractors -def is_nursery_rule_path(path: Path) -> bool: - """ - The nursery is a spot for rules that have not yet been fully polished. - For example, they may not have references to public example of a technique. - Yet, we still want to capture and report on their matches. - The nursery is currently a subdirectory of the rules directory with that name. - - When nursery rules are loaded, their metadata section should be updated with: - `nursery=True`. - """ - return "nursery" in path.parts - - -def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]: - """ - collect all rule file paths, including those in subdirectories. - """ - rule_file_paths = [] - for rule_path in rule_paths: - if not rule_path.exists(): - raise IOError(f"rule path {rule_path} does not exist or cannot be accessed") - - if rule_path.is_file(): - rule_file_paths.append(rule_path) - elif rule_path.is_dir(): - logger.debug("reading rules from directory %s", rule_path) - for root, _, files in os.walk(rule_path): - if ".git" in root: - # the .github directory contains CI config in capa-rules - # this includes some .yml files - # these are not rules - # additionally, .git has files that are not .yml and generate the warning - # skip those too - continue - for file in files: - if not file.endswith(".yml"): - if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): - # expect to see .git* files, readme.md, format.md, and maybe a .git directory - # other things maybe are rules, but are mis-named. - logger.warning("skipping non-.yml file: %s", file) - continue - rule_file_paths.append(Path(root) / file) - return rule_file_paths - - -# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+ -RulePath = Path - - -def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None: - return - - -def get_rules( - rule_paths: List[RulePath], - cache_dir=None, - on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default, -) -> RuleSet: - """ - args: - rule_paths: list of paths to rules files or directories containing rules files - cache_dir: directory to use for caching rules, or will use the default detected cache directory if None - on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation - """ - if cache_dir is None: - cache_dir = capa.rules.cache.get_default_cache_directory() - # rule_paths may contain directory paths, - # so search for file paths recursively. - rule_file_paths = collect_rule_file_paths(rule_paths) - - # this list is parallel to `rule_file_paths`: - # rule_file_paths[i] corresponds to rule_contents[i]. - rule_contents = [file_path.read_bytes() for file_path in rule_file_paths] - - ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents) - if ruleset is not None: - return ruleset - - rules: List[Rule] = [] - - total_rule_count = len(rule_file_paths) - for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)): - on_load_rule(path, i, total_rule_count) - - try: - rule = capa.rules.Rule.from_yaml(content.decode("utf-8")) - except capa.rules.InvalidRule: - raise - else: - rule.meta["capa/path"] = path.as_posix() - rule.meta["capa/nursery"] = is_nursery_rule_path(path) - - rules.append(rule) - logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes) - - ruleset = capa.rules.RuleSet(rules) - - capa.rules.cache.cache_ruleset(cache_dir, ruleset) - - return ruleset - - def get_signatures(sigs_path: Path) -> List[Path]: if not sigs_path.exists(): raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") @@ -1176,7 +1074,7 @@ def get_rules_from_args(args) -> RuleSet: else: cache_dir = capa.rules.cache.get_default_cache_directory() - rules = get_rules(args.rules, cache_dir=cache_dir) + rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir) except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: logger.error("%s", str(e)) logger.error( @@ -1474,7 +1372,7 @@ def ida_main(): rules_path = get_default_root() / "rules" logger.debug("rule path: %s", rules_path) - rules = get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) meta = capa.ida.helpers.collect_metadata([rules_path]) @@ -1508,7 +1406,7 @@ def ghidra_main(): rules_path = get_default_root() / "rules" logger.debug("rule path: %s", rules_path) - rules = get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) meta = capa.ghidra.helpers.collect_metadata([rules_path]) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index b5423ad92..d9e43dfc5 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -7,6 +7,7 @@ # See the License for the specific language governing permissions and limitations under the License. import io +import os import re import uuid import codecs @@ -25,7 +26,7 @@ # https://github.com/python/mypy/issues/1153 from backports.functools_lru_cache import lru_cache # type: ignore -from typing import Any, Set, Dict, List, Tuple, Union, Iterator, Optional +from typing import Any, Set, Dict, List, Tuple, Union, Callable, Iterator, Optional from dataclasses import asdict, dataclass import yaml @@ -1691,3 +1692,105 @@ def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[Feat matches.update(hard_matches) return (features3, matches) + + +def is_nursery_rule_path(path: Path) -> bool: + """ + The nursery is a spot for rules that have not yet been fully polished. + For example, they may not have references to public example of a technique. + Yet, we still want to capture and report on their matches. + The nursery is currently a subdirectory of the rules directory with that name. + + When nursery rules are loaded, their metadata section should be updated with: + `nursery=True`. + """ + return "nursery" in path.parts + + +def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]: + """ + collect all rule file paths, including those in subdirectories. + """ + rule_file_paths = [] + for rule_path in rule_paths: + if not rule_path.exists(): + raise IOError(f"rule path {rule_path} does not exist or cannot be accessed") + + if rule_path.is_file(): + rule_file_paths.append(rule_path) + elif rule_path.is_dir(): + logger.debug("reading rules from directory %s", rule_path) + for root, _, files in os.walk(rule_path): + if ".git" in root: + # the .github directory contains CI config in capa-rules + # this includes some .yml files + # these are not rules + # additionally, .git has files that are not .yml and generate the warning + # skip those too + continue + for file in files: + if not file.endswith(".yml"): + if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): + # expect to see .git* files, readme.md, format.md, and maybe a .git directory + # other things maybe are rules, but are mis-named. + logger.warning("skipping non-.yml file: %s", file) + continue + rule_file_paths.append(Path(root) / file) + return rule_file_paths + + +# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+ +RulePath = Path + + +def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None: + return + + +def get_rules( + rule_paths: List[RulePath], + cache_dir=None, + on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default, +) -> RuleSet: + """ + args: + rule_paths: list of paths to rules files or directories containing rules files + cache_dir: directory to use for caching rules, or will use the default detected cache directory if None + on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation + """ + if cache_dir is None: + cache_dir = capa.rules.cache.get_default_cache_directory() + # rule_paths may contain directory paths, + # so search for file paths recursively. + rule_file_paths = collect_rule_file_paths(rule_paths) + + # this list is parallel to `rule_file_paths`: + # rule_file_paths[i] corresponds to rule_contents[i]. + rule_contents = [file_path.read_bytes() for file_path in rule_file_paths] + + ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents) + if ruleset is not None: + return ruleset + + rules: List[Rule] = [] + + total_rule_count = len(rule_file_paths) + for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)): + on_load_rule(path, i, total_rule_count) + + try: + rule = capa.rules.Rule.from_yaml(content.decode("utf-8")) + except capa.rules.InvalidRule: + raise + else: + rule.meta["capa/path"] = path.as_posix() + rule.meta["capa/nursery"] = is_nursery_rule_path(path) + + rules.append(rule) + logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes) + + ruleset = capa.rules.RuleSet(rules) + + capa.rules.cache.cache_ruleset(cache_dir, ruleset) + + return ruleset diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 8950b8936..0f6422c18 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -161,7 +161,7 @@ def main(argv=None): capa.main.handle_common_args(args) try: - rules = capa.main.get_rules(args.rules) + rules = capa.rules.get_rules(args.rules) logger.info("successfully loaded %s rules", len(rules)) except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: logger.error("%s", str(e)) diff --git a/scripts/cache-ruleset.py b/scripts/cache-ruleset.py index 6630f2eea..89137650d 100644 --- a/scripts/cache-ruleset.py +++ b/scripts/cache-ruleset.py @@ -49,7 +49,7 @@ def main(argv=None): try: cache_dir = Path(args.cache) cache_dir.mkdir(parents=True, exist_ok=True) - rules = capa.main.get_rules(args.rules, cache_dir) + rules = capa.rules.get_rules(args.rules, cache_dir) logger.info("successfully loaded %s rules", len(rules)) except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: logger.error("%s", str(e)) diff --git a/scripts/capa2yara.py b/scripts/capa2yara.py index 5fe5c0849..56fd0e8cb 100644 --- a/scripts/capa2yara.py +++ b/scripts/capa2yara.py @@ -741,7 +741,7 @@ def main(argv=None): logging.getLogger("capa2yara").setLevel(level) try: - rules = capa.main.get_rules([Path(args.rules)]) + rules = capa.rules.get_rules([Path(args.rules)]) namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values())) logger.info("successfully loaded %d rules (including subscope rules which will be ignored)", len(rules)) if args.tag: diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 611576908..e6b8bf429 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -170,7 +170,7 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]: # ==== render dictionary helpers def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): # load rules from disk - rules = capa.main.get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) # extract features and find capabilities extractor = capa.main.get_extractor( diff --git a/scripts/detect_duplicate_features.py b/scripts/detect_duplicate_features.py index 6737d7fa9..9561339c2 100644 --- a/scripts/detect_duplicate_features.py +++ b/scripts/detect_duplicate_features.py @@ -48,7 +48,7 @@ def find_overlapping_rules(new_rule_path, rules_path): overlapping_rules = [] # capa.rules.RuleSet stores all rules in given paths - ruleset = capa.main.get_rules(rules_path) + ruleset = capa.rules.get_rules(rules_path) for rule_name, rule in ruleset.rules.items(): rule_features = rule.extract_all_features() diff --git a/scripts/lint.py b/scripts/lint.py index edcf9f563..b24aa1349 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -1002,7 +1002,7 @@ def main(argv=None): time0 = time.time() try: - rules = capa.main.get_rules(args.rules) + rules = capa.rules.get_rules(args.rules) logger.info("successfully loaded %s rules", rules.source_rule_count) if args.tag: rules = rules.filter_rules_by_meta(args.tag) diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 86590a800..f9615cba6 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -91,7 +91,7 @@ def main(argv=None): try: with capa.main.timing("load rules"): - rules = capa.main.get_rules(args.rules) + rules = capa.rules.get_rules(args.rules) except IOError as e: logger.error("%s", str(e)) return -1 diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 421c6c7e1..c09797ec2 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -153,7 +153,7 @@ def main(argv=None): return -1 try: - rules = capa.main.get_rules(args.rules) + rules = capa.rules.get_rules(args.rules) logger.info("successfully loaded %s rules", len(rules)) if args.tag: rules = rules.filter_rules_by_meta(args.tag) diff --git a/scripts/show-unused-features.py b/scripts/show-unused-features.py index ddd236614..b030995c3 100644 --- a/scripts/show-unused-features.py +++ b/scripts/show-unused-features.py @@ -43,7 +43,7 @@ def format_address(addr: capa.features.address.Address) -> str: def get_rules_feature_set(rules_path) -> Set[Feature]: - ruleset = capa.main.get_rules(rules_path) + ruleset = capa.rules.get_rules(rules_path) rules_feature_set: Set[Feature] = set() for _, rule in ruleset.rules.items(): rules_feature_set.update(rule.extract_all_features())