diff --git a/CHANGELOG.md b/CHANGELOG.md index b17b00c5a..733542c60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,10 @@ ### New Features - add protobuf format for result documents #1219 @williballenthin @mr-tz - - extractor: add Binary Ninja feature extractor @xusheng6 - new cli flag `--os` to override auto-detected operating system for a sample @captainGeech42 - Change colour/highlight to "cyan" instead of "blue" for easy noticing.#1384 @ggold7046 +- add new format to parse output json back to capa #1396 @ooprathamm ### Breaking Changes diff --git a/capa/features/common.py b/capa/features/common.py index 062c27fa2..5060ebaa4 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -450,6 +450,7 @@ def evaluate(self, ctx, **kwargs): FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" FORMAT_FREEZE = "freeze" +FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index 3c1145ce3..39411bde8 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -12,11 +12,14 @@ import capa.features.extractors.strings from capa.features.common import ( OS, + OS_ANY, OS_AUTO, + ARCH_ANY, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, FORMAT_FREEZE, + FORMAT_RESULT, Arch, Format, String, @@ -27,6 +30,11 @@ logger = logging.getLogger(__name__) +# match strings for formats +MATCH_PE = b"MZ" +MATCH_ELF = b"\x7fELF" +MATCH_RESULT = b'{"meta":' + def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]: """ @@ -40,12 +48,14 @@ def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]: def extract_format(buf) -> Iterator[Tuple[Feature, Address]]: - if buf.startswith(b"MZ"): + if buf.startswith(MATCH_PE): yield Format(FORMAT_PE), NO_ADDRESS - elif buf.startswith(b"\x7fELF"): + elif buf.startswith(MATCH_ELF): yield Format(FORMAT_ELF), NO_ADDRESS elif is_freeze(buf): yield Format(FORMAT_FREEZE), NO_ADDRESS + elif buf.startswith(MATCH_RESULT): + yield Format(FORMAT_RESULT), NO_ADDRESS else: # we likely end up here: # 1. handling a file format (e.g. macho) @@ -56,10 +66,13 @@ def extract_format(buf) -> Iterator[Tuple[Feature, Address]]: def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]: - if buf.startswith(b"MZ"): + if buf.startswith(MATCH_PE): yield from capa.features.extractors.pefile.extract_file_arch(pe=pefile.PE(data=buf)) + + elif buf.startswith(MATCH_RESULT): + yield Arch(ARCH_ANY), NO_ADDRESS - elif buf.startswith(b"\x7fELF"): + elif buf.startswith(MATCH_ELF): with contextlib.closing(io.BytesIO(buf)) as f: arch = capa.features.extractors.elf.detect_elf_arch(f) @@ -88,9 +101,11 @@ def extract_os(buf, os=OS_AUTO) -> Iterator[Tuple[Feature, Address]]: if os != OS_AUTO: yield OS(os), NO_ADDRESS - if buf.startswith(b"MZ"): + if buf.startswith(MATCH_PE): yield OS(OS_WINDOWS), NO_ADDRESS - elif buf.startswith(b"\x7fELF"): + elif buf.startswith(MATCH_RESULT): + yield OS(OS_ANY), NO_ADDRESS + elif buf.startswith(MATCH_ELF): with contextlib.closing(io.BytesIO(buf)) as f: os = capa.features.extractors.elf.detect_elf_os(f) diff --git a/capa/main.py b/capa/main.py index 70e086c25..c033a77f2 100644 --- a/capa/main.py +++ b/capa/main.py @@ -69,6 +69,7 @@ FORMAT_SC64, FORMAT_DOTNET, FORMAT_FREEZE, + FORMAT_RESULT, ) from capa.features.address import NO_ADDRESS, Address from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor @@ -1180,8 +1181,10 @@ def main(argv=None): if not (args.verbose or args.vverbose or args.json): logger.debug("file limitation short circuit, won't analyze fully.") return E_FILE_LIMITATION - - if format_ == FORMAT_FREEZE: + if format_ == FORMAT_RESULT: + result_doc = capa.render.result_document.ResultDocument.parse_file(args.sample) + meta, capabilities = result_doc.to_capa() + elif format_ == FORMAT_FREEZE: with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: @@ -1217,17 +1220,18 @@ def main(argv=None): log_unsupported_os_error() return E_INVALID_FILE_OS - meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor) + if format_ != FORMAT_RESULT: + meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor) - capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) - meta["analysis"].update(counts) - meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities) + capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) + meta["analysis"].update(counts) + meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities) - if has_file_limitation(rules, capabilities): - # bail if capa encountered file limitation e.g. a packed binary - # do show the output in verbose mode, though. - if not (args.verbose or args.vverbose or args.json): - return E_FILE_LIMITATION + if has_file_limitation(rules, capabilities): + # bail if capa encountered file limitation e.g. a packed binary + # do show the output in verbose mode, though. + if not (args.verbose or args.vverbose or args.json): + return E_FILE_LIMITATION if args.json: print(capa.render.json.render(meta, rules, capabilities)) diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 960635f0a..cba62258b 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -6,7 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import datetime -from typing import Any, Dict, Tuple, Union, Optional +from typing import Any, Dict, List, Tuple, Union, Optional from pydantic import Field, BaseModel @@ -125,6 +125,41 @@ def from_capa(cls, meta: Any) -> "Metadata": ), ) + def to_capa(self) -> Dict[str, Any]: + capa_meta = { + "timestamp": self.timestamp.isoformat(), + "version": self.version, + "sample": { + "md5": self.sample.md5, + "sha1": self.sample.sha1, + "sha256": self.sample.sha256, + "path": self.sample.path, + }, + "analysis": { + "format": self.analysis.format, + "arch": self.analysis.arch, + "os": self.analysis.os, + "extractor": self.analysis.extractor, + "rules": self.analysis.rules, + "base_address": self.analysis.base_address.to_capa(), + "layout": { + "functions": { + f.address.to_capa(): { + "matched_basic_blocks": [bb.address.to_capa() for bb in f.matched_basic_blocks] + } + for f in self.analysis.layout.functions + } + }, + "feature_counts": { + "file": self.analysis.feature_counts.file, + "functions": {fc.address.to_capa(): fc.count for fc in self.analysis.feature_counts.functions}, + }, + "library_functions": {lf.address.to_capa(): lf.name for lf in self.analysis.library_functions}, + }, + } + + return capa_meta + class CompoundStatementType: AND = "and" @@ -543,3 +578,38 @@ def from_capa(cls, meta, rules: RuleSet, capabilities: MatchResults) -> "ResultD ) return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches) + + def to_capa(self) -> Tuple[Dict, Dict]: + meta = self.meta.to_capa() + capabilities: Dict[str, List[Tuple[frz.Address, capa.features.common.Result]]] = {} + + for rule_name, rule_match in self.rules.items(): + # Parse the YAML source into a Rule instance + rule = capa.rules.Rule.from_yaml(rule_match.source) + + # Extract the capabilities from the RuleMatches object + for addr, match in rule_match.matches: + if isinstance(match.node, StatementNode): + if isinstance(match.node.statement, CompoundStatement): + statement = rule.statement + else: + statement = statement_from_capa(match.node.statement) + elif isinstance(match.node, FeatureNode): + statement = match.node.feature.to_capa() + if isinstance(statement, (capa.features.common.String, capa.features.common.Regex)): + statement.matches = match.captures + else: + raise ValueError("Invalid node type") + + result = capa.features.common.Result( + statement=statement, + success=match.success, + locations=[frz.Address.to_capa(loc) for loc in match.locations], + children=[], + ) + + if rule_name not in capabilities: + capabilities[rule_name] = [] + capabilities[rule_name].append((frz.Address.from_capa(addr), result)) + + return meta, capabilities diff --git a/tests/fixtures.py b/tests/fixtures.py index 51e1c0aff..f5ac7b301 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -241,6 +241,8 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "kernel32-64.dll_") elif name == "pma01-01": return os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll_") + elif name == "pma01-01-rd": + return os.path.join(CD, "data", "rd", "Practical Malware Analysis Lab 01-01.dll_.json") elif name == "pma12-04": return os.path.join(CD, "data", "Practical Malware Analysis Lab 12-04.exe_") elif name == "pma16-01": diff --git a/tests/test_result_document.py b/tests/test_result_document.py index 87e62911f..c329c2559 100644 --- a/tests/test_result_document.py +++ b/tests/test_result_document.py @@ -8,6 +8,7 @@ import copy import pytest +import fixtures from fixtures import * import capa @@ -268,3 +269,14 @@ def assert_round_trip(rd: rdoc.ResultDocument): def test_round_trip(request, rd_file): rd: rdoc.ResultDocument = request.getfixturevalue(rd_file) assert_round_trip(rd) + + +def test_json_to_rdoc(): + path = fixtures.get_data_path_by_name("pma01-01-rd") + assert isinstance(rdoc.ResultDocument.parse_file(path), rdoc.ResultDocument) + + +def test_rdoc_to_capa(): + path = fixtures.get_data_path_by_name("pma01-01-rd") + assert len(rdoc.ResultDocument.parse_file(path).to_capa()) == 2 + assert isinstance(rdoc.ResultDocument.parse_file(path).to_capa(), tuple)