diff --git a/capa/exceptions.py b/capa/exceptions.py index e080791ae..58af3bef3 100644 --- a/capa/exceptions.py +++ b/capa/exceptions.py @@ -19,3 +19,7 @@ class UnsupportedArchError(ValueError): class UnsupportedOSError(ValueError): pass + + +class EmptyReportError(ValueError): + pass diff --git a/capa/features/extractors/cape/call.py b/capa/features/extractors/cape/call.py index 5d274c5e7..88680b3fa 100644 --- a/capa/features/extractors/cape/call.py +++ b/capa/features/extractors/cape/call.py @@ -21,7 +21,7 @@ def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]: """ - this method extrcts the given call's features (such as API name and arguments), + this method extracts the given call's features (such as API name and arguments), and returns them as API, Number, and String features. args: diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py index 2a070c91b..1c8cfd2a0 100644 --- a/capa/features/extractors/cape/extractor.py +++ b/capa/features/extractors/cape/extractor.py @@ -14,10 +14,10 @@ import capa.features.extractors.cape.thread import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process -from capa.exceptions import UnsupportedFormatError +from capa.exceptions import EmptyReportError, UnsupportedFormatError from capa.features.common import Feature, Characteristic from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress -from capa.features.extractors.cape.models import CapeReport +from capa.features.extractors.cape.models import Static, CapeReport from capa.features.extractors.base_extractor import ( CallHandle, SampleHashes, @@ -85,10 +85,18 @@ def from_report(cls, report: Dict) -> "CapeExtractor": if cr.info.version not in TESTED_VERSIONS: logger.warning("CAPE version '%s' not tested/supported yet", cr.info.version) + # observed in 2.4-CAPE reports from capesandbox.com + if cr.static is None and cr.target.file.pe is not None: + cr.static = Static() + cr.static.pe = cr.target.file.pe + if cr.static is None: raise UnsupportedFormatError("CAPE report missing static analysis") if cr.static.pe is None: raise UnsupportedFormatError("CAPE report missing PE analysis") + if len(cr.behavior.processes) == 0: + raise EmptyReportError("CAPE did not capture any processes") + return cls(cr) diff --git a/capa/features/extractors/cape/models.py b/capa/features/extractors/cape/models.py index ab479c8d4..870afa820 100644 --- a/capa/features/extractors/cape/models.py +++ b/capa/features/extractors/cape/models.py @@ -132,13 +132,21 @@ class DigitalSigner(FlexibleModel): extensions_subjectKeyIdentifier: Optional[str] = None +class AuxSigner(ExactModel): + name: str + issued_to: str = Field(alias="Issued to") + issued_by: str = Field(alias="Issued by") + expires: str = Field(alias="Expires") + sha1_hash: str = Field(alias="SHA1 hash") + + class Signer(ExactModel): - aux_sha1: Optional[TODO] = None - aux_timestamp: Optional[None] = None + aux_sha1: Optional[str] = None + aux_timestamp: Optional[str] = None aux_valid: Optional[bool] = None aux_error: Optional[bool] = None aux_error_desc: Optional[str] = None - aux_signers: Optional[ListTODO] = None + aux_signers: Optional[List[AuxSigner]] = None class Overlay(ExactModel): @@ -197,7 +205,10 @@ class PE(ExactModel): guest_signers: Signer -class File(ExactModel): +# TODO(mr-tz): target.file.dotnet, target.file.extracted_files, target.file.extracted_files_tool, +# target.file.extracted_files_time +# https://github.com/mandiant/capa/issues/1814 +class File(FlexibleModel): type: str cape_type_code: Optional[int] = None cape_type: Optional[str] = None @@ -350,6 +361,7 @@ class Behavior(ExactModel): class Target(ExactModel): category: str file: File + pe: Optional[PE] = None class Static(ExactModel): @@ -385,7 +397,7 @@ class CapeReport(FlexibleModel): # post-processed results: payloads and extracted configs CAPE: Optional[Cape] = None dropped: Optional[List[File]] = None - procdump: List[ProcessFile] + procdump: Optional[List[ProcessFile]] = None procmemory: ListTODO # ========================================================================= diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index 24c2d3b29..cfdb081cf 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -10,6 +10,7 @@ from typing import Iterator from capa.features.address import DynamicCallAddress +from capa.features.extractors.helpers import is_aw_function from capa.features.extractors.cape.models import Process from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle @@ -24,5 +25,22 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: if call.thread_id != tid: continue - addr = DynamicCallAddress(thread=th.address, id=call_index) - yield CallHandle(address=addr, inner=call) + for symbol in generate_symbols(call.api): + call.api = symbol + + addr = DynamicCallAddress(thread=th.address, id=call_index) + yield CallHandle(address=addr, inner=call) + + +def generate_symbols(symbol: str) -> Iterator[str]: + """ + for a given symbol name, generate variants. + we over-generate features to make matching easier. + """ + + # CreateFileA + yield symbol + + if is_aw_function(symbol): + # CreateFile + yield symbol[:-1] diff --git a/capa/helpers.py b/capa/helpers.py index abe839af9..ee7bbca37 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -148,9 +148,9 @@ def log_unsupported_format_error(): logger.error("-" * 80) -def log_unsupported_cape_report_error(): +def log_unsupported_cape_report_error(error: str): logger.error("-" * 80) - logger.error(" Input file is not a valid CAPE report.") + logger.error("Input file is not a valid CAPE report: %s", error) logger.error(" ") logger.error(" capa currently only supports analyzing standard CAPE json reports.") logger.error( @@ -159,6 +159,14 @@ def log_unsupported_cape_report_error(): logger.error("-" * 80) +def log_empty_cape_report_error(error: str): + logger.error("-" * 80) + logger.error(" CAPE report is empty or only contains little useful data: %s", error) + logger.error(" ") + logger.error(" Please make sure the sandbox run captures useful behaviour of your sample.") + logger.error("-" * 80) + + def log_unsupported_os_error(): logger.error("-" * 80) logger.error(" Input file does not appear to target a supported OS.") diff --git a/capa/main.py b/capa/main.py index 4041cbe81..8978f7111 100644 --- a/capa/main.py +++ b/capa/main.py @@ -62,10 +62,17 @@ log_unsupported_os_error, redirecting_print_to_tqdm, log_unsupported_arch_error, + log_empty_cape_report_error, log_unsupported_format_error, log_unsupported_cape_report_error, ) -from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError, UnsupportedRuntimeError +from capa.exceptions import ( + EmptyReportError, + UnsupportedOSError, + UnsupportedArchError, + UnsupportedFormatError, + UnsupportedRuntimeError, +) from capa.features.common import ( OS_AUTO, OS_LINUX, @@ -1495,12 +1502,17 @@ def main(argv: Optional[List[str]] = None): except (ELFError, OverflowError) as e: logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e)) return E_CORRUPT_FILE - except UnsupportedFormatError: + except UnsupportedFormatError as e: if format_ == FORMAT_CAPE: - log_unsupported_cape_report_error() + log_unsupported_cape_report_error(str(e)) else: log_unsupported_format_error() return E_INVALID_FILE_TYPE + except EmptyReportError as e: + if format_ == FORMAT_CAPE: + log_empty_cape_report_error(str(e)) + else: + log_unsupported_format_error() for file_extractor in file_extractors: if isinstance(file_extractor, DynamicFeatureExtractor): @@ -1561,6 +1573,9 @@ def main(argv: Optional[List[str]] = None): should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) + # TODO(mr-tz): this should be wrapped and refactored as it's tedious to update everywhere + # see same code and show-features above examples + # https://github.com/mandiant/capa/issues/1813 try: extractor = get_extractor( args.sample, @@ -1571,9 +1586,9 @@ def main(argv: Optional[List[str]] = None): should_save_workspace, disable_progress=args.quiet or args.debug, ) - except UnsupportedFormatError: + except UnsupportedFormatError as e: if format_ == FORMAT_CAPE: - log_unsupported_cape_report_error() + log_unsupported_cape_report_error(str(e)) else: log_unsupported_format_error() return E_INVALID_FILE_TYPE diff --git a/scripts/lint.py b/scripts/lint.py index 9fcebdd0d..09f27fe57 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -216,8 +216,8 @@ class InvalidScopes(Lint): recommendation = "At least one scope (static or dynamic) must be specified" def check_rule(self, ctx: Context, rule: Rule): - return (rule.meta.get("scope").get("static") in ("unspecified", "unsupported")) and ( - rule.meta.get("scope").get("dynamic") in ("unspecified", "unsupported") + return (rule.meta.get("scopes").get("static") in ("unspecified", "unsupported")) and ( + rule.meta.get("scopes").get("dynamic") in ("unspecified", "unsupported") ) @@ -979,10 +979,6 @@ def main(argv=None): default_samples_path = str(Path(__file__).resolve().parent.parent / "tests" / "data") - # TODO(yelhamer): remove once support for the legacy scope field has been added - # https://github.com/mandiant/capa/pull/1580 - return 0 - parser = argparse.ArgumentParser(description="Lint capa rules.") capa.main.install_common_args(parser, wanted={"tag"}) parser.add_argument("rules", type=str, action="append", help="Path to rules") diff --git a/scripts/show-features.py b/scripts/show-features.py index d909d95b7..5bb9fd5b4 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -83,7 +83,15 @@ import capa.features.extractors.pefile from capa.helpers import get_auto_format, log_unsupported_runtime_error from capa.features.insn import API, Number -from capa.features.common import FORMAT_AUTO, FORMAT_FREEZE, DYNAMIC_FORMATS, String, Feature, is_global_feature +from capa.features.common import ( + FORMAT_AUTO, + FORMAT_CAPE, + FORMAT_FREEZE, + DYNAMIC_FORMATS, + String, + Feature, + is_global_feature, +) from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor, DynamicFeatureExtractor logger = logging.getLogger("capa.show-features") @@ -132,8 +140,11 @@ def main(argv=None): extractor = capa.main.get_extractor( args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace ) - except capa.exceptions.UnsupportedFormatError: - capa.helpers.log_unsupported_format_error() + except capa.exceptions.UnsupportedFormatError as e: + if format_ == FORMAT_CAPE: + capa.helpers.log_unsupported_cape_report_error(str(e)) + else: + capa.helpers.log_unsupported_format_error() return -1 except capa.exceptions.UnsupportedRuntimeError: log_unsupported_runtime_error() @@ -248,13 +259,13 @@ def print_static_features(functions, extractor: StaticFeatureExtractor): def print_dynamic_features(processes, extractor: DynamicFeatureExtractor): for p in processes: - print(f"proc: {p.inner['name']} (ppid={p.address.ppid}, pid={p.address.pid})") + print(f"proc: {p.inner.process_name} (ppid={p.address.ppid}, pid={p.address.pid})") for feature, addr in extractor.extract_process_features(p): if is_global_feature(feature): continue - print(f" proc: {p.inner['name']}: {feature}") + print(f" proc: {p.inner.process_name}: {feature}") for t in extractor.get_threads(p): print(f" thread: {t.address.tid}") @@ -283,7 +294,7 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor): print(f" arguments=[{', '.join(arguments)}]") for cid, api in apis: - print(f"call {cid}: {api}({', '.join(arguments)})") + print(f" call {cid}: {api}({', '.join(arguments)})") def ida_main():