Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop script to highlight features unused during matching #1683

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### New Features
- ELF: implement file import and export name extractor #1607 @Aayush-Goel-04
- develop script to highlight the features that are not used during matching #331 @Aayush-Goel-04

### Breaking Changes

Expand Down
16 changes: 16 additions & 0 deletions capa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ def replace_child(self, existing, new):
if child is existing:
children[i] = new

def get_all_features(self) -> Set[Feature]:
"""
recursively extracts all feature statements from a given rule statement.

returns:
set: A set of all feature statements contained within the given feature statement.
"""
feature_set: set = set()
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved

for child in self.get_children():
if isinstance(child, Statement):
feature_set.update(child.get_all_features())
else:
feature_set.add(child)
return feature_set


class And(Statement):
"""
Expand Down
42 changes: 13 additions & 29 deletions scripts/detect_duplicate_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,37 @@
import sys
import logging
import argparse
from typing import Set
from pathlib import Path

import capa.main
import capa.rules
import capa.engine as ceng
from capa.features.common import Feature

logger = logging.getLogger("detect_duplicate_features")


def get_child_features(feature: ceng.Statement) -> list:
"""
Recursively extracts all feature statements from a given rule statement.

Args:
feature (capa.engine.Statement): The feature statement to extract features from.

Returns:
list: A list of all feature statements contained within the given feature statement.
"""
children = []

if isinstance(feature, (ceng.And, ceng.Or, ceng.Some)):
for child in feature.children:
children.extend(get_child_features(child))
elif isinstance(feature, (ceng.Subscope, ceng.Range, ceng.Not)):
children.extend(get_child_features(feature.child))
else:
children.append(feature)
return children


def get_features(rule_path: str) -> list:
def get_features(rule_path: str) -> Set[Feature]:
"""
Extracts all features from a given rule file.

Args:
rule_path (str): The path to the rule file to extract features from.

Returns:
list: A list of all feature statements contained within the rule file.
set: A set of all feature statements contained within the rule file.
"""
feature_list = []
with Path(rule_path).open("r", encoding="utf-8") as f:
try:
new_rule = capa.rules.Rule.from_yaml(f.read())
feature_list = get_child_features(new_rule.statement)
if isinstance(new_rule.statement, ceng.Statement):
return new_rule.statement.get_all_features()
else:
return (new_rule.statement,)
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
logger.error("Error: New rule %s %s %s", rule_path, str(type(e)), str(e))
sys.exit(-1)
return feature_list


def find_overlapping_rules(new_rule_path, rules_path):
Expand All @@ -67,15 +48,18 @@ def find_overlapping_rules(new_rule_path, rules_path):

# Loads features of new rule in a list.
new_rule_features = get_features(new_rule_path)

count = 0
overlapping_rules = []

# capa.rules.RuleSet stores all rules in given paths
ruleset = capa.main.get_rules(rules_path)

for rule_name, rule in ruleset.rules.items():
rule_features = get_child_features(rule.statement)
rule_features = set()
if isinstance(rule.statement, ceng.Statement):
rule_features = rule.statement.get_all_features()
else:
rule_features.add(rule.statement)
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved

if not len(rule_features):
continue
Expand Down
223 changes: 223 additions & 0 deletions scripts/show-unused-features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
#!/usr/bin/env python2
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved
"""
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import logging
import argparse
from typing import Set, Tuple
from pathlib import Path

import tabulate
from termcolor import colored

import capa.main
import capa.rules
import capa.engine as ceng
import capa.helpers
import capa.features
import capa.exceptions
import capa.render.verbose as v
import capa.features.freeze
import capa.features.address
import capa.features.extractors.pefile
import capa.features.extractors.base_extractor
from capa.helpers import log_unsupported_runtime_error
from capa.features.common import Feature
from capa.features.extractors.base_extractor import FunctionHandle

logger = logging.getLogger("capa.show-features")
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved


def format_address(addr: capa.features.address.Address) -> str:
return v.format_address(capa.features.freeze.Address.from_capa((addr)))


def get_rules_feature_set(rules_path) -> Set[Feature]:
ruleset = capa.main.get_rules(rules_path)
rules_feature_set = set()
for _, rule in ruleset.rules.items():
rule_features: set = set()
if isinstance(rule.statement, ceng.Statement):
rule_features.update(rule.statement.get_all_features())
else:
rule_features.add(rule.statement)

rules_feature_set.update(rule_features)
return rules_feature_set


def get_file_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor):
feature_map: dict = {}

for f in functions:
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function %s (%s)", format_address(f.address), function_name)
continue

for feature, _ in extractor.extract_function_features(f):
if capa.features.common.is_global_feature(feature):
continue
feature_map[feature] = feature_map.get(feature, 0) + 1

for bb in extractor.get_basic_blocks(f):
for feature, _ in extractor.extract_basic_block_features(f, bb):
if capa.features.common.is_global_feature(feature):
continue
feature_map[feature] = feature_map.get(feature, 0) + 1

for insn in extractor.get_instructions(f, bb):
for feature, _ in extractor.extract_insn_features(f, bb, insn):
if capa.features.common.is_global_feature(feature):
continue
feature_map[feature] = feature_map.get(feature, 0) + 1
return feature_map


def main(argv=None):
if argv is None:
argv = sys.argv[1:]

parser = argparse.ArgumentParser(description="Show the features that capa extracts from the given sample")
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend", "rules"})

parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)

if args.function and args.backend == "pefile":
print("pefile backend does not support extracting function features")
return -1

try:
taste = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1

try:
sig_paths = capa.main.get_signatures(args.signatures)
except IOError as e:
logger.error("%s", str(e))
return -1

if (args.format == "freeze") or (
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
else:
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
extractor = capa.main.get_extractor(
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
)
except capa.exceptions.UnsupportedFormatError:
capa.helpers.log_unsupported_format_error()
return -1
except capa.exceptions.UnsupportedRuntimeError:
log_unsupported_runtime_error()
return -1

feature_map: dict = {}
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved

for feature, _ in extractor.extract_global_features():
feature_map[feature] = feature_map.get(feature, 0) + 1

function_handles: Tuple[FunctionHandle, ...]
if isinstance(extractor, capa.features.extractors.pefile.PefileFeatureExtractor):
# pefile extractor doesn't extract function features
function_handles = ()
else:
function_handles = tuple(extractor.get_functions())

if args.function:
if args.format == "freeze":
function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles))
else:
function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles))

if args.function not in [format_address(fh.address) for fh in function_handles]:
print(f"{args.function} not a function")
return -1

if len(function_handles) == 0:
print(f"{args.function} not a function")
return -1

Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved
feature_map.update(get_file_features(function_handles, extractor))

rules_feature_set = get_rules_feature_set(args.rules)

highlight_unused_features(feature_map, rules_feature_set)
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved
return 0


def ida_main():
import idc

import capa.main
import capa.features.extractors.ida.extractor

function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START)
print(f"getting features for current function {hex(function)}")

extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
feature_map: dict = {}

if not function:
for feature, _ in extractor.extract_file_features():
feature_map[feature] = feature_map.get(feature, 0) + 1
return

function_handles = tuple(extractor.get_functions())

if function:
function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles))

if len(function_handles) == 0:
print(f"{hex(function)} not a function")
return -1

feature_map.update(get_file_features(function_handles, extractor))

rules_path = capa.main.get_default_root() / "rules"
rules_feature_set = get_rules_feature_set([rules_path])

highlight_unused_features(feature_map, rules_feature_set)

return 0


def highlight_unused_features(feature_map: dict, rules_feature_set: set):
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved
unused_features = []
for feature, count in feature_map.items():
if feature in rules_feature_set:
continue
unused_features.append((get_colored(str(count)), get_colored(feature.__str__())))
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved
print("\n")
print(tabulate.tabulate(unused_features, headers=["Count", "Feature"], tablefmt="plain"))
print("\n")


def get_colored(s: str):
if "(" in s:
s_split = s.split("(")
s_split[0] = colored(s_split[0], "cyan")
return "(".join(s_split)
else:
return colored(s, "cyan")
Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved


if __name__ == "__main__":
if capa.helpers.is_runtime_ida():
ida_main()
else:
sys.exit(main())