diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ae7d02a3..2caecaebe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ ### Breaking Changes +- main: introduce wrapping routines within main for working with CLI args #1813 @williballenthin +- move functions from `capa.main` to new `capa.loader` namespace #1821 @williballenthin + ### New Rules (0) - diff --git a/capa/features/common.py b/capa/features/common.py index 0cb1396de..b6527625f 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -458,18 +458,22 @@ def evaluate(self, ctx, **kwargs): FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" FORMAT_CAPE = "cape" +FORMAT_FREEZE = "freeze" +FORMAT_RESULT = "result" STATIC_FORMATS = { FORMAT_SC32, FORMAT_SC64, FORMAT_PE, FORMAT_ELF, FORMAT_DOTNET, + FORMAT_FREEZE, + FORMAT_RESULT, } DYNAMIC_FORMATS = { FORMAT_CAPE, + FORMAT_FREEZE, + FORMAT_RESULT, } -FORMAT_FREEZE = "freeze" -FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index b7bb3c399..bf5a3e7b4 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -45,7 +45,7 @@ MATCH_JSON_OBJECT = b'{"' -def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]: +def extract_file_strings(buf: bytes, **kwargs) -> Iterator[Tuple[String, Address]]: """ extract ASCII and UTF-16 LE strings from file """ @@ -56,7 +56,7 @@ def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]: yield String(s.s), FileOffsetAddress(s.offset) -def extract_format(buf) -> Iterator[Tuple[Feature, Address]]: +def extract_format(buf: bytes) -> Iterator[Tuple[Feature, Address]]: if buf.startswith(MATCH_PE): yield Format(FORMAT_PE), NO_ADDRESS elif buf.startswith(MATCH_ELF): diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 9e3f73310..2dac7f48e 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -21,6 +21,7 @@ # https://github.com/mandiant/capa/issues/1699 from typing_extensions import TypeAlias +import capa.loader import capa.helpers import capa.version import capa.features.file @@ -681,14 +682,18 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="save capa features to a file") - capa.main.install_common_args(parser, {"sample", "format", "backend", "os", "signatures"}) + capa.main.install_common_args(parser, {"input_file", "format", "backend", "os", "signatures"}) parser.add_argument("output", type=str, help="Path to output file") args = parser.parse_args(args=argv) - capa.main.handle_common_args(args) - sigpaths = capa.main.get_signatures(args.signatures) - - extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False) + try: + capa.main.handle_common_args(args) + capa.main.ensure_input_exists_from_cli(args) + input_format = capa.main.get_input_format_from_cli(args) + backend = capa.main.get_backend_from_cli(args, input_format) + extractor = capa.main.get_extractor_from_cli(args, input_format, backend) + except capa.main.ShouldExitError as e: + return e.status_code Path(args.output).write_bytes(dump(extractor)) diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 70b98df56..b3ec0183b 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -69,7 +69,7 @@ def run_headless(): rules_path = pathlib.Path(args.rules) logger.debug("rule path: %s", rules_path) - rules = capa.main.get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() @@ -78,7 +78,7 @@ def run_headless(): meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True): logger.info("capa encountered warnings during analysis") @@ -119,7 +119,7 @@ def run_ui(): rules_path: pathlib.Path = pathlib.Path(rules_dir) logger.info("running capa using rules from %s", str(rules_path)) - rules = capa.main.get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() @@ -128,7 +128,7 @@ def run_ui(): meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False): logger.info("capa encountered warnings during analysis") diff --git a/capa/helpers.py b/capa/helpers.py index 89dad8b91..ad27f3903 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -5,6 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import sys import json import inspect import logging @@ -16,12 +17,22 @@ import tqdm from capa.exceptions import UnsupportedFormatError -from capa.features.common import FORMAT_PE, FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, FORMAT_UNKNOWN, Format +from capa.features.common import ( + FORMAT_PE, + FORMAT_CAPE, + FORMAT_SC32, + FORMAT_SC64, + FORMAT_DOTNET, + FORMAT_FREEZE, + FORMAT_UNKNOWN, + Format, +) EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64") EXTENSIONS_DYNAMIC = ("json", "json_") EXTENSIONS_ELF = "elf_" +EXTENSIONS_FREEZE = "frz" logger = logging.getLogger("capa") @@ -81,6 +92,8 @@ def get_format_from_extension(sample: Path) -> str: format_ = FORMAT_SC64 elif sample.name.endswith(EXTENSIONS_DYNAMIC): format_ = get_format_from_report(sample) + elif sample.name.endswith(EXTENSIONS_FREEZE): + format_ = FORMAT_FREEZE return format_ @@ -201,3 +214,16 @@ def log_unsupported_runtime_error(): " If you're seeing this message on the command line, please ensure you're running a supported Python version." ) logger.error("-" * 80) + + +def is_running_standalone() -> bool: + """ + are we running from a PyInstaller'd executable? + if so, then we'll be able to access `sys._MEIPASS` for the packaged resources. + """ + # typically we only expect capa.main to be packaged via PyInstaller. + # therefore, this *should* be in capa.main; however, + # the Binary Ninja extractor uses this to resolve the BN API code, + # so we keep this in a common area. + # generally, other library code should not use this function. + return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS") diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 4e1bd572a..ddd4c4e0d 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -636,7 +636,7 @@ def on_load_rule(_, i, total): if ida_kernwin.user_cancelled(): raise UserCancelledError("user cancelled") - return capa.main.get_rules([rule_path], on_load_rule=on_load_rule) + return capa.rules.get_rules([rule_path], on_load_rule=on_load_rule) except UserCancelledError: logger.info("User cancelled analysis.") return None @@ -775,7 +775,7 @@ def slot_progress_feature_extraction(text): meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.main.compute_layout(ruleset, self.feature_extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(ruleset, self.feature_extractor, capabilities) except UserCancelledError: logger.info("User cancelled analysis.") return False diff --git a/capa/loader.py b/capa/loader.py new file mode 100644 index 000000000..4c0f3d4f0 --- /dev/null +++ b/capa/loader.py @@ -0,0 +1,544 @@ +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import sys +import json +import logging +import datetime +from typing import Set, Dict, List, Optional +from pathlib import Path + +import halo +from typing_extensions import assert_never + +import capa.perf +import capa.rules +import capa.engine +import capa.helpers +import capa.version +import capa.render.json +import capa.rules.cache +import capa.render.default +import capa.render.verbose +import capa.features.common +import capa.features.freeze as frz +import capa.render.vverbose +import capa.features.extractors +import capa.render.result_document +import capa.render.result_document as rdoc +import capa.features.extractors.common +import capa.features.extractors.pefile +import capa.features.extractors.elffile +import capa.features.extractors.dotnetfile +import capa.features.extractors.base_extractor +import capa.features.extractors.cape.extractor +from capa.rules import RuleSet +from capa.engine import MatchResults +from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError +from capa.features.common import ( + OS_AUTO, + FORMAT_PE, + FORMAT_ELF, + FORMAT_AUTO, + FORMAT_CAPE, + FORMAT_SC32, + FORMAT_SC64, + FORMAT_DOTNET, +) +from capa.features.address import Address +from capa.features.extractors.base_extractor import ( + SampleHashes, + FeatureExtractor, + StaticFeatureExtractor, + DynamicFeatureExtractor, +) + +logger = logging.getLogger(__name__) + +BACKEND_VIV = "vivisect" +BACKEND_DOTNET = "dotnet" +BACKEND_BINJA = "binja" +BACKEND_PEFILE = "pefile" +BACKEND_CAPE = "cape" +BACKEND_FREEZE = "freeze" + + +def is_supported_format(sample: Path) -> bool: + """ + Return if this is a supported file based on magic header values + """ + taste = sample.open("rb").read(0x100) + + return len(list(capa.features.extractors.common.extract_format(taste))) == 1 + + +def is_supported_arch(sample: Path) -> bool: + buf = sample.read_bytes() + + return len(list(capa.features.extractors.common.extract_arch(buf))) == 1 + + +def get_arch(sample: Path) -> str: + buf = sample.read_bytes() + + for feature, _ in capa.features.extractors.common.extract_arch(buf): + assert isinstance(feature.value, str) + return feature.value + + return "unknown" + + +def is_supported_os(sample: Path) -> bool: + buf = sample.read_bytes() + + return len(list(capa.features.extractors.common.extract_os(buf))) == 1 + + +def get_os(sample: Path) -> str: + buf = sample.read_bytes() + + for feature, _ in capa.features.extractors.common.extract_os(buf): + assert isinstance(feature.value, str) + return feature.value + + return "unknown" + + +def get_meta_str(vw): + """ + Return workspace meta information string + """ + meta = [] + for k in ["Format", "Platform", "Architecture"]: + if k in vw.metadata: + meta.append(f"{k.lower()}: {vw.metadata[k]}") + return f"{', '.join(meta)}, number of functions: {len(vw.getFunctions())}" + + +def get_workspace(path: Path, input_format: str, sigpaths: List[Path]): + """ + load the program at the given path into a vivisect workspace using the given format. + also apply the given FLIRT signatures. + + supported formats: + - pe + - elf + - shellcode 32-bit + - shellcode 64-bit + - auto + + this creates and analyzes the workspace; however, it does *not* save the workspace. + this is the responsibility of the caller. + """ + + # lazy import enables us to not require viv if user wants another backend. + import viv_utils + import viv_utils.flirt + + logger.debug("generating vivisect workspace for: %s", path) + if input_format == FORMAT_AUTO: + if not is_supported_format(path): + raise UnsupportedFormatError() + + # don't analyze, so that we can add our Flirt function analyzer first. + vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) + elif input_format in {FORMAT_PE, FORMAT_ELF}: + vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) + elif input_format == FORMAT_SC32: + # these are not analyzed nor saved. + vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False) + elif input_format == FORMAT_SC64: + vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="amd64", analyze=False) + else: + raise ValueError("unexpected format: " + input_format) + + viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths]) + + vw.analyze() + + logger.debug("%s", get_meta_str(vw)) + return vw + + +def get_extractor( + input_path: Path, + input_format: str, + os_: str, + backend: str, + sigpaths: List[Path], + should_save_workspace=False, + disable_progress=False, + sample_path: Optional[Path] = None, +) -> FeatureExtractor: + """ + raises: + UnsupportedFormatError + UnsupportedArchError + UnsupportedOSError + """ + if backend == BACKEND_CAPE: + import capa.features.extractors.cape.extractor + + report = json.loads(input_path.read_text(encoding="utf-8")) + return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) + + elif backend == BACKEND_DOTNET: + import capa.features.extractors.dnfile.extractor + + if input_format not in (FORMAT_PE, FORMAT_DOTNET): + raise UnsupportedFormatError() + + return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path) + + elif backend == BACKEND_BINJA: + import capa.helpers + from capa.features.extractors.binja.find_binja_api import find_binja_path + + # When we are running as a standalone executable, we cannot directly import binaryninja + # We need to fist find the binja API installation path and add it into sys.path + if capa.helpers.is_running_standalone(): + bn_api = find_binja_path() + if bn_api.exists(): + sys.path.append(str(bn_api)) + + try: + import binaryninja + from binaryninja import BinaryView + except ImportError: + raise RuntimeError( + "Cannot import binaryninja module. Please install the Binary Ninja Python API first: " + + "https://docs.binary.ninja/dev/batch.html#install-the-api)." + ) + + import capa.features.extractors.binja.extractor + + if input_format not in (FORMAT_SC32, FORMAT_SC64): + if not is_supported_format(input_path): + raise UnsupportedFormatError() + + if not is_supported_arch(input_path): + raise UnsupportedArchError() + + if os_ == OS_AUTO and not is_supported_os(input_path): + raise UnsupportedOSError() + + with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): + bv: BinaryView = binaryninja.load(str(input_path)) + if bv is None: + raise RuntimeError(f"Binary Ninja cannot open file {input_path}") + + return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv) + + elif backend == BACKEND_PEFILE: + import capa.features.extractors.pefile + + return capa.features.extractors.pefile.PefileFeatureExtractor(input_path) + + elif backend == BACKEND_VIV: + import capa.features.extractors.viv.extractor + + if input_format not in (FORMAT_SC32, FORMAT_SC64): + if not is_supported_format(input_path): + raise UnsupportedFormatError() + + if not is_supported_arch(input_path): + raise UnsupportedArchError() + + if os_ == OS_AUTO and not is_supported_os(input_path): + raise UnsupportedOSError() + + with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): + vw = get_workspace(input_path, input_format, sigpaths) + + if should_save_workspace: + logger.debug("saving workspace") + try: + vw.saveWorkspace() + except IOError: + # see #168 for discussion around how to handle non-writable directories + logger.info("source directory is not writable, won't save intermediate workspace") + else: + logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") + + return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_) + + elif backend == BACKEND_FREEZE: + return frz.load(input_path.read_bytes()) + + else: + raise ValueError("unexpected backend: " + backend) + + +def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtractor]: + file_extractors: List[FeatureExtractor] = [] + + if input_format == FORMAT_PE: + file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file)) + + elif input_format == FORMAT_DOTNET: + file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file)) + file_extractors.append(capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(input_file)) + + elif input_format == FORMAT_ELF: + file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(input_file)) + + elif input_format == FORMAT_CAPE: + report = json.loads(input_file.read_text(encoding="utf-8")) + file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) + + return file_extractors + + +def get_signatures(sigs_path: Path) -> List[Path]: + if not sigs_path.exists(): + raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") + + paths: List[Path] = [] + if sigs_path.is_file(): + paths.append(sigs_path) + elif sigs_path.is_dir(): + logger.debug("reading signatures from directory %s", sigs_path.resolve()) + for file in sigs_path.rglob("*"): + if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"): + paths.append(file) + + # Convert paths to their absolute and normalized forms + paths = [path.resolve().absolute() for path in paths] + + # load signatures in deterministic order: the alphabetic sorting of filename. + # this means that `0_sigs.pat` loads before `1_sigs.pat`. + paths = sorted(paths, key=lambda path: path.name) + + for path in paths: + logger.debug("found signature file: %s", path) + + return paths + + +def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts): + if isinstance(extractor, StaticFeatureExtractor): + return rdoc.StaticAnalysis( + format=format_, + arch=arch, + os=os_, + extractor=extractor.__class__.__name__, + rules=tuple(rules_path), + base_address=frz.Address.from_capa(extractor.get_base_address()), + layout=rdoc.StaticLayout( + functions=(), + # this is updated after capabilities have been collected. + # will look like: + # + # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } + ), + feature_counts=counts["feature_counts"], + library_functions=counts["library_functions"], + ) + elif isinstance(extractor, DynamicFeatureExtractor): + return rdoc.DynamicAnalysis( + format=format_, + arch=arch, + os=os_, + extractor=extractor.__class__.__name__, + rules=tuple(rules_path), + layout=rdoc.DynamicLayout( + processes=(), + ), + feature_counts=counts["feature_counts"], + ) + else: + raise ValueError("invalid extractor type") + + +def collect_metadata( + argv: List[str], + input_path: Path, + input_format: str, + os_: str, + rules_path: List[Path], + extractor: FeatureExtractor, + counts: dict, +) -> rdoc.Metadata: + # if it's a binary sample we hash it, if it's a report + # we fetch the hashes from the report + sample_hashes: SampleHashes = extractor.get_sample_hashes() + md5, sha1, sha256 = sample_hashes.md5, sample_hashes.sha1, sample_hashes.sha256 + + global_feats = list(extractor.extract_global_features()) + extractor_format = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Format)] + extractor_arch = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Arch)] + extractor_os = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.OS)] + + input_format = ( + str(extractor_format[0]) if extractor_format else "unknown" if input_format == FORMAT_AUTO else input_format + ) + arch = str(extractor_arch[0]) if extractor_arch else "unknown" + os_ = str(extractor_os[0]) if extractor_os else "unknown" if os_ == OS_AUTO else os_ + + if isinstance(extractor, StaticFeatureExtractor): + meta_class: type = rdoc.StaticMetadata + elif isinstance(extractor, DynamicFeatureExtractor): + meta_class = rdoc.DynamicMetadata + else: + assert_never(extractor) + + rules = tuple(r.resolve().absolute().as_posix() for r in rules_path) + + return meta_class( + timestamp=datetime.datetime.now(), + version=capa.version.__version__, + argv=tuple(argv) if argv else None, + sample=rdoc.Sample( + md5=md5, + sha1=sha1, + sha256=sha256, + path=input_path.resolve().as_posix(), + ), + analysis=get_sample_analysis( + input_format, + arch, + os_, + extractor, + rules, + counts, + ), + ) + + +def compute_dynamic_layout( + rules: RuleSet, extractor: DynamicFeatureExtractor, capabilities: MatchResults +) -> rdoc.DynamicLayout: + """ + compute a metadata structure that links threads + to the processes in which they're found. + + only collect the threads at which some rule matched. + otherwise, we may pollute the json document with + a large amount of un-referenced data. + """ + assert isinstance(extractor, DynamicFeatureExtractor) + + matched_calls: Set[Address] = set() + + def result_rec(result: capa.features.common.Result): + for loc in result.locations: + if isinstance(loc, capa.features.address.DynamicCallAddress): + matched_calls.add(loc) + for child in result.children: + result_rec(child) + + for matches in capabilities.values(): + for _, result in matches: + result_rec(result) + + names_by_process: Dict[Address, str] = {} + names_by_call: Dict[Address, str] = {} + + matched_processes: Set[Address] = set() + matched_threads: Set[Address] = set() + + threads_by_process: Dict[Address, List[Address]] = {} + calls_by_thread: Dict[Address, List[Address]] = {} + + for p in extractor.get_processes(): + threads_by_process[p.address] = [] + + for t in extractor.get_threads(p): + calls_by_thread[t.address] = [] + + for c in extractor.get_calls(p, t): + if c.address in matched_calls: + names_by_call[c.address] = extractor.get_call_name(p, t, c) + calls_by_thread[t.address].append(c.address) + + if calls_by_thread[t.address]: + matched_threads.add(t.address) + threads_by_process[p.address].append(t.address) + + if threads_by_process[p.address]: + matched_processes.add(p.address) + names_by_process[p.address] = extractor.get_process_name(p) + + layout = rdoc.DynamicLayout( + processes=tuple( + rdoc.ProcessLayout( + address=frz.Address.from_capa(p), + name=names_by_process[p], + matched_threads=tuple( + rdoc.ThreadLayout( + address=frz.Address.from_capa(t), + matched_calls=tuple( + rdoc.CallLayout( + address=frz.Address.from_capa(c), + name=names_by_call[c], + ) + for c in calls_by_thread[t] + if c in matched_calls + ), + ) + for t in threads + if t in matched_threads + ) # this object is open to extension in the future, + # such as with the function name, etc. + ) + for p, threads in threads_by_process.items() + if p in matched_processes + ) + ) + + return layout + + +def compute_static_layout(rules: RuleSet, extractor: StaticFeatureExtractor, capabilities) -> rdoc.StaticLayout: + """ + compute a metadata structure that links basic blocks + to the functions in which they're found. + + only collect the basic blocks at which some rule matched. + otherwise, we may pollute the json document with + a large amount of un-referenced data. + """ + functions_by_bb: Dict[Address, Address] = {} + bbs_by_function: Dict[Address, List[Address]] = {} + for f in extractor.get_functions(): + bbs_by_function[f.address] = [] + for bb in extractor.get_basic_blocks(f): + functions_by_bb[bb.address] = f.address + bbs_by_function[f.address].append(bb.address) + + matched_bbs = set() + for rule_name, matches in capabilities.items(): + rule = rules[rule_name] + if capa.rules.Scope.BASIC_BLOCK in rule.scopes: + for addr, _ in matches: + assert addr in functions_by_bb + matched_bbs.add(addr) + + layout = rdoc.StaticLayout( + functions=tuple( + rdoc.FunctionLayout( + address=frz.Address.from_capa(f), + matched_basic_blocks=tuple( + rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs + ) # this object is open to extension in the future, + # such as with the function name, etc. + ) + for f, bbs in bbs_by_function.items() + if len([bb for bb in bbs if bb in matched_bbs]) > 0 + ) + ) + + return layout + + +def compute_layout(rules: RuleSet, extractor, capabilities) -> rdoc.Layout: + if isinstance(extractor, StaticFeatureExtractor): + return compute_static_layout(rules, extractor, capabilities) + elif isinstance(extractor, DynamicFeatureExtractor): + return compute_dynamic_layout(rules, extractor, capabilities) + else: + raise ValueError("extractor must be either a static or dynamic extracotr") diff --git a/capa/main.py b/capa/main.py index e5ee92a2a..2d85b8684 100644 --- a/capa/main.py +++ b/capa/main.py @@ -11,26 +11,23 @@ import io import os import sys -import json import time import logging import argparse -import datetime import textwrap import contextlib from types import TracebackType -from typing import Any, Set, Dict, List, Callable, Optional +from typing import Any, Dict, List, Optional from pathlib import Path -import halo import colorama from pefile import PEFormatError -from typing_extensions import assert_never from elftools.common.exceptions import ELFError import capa.perf import capa.rules import capa.engine +import capa.loader import capa.helpers import capa.version import capa.render.json @@ -38,7 +35,6 @@ import capa.render.default import capa.render.verbose import capa.features.common -import capa.features.freeze as frz import capa.render.vverbose import capa.features.extractors import capa.render.result_document @@ -49,8 +45,9 @@ import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor import capa.features.extractors.cape.extractor -from capa.rules import Rule, RuleSet +from capa.rules import RuleSet from capa.engine import MatchResults +from capa.loader import BACKEND_VIV, BACKEND_CAPE, BACKEND_BINJA, BACKEND_DOTNET, BACKEND_FREEZE, BACKEND_PEFILE from capa.helpers import ( get_file_taste, get_auto_format, @@ -82,21 +79,12 @@ FORMAT_FREEZE, FORMAT_RESULT, ) -from capa.features.address import Address from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities -from capa.features.extractors.base_extractor import ( - SampleHashes, - FeatureExtractor, - StaticFeatureExtractor, - DynamicFeatureExtractor, -) +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor RULES_PATH_DEFAULT_STRING = "(embedded rules)" SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)" -BACKEND_VIV = "vivisect" -BACKEND_DOTNET = "dotnet" -BACKEND_BINJA = "binja" -BACKEND_PEFILE = "pefile" +BACKEND_AUTO = "auto" E_MISSING_RULES = 10 E_MISSING_FILE = 11 @@ -134,73 +122,16 @@ def set_vivisect_log_level(level): logging.getLogger("Elf").setLevel(level) -def is_supported_format(sample: Path) -> bool: - """ - Return if this is a supported file based on magic header values - """ - taste = sample.open("rb").read(0x100) - - return len(list(capa.features.extractors.common.extract_format(taste))) == 1 - - -def is_supported_arch(sample: Path) -> bool: - buf = sample.read_bytes() - - return len(list(capa.features.extractors.common.extract_arch(buf))) == 1 - - -def get_arch(sample: Path) -> str: - buf = sample.read_bytes() - - for feature, _ in capa.features.extractors.common.extract_arch(buf): - assert isinstance(feature.value, str) - return feature.value - - return "unknown" - - -def is_supported_os(sample: Path) -> bool: - buf = sample.read_bytes() - - return len(list(capa.features.extractors.common.extract_os(buf))) == 1 - - -def get_os(sample: Path) -> str: - buf = sample.read_bytes() - - for feature, _ in capa.features.extractors.common.extract_os(buf): - assert isinstance(feature.value, str) - return feature.value - - return "unknown" - - -def get_meta_str(vw): - """ - Return workspace meta information string - """ - meta = [] - for k in ["Format", "Platform", "Architecture"]: - if k in vw.metadata: - meta.append(f"{k.lower()}: {vw.metadata[k]}") - return f"{', '.join(meta)}, number of functions: {len(vw.getFunctions())}" - - -def is_running_standalone() -> bool: - """ - are we running from a PyInstaller'd executable? - if so, then we'll be able to access `sys._MEIPASS` for the packaged resources. - """ - return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS") - - def get_default_root() -> Path: """ get the file system path to the default resources directory. under PyInstaller, this comes from _MEIPASS. under source, this is the root directory of the project. """ - if is_running_standalone(): + # we only expect capa.main to be packaged within PyInstaller, + # so we don't put this in a more common place, like capa.helpers. + + if capa.helpers.is_running_standalone(): # pylance/mypy don't like `sys._MEIPASS` because this isn't standard. # its injected by pyinstaller. # so we'll fetch this attribute dynamically. @@ -225,517 +156,32 @@ def get_default_signatures() -> List[Path]: return ret -def get_workspace(path: Path, format_: str, sigpaths: List[Path]): - """ - load the program at the given path into a vivisect workspace using the given format. - also apply the given FLIRT signatures. - - supported formats: - - pe - - elf - - shellcode 32-bit - - shellcode 64-bit - - auto - - this creates and analyzes the workspace; however, it does *not* save the workspace. - this is the responsibility of the caller. - """ - - # lazy import enables us to not require viv if user wants SMDA, for example. - import viv_utils - import viv_utils.flirt - - logger.debug("generating vivisect workspace for: %s", path) - if format_ == FORMAT_AUTO: - if not is_supported_format(path): - raise UnsupportedFormatError() - - # don't analyze, so that we can add our Flirt function analyzer first. - vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) - elif format_ in {FORMAT_PE, FORMAT_ELF}: - vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) - elif format_ == FORMAT_SC32: - # these are not analyzed nor saved. - vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False) - elif format_ == FORMAT_SC64: - vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="amd64", analyze=False) - else: - raise ValueError("unexpected format: " + format_) - - viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths]) - - vw.analyze() - - logger.debug("%s", get_meta_str(vw)) - return vw - - -def get_extractor( - path: Path, - format_: str, - os_: str, - backend: str, - sigpaths: List[Path], - should_save_workspace=False, - disable_progress=False, -) -> FeatureExtractor: - """ - raises: - UnsupportedFormatError - UnsupportedArchError - UnsupportedOSError - """ - - if format_ not in (FORMAT_SC32, FORMAT_SC64, FORMAT_CAPE): - if not is_supported_format(path): - raise UnsupportedFormatError() - - if not is_supported_arch(path): - raise UnsupportedArchError() - - if os_ == OS_AUTO and not is_supported_os(path): - raise UnsupportedOSError() - - if format_ == FORMAT_CAPE: - import capa.features.extractors.cape.extractor - - report = json.load(Path(path).open(encoding="utf-8")) - return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) - - elif format_ == FORMAT_DOTNET: - import capa.features.extractors.dnfile.extractor - - return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path) - - elif backend == BACKEND_BINJA: - from capa.features.extractors.binja.find_binja_api import find_binja_path - - # When we are running as a standalone executable, we cannot directly import binaryninja - # We need to fist find the binja API installation path and add it into sys.path - if is_running_standalone(): - bn_api = find_binja_path() - if bn_api.exists(): - sys.path.append(str(bn_api)) - - try: - import binaryninja - from binaryninja import BinaryView - except ImportError: - raise RuntimeError( - "Cannot import binaryninja module. Please install the Binary Ninja Python API first: " - + "https://docs.binary.ninja/dev/batch.html#install-the-api)." - ) - - import capa.features.extractors.binja.extractor - - with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): - bv: BinaryView = binaryninja.load(str(path)) - if bv is None: - raise RuntimeError(f"Binary Ninja cannot open file {path}") - - return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv) - - elif backend == BACKEND_PEFILE: - import capa.features.extractors.pefile - - return capa.features.extractors.pefile.PefileFeatureExtractor(path) - - elif backend == BACKEND_VIV: - import capa.features.extractors.viv.extractor - - with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): - vw = get_workspace(path, format_, sigpaths) - - if should_save_workspace: - logger.debug("saving workspace") - try: - vw.saveWorkspace() - except IOError: - # see #168 for discussion around how to handle non-writable directories - logger.info("source directory is not writable, won't save intermediate workspace") - else: - logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") - - return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_) - - else: - raise ValueError("unexpected backend: " + backend) - - -def get_file_extractors(sample: Path, format_: str) -> List[FeatureExtractor]: - file_extractors: List[FeatureExtractor] = [] - - if format_ == FORMAT_PE: - file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(sample)) - - elif format_ == FORMAT_DOTNET: - file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(sample)) - file_extractors.append(capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(sample)) - - elif format_ == capa.features.common.FORMAT_ELF: - file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample)) - - elif format_ == FORMAT_CAPE: - report = json.load(Path(sample).open(encoding="utf-8")) - file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) - - return file_extractors - - -def is_nursery_rule_path(path: Path) -> bool: - """ - The nursery is a spot for rules that have not yet been fully polished. - For example, they may not have references to public example of a technique. - Yet, we still want to capture and report on their matches. - The nursery is currently a subdirectory of the rules directory with that name. - - When nursery rules are loaded, their metadata section should be updated with: - `nursery=True`. +def simple_message_exception_handler(exctype, value: BaseException, traceback: TracebackType): """ - return "nursery" in path.parts - + prints friendly message on unexpected exceptions to regular users (debug mode shows regular stack trace) -def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]: - """ - collect all rule file paths, including those in subdirectories. - """ - rule_file_paths = [] - for rule_path in rule_paths: - if not rule_path.exists(): - raise IOError(f"rule path {rule_path} does not exist or cannot be accessed") - - if rule_path.is_file(): - rule_file_paths.append(rule_path) - elif rule_path.is_dir(): - logger.debug("reading rules from directory %s", rule_path) - for root, _, files in os.walk(rule_path): - if ".git" in root: - # the .github directory contains CI config in capa-rules - # this includes some .yml files - # these are not rules - # additionally, .git has files that are not .yml and generate the warning - # skip those too - continue - for file in files: - if not file.endswith(".yml"): - if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): - # expect to see .git* files, readme.md, format.md, and maybe a .git directory - # other things maybe are rules, but are mis-named. - logger.warning("skipping non-.yml file: %s", file) - continue - rule_file_paths.append(Path(root) / file) - return rule_file_paths - - -# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+ -RulePath = Path - - -def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None: - return - - -def get_rules( - rule_paths: List[RulePath], - cache_dir=None, - on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default, -) -> RuleSet: - """ args: - rule_paths: list of paths to rules files or directories containing rules files - cache_dir: directory to use for caching rules, or will use the default detected cache directory if None - on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation + # TODO(aaronatp): Once capa drops support for Python 3.8, move the exctype type annotation to + # the function parameters and remove the "# type: ignore[assignment]" from the relevant place + # in the main function, see (https://github.com/mandiant/capa/issues/1896) + exctype (type[BaseException]): exception class """ - if cache_dir is None: - cache_dir = capa.rules.cache.get_default_cache_directory() - # rule_paths may contain directory paths, - # so search for file paths recursively. - rule_file_paths = collect_rule_file_paths(rule_paths) - - # this list is parallel to `rule_file_paths`: - # rule_file_paths[i] corresponds to rule_contents[i]. - rule_contents = [file_path.read_bytes() for file_path in rule_file_paths] - - ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents) - if ruleset is not None: - return ruleset - - rules: List[Rule] = [] - - total_rule_count = len(rule_file_paths) - for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)): - on_load_rule(path, i, total_rule_count) - try: - rule = capa.rules.Rule.from_yaml(content.decode("utf-8")) - except capa.rules.InvalidRule: - raise - else: - rule.meta["capa/path"] = path.as_posix() - rule.meta["capa/nursery"] = is_nursery_rule_path(path) - - rules.append(rule) - logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes) - - ruleset = capa.rules.RuleSet(rules) - - capa.rules.cache.cache_ruleset(cache_dir, ruleset) - - return ruleset - - -def get_signatures(sigs_path: Path) -> List[Path]: - if not sigs_path.exists(): - raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") - - paths: List[Path] = [] - if sigs_path.is_file(): - paths.append(sigs_path) - elif sigs_path.is_dir(): - logger.debug("reading signatures from directory %s", sigs_path.resolve()) - for file in sigs_path.rglob("*"): - if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"): - paths.append(file) - - # Convert paths to their absolute and normalized forms - paths = [path.resolve().absolute() for path in paths] - - # load signatures in deterministic order: the alphabetic sorting of filename. - # this means that `0_sigs.pat` loads before `1_sigs.pat`. - paths = sorted(paths, key=lambda path: path.name) - - for path in paths: - logger.debug("found signature file: %s", path) - - return paths - - -def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts): - if isinstance(extractor, StaticFeatureExtractor): - return rdoc.StaticAnalysis( - format=format_, - arch=arch, - os=os_, - extractor=extractor.__class__.__name__, - rules=tuple(rules_path), - base_address=frz.Address.from_capa(extractor.get_base_address()), - layout=rdoc.StaticLayout( - functions=(), - # this is updated after capabilities have been collected. - # will look like: - # - # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } - ), - feature_counts=counts["feature_counts"], - library_functions=counts["library_functions"], - ) - elif isinstance(extractor, DynamicFeatureExtractor): - return rdoc.DynamicAnalysis( - format=format_, - arch=arch, - os=os_, - extractor=extractor.__class__.__name__, - rules=tuple(rules_path), - layout=rdoc.DynamicLayout( - processes=(), - ), - feature_counts=counts["feature_counts"], - ) - else: - raise ValueError("invalid extractor type") - - -def collect_metadata( - argv: List[str], - sample_path: Path, - format_: str, - os_: str, - rules_path: List[Path], - extractor: FeatureExtractor, - counts: dict, -) -> rdoc.Metadata: - # if it's a binary sample we hash it, if it's a report - # we fetch the hashes from the report - sample_hashes: SampleHashes = extractor.get_sample_hashes() - md5, sha1, sha256 = sample_hashes.md5, sample_hashes.sha1, sample_hashes.sha256 - - global_feats = list(extractor.extract_global_features()) - extractor_format = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Format)] - extractor_arch = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Arch)] - extractor_os = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.OS)] - - format_ = str(extractor_format[0]) if extractor_format else "unknown" if format_ == FORMAT_AUTO else format_ - arch = str(extractor_arch[0]) if extractor_arch else "unknown" - os_ = str(extractor_os[0]) if extractor_os else "unknown" if os_ == OS_AUTO else os_ - - if isinstance(extractor, StaticFeatureExtractor): - meta_class: type = rdoc.StaticMetadata - elif isinstance(extractor, DynamicFeatureExtractor): - meta_class = rdoc.DynamicMetadata + if exctype is KeyboardInterrupt: + print("KeyboardInterrupt detected, program terminated") else: - assert_never(extractor) - - rules = tuple(r.resolve().absolute().as_posix() for r in rules_path) - - return meta_class( - timestamp=datetime.datetime.now(), - version=capa.version.__version__, - argv=tuple(argv) if argv else None, - sample=rdoc.Sample( - md5=md5, - sha1=sha1, - sha256=sha256, - path=Path(sample_path).resolve().as_posix(), - ), - analysis=get_sample_analysis( - format_, - arch, - os_, - extractor, - rules, - counts, - ), - ) - - -def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabilities: MatchResults) -> rdoc.DynamicLayout: - """ - compute a metadata structure that links threads - to the processes in which they're found. - - only collect the threads at which some rule matched. - otherwise, we may pollute the json document with - a large amount of un-referenced data. - """ - assert isinstance(extractor, DynamicFeatureExtractor) - - matched_calls: Set[Address] = set() - - def result_rec(result: capa.features.common.Result): - for loc in result.locations: - if isinstance(loc, capa.features.address.DynamicCallAddress): - matched_calls.add(loc) - for child in result.children: - result_rec(child) - - for matches in capabilities.values(): - for _, result in matches: - result_rec(result) - - names_by_process: Dict[Address, str] = {} - names_by_call: Dict[Address, str] = {} - - matched_processes: Set[Address] = set() - matched_threads: Set[Address] = set() - - threads_by_process: Dict[Address, List[Address]] = {} - calls_by_thread: Dict[Address, List[Address]] = {} - - for p in extractor.get_processes(): - threads_by_process[p.address] = [] - - for t in extractor.get_threads(p): - calls_by_thread[t.address] = [] - - for c in extractor.get_calls(p, t): - if c.address in matched_calls: - names_by_call[c.address] = extractor.get_call_name(p, t, c) - calls_by_thread[t.address].append(c.address) - - if calls_by_thread[t.address]: - matched_threads.add(t.address) - threads_by_process[p.address].append(t.address) - - if threads_by_process[p.address]: - matched_processes.add(p.address) - names_by_process[p.address] = extractor.get_process_name(p) - - layout = rdoc.DynamicLayout( - processes=tuple( - rdoc.ProcessLayout( - address=frz.Address.from_capa(p), - name=names_by_process[p], - matched_threads=tuple( - rdoc.ThreadLayout( - address=frz.Address.from_capa(t), - matched_calls=tuple( - rdoc.CallLayout( - address=frz.Address.from_capa(c), - name=names_by_call[c], - ) - for c in calls_by_thread[t] - if c in matched_calls - ), - ) - for t in threads - if t in matched_threads - ) # this object is open to extension in the future, - # such as with the function name, etc. - ) - for p, threads in threads_by_process.items() - if p in matched_processes - ) - ) - - return layout - - -def compute_static_layout(rules, extractor: StaticFeatureExtractor, capabilities) -> rdoc.StaticLayout: - """ - compute a metadata structure that links basic blocks - to the functions in which they're found. - - only collect the basic blocks at which some rule matched. - otherwise, we may pollute the json document with - a large amount of un-referenced data. - """ - functions_by_bb: Dict[Address, Address] = {} - bbs_by_function: Dict[Address, List[Address]] = {} - for f in extractor.get_functions(): - bbs_by_function[f.address] = [] - for bb in extractor.get_basic_blocks(f): - functions_by_bb[bb.address] = f.address - bbs_by_function[f.address].append(bb.address) - - matched_bbs = set() - for rule_name, matches in capabilities.items(): - rule = rules[rule_name] - if capa.rules.Scope.BASIC_BLOCK in rule.scopes: - for addr, _ in matches: - assert addr in functions_by_bb - matched_bbs.add(addr) - - layout = rdoc.StaticLayout( - functions=tuple( - rdoc.FunctionLayout( - address=frz.Address.from_capa(f), - matched_basic_blocks=tuple( - rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs - ) # this object is open to extension in the future, - # such as with the function name, etc. - ) - for f, bbs in bbs_by_function.items() - if len([bb for bb in bbs if bb in matched_bbs]) > 0 + print( + f"Unexpected exception raised: {exctype}. Please run capa in debug mode (-d/--debug) " + + "to see the stack trace. Please also report your issue on the capa GitHub page so we " + + "can improve the code! (https://github.com/mandiant/capa/issues)" ) - ) - - return layout - - -def compute_layout(rules, extractor, capabilities) -> rdoc.Layout: - if isinstance(extractor, StaticFeatureExtractor): - return compute_static_layout(rules, extractor, capabilities) - elif isinstance(extractor, DynamicFeatureExtractor): - return compute_dynamic_layout(rules, extractor, capabilities) - else: - raise ValueError("extractor must be either a static or dynamic extracotr") def install_common_args(parser, wanted=None): """ register a common set of command line arguments for re-use by main & scripts. these are things like logging/coloring/etc. - also enable callers to opt-in to common arguments, like specifying the input sample. + also enable callers to opt-in to common arguments, like specifying the input file. this routine lets many script use the same language for cli arguments. see `handle_common_args` to do common configuration. @@ -743,7 +189,7 @@ def install_common_args(parser, wanted=None): args: parser (argparse.ArgumentParser): a parser to update in place, adding common arguments. wanted (Set[str]): collection of arguments to opt-into, including: - - "sample": required positional argument to input file. + - "input_file": required positional argument to input file. - "format": flag to override file format. - "os": flag to override file operating system. - "backend": flag to override analysis backend. @@ -774,21 +220,11 @@ def install_common_args(parser, wanted=None): help="enable ANSI color codes in results, default: only during interactive session", ) - # - # arguments that may be opted into: - # - # - sample - # - format - # - os - # - rules - # - tag - # - - if "sample" in wanted: + if "input_file" in wanted: parser.add_argument( - "sample", + "input_file", type=str, - help="path to sample to analyze", + help="path to file to analyze", ) if "format" in wanted: @@ -803,22 +239,33 @@ def install_common_args(parser, wanted=None): (FORMAT_FREEZE, "features previously frozen by capa"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) + parser.add_argument( "-f", "--format", choices=[f[0] for f in formats], default=FORMAT_AUTO, - help=f"select sample format, {format_help}", + help=f"select input format, {format_help}", ) if "backend" in wanted: + backends = [ + (BACKEND_AUTO, "(default) detect apppropriate backend automatically"), + (BACKEND_VIV, "vivisect"), + (BACKEND_PEFILE, "pefile (file features only)"), + (BACKEND_BINJA, "Binary Ninja"), + (BACKEND_DOTNET, ".NET"), + (BACKEND_FREEZE, "capa freeze"), + (BACKEND_CAPE, "CAPE"), + ] + backend_help = ", ".join([f"{f[0]}: {f[1]}" for f in backends]) parser.add_argument( "-b", "--backend", type=str, - help="select the backend to use", - choices=(BACKEND_VIV, BACKEND_BINJA, BACKEND_PEFILE), - default=BACKEND_VIV, + choices=[f[0] for f in backends], + default=BACKEND_AUTO, + help=f"select backend, {backend_help}", ) if "os" in wanted: @@ -859,6 +306,34 @@ def install_common_args(parser, wanted=None): parser.add_argument("-t", "--tag", type=str, help="filter on rule meta field values") +############################################################################### +# +# "main routines" +# +# All of the following routines are considered "main routines". +# That is, they rely upon the given CLI arguments and write to output streams. +# We prefer to keep as much logic away from input/output as possible; +# however, capa does handle many combinations of flags/switches/overrides, +# so these routines deal with that logic. +# +# Other scripts may use this routines, but should also prefer to invoke them +# directly within `main()`, not within library code. +# Library code should *not* call these functions. +# +# These main routines may raise `ShouldExitError` to indicate the program +# ...should exit. Its a tiny step away from doing `sys.exit()` directly. +# I'm not sure if we should just do that. In the meantime, programs should +# handle `ShouldExitError` and pass the status code to `sys.exit()`. +# + + +class ShouldExitError(Exception): + """raised when a main-related routine indicates the program should exit.""" + + def __init__(self, status_code: int): + self.status_code = status_code + + def handle_common_args(args): """ handle the global config specified by `install_common_args`, @@ -871,7 +346,10 @@ def handle_common_args(args): - is_default_rules: if the default rules were used. args: - args (argparse.Namespace): parsed arguments that included at least `install_common_args` args. + args: The parsed command line arguments from `install_common_args`. + + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. """ if args.quiet: logging.basicConfig(level=logging.WARNING) @@ -914,8 +392,11 @@ def handle_common_args(args): else: raise RuntimeError("unexpected --color value: " + args.color) - if hasattr(args, "sample"): - args.sample = Path(args.sample) + if not args.debug: + sys.excepthook = simple_message_exception_handler # type: ignore[assignment] + + if hasattr(args, "input_file"): + args.input_file = Path(args.input_file) if hasattr(args, "rules"): rules_paths: List[Path] = [] @@ -937,7 +418,7 @@ def handle_common_args(args): # so in this case, we require the user to use -r to specify the rule directory. logger.error("default embedded rules not found! (maybe you installed capa as a library?)") logger.error("provide your own rule set via the `-r` option.") - return E_MISSING_RULES + raise ShouldExitError(E_MISSING_RULES) rules_paths.append(default_rule_path) args.is_default_rules = True @@ -978,115 +459,136 @@ def handle_common_args(args): args.signatures = sigs_path -def simple_message_exception_handler(exctype, value: BaseException, traceback: TracebackType): +def ensure_input_exists_from_cli(args): """ - prints friendly message on unexpected exceptions to regular users (debug mode shows regular stack trace) + args: + args: The parsed command line arguments from `install_common_args`. + + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. + """ + try: + _ = get_file_taste(args.input_file) + except IOError as e: + # per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we + # handle the IOError separately and reach into the args + logger.error("%s", e.args[0]) + raise ShouldExitError(E_MISSING_FILE) from e + + +def get_input_format_from_cli(args) -> str: + """ + Determine the format of the input file. + + Note: this may not be the same as the format of the sample. + Cape, Freeze, etc. formats describe a sample without being the sample itself. args: - # TODO(aaronatp): Once capa drops support for Python 3.8, move the exctype type annotation to - # the function parameters and remove the "# type: ignore[assignment]" from the relevant place - # in the main function, see (https://github.com/mandiant/capa/issues/1896) - exctype (type[BaseException]): exception class + args: The parsed command line arguments from `install_common_args`. + + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. """ + format_ = args.format - if exctype is KeyboardInterrupt: - print("KeyboardInterrupt detected, program terminated") - else: - print( - f"Unexpected exception raised: {exctype}. Please run capa in debug mode (-d/--debug) " - + "to see the stack trace. Please also report your issue on the capa GitHub page so we " - + "can improve the code! (https://github.com/mandiant/capa/issues)" - ) + if format_ != FORMAT_AUTO: + return format_ + try: + return get_auto_format(args.input_file) + except PEFormatError as e: + logger.error("Input file '%s' is not a valid PE file: %s", args.input_file, str(e)) + raise ShouldExitError(E_CORRUPT_FILE) from e + except UnsupportedFormatError as e: + log_unsupported_format_error() + raise ShouldExitError(E_INVALID_FILE_TYPE) from e -def main(argv: Optional[List[str]] = None): - if sys.version_info < (3, 8): - raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+") - if argv is None: - argv = sys.argv[1:] +def get_backend_from_cli(args, input_format: str) -> str: + """ + Determine the backend that should be used for the given input file. + Respects an override provided by the user, otherwise, use a good default. - desc = "The FLARE team's open-source tool to identify capabilities in executable files." - epilog = textwrap.dedent( - """ - By default, capa uses a default set of embedded rules. - You can see the rule set here: - https://github.com/mandiant/capa-rules + args: + args: The parsed command line arguments from `install_common_args`. + input_format: The file format of the input file. - To provide your own rule set, use the `-r` flag: - capa --rules /path/to/rules suspicious.exe - capa -r /path/to/rules suspicious.exe + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. + """ + if args.backend != BACKEND_AUTO: + return args.backend - examples: - identify capabilities in a binary - capa suspicious.exe + if input_format == FORMAT_CAPE: + return BACKEND_CAPE - identify capabilities in 32-bit shellcode, see `-f` for all supported formats - capa -f sc32 shellcode.bin + elif input_format == FORMAT_DOTNET: + return BACKEND_DOTNET - report match locations - capa -v suspicious.exe + elif input_format == FORMAT_FREEZE: + return BACKEND_FREEZE - report all feature match details - capa -vv suspicious.exe + else: + return BACKEND_VIV - filter rules by meta fields, e.g. rule name or namespace - capa -t "create TCP socket" suspicious.exe - """ - ) - parser = argparse.ArgumentParser( - description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter - ) - install_common_args(parser, {"sample", "format", "backend", "os", "signatures", "rules", "tag"}) - parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text") - args = parser.parse_args(args=argv) - if not args.debug: - sys.excepthook = simple_message_exception_handler # type: ignore[assignment] - ret = handle_common_args(args) - if ret is not None and ret != 0: - return ret +def get_sample_path_from_cli(args, backend: str) -> Optional[Path]: + """ + Determine the path to the underlying sample, if it exists. - try: - _ = get_file_taste(args.sample) - except IOError as e: - # per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we - # handle the IOError separately and reach into the args - logger.error("%s", e.args[0]) - return E_MISSING_FILE + Note: this may not be the same as the input file. + Cape, Freeze, etc. formats describe a sample without being the sample itself. - format_ = args.format - if format_ == FORMAT_AUTO: - try: - format_ = get_auto_format(args.sample) - except PEFormatError as e: - logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e)) - return E_CORRUPT_FILE - except UnsupportedFormatError: - log_unsupported_format_error() - return E_INVALID_FILE_TYPE + args: + args: The parsed command line arguments from `install_common_args`. + backend: The backend that will handle the input file. + + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. + """ + if backend == BACKEND_CAPE: + return None + else: + return args.input_file + + +def get_os_from_cli(args, backend) -> str: + """ + Determine the OS for the given sample. + Respects an override provided by the user, otherwise, use heuristics and + algorithms to detect the OS. + + args: + args: The parsed command line arguments from `install_common_args`. + backend: The backend that will handle the input file. + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. + """ + if args.os: + return args.os + + sample_path = get_sample_path_from_cli(args, backend) + if sample_path is None: + return "unknown" + return capa.loader.get_os(sample_path) + + +def get_rules_from_cli(args) -> RuleSet: + """ + args: + args: The parsed command line arguments from `install_common_args`. + + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. + """ try: - if is_running_standalone() and args.is_default_rules: + if capa.helpers.is_running_standalone() and args.is_default_rules: cache_dir = get_default_root() / "cache" else: cache_dir = capa.rules.cache.get_default_cache_directory() - rules = get_rules(args.rules, cache_dir=cache_dir) - - logger.debug( - "successfully loaded %s rules", - # during the load of the RuleSet, we extract subscope statements into their own rules - # that are subsequently `match`ed upon. this inflates the total rule count. - # so, filter out the subscope rules when reporting total number of loaded rules. - len(list(filter(lambda r: not (r.is_subscope_rule()), rules.rules.values()))), - ) - if args.tag: - rules = rules.filter_rules_by_meta(args.tag) - logger.debug("selected %d rules", len(rules)) - for i, r in enumerate(rules.rules, 1): - logger.debug(" %d. %s", i, r) - + rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir) except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: logger.error("%s", str(e)) logger.error( @@ -1101,8 +603,34 @@ def main(argv: Optional[List[str]] = None): "Or, for more details, see the rule set documentation here: %s", "https://github.com/mandiant/capa/blob/master/doc/rules.md", ) - return E_INVALID_RULE + raise ShouldExitError(E_INVALID_RULE) from e + + logger.debug( + "successfully loaded %s rules", + # during the load of the RuleSet, we extract subscope statements into their own rules + # that are subsequently `match`ed upon. this inflates the total rule count. + # so, filter out the subscope rules when reporting total number of loaded rules. + len(list(filter(lambda r: not (r.is_subscope_rule()), rules.rules.values()))), + ) + + if hasattr(args, "tag") and args.tag: + rules = rules.filter_rules_by_meta(args.tag) + logger.debug("selected %d rules", len(rules)) + for i, r in enumerate(rules.rules, 1): + logger.debug(" %d. %s", i, r) + + return rules + +def get_file_extractors_from_cli(args, input_format: str) -> List[FeatureExtractor]: + """ + args: + args: The parsed command line arguments from `install_common_args`. + input_format: The file format of the input file. + + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. + """ # file feature extractors are pretty lightweight: they don't do any code analysis. # so we can fairly quickly determine if the given file has "pure" file-scope rules # that indicate a limitation (like "file is packed based on section names") @@ -1111,27 +639,36 @@ def main(argv: Optional[List[str]] = None): # this pass can inspect multiple file extractors, e.g., dotnet and pe to identify # various limitations try: - file_extractors = get_file_extractors(args.sample, format_) + return capa.loader.get_file_extractors(args.input_file, input_format) except PEFormatError as e: - logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e)) - return E_CORRUPT_FILE + logger.error("Input file '%s' is not a valid PE file: %s", args.input_file, str(e)) + raise ShouldExitError(E_CORRUPT_FILE) from e except (ELFError, OverflowError) as e: - logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e)) - return E_CORRUPT_FILE + logger.error("Input file '%s' is not a valid ELF file: %s", args.input_file, str(e)) + raise ShouldExitError(E_CORRUPT_FILE) from e except UnsupportedFormatError as e: - if format_ == FORMAT_CAPE: + if input_format == FORMAT_CAPE: log_unsupported_cape_report_error(str(e)) else: log_unsupported_format_error() - return E_INVALID_FILE_TYPE + raise ShouldExitError(E_INVALID_FILE_TYPE) from e except EmptyReportError as e: - if format_ == FORMAT_CAPE: + if input_format == FORMAT_CAPE: log_empty_cape_report_error(str(e)) - return E_EMPTY_REPORT + raise ShouldExitError(E_EMPTY_REPORT) from e else: log_unsupported_format_error() - return E_INVALID_FILE_TYPE + raise ShouldExitError(E_INVALID_FILE_TYPE) from e + + +def find_file_limitations_from_cli(args, rules: RuleSet, file_extractors: List[FeatureExtractor]) -> bool: + """ + args: + args: The parsed command line arguments from `install_common_args`. + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. + """ found_file_limitation = False for file_extractor in file_extractors: if isinstance(file_extractor, DynamicFeatureExtractor): @@ -1141,11 +678,11 @@ def main(argv: Optional[List[str]] = None): try: pure_file_capabilities, _ = find_file_capabilities(rules, file_extractor, {}) except PEFormatError as e: - logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e)) - return E_CORRUPT_FILE + logger.error("Input file '%s' is not a valid PE file: %s", args.input_file, str(e)) + raise ShouldExitError(E_CORRUPT_FILE) from e except (ELFError, OverflowError) as e: - logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e)) - return E_CORRUPT_FILE + logger.error("Input file '%s' is not a valid ELF file: %s", args.input_file, str(e)) + raise ShouldExitError(E_CORRUPT_FILE) from e # file limitations that rely on non-file scope won't be detected here. # nor on FunctionName features, because pefile doesn't support this. @@ -1155,77 +692,156 @@ def main(argv: Optional[List[str]] = None): # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): logger.debug("file limitation short circuit, won't analyze fully.") - return E_FILE_LIMITATION + raise ShouldExitError(E_FILE_LIMITATION) + return found_file_limitation + + +def get_signatures_from_cli(args, input_format: str, backend: str) -> List[Path]: + if backend != BACKEND_VIV: + logger.debug("skipping library code matching: only supported by the vivisect backend") + return [] + + if input_format != FORMAT_PE: + logger.debug("skipping library code matching: signatures only supports PE files") + return [] + + try: + return capa.loader.get_signatures(args.signatures) + except IOError as e: + logger.error("%s", str(e)) + raise ShouldExitError(E_INVALID_SIG) from e + + +def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtractor: + """ + args: + args: The parsed command line arguments from `install_common_args`. + input_format: The file format of the input file. + backend: The backend that will handle the input file. + + raises: + ShouldExitError: if the program is invoked incorrectly and should exit. + """ + sig_paths = get_signatures_from_cli(args, input_format, backend) + + should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) + + os_ = get_os_from_cli(args, backend) + sample_path = get_sample_path_from_cli(args, backend) + + try: + return capa.loader.get_extractor( + args.input_file, + input_format, + os_, + backend, + sig_paths, + should_save_workspace=should_save_workspace, + disable_progress=args.quiet or args.debug, + sample_path=sample_path, + ) + except UnsupportedFormatError as e: + if input_format == FORMAT_CAPE: + log_unsupported_cape_report_error(str(e)) + else: + log_unsupported_format_error() + raise ShouldExitError(E_INVALID_FILE_TYPE) from e + except UnsupportedArchError as e: + log_unsupported_arch_error() + raise ShouldExitError(E_INVALID_FILE_ARCH) from e + except UnsupportedOSError as e: + log_unsupported_os_error() + raise ShouldExitError(E_INVALID_FILE_OS) from e + + +def main(argv: Optional[List[str]] = None): + if sys.version_info < (3, 8): + raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+") + + if argv is None: + argv = sys.argv[1:] + + desc = "The FLARE team's open-source tool to identify capabilities in executable files." + epilog = textwrap.dedent( + """ + By default, capa uses a default set of embedded rules. + You can see the rule set here: + https://github.com/mandiant/capa-rules + + To provide your own rule set, use the `-r` flag: + capa --rules /path/to/rules suspicious.exe + capa -r /path/to/rules suspicious.exe + + examples: + identify capabilities in a binary + capa suspicious.exe + + identify capabilities in 32-bit shellcode, see `-f` for all supported formats + capa -f sc32 shellcode.bin + + report match locations + capa -v suspicious.exe + + report all feature match details + capa -vv suspicious.exe + + filter rules by meta fields, e.g. rule name or namespace + capa -t "create TCP socket" suspicious.exe + """ + ) + + parser = argparse.ArgumentParser( + description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter + ) + install_common_args(parser, {"input_file", "format", "backend", "os", "signatures", "rules", "tag"}) + parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text") + args = parser.parse_args(args=argv) + + try: + handle_common_args(args) + ensure_input_exists_from_cli(args) + input_format = get_input_format_from_cli(args) + rules = get_rules_from_cli(args) + file_extractors = get_file_extractors_from_cli(args, input_format) + found_file_limitation = find_file_limitations_from_cli(args, rules, file_extractors) + except ShouldExitError as e: + return e.status_code meta: rdoc.Metadata capabilities: MatchResults counts: Dict[str, Any] - if format_ == FORMAT_RESULT: + if input_format == FORMAT_RESULT: # result document directly parses into meta, capabilities - result_doc = capa.render.result_document.ResultDocument.from_file(Path(args.sample)) + result_doc = capa.render.result_document.ResultDocument.from_file(args.input_file) meta, capabilities = result_doc.to_capa() else: # all other formats we must create an extractor # and use that to extract meta and capabilities - if format_ == FORMAT_FREEZE: - # freeze format deserializes directly into an extractor - extractor: FeatureExtractor = frz.load(Path(args.sample).read_bytes()) - else: - # all other formats we must create an extractor, - # such as viv, binary ninja, etc. workspaces - # and use those for extracting. - - try: - if format_ == FORMAT_PE: - sig_paths = get_signatures(args.signatures) - else: - sig_paths = [] - logger.debug("skipping library code matching: only have native PE signatures") - except IOError as e: - logger.error("%s", str(e)) - return E_INVALID_SIG - - should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) - - # TODO(mr-tz): this should be wrapped and refactored as it's tedious to update everywhere - # see same code and show-features above examples - # https://github.com/mandiant/capa/issues/1813 - try: - extractor = get_extractor( - args.sample, - format_, - args.os, - args.backend, - sig_paths, - should_save_workspace, - disable_progress=args.quiet or args.debug, - ) - except UnsupportedFormatError as e: - if format_ == FORMAT_CAPE: - log_unsupported_cape_report_error(str(e)) - else: - log_unsupported_format_error() - return E_INVALID_FILE_TYPE - except UnsupportedArchError: - log_unsupported_arch_error() - return E_INVALID_FILE_ARCH - except UnsupportedOSError: - log_unsupported_os_error() - return E_INVALID_FILE_OS + try: + backend = get_backend_from_cli(args, input_format) + sample_path = get_sample_path_from_cli(args, backend) + if sample_path is None: + os_ = "unknown" + else: + os_ = capa.loader.get_os(sample_path) + extractor = get_extractor_from_cli(args, input_format, backend) + except ShouldExitError as e: + return e.status_code capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) - meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor, counts) - meta.analysis.layout = compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) if isinstance(extractor, StaticFeatureExtractor) and found_file_limitation: # bail if capa's static feature extractor encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): return E_FILE_LIMITATION + if args.json: print(capa.render.json.render(meta, rules, capabilities)) elif args.vverbose: @@ -1265,7 +881,7 @@ def ida_main(): rules_path = get_default_root() / "rules" logger.debug("rule path: %s", rules_path) - rules = get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) meta = capa.ida.helpers.collect_metadata([rules_path]) @@ -1299,7 +915,7 @@ def ghidra_main(): rules_path = get_default_root() / "rules" logger.debug("rule path: %s", rules_path) - rules = get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) meta = capa.ghidra.helpers.collect_metadata([rules_path]) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index b5423ad92..d9e43dfc5 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -7,6 +7,7 @@ # See the License for the specific language governing permissions and limitations under the License. import io +import os import re import uuid import codecs @@ -25,7 +26,7 @@ # https://github.com/python/mypy/issues/1153 from backports.functools_lru_cache import lru_cache # type: ignore -from typing import Any, Set, Dict, List, Tuple, Union, Iterator, Optional +from typing import Any, Set, Dict, List, Tuple, Union, Callable, Iterator, Optional from dataclasses import asdict, dataclass import yaml @@ -1691,3 +1692,105 @@ def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[Feat matches.update(hard_matches) return (features3, matches) + + +def is_nursery_rule_path(path: Path) -> bool: + """ + The nursery is a spot for rules that have not yet been fully polished. + For example, they may not have references to public example of a technique. + Yet, we still want to capture and report on their matches. + The nursery is currently a subdirectory of the rules directory with that name. + + When nursery rules are loaded, their metadata section should be updated with: + `nursery=True`. + """ + return "nursery" in path.parts + + +def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]: + """ + collect all rule file paths, including those in subdirectories. + """ + rule_file_paths = [] + for rule_path in rule_paths: + if not rule_path.exists(): + raise IOError(f"rule path {rule_path} does not exist or cannot be accessed") + + if rule_path.is_file(): + rule_file_paths.append(rule_path) + elif rule_path.is_dir(): + logger.debug("reading rules from directory %s", rule_path) + for root, _, files in os.walk(rule_path): + if ".git" in root: + # the .github directory contains CI config in capa-rules + # this includes some .yml files + # these are not rules + # additionally, .git has files that are not .yml and generate the warning + # skip those too + continue + for file in files: + if not file.endswith(".yml"): + if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): + # expect to see .git* files, readme.md, format.md, and maybe a .git directory + # other things maybe are rules, but are mis-named. + logger.warning("skipping non-.yml file: %s", file) + continue + rule_file_paths.append(Path(root) / file) + return rule_file_paths + + +# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+ +RulePath = Path + + +def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None: + return + + +def get_rules( + rule_paths: List[RulePath], + cache_dir=None, + on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default, +) -> RuleSet: + """ + args: + rule_paths: list of paths to rules files or directories containing rules files + cache_dir: directory to use for caching rules, or will use the default detected cache directory if None + on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation + """ + if cache_dir is None: + cache_dir = capa.rules.cache.get_default_cache_directory() + # rule_paths may contain directory paths, + # so search for file paths recursively. + rule_file_paths = collect_rule_file_paths(rule_paths) + + # this list is parallel to `rule_file_paths`: + # rule_file_paths[i] corresponds to rule_contents[i]. + rule_contents = [file_path.read_bytes() for file_path in rule_file_paths] + + ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents) + if ruleset is not None: + return ruleset + + rules: List[Rule] = [] + + total_rule_count = len(rule_file_paths) + for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)): + on_load_rule(path, i, total_rule_count) + + try: + rule = capa.rules.Rule.from_yaml(content.decode("utf-8")) + except capa.rules.InvalidRule: + raise + else: + rule.meta["capa/path"] = path.as_posix() + rule.meta["capa/nursery"] = is_nursery_rule_path(path) + + rules.append(rule) + logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes) + + ruleset = capa.rules.RuleSet(rules) + + capa.rules.cache.cache_ruleset(cache_dir, ruleset) + + return ruleset diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 8950b8936..0cb315035 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -36,7 +36,7 @@ usage: usage: bulk-process.py [-h] [-r RULES] [-d] [-q] [-n PARALLELISM] [--no-mp] - input + input_directory detect capabilities in programs. @@ -62,7 +62,6 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import os import sys import json import logging @@ -74,10 +73,10 @@ import capa import capa.main import capa.rules +import capa.loader import capa.render.json import capa.capabilities.common import capa.render.result_document as rd -from capa.features.common import OS_AUTO logger = logging.getLogger("capa") @@ -87,11 +86,8 @@ def get_capa_results(args): run capa against the file at the given path, using the given rules. args is a tuple, containing: - rules (capa.rules.RuleSet): the rules to match - signatures (List[str]): list of file system paths to signature files - format (str): the name of the sample file format - os (str): the name of the operating system - path (str): the file system path to the sample to process + rules, signatures, format, backend, os, input_file + as provided via the CLI arguments. args is a tuple because i'm not quite sure how to unpack multiple arguments using `map`. @@ -106,44 +102,58 @@ def get_capa_results(args): meta (dict): the meta analysis results capabilities (dict): the matched capabilities and their result objects """ - rules, sigpaths, format, os_, path = args - should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) - logger.info("computing capa results for: %s", path) + rules, signatures, format_, backend, os_, input_file = args + + parser = argparse.ArgumentParser(description="detect capabilities in programs.") + capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os", "backend", "input_file"}) + argv = [ + "--signatures", + signatures, + "--format", + format_, + "--backend", + backend, + "--os", + os_, + input_file, + ] + if rules: + argv += ["--rules", rules] + args = parser.parse_args(args=argv) + try: - extractor = capa.main.get_extractor( - path, format, os_, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True - ) - except capa.exceptions.UnsupportedFormatError: - # i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries. + capa.main.handle_common_args(args) + capa.main.ensure_input_exists_from_cli(args) + input_format = capa.main.get_input_format_from_cli(args) + rules = capa.main.get_rules_from_cli(args) + backend = capa.main.get_backend_from_cli(args, input_format) + sample_path = capa.main.get_sample_path_from_cli(args, backend) + if sample_path is None: + os_ = "unknown" + else: + os_ = capa.loader.get_os(sample_path) + extractor = capa.main.get_extractor_from_cli(args, input_format, backend) + except capa.main.ShouldExitError as e: + # i'm not 100% sure if multiprocessing will reliably raise exceptions across process boundaries. # so instead, return an object with explicit success/failure status. # # if success, then status=ok, and results found in property "ok" # if error, then status=error, and human readable message in property "error" - return { - "path": path, - "status": "error", - "error": f"input file does not appear to be a PE file: {path}", - } - except capa.exceptions.UnsupportedRuntimeError: - return { - "path": path, - "status": "error", - "error": "unsupported runtime or Python interpreter", - } + return {"path": input_file, "status": "error", "error": str(e), "status_code": e.status_code} except Exception as e: return { - "path": path, + "path": input_file, "status": "error", "error": f"unexpected error: {e}", } capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) - meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts) - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata(argv, args.input_file, format_, os_, [], extractor, counts) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) doc = rd.ResultDocument.from_capa(meta, rules, capabilities) - return {"path": path, "status": "ok", "ok": doc.model_dump()} + return {"path": input_file, "status": "ok", "ok": doc.model_dump()} def main(argv=None): @@ -151,30 +161,16 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="detect capabilities in programs.") - capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os"}) - parser.add_argument("input", type=str, help="Path to directory of files to recursively analyze") + capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os", "backend"}) + parser.add_argument("input_directory", type=str, help="Path to directory of files to recursively analyze") parser.add_argument( "-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor" ) parser.add_argument("--no-mp", action="store_true", help="disable subprocesses") args = parser.parse_args(args=argv) - capa.main.handle_common_args(args) - - try: - rules = capa.main.get_rules(args.rules) - logger.info("successfully loaded %s rules", len(rules)) - except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: - logger.error("%s", str(e)) - return -1 - - try: - sig_paths = capa.main.get_signatures(args.signatures) - except IOError as e: - logger.error("%s", str(e)) - return -1 samples = [] - for file in Path(args.input).rglob("*"): + for file in Path(args.input_directory).rglob("*"): samples.append(file) cpu_count = multiprocessing.cpu_count() @@ -203,18 +199,22 @@ def map(f, args, parallelism=None): logger.debug("using process mapper") mapper = pmap + rules = args.rules + if rules == [capa.main.RULES_PATH_DEFAULT_STRING]: + rules = None + results = {} for result in mapper( get_capa_results, - [(rules, sig_paths, "pe", OS_AUTO, sample) for sample in samples], + [(rules, args.signatures, args.format, args.backend, args.os, str(sample)) for sample in samples], parallelism=args.parallelism, ): if result["status"] == "error": logger.warning(result["error"]) elif result["status"] == "ok": - results[result["path"].as_posix()] = rd.ResultDocument.model_validate(result["ok"]).model_dump_json( - exclude_none=True - ) + doc = rd.ResultDocument.model_validate(result["ok"]).model_dump_json(exclude_none=True) + results[result["path"]] = json.loads(doc) + else: raise ValueError(f"unexpected status: {result['status']}") diff --git a/scripts/cache-ruleset.py b/scripts/cache-ruleset.py index 6630f2eea..0e364622b 100644 --- a/scripts/cache-ruleset.py +++ b/scripts/cache-ruleset.py @@ -36,20 +36,27 @@ def main(argv=None): parser = argparse.ArgumentParser(description="Cache ruleset.") capa.main.install_common_args(parser) - parser.add_argument("rules", type=str, action="append", help="Path to rules") + parser.add_argument("rules", type=str, help="Path to rules directory") parser.add_argument("cache", type=str, help="Path to cache directory") args = parser.parse_args(args=argv) - capa.main.handle_common_args(args) - if args.debug: - logging.getLogger("capa").setLevel(logging.DEBUG) + # don't use capa.main.handle_common_args + # because it expects a different format for the --rules argument + + if args.quiet: + logging.basicConfig(level=logging.WARNING) + logging.getLogger().setLevel(logging.WARNING) + elif args.debug: + logging.basicConfig(level=logging.DEBUG) + logging.getLogger().setLevel(logging.DEBUG) else: - logging.getLogger("capa").setLevel(logging.ERROR) + logging.basicConfig(level=logging.INFO) + logging.getLogger().setLevel(logging.INFO) try: cache_dir = Path(args.cache) cache_dir.mkdir(parents=True, exist_ok=True) - rules = capa.main.get_rules(args.rules, cache_dir) + rules = capa.rules.get_rules([Path(args.rules)], cache_dir) logger.info("successfully loaded %s rules", len(rules)) except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: logger.error("%s", str(e)) diff --git a/scripts/capa2yara.py b/scripts/capa2yara.py index 5fe5c0849..b1adb3625 100644 --- a/scripts/capa2yara.py +++ b/scripts/capa2yara.py @@ -723,36 +723,33 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Capa to YARA rule converter") - parser.add_argument("rules", type=str, help="Path to rules") - parser.add_argument("--private", "-p", action="store_true", help="Create private rules", default=False) capa.main.install_common_args(parser, wanted={"tag"}) - + parser.add_argument("--private", "-p", action="store_true", help="Create private rules", default=False) + parser.add_argument("rules", type=str, help="Path to rules directory") args = parser.parse_args(args=argv) - make_priv = args.private - if args.verbose: - level = logging.DEBUG - elif args.quiet: - level = logging.ERROR - else: - level = logging.INFO + # don't use capa.main.handle_common_args + # because it expects a different format for the --rules argument - logging.basicConfig(level=level) - logging.getLogger("capa2yara").setLevel(level) + if args.quiet: + logging.basicConfig(level=logging.WARNING) + logging.getLogger().setLevel(logging.WARNING) + elif args.debug: + logging.basicConfig(level=logging.DEBUG) + logging.getLogger().setLevel(logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + logging.getLogger().setLevel(logging.INFO) try: - rules = capa.main.get_rules([Path(args.rules)]) - namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values())) - logger.info("successfully loaded %d rules (including subscope rules which will be ignored)", len(rules)) - if args.tag: - rules = rules.filter_rules_by_meta(args.tag) - logger.debug("selected %d rules", len(rules)) - for i, r in enumerate(rules.rules, 1): - logger.debug(" %d. %s", i, r) + rules = capa.rules.get_rules([Path(args.rules)]) + logger.info("successfully loaded %s rules", len(rules)) except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: logger.error("%s", str(e)) return -1 + namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values())) + output_yar( "// Rules from Mandiant's https://github.com/mandiant/capa-rules converted to YARA using https://github.com/mandiant/capa/blob/master/scripts/capa2yara.py by Arnim Rupp" ) @@ -780,10 +777,10 @@ def main(argv=None): cround += 1 logger.info("doing convert_rules(), round: %d", cround) num_rules = len(converted_rules) - count_incomplete += convert_rules(rules, namespaces, cround, make_priv) + count_incomplete += convert_rules(rules, namespaces, cround, args.private) # one last round to collect all unconverted rules - count_incomplete += convert_rules(rules, namespaces, 9000, make_priv) + count_incomplete += convert_rules(rules, namespaces, 9000, args.private) stats = "\n// converted rules : " + str(len(converted_rules)) stats += "\n// among those are incomplete : " + str(count_incomplete) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 611576908..cc3228f9f 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -15,6 +15,7 @@ import capa.main import capa.rules import capa.engine +import capa.loader import capa.features import capa.render.json import capa.render.utils as rutils @@ -168,19 +169,19 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]: # ==== render dictionary helpers -def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): +def capa_details(rules_path: Path, input_file: Path, output_format="dictionary"): # load rules from disk - rules = capa.main.get_rules([rules_path]) + rules = capa.rules.get_rules([rules_path]) # extract features and find capabilities - extractor = capa.main.get_extractor( - file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True + extractor = capa.loader.get_extractor( + input_file, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], should_save_workspace=False, disable_progress=True ) capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata([], input_file, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) capa_output: Any = False @@ -206,7 +207,7 @@ def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): RULES_PATH = capa.main.get_default_root() / "rules" parser = argparse.ArgumentParser(description="Extract capabilities from a file") - parser.add_argument("file", help="file to extract capabilities from") + parser.add_argument("input_file", help="file to extract capabilities from") parser.add_argument("--rules", help="path to rules directory", default=RULES_PATH) parser.add_argument( "--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary" @@ -214,5 +215,5 @@ def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): args = parser.parse_args() if args.rules != RULES_PATH: args.rules = Path(args.rules) - print(capa_details(args.rules, Path(args.file), args.output)) + print(capa_details(args.rules, Path(args.input_file), args.output)) sys.exit(0) diff --git a/scripts/capafmt.py b/scripts/capafmt.py index fa8298007..de4171ea8 100644 --- a/scripts/capafmt.py +++ b/scripts/capafmt.py @@ -19,6 +19,7 @@ import argparse from pathlib import Path +import capa.main import capa.rules logger = logging.getLogger("capafmt") @@ -29,6 +30,7 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Capa rule formatter.") + capa.main.install_common_args(parser) parser.add_argument("path", type=str, help="Path to rule to format") parser.add_argument( "-i", @@ -37,8 +39,6 @@ def main(argv=None): dest="in_place", help="Format the rule in place, otherwise, write formatted rule to STDOUT", ) - parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging") - parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors") parser.add_argument( "-c", "--check", @@ -47,15 +47,10 @@ def main(argv=None): ) args = parser.parse_args(args=argv) - if args.verbose: - level = logging.DEBUG - elif args.quiet: - level = logging.ERROR - else: - level = logging.INFO - - logging.basicConfig(level=level) - logging.getLogger("capafmt").setLevel(level) + try: + capa.main.handle_common_args(args) + except capa.main.ShouldExitError as e: + return e.status_code rule = capa.rules.Rule.from_yaml_file(args.path, use_ruamel=True) reformatted_rule = rule.to_yaml() diff --git a/scripts/detect-elf-os.py b/scripts/detect-elf-os.py index 5adf85de7..2dfd86b76 100644 --- a/scripts/detect-elf-os.py +++ b/scripts/detect-elf-os.py @@ -17,8 +17,8 @@ import argparse import contextlib from typing import BinaryIO -from pathlib import Path +import capa.main import capa.helpers import capa.features.extractors.elf @@ -36,28 +36,16 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Detect the underlying OS for the given ELF file") - parser.add_argument("sample", type=str, help="path to ELF file") - - logging_group = parser.add_argument_group("logging arguments") - - logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") - logging_group.add_argument( - "-q", "--quiet", action="store_true", help="disable all status output except fatal errors" - ) - + capa.main.install_common_args(parser, wanted={"input_file"}) args = parser.parse_args(args=argv) - if args.quiet: - logging.basicConfig(level=logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - elif args.debug: - logging.basicConfig(level=logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - else: - logging.basicConfig(level=logging.INFO) - logging.getLogger().setLevel(logging.INFO) + try: + capa.main.handle_common_args(args) + capa.main.ensure_input_exists_from_cli(args) + except capa.main.ShouldExitError as e: + return e.status_code - f = Path(args.sample).open("rb") + f = args.input_file.open("rb") with contextlib.closing(f): try: diff --git a/scripts/detect_duplicate_features.py b/scripts/detect_duplicate_features.py index 6737d7fa9..9561339c2 100644 --- a/scripts/detect_duplicate_features.py +++ b/scripts/detect_duplicate_features.py @@ -48,7 +48,7 @@ def find_overlapping_rules(new_rule_path, rules_path): overlapping_rules = [] # capa.rules.RuleSet stores all rules in given paths - ruleset = capa.main.get_rules(rules_path) + ruleset = capa.rules.get_rules(rules_path) for rule_name, rule in ruleset.rules.items(): rule_features = rule.extract_all_features() diff --git a/scripts/lint.py b/scripts/lint.py index edcf9f563..93440395d 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -39,6 +39,7 @@ import capa.main import capa.rules import capa.engine +import capa.loader import capa.helpers import capa.features.insn import capa.capabilities.common @@ -363,8 +364,14 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: format_ = capa.helpers.get_auto_format(nice_path) logger.debug("analyzing sample: %s", nice_path) - extractor = capa.main.get_extractor( - nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True + extractor = capa.loader.get_extractor( + nice_path, + format_, + OS_AUTO, + capa.main.BACKEND_VIV, + DEFAULT_SIGNATURES, + should_save_workspace=False, + disable_progress=True, ) capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True) @@ -990,7 +997,11 @@ def main(argv=None): help="Enable thorough linting - takes more time, but does a better job", ) args = parser.parse_args(args=argv) - capa.main.handle_common_args(args) + + try: + capa.main.handle_common_args(args) + except capa.main.ShouldExitError as e: + return e.status_code if args.debug: logging.getLogger("capa").setLevel(logging.DEBUG) @@ -1002,16 +1013,9 @@ def main(argv=None): time0 = time.time() try: - rules = capa.main.get_rules(args.rules) - logger.info("successfully loaded %s rules", rules.source_rule_count) - if args.tag: - rules = rules.filter_rules_by_meta(args.tag) - logger.debug("selected %s rules", len(rules)) - for i, r in enumerate(rules.rules, 1): - logger.debug(" %d. %s", i, r) - except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: - logger.error("%s", str(e)) - return -1 + rules = capa.main.get_rules_from_cli(args) + except capa.main.ShouldExitError as e: + return e.status_code logger.info("collecting potentially referenced samples") samples_path = Path(args.samples) diff --git a/scripts/match-function-id.py b/scripts/match-function-id.py index 7fe51e238..7896e24b2 100644 --- a/scripts/match-function-id.py +++ b/scripts/match-function-id.py @@ -62,6 +62,7 @@ import capa.helpers import capa.features import capa.features.freeze +from capa.loader import BACKEND_VIV logger = logging.getLogger("capa.match-function-id") @@ -71,61 +72,53 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="FLIRT match each function") - parser.add_argument("sample", type=str, help="Path to sample to analyze") + capa.main.install_common_args(parser, wanted={"input_file", "signatures", "format"}) parser.add_argument( "-F", "--function", type=lambda x: int(x, 0x10), help="match a specific function by VA, rather than add functions", ) - parser.add_argument( - "--signature", - action="append", - dest="signatures", - type=str, - default=[], - help="use the given signatures to identify library functions, file system paths to .sig/.pat files.", - ) - parser.add_argument("-d", "--debug", action="store_true", help="Enable debugging output on STDERR") - parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors") args = parser.parse_args(args=argv) - if args.quiet: - logging.basicConfig(level=logging.ERROR) - logging.getLogger().setLevel(logging.ERROR) - elif args.debug: - logging.basicConfig(level=logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - else: - logging.basicConfig(level=logging.INFO) - logging.getLogger().setLevel(logging.INFO) - - # disable vivisect-related logging, it's verbose and not relevant for capa users - capa.main.set_vivisect_log_level(logging.CRITICAL) + try: + capa.main.handle_common_args(args) + capa.main.ensure_input_exists_from_cli(args) + input_format = capa.main.get_input_format_from_cli(args) + sig_paths = capa.main.get_signatures_from_cli(args, input_format, BACKEND_VIV) + except capa.main.ShouldExitError as e: + return e.status_code analyzers = [] - for sigpath in args.signatures: - sigs = viv_utils.flirt.load_flirt_signature(sigpath) + for sigpath in sig_paths: + sigs = viv_utils.flirt.load_flirt_signature(str(sigpath)) with capa.main.timing("flirt: compiling sigs"): matcher = flirt.compile(sigs) - analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, sigpath) + analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, str(sigpath)) logger.debug("registering viv function analyzer: %s", repr(analyzer)) analyzers.append(analyzer) - vw = viv_utils.getWorkspace(args.sample, analyze=True, should_save=False) + vw = viv_utils.getWorkspace(str(args.input_file), analyze=True, should_save=False) functions = vw.getFunctions() if args.function: functions = [args.function] + seen = set() for function in functions: logger.debug("matching function: 0x%04x", function) for analyzer in analyzers: - name = viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function) + viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function) + name = viv_utils.get_function_name(vw, function) if name: - print(f"0x{function:04x}: {name}") + key = (function, name) + if key in seen: + continue + else: + print(f"0x{function:04x}: {name}") + seen.add(key) return 0 diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 86590a800..dd0107c10 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -41,7 +41,6 @@ import logging import argparse import subprocess -from pathlib import Path import tqdm import tabulate @@ -50,6 +49,7 @@ import capa.perf import capa.rules import capa.engine +import capa.loader import capa.helpers import capa.features import capa.features.common @@ -74,42 +74,22 @@ def main(argv=None): label += " (dirty)" parser = argparse.ArgumentParser(description="Profile capa performance") - capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "rules"}) - + capa.main.install_common_args(parser, wanted={"format", "os", "input_file", "signatures", "rules"}) parser.add_argument("--number", type=int, default=3, help="batch size of profile collection") parser.add_argument("--repeat", type=int, default=30, help="batch count of profile collection") parser.add_argument("--label", type=str, default=label, help="description of the profile collection") - args = parser.parse_args(args=argv) - capa.main.handle_common_args(args) - - try: - taste = capa.helpers.get_file_taste(Path(args.sample)) - except IOError as e: - logger.error("%s", str(e)) - return -1 try: + capa.main.handle_common_args(args) + capa.main.ensure_input_exists_from_cli(args) + input_format = capa.main.get_input_format_from_cli(args) + backend = capa.main.get_backend_from_cli(args, input_format) with capa.main.timing("load rules"): - rules = capa.main.get_rules(args.rules) - except IOError as e: - logger.error("%s", str(e)) - return -1 - - try: - sig_paths = capa.main.get_signatures(args.signatures) - except IOError as e: - logger.error("%s", str(e)) - return -1 - - if (args.format == "freeze") or ( - args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) - ): - extractor = capa.features.freeze.load(Path(args.sample).read_bytes()) - else: - extractor = capa.main.get_extractor( - args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False - ) + rules = capa.main.get_rules_from_cli(args) + extractor = capa.main.get_extractor_from_cli(args, input_format, backend) + except capa.main.ShouldExitError as e: + return e.status_code with tqdm.tqdm(total=args.number * args.repeat, leave=False) as pbar: diff --git a/scripts/proto-from-results.py b/scripts/proto-from-results.py index 61df56b6e..68b0eefec 100644 --- a/scripts/proto-from-results.py +++ b/scripts/proto-from-results.py @@ -33,6 +33,7 @@ import argparse from pathlib import Path +import capa.main import capa.render.proto import capa.render.result_document @@ -44,26 +45,14 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Convert a capa JSON result document into the protobuf format") + capa.main.install_common_args(parser) parser.add_argument("json", type=str, help="path to JSON result document file, produced by `capa --json`") - - logging_group = parser.add_argument_group("logging arguments") - - logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") - logging_group.add_argument( - "-q", "--quiet", action="store_true", help="disable all status output except fatal errors" - ) - args = parser.parse_args(args=argv) - if args.quiet: - logging.basicConfig(level=logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - elif args.debug: - logging.basicConfig(level=logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - else: - logging.basicConfig(level=logging.INFO) - logging.getLogger().setLevel(logging.INFO) + try: + capa.main.handle_common_args(args) + except capa.main.ShouldExitError as e: + return e.status_code rd = capa.render.result_document.ResultDocument.from_file(Path(args.json)) pb = capa.render.proto.doc_to_pb2(rd) diff --git a/scripts/proto-to-results.py b/scripts/proto-to-results.py index 3bb165704..b413cd9dc 100644 --- a/scripts/proto-to-results.py +++ b/scripts/proto-to-results.py @@ -36,6 +36,7 @@ import argparse from pathlib import Path +import capa.main import capa.render.json import capa.render.proto import capa.render.proto.capa_pb2 @@ -49,28 +50,16 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Convert a capa protobuf result document into the JSON format") + capa.main.install_common_args(parser) parser.add_argument( "pb", type=str, help="path to protobuf result document file, produced by `proto-from-results.py`" ) - - logging_group = parser.add_argument_group("logging arguments") - - logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") - logging_group.add_argument( - "-q", "--quiet", action="store_true", help="disable all status output except fatal errors" - ) - args = parser.parse_args(args=argv) - if args.quiet: - logging.basicConfig(level=logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - elif args.debug: - logging.basicConfig(level=logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - else: - logging.basicConfig(level=logging.INFO) - logging.getLogger().setLevel(logging.INFO) + try: + capa.main.handle_common_args(args) + except capa.main.ShouldExitError as e: + return e.status_code pb = Path(args.pb).read_bytes() diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 421c6c7e1..5a1c0ea1c 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -55,13 +55,11 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import os import sys import logging import argparse import collections from typing import Dict -from pathlib import Path import colorama @@ -76,10 +74,7 @@ import capa.features.freeze import capa.capabilities.common import capa.render.result_document as rd -from capa.helpers import get_file_taste -from capa.features.common import FORMAT_AUTO from capa.features.freeze import Address -from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor logger = logging.getLogger("capa.show-capabilities-by-function") @@ -142,67 +137,37 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="detect capabilities in programs.") - capa.main.install_common_args(parser, wanted={"format", "os", "backend", "sample", "signatures", "rules", "tag"}) + capa.main.install_common_args( + parser, wanted={"format", "os", "backend", "input_file", "signatures", "rules", "tag"} + ) args = parser.parse_args(args=argv) - capa.main.handle_common_args(args) try: - taste = get_file_taste(Path(args.sample)) - except IOError as e: - logger.error("%s", str(e)) - return -1 - - try: - rules = capa.main.get_rules(args.rules) - logger.info("successfully loaded %s rules", len(rules)) - if args.tag: - rules = rules.filter_rules_by_meta(args.tag) - logger.info("selected %s rules", len(rules)) - except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: - logger.error("%s", str(e)) - return -1 - - try: - sig_paths = capa.main.get_signatures(args.signatures) - except IOError as e: - logger.error("%s", str(e)) - return -1 - - if (args.format == "freeze") or (args.format == FORMAT_AUTO and capa.features.freeze.is_freeze(taste)): - format_ = "freeze" - extractor: FeatureExtractor = capa.features.freeze.load(Path(args.sample).read_bytes()) - else: - format_ = args.format - should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) - - try: - extractor = capa.main.get_extractor( - args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace - ) - assert isinstance(extractor, StaticFeatureExtractor) - except capa.exceptions.UnsupportedFormatError: - capa.helpers.log_unsupported_format_error() - return -1 - except capa.exceptions.UnsupportedRuntimeError: - capa.helpers.log_unsupported_runtime_error() - return -1 + capa.main.handle_common_args(args) + capa.main.ensure_input_exists_from_cli(args) + input_format = capa.main.get_input_format_from_cli(args) + rules = capa.main.get_rules_from_cli(args) + backend = capa.main.get_backend_from_cli(args, input_format) + sample_path = capa.main.get_sample_path_from_cli(args, backend) + if sample_path is None: + os_ = "unknown" + else: + os_ = capa.loader.get_os(sample_path) + extractor = capa.main.get_extractor_from_cli(args, input_format, backend) + except capa.main.ShouldExitError as e: + return e.status_code capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) - meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) if capa.capabilities.common.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): - return -1 + return capa.main.E_FILE_LIMITATION - # colorama will detect: - # - when on Windows console, and fixup coloring, and - # - when not an interactive session, and disable coloring - # renderers should use coloring and assume it will be stripped out if necessary. - colorama.init() doc = rd.ResultDocument.from_capa(meta, rules, capabilities) print(render_matches_by_function(doc)) colorama.deinit() diff --git a/scripts/show-features.py b/scripts/show-features.py index 2d5a34808..d70c6815b 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -64,16 +64,15 @@ insn: 0x10001027: mnemonic(shl) ... """ -import os import sys import logging import argparse from typing import Tuple -from pathlib import Path import capa.main import capa.rules import capa.engine +import capa.loader import capa.helpers import capa.features import capa.exceptions @@ -81,17 +80,9 @@ import capa.features.freeze import capa.features.address import capa.features.extractors.pefile -from capa.helpers import get_auto_format, log_unsupported_runtime_error +from capa.helpers import assert_never from capa.features.insn import API, Number -from capa.features.common import ( - FORMAT_AUTO, - FORMAT_CAPE, - FORMAT_FREEZE, - DYNAMIC_FORMATS, - String, - Feature, - is_global_feature, -) +from capa.features.common import String, Feature, is_global_feature from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor, DynamicFeatureExtractor logger = logging.getLogger("capa.show-features") @@ -106,56 +97,33 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Show the features that capa extracts from the given sample") - capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"}) + capa.main.install_common_args(parser, wanted={"input_file", "format", "os", "signatures", "backend"}) parser.add_argument("-F", "--function", type=str, help="Show features for specific function") parser.add_argument("-P", "--process", type=str, help="Show features for specific process name") args = parser.parse_args(args=argv) - capa.main.handle_common_args(args) - - if args.function and args.backend == "pefile": - print("pefile backend does not support extracting function features") - return -1 try: - _ = capa.helpers.get_file_taste(Path(args.sample)) - except IOError as e: - logger.error("%s", str(e)) - return -1 + capa.main.handle_common_args(args) + capa.main.ensure_input_exists_from_cli(args) - try: - sig_paths = capa.main.get_signatures(args.signatures) - except IOError as e: - logger.error("%s", str(e)) - return -1 - - format_ = args.format if args.format != FORMAT_AUTO else get_auto_format(args.sample) - if format_ == FORMAT_FREEZE: - # this should be moved above the previous if clause after implementing - # feature freeze for the dynamic analysis flavor - extractor = capa.features.freeze.load(Path(args.sample).read_bytes()) - else: - should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) - try: - extractor = capa.main.get_extractor( - args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace - ) - except capa.exceptions.UnsupportedFormatError as e: - if format_ == FORMAT_CAPE: - capa.helpers.log_unsupported_cape_report_error(str(e)) - else: - capa.helpers.log_unsupported_format_error() - return -1 - except capa.exceptions.UnsupportedRuntimeError: - log_unsupported_runtime_error() + if args.function and args.backend == "pefile": + print("pefile backend does not support extracting function features") return -1 - if format_ in DYNAMIC_FORMATS: - assert isinstance(extractor, DynamicFeatureExtractor) + input_format = capa.main.get_input_format_from_cli(args) + + backend = capa.main.get_backend_from_cli(args, input_format) + extractor = capa.main.get_extractor_from_cli(args, input_format, backend) + except capa.main.ShouldExitError as e: + return e.status_code + + if isinstance(extractor, DynamicFeatureExtractor): print_dynamic_analysis(extractor, args) - else: - assert isinstance(extractor, StaticFeatureExtractor) + elif isinstance(extractor, StaticFeatureExtractor): print_static_analysis(extractor, args) + else: + assert_never(extractor) return 0 diff --git a/scripts/show-unused-features.py b/scripts/show-unused-features.py index ddd236614..0390cd640 100644 --- a/scripts/show-unused-features.py +++ b/scripts/show-unused-features.py @@ -8,13 +8,11 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import os import sys import typing import logging import argparse from typing import Set, Tuple -from pathlib import Path from collections import Counter import tabulate @@ -31,8 +29,7 @@ import capa.features.address import capa.features.extractors.pefile import capa.features.extractors.base_extractor -from capa.helpers import log_unsupported_runtime_error -from capa.features.common import Feature +from capa.features.common import FORMAT_FREEZE, Feature from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor logger = logging.getLogger("show-unused-features") @@ -42,10 +39,9 @@ def format_address(addr: capa.features.address.Address) -> str: return v.format_address(capa.features.freeze.Address.from_capa((addr))) -def get_rules_feature_set(rules_path) -> Set[Feature]: - ruleset = capa.main.get_rules(rules_path) +def get_rules_feature_set(rules: capa.rules.RuleSet) -> Set[Feature]: rules_feature_set: Set[Feature] = set() - for _, rule in ruleset.rules.items(): + for _, rule in rules.rules.items(): rules_feature_set.update(rule.extract_all_features()) return rules_feature_set @@ -106,44 +102,23 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Show the features that capa doesn't have rules for yet") - capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend", "rules"}) - + capa.main.install_common_args(parser, wanted={"format", "os", "input_file", "signatures", "backend", "rules"}) parser.add_argument("-F", "--function", type=str, help="Show features for specific function") args = parser.parse_args(args=argv) - capa.main.handle_common_args(args) if args.function and args.backend == "pefile": print("pefile backend does not support extracting function features") return -1 try: - taste = capa.helpers.get_file_taste(Path(args.sample)) - except IOError as e: - logger.error("%s", str(e)) - return -1 - - try: - sig_paths = capa.main.get_signatures(args.signatures) - except IOError as e: - logger.error("%s", str(e)) - return -1 - - if (args.format == "freeze") or ( - args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) - ): - extractor = capa.features.freeze.load(Path(args.sample).read_bytes()) - else: - should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) - try: - extractor = capa.main.get_extractor( - args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace - ) - except capa.exceptions.UnsupportedFormatError: - capa.helpers.log_unsupported_format_error() - return -1 - except capa.exceptions.UnsupportedRuntimeError: - log_unsupported_runtime_error() - return -1 + capa.main.handle_common_args(args) + capa.main.ensure_input_exists_from_cli(args) + rules = capa.main.get_rules_from_cli(args) + input_format = capa.main.get_input_format_from_cli(args) + backend = capa.main.get_backend_from_cli(args, input_format) + extractor = capa.main.get_extractor_from_cli(args, input_format, backend) + except capa.main.ShouldExitError as e: + return e.status_code assert isinstance(extractor, StaticFeatureExtractor), "only static analysis supported today" @@ -159,7 +134,7 @@ def main(argv=None): function_handles = tuple(extractor.get_functions()) if args.function: - if args.format == "freeze": + if input_format == FORMAT_FREEZE: function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles)) else: function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles)) @@ -174,7 +149,7 @@ def main(argv=None): feature_map.update(get_file_features(function_handles, extractor)) - rules_feature_set = get_rules_feature_set(args.rules) + rules_feature_set = get_rules_feature_set(rules) print_unused_features(feature_map, rules_feature_set) return 0 @@ -206,7 +181,8 @@ def ida_main(): feature_map.update(get_file_features(function_handles, extractor)) rules_path = capa.main.get_default_root() / "rules" - rules_feature_set = get_rules_feature_set([rules_path]) + rules = capa.rules.get_rules([rules_path]) + rules_feature_set = get_rules_feature_set(rules) print_unused_features(feature_map, rules_feature_set) diff --git a/tests/fixtures.py b/tests/fixtures.py index a06308a1c..ebfe557a5 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -106,11 +106,11 @@ def get_viv_extractor(path: Path): ] if "raw32" in path.name: - vw = capa.main.get_workspace(path, "sc32", sigpaths=sigpaths) + vw = capa.loader.get_workspace(path, "sc32", sigpaths=sigpaths) elif "raw64" in path.name: - vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths) + vw = capa.loader.get_workspace(path, "sc64", sigpaths=sigpaths) else: - vw = capa.main.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths) + vw = capa.loader.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths) vw.saveWorkspace() extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, OS_AUTO) fixup_viv(path, extractor)