Skip to content

Commit

Permalink
main: move rule-related routines to capa.rules
Browse files Browse the repository at this point in the history
ref #1821
  • Loading branch information
williballenthin committed Jan 25, 2024
1 parent 8c2c486 commit a9e1fd9
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 120 deletions.
4 changes: 2 additions & 2 deletions capa/ghidra/capa_ghidra.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run_headless():
rules_path = pathlib.Path(args.rules)

logger.debug("rule path: %s", rules_path)
rules = capa.main.get_rules([rules_path])
rules = capa.rules.get_rules([rules_path])

meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
Expand Down Expand Up @@ -119,7 +119,7 @@ def run_ui():
rules_path: pathlib.Path = pathlib.Path(rules_dir)
logger.info("running capa using rules from %s", str(rules_path))

rules = capa.main.get_rules([rules_path])
rules = capa.rules.get_rules([rules_path])

meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
Expand Down
2 changes: 1 addition & 1 deletion capa/ida/plugin/form.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def on_load_rule(_, i, total):
if ida_kernwin.user_cancelled():
raise UserCancelledError("user cancelled")

return capa.main.get_rules([rule_path], on_load_rule=on_load_rule)
return capa.rules.get_rules([rule_path], on_load_rule=on_load_rule)
except UserCancelledError:
logger.info("User cancelled analysis.")
return None
Expand Down
112 changes: 5 additions & 107 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import textwrap
import contextlib
from types import TracebackType
from typing import Any, Set, Dict, List, Callable, Optional
from typing import Any, Set, Dict, List, Optional
from pathlib import Path

import halo
Expand Down Expand Up @@ -49,7 +49,7 @@
import capa.features.extractors.dotnetfile
import capa.features.extractors.base_extractor
import capa.features.extractors.cape.extractor
from capa.rules import Rule, RuleSet
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.helpers import (
get_file_taste,
Expand Down Expand Up @@ -398,108 +398,6 @@ def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtr
return file_extractors


def is_nursery_rule_path(path: Path) -> bool:
"""
The nursery is a spot for rules that have not yet been fully polished.
For example, they may not have references to public example of a technique.
Yet, we still want to capture and report on their matches.
The nursery is currently a subdirectory of the rules directory with that name.
When nursery rules are loaded, their metadata section should be updated with:
`nursery=True`.
"""
return "nursery" in path.parts


def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]:
"""
collect all rule file paths, including those in subdirectories.
"""
rule_file_paths = []
for rule_path in rule_paths:
if not rule_path.exists():
raise IOError(f"rule path {rule_path} does not exist or cannot be accessed")

if rule_path.is_file():
rule_file_paths.append(rule_path)
elif rule_path.is_dir():
logger.debug("reading rules from directory %s", rule_path)
for root, _, files in os.walk(rule_path):
if ".git" in root:
# the .github directory contains CI config in capa-rules
# this includes some .yml files
# these are not rules
# additionally, .git has files that are not .yml and generate the warning
# skip those too
continue
for file in files:
if not file.endswith(".yml"):
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
# other things maybe are rules, but are mis-named.
logger.warning("skipping non-.yml file: %s", file)
continue
rule_file_paths.append(Path(root) / file)
return rule_file_paths


# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+
RulePath = Path


def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None:
return


def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
) -> RuleSet:
"""
args:
rule_paths: list of paths to rules files or directories containing rules files
cache_dir: directory to use for caching rules, or will use the default detected cache directory if None
on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation
"""
if cache_dir is None:
cache_dir = capa.rules.cache.get_default_cache_directory()
# rule_paths may contain directory paths,
# so search for file paths recursively.
rule_file_paths = collect_rule_file_paths(rule_paths)

# this list is parallel to `rule_file_paths`:
# rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = [file_path.read_bytes() for file_path in rule_file_paths]

ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset

rules: List[Rule] = []

total_rule_count = len(rule_file_paths)
for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)):
on_load_rule(path, i, total_rule_count)

try:
rule = capa.rules.Rule.from_yaml(content.decode("utf-8"))
except capa.rules.InvalidRule:
raise
else:
rule.meta["capa/path"] = path.as_posix()
rule.meta["capa/nursery"] = is_nursery_rule_path(path)

rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes)

ruleset = capa.rules.RuleSet(rules)

capa.rules.cache.cache_ruleset(cache_dir, ruleset)

return ruleset


def get_signatures(sigs_path: Path) -> List[Path]:
if not sigs_path.exists():
raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed")
Expand Down Expand Up @@ -1176,7 +1074,7 @@ def get_rules_from_args(args) -> RuleSet:
else:
cache_dir = capa.rules.cache.get_default_cache_directory()

rules = get_rules(args.rules, cache_dir=cache_dir)
rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir)
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
logger.error(
Expand Down Expand Up @@ -1474,7 +1372,7 @@ def ida_main():

rules_path = get_default_root() / "rules"
logger.debug("rule path: %s", rules_path)
rules = get_rules([rules_path])
rules = capa.rules.get_rules([rules_path])

meta = capa.ida.helpers.collect_metadata([rules_path])

Expand Down Expand Up @@ -1508,7 +1406,7 @@ def ghidra_main():

rules_path = get_default_root() / "rules"
logger.debug("rule path: %s", rules_path)
rules = get_rules([rules_path])
rules = capa.rules.get_rules([rules_path])

meta = capa.ghidra.helpers.collect_metadata([rules_path])

Expand Down
105 changes: 104 additions & 1 deletion capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# See the License for the specific language governing permissions and limitations under the License.

import io
import os
import re
import uuid
import codecs
Expand All @@ -25,7 +26,7 @@
# https://github.com/python/mypy/issues/1153
from backports.functools_lru_cache import lru_cache # type: ignore

from typing import Any, Set, Dict, List, Tuple, Union, Iterator, Optional
from typing import Any, Set, Dict, List, Tuple, Union, Callable, Iterator, Optional
from dataclasses import asdict, dataclass

import yaml
Expand Down Expand Up @@ -1691,3 +1692,105 @@ def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[Feat
matches.update(hard_matches)

return (features3, matches)


def is_nursery_rule_path(path: Path) -> bool:
"""
The nursery is a spot for rules that have not yet been fully polished.
For example, they may not have references to public example of a technique.
Yet, we still want to capture and report on their matches.
The nursery is currently a subdirectory of the rules directory with that name.
When nursery rules are loaded, their metadata section should be updated with:
`nursery=True`.
"""
return "nursery" in path.parts


def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]:
"""
collect all rule file paths, including those in subdirectories.
"""
rule_file_paths = []
for rule_path in rule_paths:
if not rule_path.exists():
raise IOError(f"rule path {rule_path} does not exist or cannot be accessed")

if rule_path.is_file():
rule_file_paths.append(rule_path)
elif rule_path.is_dir():
logger.debug("reading rules from directory %s", rule_path)
for root, _, files in os.walk(rule_path):
if ".git" in root:
# the .github directory contains CI config in capa-rules
# this includes some .yml files
# these are not rules
# additionally, .git has files that are not .yml and generate the warning
# skip those too
continue
for file in files:
if not file.endswith(".yml"):
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
# other things maybe are rules, but are mis-named.
logger.warning("skipping non-.yml file: %s", file)
continue
rule_file_paths.append(Path(root) / file)
return rule_file_paths


# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+
RulePath = Path


def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None:
return


def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
) -> RuleSet:
"""
args:
rule_paths: list of paths to rules files or directories containing rules files
cache_dir: directory to use for caching rules, or will use the default detected cache directory if None
on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation
"""
if cache_dir is None:
cache_dir = capa.rules.cache.get_default_cache_directory()
# rule_paths may contain directory paths,
# so search for file paths recursively.
rule_file_paths = collect_rule_file_paths(rule_paths)

# this list is parallel to `rule_file_paths`:
# rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = [file_path.read_bytes() for file_path in rule_file_paths]

ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset

rules: List[Rule] = []

total_rule_count = len(rule_file_paths)
for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)):
on_load_rule(path, i, total_rule_count)

try:
rule = capa.rules.Rule.from_yaml(content.decode("utf-8"))
except capa.rules.InvalidRule:
raise
else:
rule.meta["capa/path"] = path.as_posix()
rule.meta["capa/nursery"] = is_nursery_rule_path(path)

rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes)

ruleset = capa.rules.RuleSet(rules)

capa.rules.cache.cache_ruleset(cache_dir, ruleset)

return ruleset
2 changes: 1 addition & 1 deletion scripts/bulk-process.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def main(argv=None):
capa.main.handle_common_args(args)

try:
rules = capa.main.get_rules(args.rules)
rules = capa.rules.get_rules(args.rules)
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
Expand Down
2 changes: 1 addition & 1 deletion scripts/cache-ruleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def main(argv=None):
try:
cache_dir = Path(args.cache)
cache_dir.mkdir(parents=True, exist_ok=True)
rules = capa.main.get_rules(args.rules, cache_dir)
rules = capa.rules.get_rules(args.rules, cache_dir)
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
Expand Down
2 changes: 1 addition & 1 deletion scripts/capa2yara.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ def main(argv=None):
logging.getLogger("capa2yara").setLevel(level)

try:
rules = capa.main.get_rules([Path(args.rules)])
rules = capa.rules.get_rules([Path(args.rules)])
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
logger.info("successfully loaded %d rules (including subscope rules which will be ignored)", len(rules))
if args.tag:
Expand Down
2 changes: 1 addition & 1 deletion scripts/capa_as_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]:
# ==== render dictionary helpers
def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"):
# load rules from disk
rules = capa.main.get_rules([rules_path])
rules = capa.rules.get_rules([rules_path])

# extract features and find capabilities
extractor = capa.main.get_extractor(
Expand Down
2 changes: 1 addition & 1 deletion scripts/detect_duplicate_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def find_overlapping_rules(new_rule_path, rules_path):
overlapping_rules = []

# capa.rules.RuleSet stores all rules in given paths
ruleset = capa.main.get_rules(rules_path)
ruleset = capa.rules.get_rules(rules_path)

for rule_name, rule in ruleset.rules.items():
rule_features = rule.extract_all_features()
Expand Down
2 changes: 1 addition & 1 deletion scripts/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ def main(argv=None):
time0 = time.time()

try:
rules = capa.main.get_rules(args.rules)
rules = capa.rules.get_rules(args.rules)
logger.info("successfully loaded %s rules", rules.source_rule_count)
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
Expand Down
2 changes: 1 addition & 1 deletion scripts/profile-time.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def main(argv=None):

try:
with capa.main.timing("load rules"):
rules = capa.main.get_rules(args.rules)
rules = capa.rules.get_rules(args.rules)
except IOError as e:
logger.error("%s", str(e))
return -1
Expand Down
2 changes: 1 addition & 1 deletion scripts/show-capabilities-by-function.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def main(argv=None):
return -1

try:
rules = capa.main.get_rules(args.rules)
rules = capa.rules.get_rules(args.rules)
logger.info("successfully loaded %s rules", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
Expand Down
2 changes: 1 addition & 1 deletion scripts/show-unused-features.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def format_address(addr: capa.features.address.Address) -> str:


def get_rules_feature_set(rules_path) -> Set[Feature]:
ruleset = capa.main.get_rules(rules_path)
ruleset = capa.rules.get_rules(rules_path)
rules_feature_set: Set[Feature] = set()
for _, rule in ruleset.rules.items():
rules_feature_set.update(rule.extract_all_features())
Expand Down

0 comments on commit a9e1fd9

Please sign in to comment.