From 5c48f3820851dd49ce18c2d96e0c059cff45e28c Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 19 Oct 2023 10:39:14 +0200 Subject: [PATCH 01/14] capa/main.py: add a `capabilities` module and move all of the capability extraction there --- capa/features/capabilities/__init__.py | 0 capa/features/capabilities/common.py | 49 +++ capa/features/capabilities/dynamic.py | 190 +++++++++++ capa/features/capabilities/static.py | 225 +++++++++++++ capa/main.py | 428 +------------------------ tests/test_main.py | 15 +- 6 files changed, 475 insertions(+), 432 deletions(-) create mode 100644 capa/features/capabilities/__init__.py create mode 100644 capa/features/capabilities/common.py create mode 100644 capa/features/capabilities/dynamic.py create mode 100644 capa/features/capabilities/static.py diff --git a/capa/features/capabilities/__init__.py b/capa/features/capabilities/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/capa/features/capabilities/common.py b/capa/features/capabilities/common.py new file mode 100644 index 000000000..ce7ddfb4d --- /dev/null +++ b/capa/features/capabilities/common.py @@ -0,0 +1,49 @@ +import logging +import itertools +import collections +from typing import Any, Tuple + +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.features.address import NO_ADDRESS +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor + +logger = logging.getLogger("capa") + + +def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): + file_features: FeatureSet = collections.defaultdict(set) + + for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): + # not all file features may have virtual addresses. + # if not, then at least ensure the feature shows up in the index. + # the set of addresses will still be empty. + if va: + file_features[feature].add(va) + else: + if feature not in file_features: + file_features[feature] = set() + + logger.debug("analyzed file and extracted %d features", len(file_features)) + + file_features.update(function_features) + + _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) + return matches, len(file_features) + + +def find_capabilities( + ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs +) -> Tuple[MatchResults, Any]: + from capa.features.capabilities.static import find_static_capabilities + from capa.features.capabilities.dynamic import find_dynamic_capabilities + + if isinstance(extractor, StaticFeatureExtractor): + # for the time being, extractors are either static or dynamic. + # Remove this assertion once that has changed + assert not isinstance(extractor, DynamicFeatureExtractor) + return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) + if isinstance(extractor, DynamicFeatureExtractor): + return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) + else: + raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") diff --git a/capa/features/capabilities/dynamic.py b/capa/features/capabilities/dynamic.py new file mode 100644 index 000000000..4ac7a3599 --- /dev/null +++ b/capa/features/capabilities/dynamic.py @@ -0,0 +1,190 @@ +import logging +import itertools +import collections +from typing import Any, Tuple + +import tqdm + +import capa.perf +import capa.features.freeze as frz +import capa.render.result_document as rdoc +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.helpers import redirecting_print_to_tqdm +from capa.features.capabilities.common import find_file_capabilities +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor + +logger = logging.getLogger("capa") + + +def find_call_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given call. + + returns: tuple containing (features for call, match results for call) + """ + # all features found for the call. + features: FeatureSet = collections.defaultdict(set) + + for feature, addr in itertools.chain( + extractor.extract_call_features(ph, th, ch), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this thread. + _, matches = ruleset.match(Scope.CALL, features, ch.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_thread_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle +) -> Tuple[FeatureSet, MatchResults, MatchResults]: + """ + find matches for the given rules within the given thread. + + returns: tuple containing (features for thread, match results for thread, match results for calls) + """ + # all features found within this thread, + # includes features found within calls. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the call scope. + # might be found at different calls, thats ok. + call_matches: MatchResults = collections.defaultdict(list) + + for ch in extractor.get_calls(ph, th): + ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch) + for feature, vas in ifeatures.items(): + features[feature].update(vas) + + for rule_name, res in imatches.items(): + call_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): + features[feature].add(va) + + # matches found within this thread. + _, matches = ruleset.match(Scope.THREAD, features, th.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(features, rule, [va]) + + return features, matches, call_matches + + +def find_process_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle +) -> Tuple[MatchResults, MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given process. + + returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) + """ + # all features found within this process, + # includes features found within threads (and calls). + process_features: FeatureSet = collections.defaultdict(set) + + # matches found at the basic threads. + # might be found at different threads, thats ok. + thread_matches: MatchResults = collections.defaultdict(list) + + # matches found at the call scope. + # might be found at different calls, thats ok. + call_matches: MatchResults = collections.defaultdict(list) + + for th in extractor.get_threads(ph): + features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) + for feature, vas in features.items(): + process_features[feature].update(vas) + + for rule_name, res in tmatches.items(): + thread_matches[rule_name].extend(res) + + for rule_name, res in cmatches.items(): + call_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): + process_features[feature].add(va) + + _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) + return process_matches, thread_matches, call_matches, len(process_features) + + +def find_dynamic_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_process_matches: MatchResults = collections.defaultdict(list) + all_thread_matches: MatchResults = collections.defaultdict(list) + all_call_matches: MatchResults = collections.defaultdict(list) + + feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) + + assert isinstance(extractor, DynamicFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + processes = list(extractor.get_processes()) + + pb = pbar(processes, desc="matching", unit=" processes", leave=False) + for p in pb: + process_matches, thread_matches, call_matches, feature_count = find_process_capabilities( + ruleset, extractor, p + ) + feature_counts.processes += ( + rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), + ) + logger.debug("analyzed %s and extracted %d features", p.address, feature_count) + + for rule_name, res in process_matches.items(): + all_process_matches[rule_name].extend(res) + for rule_name, res in thread_matches.items(): + all_thread_matches[rule_name].extend(res) + for rule_name, res in call_matches.items(): + all_call_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within process and thread scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + process_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain( + all_process_matches.items(), all_thread_matches.items(), all_call_matches.items() + ): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(process_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_thread_matches.items(), + all_process_matches.items(), + all_call_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + } + + return matches, meta diff --git a/capa/features/capabilities/static.py b/capa/features/capabilities/static.py new file mode 100644 index 000000000..12e1b5196 --- /dev/null +++ b/capa/features/capabilities/static.py @@ -0,0 +1,225 @@ +import time +import logging +import itertools +import collections +from typing import Any, Tuple + +import tqdm.contrib.logging + +import capa.perf +import capa.features.freeze as frz +import capa.render.result_document as rdoc +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.helpers import redirecting_print_to_tqdm +from capa.features.capabilities.common import find_file_capabilities +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor + +logger = logging.getLogger("capa") + + +def find_instruction_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given instruction. + + returns: tuple containing (features for instruction, match results for instruction) + """ + # all features found for the instruction. + features: FeatureSet = collections.defaultdict(set) + + for feature, addr in itertools.chain( + extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this instruction. + _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_basic_block_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle +) -> Tuple[FeatureSet, MatchResults, MatchResults]: + """ + find matches for the given rules within the given basic block. + + returns: tuple containing (features for basic block, match results for basic block, match results for instructions) + """ + # all features found within this basic block, + # includes features found within instructions. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the instruction scope. + # might be found at different instructions, thats ok. + insn_matches: MatchResults = collections.defaultdict(list) + + for insn in extractor.get_instructions(f, bb): + ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) + for feature, vas in ifeatures.items(): + features[feature].update(vas) + + for rule_name, res in imatches.items(): + insn_matches[rule_name].extend(res) + + for feature, va in itertools.chain( + extractor.extract_basic_block_features(f, bb), extractor.extract_global_features() + ): + features[feature].add(va) + + # matches found within this basic block. + _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(features, rule, [va]) + + return features, matches, insn_matches + + +def find_code_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle +) -> Tuple[MatchResults, MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given function. + + returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) + """ + # all features found within this function, + # includes features found within basic blocks (and instructions). + function_features: FeatureSet = collections.defaultdict(set) + + # matches found at the basic block scope. + # might be found at different basic blocks, thats ok. + bb_matches: MatchResults = collections.defaultdict(list) + + # matches found at the instruction scope. + # might be found at different instructions, thats ok. + insn_matches: MatchResults = collections.defaultdict(list) + + for bb in extractor.get_basic_blocks(fh): + features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) + for feature, vas in features.items(): + function_features[feature].update(vas) + + for rule_name, res in bmatches.items(): + bb_matches[rule_name].extend(res) + + for rule_name, res in imatches.items(): + insn_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): + function_features[feature].add(va) + + _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) + return function_matches, bb_matches, insn_matches, len(function_features) + + +def find_static_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_function_matches: MatchResults = collections.defaultdict(list) + all_bb_matches: MatchResults = collections.defaultdict(list) + all_insn_matches: MatchResults = collections.defaultdict(list) + + feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) + library_functions: Tuple[rdoc.LibraryFunction, ...] = () + + assert isinstance(extractor, StaticFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if capa.helpers.is_runtime_ghidra(): + # Ghidrathon interpreter cannot properly handle + # the TMonitor thread that is created via a monitor_interval + # > 0 + pbar.monitor_interval = 0 + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + functions = list(extractor.get_functions()) + n_funcs = len(functions) + + pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False) + for f in pb: + t0 = time.time() + if extractor.is_library_function(f.address): + function_name = extractor.get_function_name(f.address) + logger.debug("skipping library function 0x%x (%s)", f.address, function_name) + library_functions += ( + rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), + ) + n_libs = len(library_functions) + percentage = round(100 * (n_libs / n_funcs)) + if isinstance(pb, tqdm.tqdm): + pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") + continue + + function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( + ruleset, extractor, f + ) + feature_counts.functions += ( + rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), + ) + t1 = time.time() + + match_count = sum(len(res) for res in function_matches.values()) + match_count += sum(len(res) for res in bb_matches.values()) + match_count += sum(len(res) for res in insn_matches.values()) + logger.debug( + "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", + f.address, + feature_count, + match_count, + t1 - t0, + ) + + for rule_name, res in function_matches.items(): + all_function_matches[rule_name].extend(res) + for rule_name, res in bb_matches.items(): + all_bb_matches[rule_name].extend(res) + for rule_name, res in insn_matches.items(): + all_insn_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within function, BB, and instruction scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + function_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain( + all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() + ): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(function_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_insn_matches.items(), + all_bb_matches.items(), + all_function_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + "library_functions": library_functions, + } + + return matches, meta diff --git a/capa/main.py b/capa/main.py index 642778877..fdfeca813 100644 --- a/capa/main.py +++ b/capa/main.py @@ -17,16 +17,12 @@ import argparse import datetime import textwrap -import itertools import contextlib -import collections -from typing import Any, Dict, List, Tuple, Callable, Optional +from typing import Any, Dict, List, Callable, Optional from pathlib import Path import halo -import tqdm import colorama -import tqdm.contrib.logging from pefile import PEFormatError from typing_extensions import assert_never from elftools.common.exceptions import ELFError @@ -53,14 +49,13 @@ import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor import capa.features.extractors.cape.extractor -from capa.rules import Rule, Scope, RuleSet +from capa.rules import Rule, RuleSet from capa.engine import FeatureSet, MatchResults from capa.helpers import ( get_format, get_file_taste, get_auto_format, log_unsupported_os_error, - redirecting_print_to_tqdm, log_unsupported_arch_error, log_empty_cape_report_error, log_unsupported_format_error, @@ -89,14 +84,9 @@ FORMAT_RESULT, ) from capa.features.address import NO_ADDRESS, Address +from capa.features.capabilities.common import find_capabilities, find_file_capabilities from capa.features.extractors.base_extractor import ( - BBHandle, - CallHandle, - InsnHandle, SampleHashes, - ThreadHandle, - ProcessHandle, - FunctionHandle, FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor, @@ -144,418 +134,6 @@ def set_vivisect_log_level(level): logging.getLogger("Elf").setLevel(level) -def find_instruction_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle -) -> Tuple[FeatureSet, MatchResults]: - """ - find matches for the given rules for the given instruction. - - returns: tuple containing (features for instruction, match results for instruction) - """ - # all features found for the instruction. - features: FeatureSet = collections.defaultdict(set) - - for feature, addr in itertools.chain( - extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() - ): - features[feature].add(addr) - - # matches found at this instruction. - _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for addr, _ in res: - capa.engine.index_rule_matches(features, rule, [addr]) - - return features, matches - - -def find_basic_block_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle -) -> Tuple[FeatureSet, MatchResults, MatchResults]: - """ - find matches for the given rules within the given basic block. - - returns: tuple containing (features for basic block, match results for basic block, match results for instructions) - """ - # all features found within this basic block, - # includes features found within instructions. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the instruction scope. - # might be found at different instructions, thats ok. - insn_matches: MatchResults = collections.defaultdict(list) - - for insn in extractor.get_instructions(f, bb): - ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) - for feature, vas in ifeatures.items(): - features[feature].update(vas) - - for rule_name, res in imatches.items(): - insn_matches[rule_name].extend(res) - - for feature, va in itertools.chain( - extractor.extract_basic_block_features(f, bb), extractor.extract_global_features() - ): - features[feature].add(va) - - # matches found within this basic block. - _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) - - return features, matches, insn_matches - - -def find_code_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle -) -> Tuple[MatchResults, MatchResults, MatchResults, int]: - """ - find matches for the given rules within the given function. - - returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) - """ - # all features found within this function, - # includes features found within basic blocks (and instructions). - function_features: FeatureSet = collections.defaultdict(set) - - # matches found at the basic block scope. - # might be found at different basic blocks, thats ok. - bb_matches: MatchResults = collections.defaultdict(list) - - # matches found at the instruction scope. - # might be found at different instructions, thats ok. - insn_matches: MatchResults = collections.defaultdict(list) - - for bb in extractor.get_basic_blocks(fh): - features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) - for feature, vas in features.items(): - function_features[feature].update(vas) - - for rule_name, res in bmatches.items(): - bb_matches[rule_name].extend(res) - - for rule_name, res in imatches.items(): - insn_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): - function_features[feature].add(va) - - _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) - return function_matches, bb_matches, insn_matches, len(function_features) - - -def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): - file_features: FeatureSet = collections.defaultdict(set) - - for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): - # not all file features may have virtual addresses. - # if not, then at least ensure the feature shows up in the index. - # the set of addresses will still be empty. - if va: - file_features[feature].add(va) - else: - if feature not in file_features: - file_features[feature] = set() - - logger.debug("analyzed file and extracted %d features", len(file_features)) - - file_features.update(function_features) - - _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) - return matches, len(file_features) - - -def find_static_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None -) -> Tuple[MatchResults, Any]: - all_function_matches: MatchResults = collections.defaultdict(list) - all_bb_matches: MatchResults = collections.defaultdict(list) - all_insn_matches: MatchResults = collections.defaultdict(list) - - feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) - library_functions: Tuple[rdoc.LibraryFunction, ...] = () - - assert isinstance(extractor, StaticFeatureExtractor) - with redirecting_print_to_tqdm(disable_progress): - with tqdm.contrib.logging.logging_redirect_tqdm(): - pbar = tqdm.tqdm - if capa.helpers.is_runtime_ghidra(): - # Ghidrathon interpreter cannot properly handle - # the TMonitor thread that is created via a monitor_interval - # > 0 - pbar.monitor_interval = 0 - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - def pbar(s, *args, **kwargs): - return s - - functions = list(extractor.get_functions()) - n_funcs = len(functions) - - pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False) - for f in pb: - t0 = time.time() - if extractor.is_library_function(f.address): - function_name = extractor.get_function_name(f.address) - logger.debug("skipping library function 0x%x (%s)", f.address, function_name) - library_functions += ( - rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), - ) - n_libs = len(library_functions) - percentage = round(100 * (n_libs / n_funcs)) - if isinstance(pb, tqdm.tqdm): - pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") - continue - - function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( - ruleset, extractor, f - ) - feature_counts.functions += ( - rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), - ) - t1 = time.time() - - match_count = sum(len(res) for res in function_matches.values()) - match_count += sum(len(res) for res in bb_matches.values()) - match_count += sum(len(res) for res in insn_matches.values()) - logger.debug( - "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", - f.address, - feature_count, - match_count, - t1 - t0, - ) - - for rule_name, res in function_matches.items(): - all_function_matches[rule_name].extend(res) - for rule_name, res in bb_matches.items(): - all_bb_matches[rule_name].extend(res) - for rule_name, res in insn_matches.items(): - all_insn_matches[rule_name].extend(res) - - # collection of features that captures the rule matches within function, BB, and instruction scopes. - # mapping from feature (matched rule) to set of addresses at which it matched. - function_and_lower_features: FeatureSet = collections.defaultdict(set) - for rule_name, results in itertools.chain( - all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() - ): - locations = {p[0] for p in results} - rule = ruleset[rule_name] - capa.engine.index_rule_matches(function_and_lower_features, rule, locations) - - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) - feature_counts.file = feature_count - - matches = dict( - itertools.chain( - # each rule exists in exactly one scope, - # so there won't be any overlap among these following MatchResults, - # and we can merge the dictionaries naively. - all_insn_matches.items(), - all_bb_matches.items(), - all_function_matches.items(), - all_file_matches.items(), - ) - ) - - meta = { - "feature_counts": feature_counts, - "library_functions": library_functions, - } - - return matches, meta - - -def find_call_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle -) -> Tuple[FeatureSet, MatchResults]: - """ - find matches for the given rules for the given call. - - returns: tuple containing (features for call, match results for call) - """ - # all features found for the call. - features: FeatureSet = collections.defaultdict(set) - - for feature, addr in itertools.chain( - extractor.extract_call_features(ph, th, ch), extractor.extract_global_features() - ): - features[feature].add(addr) - - # matches found at this thread. - _, matches = ruleset.match(Scope.CALL, features, ch.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for addr, _ in res: - capa.engine.index_rule_matches(features, rule, [addr]) - - return features, matches - - -def find_thread_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle -) -> Tuple[FeatureSet, MatchResults, MatchResults]: - """ - find matches for the given rules within the given thread. - - returns: tuple containing (features for thread, match results for thread, match results for calls) - """ - # all features found within this thread, - # includes features found within calls. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the call scope. - # might be found at different calls, thats ok. - call_matches: MatchResults = collections.defaultdict(list) - - for ch in extractor.get_calls(ph, th): - ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch) - for feature, vas in ifeatures.items(): - features[feature].update(vas) - - for rule_name, res in imatches.items(): - call_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): - features[feature].add(va) - - # matches found within this thread. - _, matches = ruleset.match(Scope.THREAD, features, th.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) - - return features, matches, call_matches - - -def find_process_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle -) -> Tuple[MatchResults, MatchResults, MatchResults, int]: - """ - find matches for the given rules within the given process. - - returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) - """ - # all features found within this process, - # includes features found within threads (and calls). - process_features: FeatureSet = collections.defaultdict(set) - - # matches found at the basic threads. - # might be found at different threads, thats ok. - thread_matches: MatchResults = collections.defaultdict(list) - - # matches found at the call scope. - # might be found at different calls, thats ok. - call_matches: MatchResults = collections.defaultdict(list) - - for th in extractor.get_threads(ph): - features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) - for feature, vas in features.items(): - process_features[feature].update(vas) - - for rule_name, res in tmatches.items(): - thread_matches[rule_name].extend(res) - - for rule_name, res in cmatches.items(): - call_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): - process_features[feature].add(va) - - _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) - return process_matches, thread_matches, call_matches, len(process_features) - - -def find_dynamic_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None -) -> Tuple[MatchResults, Any]: - all_process_matches: MatchResults = collections.defaultdict(list) - all_thread_matches: MatchResults = collections.defaultdict(list) - all_call_matches: MatchResults = collections.defaultdict(list) - - feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) - - assert isinstance(extractor, DynamicFeatureExtractor) - with redirecting_print_to_tqdm(disable_progress): - with tqdm.contrib.logging.logging_redirect_tqdm(): - pbar = tqdm.tqdm - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - def pbar(s, *args, **kwargs): - return s - - processes = list(extractor.get_processes()) - - pb = pbar(processes, desc="matching", unit=" processes", leave=False) - for p in pb: - process_matches, thread_matches, call_matches, feature_count = find_process_capabilities( - ruleset, extractor, p - ) - feature_counts.processes += ( - rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), - ) - logger.debug("analyzed %s and extracted %d features", p.address, feature_count) - - for rule_name, res in process_matches.items(): - all_process_matches[rule_name].extend(res) - for rule_name, res in thread_matches.items(): - all_thread_matches[rule_name].extend(res) - for rule_name, res in call_matches.items(): - all_call_matches[rule_name].extend(res) - - # collection of features that captures the rule matches within process and thread scopes. - # mapping from feature (matched rule) to set of addresses at which it matched. - process_and_lower_features: FeatureSet = collections.defaultdict(set) - for rule_name, results in itertools.chain( - all_process_matches.items(), all_thread_matches.items(), all_call_matches.items() - ): - locations = {p[0] for p in results} - rule = ruleset[rule_name] - capa.engine.index_rule_matches(process_and_lower_features, rule, locations) - - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) - feature_counts.file = feature_count - - matches = dict( - itertools.chain( - # each rule exists in exactly one scope, - # so there won't be any overlap among these following MatchResults, - # and we can merge the dictionaries naively. - all_thread_matches.items(), - all_process_matches.items(), - all_call_matches.items(), - all_file_matches.items(), - ) - ) - - meta = { - "feature_counts": feature_counts, - } - - return matches, meta - - -def find_capabilities( - ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs -) -> Tuple[MatchResults, Any]: - if isinstance(extractor, StaticFeatureExtractor): - return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - elif isinstance(extractor, DynamicFeatureExtractor): - return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - else: - raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") - - def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespace: str) -> bool: return any( rules.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() diff --git a/tests/test_main.py b/tests/test_main.py index 8caae9322..284988fdc 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -17,6 +17,7 @@ import capa.rules import capa.engine import capa.features +import capa.features.capabilities.common def test_main(z9324d_extractor): @@ -277,7 +278,7 @@ def test_match_across_scopes_file_function(z9324d_extractor): ), ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "install service" in capabilities assert ".text section" in capabilities assert ".text section and install service" in capabilities @@ -345,7 +346,7 @@ def test_match_across_scopes(z9324d_extractor): ), ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "tight loop" in capabilities assert "kill thread loop" in capabilities assert "kill thread program" in capabilities @@ -373,7 +374,7 @@ def test_subscope_bb_rules(z9324d_extractor): ] ) # tight loop at 0x403685 - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "test rule" in capabilities @@ -397,7 +398,7 @@ def test_byte_matching(z9324d_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "byte match test" in capabilities @@ -422,7 +423,7 @@ def test_count_bb(z9324d_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "count bb" in capabilities @@ -449,7 +450,7 @@ def test_instruction_scope(z9324d_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000" in capabilities assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} @@ -481,7 +482,7 @@ def test_instruction_subscope(z9324d_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000 on i386" in capabilities assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} From 37caeb2736910130c8770adafc9e1a6ef7b41520 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 19 Oct 2023 10:54:53 +0200 Subject: [PATCH 02/14] capabilities: add a test file for the new capabilities module, and move the corresponding tests from main to there --- capa/features/capabilities/common.py | 12 +- capa/features/capabilities/dynamic.py | 8 + capa/features/capabilities/static.py | 8 + tests/test_capabilities.py | 283 ++++++++++++++++++++++++++ tests/test_main.py | 273 ------------------------- 5 files changed, 309 insertions(+), 275 deletions(-) create mode 100644 tests/test_capabilities.py diff --git a/capa/features/capabilities/common.py b/capa/features/capabilities/common.py index ce7ddfb4d..b9252c9fe 100644 --- a/capa/features/capabilities/common.py +++ b/capa/features/capabilities/common.py @@ -1,3 +1,11 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. import logging import itertools import collections @@ -45,5 +53,5 @@ def find_capabilities( return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) if isinstance(extractor, DynamicFeatureExtractor): return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - else: - raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") + + raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") diff --git a/capa/features/capabilities/dynamic.py b/capa/features/capabilities/dynamic.py index 4ac7a3599..acf505466 100644 --- a/capa/features/capabilities/dynamic.py +++ b/capa/features/capabilities/dynamic.py @@ -1,3 +1,11 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. import logging import itertools import collections diff --git a/capa/features/capabilities/static.py b/capa/features/capabilities/static.py index 12e1b5196..785917c0e 100644 --- a/capa/features/capabilities/static.py +++ b/capa/features/capabilities/static.py @@ -1,3 +1,11 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. import time import logging import itertools diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py new file mode 100644 index 000000000..ef86d102d --- /dev/null +++ b/tests/test_capabilities.py @@ -0,0 +1,283 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import textwrap + +import capa.features.capabilities.common + + +def test_match_across_scopes_file_function(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + # this rule should match on a function (0x4073F0) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: install service + scopes: + static: function + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x4073F0 + features: + - and: + - api: advapi32.OpenSCManagerA + - api: advapi32.CreateServiceA + - api: advapi32.StartServiceA + """ + ) + ), + # this rule should match on a file feature + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: .text section + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - section: .text + """ + ) + ), + # this rule should match on earlier rule matches: + # - install service, with function scope + # - .text section, with file scope + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: .text section and install service + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - match: install service + - match: .text section + """ + ) + ), + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "install service" in capabilities + assert ".text section" in capabilities + assert ".text section and install service" in capabilities + + +def test_match_across_scopes(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + # this rule should match on a basic block (including at least 0x403685) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: tight loop + scopes: + static: basic block + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403685 + features: + - characteristic: tight loop + """ + ) + ), + # this rule should match on a function (0x403660) + # based on API, as well as prior basic block rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread loop + scopes: + static: function + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403660 + features: + - and: + - api: kernel32.TerminateThread + - api: kernel32.CloseHandle + - match: tight loop + """ + ) + ), + # this rule should match on a file feature and a prior function rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread program + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - section: .text + - match: kill thread loop + """ + ) + ), + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "tight loop" in capabilities + assert "kill thread loop" in capabilities + assert "kill thread program" in capabilities + + +def test_subscope_bb_rules(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - and: + - basic block: + - characteristic: tight loop + """ + ) + ) + ] + ) + # tight loop at 0x403685 + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "test rule" in capabilities + + +def test_byte_matching(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: byte match test + scopes: + static: function + dynamic: process + features: + - and: + - bytes: ED 24 9E F4 52 A9 07 47 55 8E E1 AB 30 8E 23 61 + """ + ) + ) + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "byte match test" in capabilities + + +def test_count_bb(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: count bb + namespace: test + scopes: + static: function + dynamic: process + features: + - and: + - count(basic blocks): 1 or more + """ + ) + ) + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "count bb" in capabilities + + +def test_instruction_scope(z9324d_extractor): + # .text:004071A4 68 E8 03 00 00 push 3E8h + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: push 1000 + namespace: test + scopes: + static: instruction + dynamic: process + features: + - and: + - mnemonic: push + - number: 1000 + """ + ) + ) + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000" in capabilities + assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} + + +def test_instruction_subscope(z9324d_extractor): + # .text:00406F60 sub_406F60 proc near + # [...] + # .text:004071A4 68 E8 03 00 00 push 3E8h + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: push 1000 on i386 + namespace: test + scopes: + static: function + dynamic: process + features: + - and: + - arch: i386 + - instruction: + - mnemonic: push + - number: 1000 + """ + ) + ) + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000 on i386" in capabilities + assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} diff --git a/tests/test_main.py b/tests/test_main.py index 284988fdc..6d588dda1 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -17,7 +17,6 @@ import capa.rules import capa.engine import capa.features -import capa.features.capabilities.common def test_main(z9324d_extractor): @@ -215,278 +214,6 @@ def test_ruleset(): assert len(rules.call_rules) == 2 -def test_match_across_scopes_file_function(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - # this rule should match on a function (0x4073F0) - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: install service - scopes: - static: function - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x4073F0 - features: - - and: - - api: advapi32.OpenSCManagerA - - api: advapi32.CreateServiceA - - api: advapi32.StartServiceA - """ - ) - ), - # this rule should match on a file feature - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: .text section - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - section: .text - """ - ) - ), - # this rule should match on earlier rule matches: - # - install service, with function scope - # - .text section, with file scope - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: .text section and install service - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - match: install service - - match: .text section - """ - ) - ), - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "install service" in capabilities - assert ".text section" in capabilities - assert ".text section and install service" in capabilities - - -def test_match_across_scopes(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - # this rule should match on a basic block (including at least 0x403685) - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: tight loop - scopes: - static: basic block - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403685 - features: - - characteristic: tight loop - """ - ) - ), - # this rule should match on a function (0x403660) - # based on API, as well as prior basic block rule match - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: kill thread loop - scopes: - static: function - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403660 - features: - - and: - - api: kernel32.TerminateThread - - api: kernel32.CloseHandle - - match: tight loop - """ - ) - ), - # this rule should match on a file feature and a prior function rule match - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: kill thread program - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - section: .text - - match: kill thread loop - """ - ) - ), - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "tight loop" in capabilities - assert "kill thread loop" in capabilities - assert "kill thread program" in capabilities - - -def test_subscope_bb_rules(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: test rule - scopes: - static: function - dynamic: process - features: - - and: - - basic block: - - characteristic: tight loop - """ - ) - ) - ] - ) - # tight loop at 0x403685 - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "test rule" in capabilities - - -def test_byte_matching(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: byte match test - scopes: - static: function - dynamic: process - features: - - and: - - bytes: ED 24 9E F4 52 A9 07 47 55 8E E1 AB 30 8E 23 61 - """ - ) - ) - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "byte match test" in capabilities - - -def test_count_bb(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: count bb - namespace: test - scopes: - static: function - dynamic: process - features: - - and: - - count(basic blocks): 1 or more - """ - ) - ) - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "count bb" in capabilities - - -def test_instruction_scope(z9324d_extractor): - # .text:004071A4 68 E8 03 00 00 push 3E8h - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: push 1000 - namespace: test - scopes: - static: instruction - dynamic: process - features: - - and: - - mnemonic: push - - number: 1000 - """ - ) - ) - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "push 1000" in capabilities - assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} - - -def test_instruction_subscope(z9324d_extractor): - # .text:00406F60 sub_406F60 proc near - # [...] - # .text:004071A4 68 E8 03 00 00 push 3E8h - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: push 1000 on i386 - namespace: test - scopes: - static: function - dynamic: process - features: - - and: - - arch: i386 - - instruction: - - mnemonic: push - - number: 1000 - """ - ) - ) - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "push 1000 on i386" in capabilities - assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} - - def test_fix262(pma16_01_extractor, capsys): path = pma16_01_extractor.path assert capa.main.main([path, "-vv", "-t", "send HTTP request", "-q"]) == 0 From f2011c162c301c6a5e88d22423cf68f0fef2814c Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 19 Oct 2023 10:58:30 +0200 Subject: [PATCH 03/14] fix styling issues --- capa/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capa/main.py b/capa/main.py index fdfeca813..262b63332 100644 --- a/capa/main.py +++ b/capa/main.py @@ -50,7 +50,7 @@ import capa.features.extractors.base_extractor import capa.features.extractors.cape.extractor from capa.rules import Rule, RuleSet -from capa.engine import FeatureSet, MatchResults +from capa.engine import MatchResults from capa.helpers import ( get_format, get_file_taste, @@ -83,7 +83,7 @@ FORMAT_FREEZE, FORMAT_RESULT, ) -from capa.features.address import NO_ADDRESS, Address +from capa.features.address import Address from capa.features.capabilities.common import find_capabilities, find_file_capabilities from capa.features.extractors.base_extractor import ( SampleHashes, From 85610a82c57393c31952bc3788c390fc150a75f8 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 19 Oct 2023 10:59:45 +0200 Subject: [PATCH 04/14] changelog fix --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33d141f5d..39e0602f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ - implement dynamic analysis via CAPE sandbox #48 #1535 @yelhamer - add call scope #771 @yelhamer - add process scope for the dynamic analysis flavor #1517 @yelhamer -- Add thread scope for the dynamic analysis flavor #1517 @yelhamer +- add thread scope for the dynamic analysis flavor #1517 @yelhamer - ghidra: add Ghidra feature extractor and supporting code #1770 @colton-gabertan - ghidra: add entry script helping users run capa against a loaded Ghidra database #1767 @mike-hunhoff - binja: add support for forwarded exports #1646 @xusheng6 From f9b87417e672f1dc90297cf2d41a60f25daebb07 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Fri, 20 Oct 2023 09:27:58 +0200 Subject: [PATCH 05/14] Update capa/capabilities/common.py Co-authored-by: Willi Ballenthin --- capa/capabilities/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index f20e26152..a73f40afe 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -16,7 +16,7 @@ from capa.features.address import NO_ADDRESS from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor -logger = logging.getLogger("capa") +logger = logging.getLogger(__name__) def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): From 423d942bd099dbe02025a82e837f4bae3e617990 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Fri, 20 Oct 2023 09:28:05 +0200 Subject: [PATCH 06/14] Update capa/capabilities/dynamic.py Co-authored-by: Willi Ballenthin --- capa/capabilities/dynamic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index 8c503cde9..23bfde4ac 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -22,7 +22,7 @@ from capa.capabilities.common import find_file_capabilities from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor -logger = logging.getLogger("capa") +logger = logging.getLogger(__name__) def find_call_capabilities( From 20604c4b41abcd3f3ad7a69273fcd4ef8176f488 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Fri, 20 Oct 2023 09:28:13 +0200 Subject: [PATCH 07/14] Update capa/capabilities/static.py Co-authored-by: Willi Ballenthin --- capa/capabilities/static.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index f072ed208..a522a29da 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -23,7 +23,7 @@ from capa.capabilities.common import find_file_capabilities from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor -logger = logging.getLogger("capa") +logger = logging.getLogger(__name__) def find_instruction_capabilities( From 96fb204d9d388e09a8a6aa354e9067598ab5e021 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 09:54:24 +0200 Subject: [PATCH 08/14] move capa.features.capabilities to capa.capabilities, and update scripts --- capa/{features => }/capabilities/__init__.py | 0 capa/{features => }/capabilities/common.py | 4 ++-- capa/{features => }/capabilities/dynamic.py | 2 +- capa/{features => }/capabilities/static.py | 2 +- capa/ghidra/capa_ghidra.py | 5 +++-- capa/ida/plugin/form.py | 3 ++- capa/main.py | 2 +- scripts/bulk-process.py | 3 ++- scripts/capa_as_library.py | 3 ++- scripts/lint.py | 3 ++- scripts/profile-time.py | 3 ++- scripts/show-capabilities-by-function.py | 3 ++- tests/test_capabilities.py | 16 ++++++++-------- 13 files changed, 28 insertions(+), 21 deletions(-) rename capa/{features => }/capabilities/__init__.py (100%) rename capa/{features => }/capabilities/common.py (94%) rename capa/{features => }/capabilities/dynamic.py (99%) rename capa/{features => }/capabilities/static.py (99%) diff --git a/capa/features/capabilities/__init__.py b/capa/capabilities/__init__.py similarity index 100% rename from capa/features/capabilities/__init__.py rename to capa/capabilities/__init__.py diff --git a/capa/features/capabilities/common.py b/capa/capabilities/common.py similarity index 94% rename from capa/features/capabilities/common.py rename to capa/capabilities/common.py index b9252c9fe..6098f789b 100644 --- a/capa/features/capabilities/common.py +++ b/capa/capabilities/common.py @@ -43,8 +43,8 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi def find_capabilities( ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs ) -> Tuple[MatchResults, Any]: - from capa.features.capabilities.static import find_static_capabilities - from capa.features.capabilities.dynamic import find_dynamic_capabilities + from capa.capabilities.static import find_static_capabilities + from capa.capabilities.dynamic import find_dynamic_capabilities if isinstance(extractor, StaticFeatureExtractor): # for the time being, extractors are either static or dynamic. diff --git a/capa/features/capabilities/dynamic.py b/capa/capabilities/dynamic.py similarity index 99% rename from capa/features/capabilities/dynamic.py rename to capa/capabilities/dynamic.py index acf505466..8c503cde9 100644 --- a/capa/features/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -19,7 +19,7 @@ from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.helpers import redirecting_print_to_tqdm -from capa.features.capabilities.common import find_file_capabilities +from capa.capabilities.common import find_file_capabilities from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor logger = logging.getLogger("capa") diff --git a/capa/features/capabilities/static.py b/capa/capabilities/static.py similarity index 99% rename from capa/features/capabilities/static.py rename to capa/capabilities/static.py index 785917c0e..f072ed208 100644 --- a/capa/features/capabilities/static.py +++ b/capa/capabilities/static.py @@ -20,7 +20,7 @@ from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.helpers import redirecting_print_to_tqdm -from capa.features.capabilities.common import find_file_capabilities +from capa.capabilities.common import find_file_capabilities from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor logger = logging.getLogger("capa") diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 99beaffc4..72eae7cf3 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -19,6 +19,7 @@ import capa.rules import capa.ghidra.helpers import capa.render.default +import capa.capabilities.common import capa.features.extractors.ghidra.extractor logger = logging.getLogger("capa_ghidra") @@ -73,7 +74,7 @@ def run_headless(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.main.find_capabilities(rules, extractor, False) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, False) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] @@ -123,7 +124,7 @@ def run_ui(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.main.find_capabilities(rules, extractor, True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index bc78045e9..f0a4e13e9 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -25,6 +25,7 @@ import capa.ida.helpers import capa.render.json import capa.features.common +import capa.capabilities.common import capa.render.result_document import capa.features.extractors.ida.extractor from capa.rules import Rule @@ -768,7 +769,7 @@ def slot_progress_feature_extraction(text): try: meta = capa.ida.helpers.collect_metadata([Path(settings.user[CAPA_SETTINGS_RULE_PATH])]) - capabilities, counts = capa.main.find_capabilities( + capabilities, counts = capa.capabilities.common.find_capabilities( ruleset, self.feature_extractor, disable_progress=True ) diff --git a/capa/main.py b/capa/main.py index 262b63332..8a6a398a3 100644 --- a/capa/main.py +++ b/capa/main.py @@ -84,7 +84,7 @@ FORMAT_RESULT, ) from capa.features.address import Address -from capa.features.capabilities.common import find_capabilities, find_file_capabilities +from capa.capabilities.common import find_capabilities, find_file_capabilities from capa.features.extractors.base_extractor import ( SampleHashes, FeatureExtractor, diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 3e3cdfb2f..8950b8936 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -75,6 +75,7 @@ import capa.main import capa.rules import capa.render.json +import capa.capabilities.common import capa.render.result_document as rd from capa.features.common import OS_AUTO @@ -136,7 +137,7 @@ def get_capa_results(args): "error": f"unexpected error: {e}", } - capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 7311107a9..611576908 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -19,6 +19,7 @@ import capa.render.json import capa.render.utils as rutils import capa.render.default +import capa.capabilities.common import capa.render.result_document as rd import capa.features.freeze.features as frzf from capa.features.common import OS_AUTO, FORMAT_AUTO @@ -175,7 +176,7 @@ def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): extractor = capa.main.get_extractor( file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True ) - capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) diff --git a/scripts/lint.py b/scripts/lint.py index 065e694bb..edcf9f563 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -41,6 +41,7 @@ import capa.engine import capa.helpers import capa.features.insn +import capa.capabilities.common from capa.rules import Rule, RuleSet from capa.features.common import OS_AUTO, String, Feature, Substring from capa.render.result_document import RuleMetadata @@ -366,7 +367,7 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True ) - capabilities, _ = capa.main.find_capabilities(ctx.rules, extractor, disable_progress=True) + capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True) # mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())? # so we ignore a few types here. capabilities = set(capabilities.keys()) # type: ignore diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 9acd60ff4..86590a800 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -54,6 +54,7 @@ import capa.features import capa.features.common import capa.features.freeze +import capa.capabilities.common logger = logging.getLogger("capa.profile") @@ -114,7 +115,7 @@ def main(argv=None): def do_iteration(): capa.perf.reset() - capa.main.find_capabilities(rules, extractor, disable_progress=True) + capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) pbar.update(1) samples = timeit.repeat(do_iteration, number=args.number, repeat=args.repeat) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 509c3a847..e987b6801 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -74,6 +74,7 @@ import capa.render.utils as rutils import capa.render.verbose import capa.features.freeze +import capa.capabilities.common import capa.render.result_document as rd from capa.helpers import get_file_taste from capa.features.common import FORMAT_AUTO @@ -186,7 +187,7 @@ def main(argv=None): capa.helpers.log_unsupported_runtime_error() return -1 - capabilities, counts = capa.main.find_capabilities(rules, extractor) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index ef86d102d..fe02985c3 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -8,7 +8,7 @@ # See the License for the specific language governing permissions and limitations under the License. import textwrap -import capa.features.capabilities.common +import capa.capabilities.common def test_match_across_scopes_file_function(z9324d_extractor): @@ -74,7 +74,7 @@ def test_match_across_scopes_file_function(z9324d_extractor): ), ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "install service" in capabilities assert ".text section" in capabilities assert ".text section and install service" in capabilities @@ -142,7 +142,7 @@ def test_match_across_scopes(z9324d_extractor): ), ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "tight loop" in capabilities assert "kill thread loop" in capabilities assert "kill thread program" in capabilities @@ -170,7 +170,7 @@ def test_subscope_bb_rules(z9324d_extractor): ] ) # tight loop at 0x403685 - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "test rule" in capabilities @@ -194,7 +194,7 @@ def test_byte_matching(z9324d_extractor): ) ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "byte match test" in capabilities @@ -219,7 +219,7 @@ def test_count_bb(z9324d_extractor): ) ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "count bb" in capabilities @@ -246,7 +246,7 @@ def test_instruction_scope(z9324d_extractor): ) ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000" in capabilities assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} @@ -278,6 +278,6 @@ def test_instruction_subscope(z9324d_extractor): ) ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000 on i386" in capabilities assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} From d5ae2ffd9148c41be71b9c4246e387a4c369d593 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 10:15:20 +0200 Subject: [PATCH 09/14] capa.capabilities: move `has_file_limitations()` from capa.main to the capabilities module --- capa/capabilities/common.py | 28 +++++++++++++++++++++++- capa/ghidra/capa_ghidra.py | 4 ++-- capa/ida/plugin/form.py | 2 +- capa/main.py | 28 +----------------------- scripts/show-capabilities-by-function.py | 2 +- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index 6098f789b..0563b5389 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -11,7 +11,7 @@ import collections from typing import Any, Tuple -from capa.rules import Scope, RuleSet +from capa.rules import Rule, Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor @@ -40,6 +40,32 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi return matches, len(file_features) +def is_file_limitation_rule(rule: Rule) -> bool: + return rule.meta.get("namespace", "") == "internal/limitation/file" + + +def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: + file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values())) + + for file_limitation_rule in file_limitation_rules: + if file_limitation_rule.name not in capabilities: + continue + + logger.warning("-" * 80) + for line in file_limitation_rule.meta.get("description", "").split("\n"): + logger.warning(" %s", line) + logger.warning(" Identified via rule: %s", file_limitation_rule.name) + if is_standalone: + logger.warning(" ") + logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.") + logger.warning("-" * 80) + + # bail on first file limitation + return True + + return False + + def find_capabilities( ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs ) -> Tuple[MatchResults, Any]: diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 72eae7cf3..70b98df56 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -80,7 +80,7 @@ def run_headless(): meta.analysis.library_functions = counts["library_functions"] meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities, is_standalone=True): + if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True): logger.info("capa encountered warnings during analysis") if args.json: @@ -130,7 +130,7 @@ def run_ui(): meta.analysis.library_functions = counts["library_functions"] meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False): logger.info("capa encountered warnings during analysis") if verbose == "vverbose": diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index f0a4e13e9..4e1bd572a 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -811,7 +811,7 @@ def slot_progress_feature_extraction(text): capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(ruleset, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: logger.exception("Failed to check for file limitations (error: %s)", e) diff --git a/capa/main.py b/capa/main.py index 8a6a398a3..540524334 100644 --- a/capa/main.py +++ b/capa/main.py @@ -84,7 +84,7 @@ FORMAT_RESULT, ) from capa.features.address import Address -from capa.capabilities.common import find_capabilities, find_file_capabilities +from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities from capa.features.extractors.base_extractor import ( SampleHashes, FeatureExtractor, @@ -144,32 +144,6 @@ def is_internal_rule(rule: Rule) -> bool: return rule.meta.get("namespace", "").startswith("internal/") -def is_file_limitation_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "") == "internal/limitation/file" - - -def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: - file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values())) - - for file_limitation_rule in file_limitation_rules: - if file_limitation_rule.name not in capabilities: - continue - - logger.warning("-" * 80) - for line in file_limitation_rule.meta.get("description", "").split("\n"): - logger.warning(" %s", line) - logger.warning(" Identified via rule: %s", file_limitation_rule.name) - if is_standalone: - logger.warning(" ") - logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.") - logger.warning("-" * 80) - - # bail on first file limitation - return True - - return False - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index e987b6801..421c6c7e1 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -192,7 +192,7 @@ def main(argv=None): meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities): + if capa.capabilities.common.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): From d6c5d98b0d99e0afff08b905df4abeb39dfeb2b6 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 10:16:09 +0200 Subject: [PATCH 10/14] move `is_file_limitation_rule()` to the rules module (Rule class) --- capa/capabilities/common.py | 8 ++------ capa/main.py | 4 ---- capa/rules/__init__.py | 6 ++++++ 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index 0563b5389..f20e26152 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -11,7 +11,7 @@ import collections from typing import Any, Tuple -from capa.rules import Rule, Scope, RuleSet +from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor @@ -40,12 +40,8 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi return matches, len(file_features) -def is_file_limitation_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "") == "internal/limitation/file" - - def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: - file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values())) + file_limitation_rules = list(filter(lambda r: r.is_file_limitation_rule(), rules.rules.values())) for file_limitation_rule in file_limitation_rules: if file_limitation_rule.name not in capabilities: diff --git a/capa/main.py b/capa/main.py index 540524334..1756513a6 100644 --- a/capa/main.py +++ b/capa/main.py @@ -140,10 +140,6 @@ def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespac ) -def is_internal_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "").startswith("internal/") - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 9b8af10b8..13dda29ec 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -869,6 +869,12 @@ def _extract_subscope_rules_rec(self, statement): for child in statement.get_children(): yield from self._extract_subscope_rules_rec(child) + def is_internal_rule(self) -> bool: + return self.meta.get("namespace", "").startswith("internal/") + + def is_file_limitation_rule(self) -> bool: + return self.meta.get("namespace", "") == "internal/limitation/file" + def is_subscope_rule(self): return bool(self.meta.get("capa/subscope-rule", False)) From ab06c94d80195a264c468455eacd096ad719cb2a Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 20:10:29 +0200 Subject: [PATCH 11/14] capa/main.py: move `has_rule_with_namespace()` to `capa.rules.RuleSet` --- capa/main.py | 6 ------ capa/rules/__init__.py | 7 ++++++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/capa/main.py b/capa/main.py index 1756513a6..47a95a577 100644 --- a/capa/main.py +++ b/capa/main.py @@ -134,12 +134,6 @@ def set_vivisect_log_level(level): logging.getLogger("Elf").setLevel(level) -def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespace: str) -> bool: - return any( - rules.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() - ) - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index c1f3696c2..6d60d4874 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -43,7 +43,7 @@ import capa.features.insn import capa.features.common import capa.features.basicblock -from capa.engine import Statement, FeatureSet +from capa.engine import Statement, FeatureSet, MatchResults from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature from capa.features.address import Address @@ -1622,6 +1622,11 @@ def filter_rules_by_meta(self, tag: str) -> "RuleSet": break return RuleSet(list(rules_filtered)) + def has_rule_with_namespace(self, capabilities: MatchResults, namespace: str) -> bool: + return any( + self.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() + ) + def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[FeatureSet, ceng.MatchResults]: """ match rules from this ruleset at the given scope against the given features. From 3572b512d92a181f716e31f43005ea08f2d851f4 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 20:11:08 +0200 Subject: [PATCH 12/14] test_capabilities.py: add missing `test_com_feature_matching()` test --- tests/test_capabilities.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index fe02985c3..ddc7f6c3f 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -198,6 +198,32 @@ def test_byte_matching(z9324d_extractor): assert "byte match test" in capabilities +def test_com_feature_matching(z395eb_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: initialize IWebBrowser2 + scopes: + static: basic block + dynamic: unsupported + features: + - and: + - api: ole32.CoCreateInstance + - com/class: InternetExplorer #bytes: 01 DF 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_InternetExplorer + - com/interface: IWebBrowser2 #bytes: 61 16 0C D3 AF CD D0 11 8A 3E 00 C0 4F C9 E2 6E = IID_IWebBrowser2 + """ + ) + ) + ] + ) + capabilities, meta = capa.main.find_capabilities(rules, z395eb_extractor) + assert "initialize IWebBrowser2" in capabilities + + def test_count_bb(z9324d_extractor): rules = capa.rules.RuleSet( [ From a0cec3f07d266ba98daaa70aa5bbdb927be2718a Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 26 Oct 2023 19:41:09 +0200 Subject: [PATCH 13/14] capa.rules: remove redundant `is_internal_rule()` and `has_file_limitations()` from capa source code --- capa/rules/__init__.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 6d60d4874..52b205963 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -940,9 +940,6 @@ def _extract_subscope_rules_rec(self, statement): for child in statement.get_children(): yield from self._extract_subscope_rules_rec(child) - def is_internal_rule(self) -> bool: - return self.meta.get("namespace", "").startswith("internal/") - def is_file_limitation_rule(self) -> bool: return self.meta.get("namespace", "") == "internal/limitation/file" @@ -1622,11 +1619,6 @@ def filter_rules_by_meta(self, tag: str) -> "RuleSet": break return RuleSet(list(rules_filtered)) - def has_rule_with_namespace(self, capabilities: MatchResults, namespace: str) -> bool: - return any( - self.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() - ) - def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[FeatureSet, ceng.MatchResults]: """ match rules from this ruleset at the given scope against the given features. From e559cc27d55ec940c3ed40f422e673f2670d1919 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 26 Oct 2023 19:43:26 +0200 Subject: [PATCH 14/14] capa.rules: remove redundant `ceng.MatchResults` import --- capa/rules/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 52b205963..bb6ab5a18 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -43,7 +43,7 @@ import capa.features.insn import capa.features.common import capa.features.basicblock -from capa.engine import Statement, FeatureSet, MatchResults +from capa.engine import Statement, FeatureSet from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature from capa.features.address import Address