From cbe83ddad4ff2887830802a3a9d99b30171bce3f Mon Sep 17 00:00:00 2001 From: Mike Hunhoff Date: Fri, 31 May 2024 14:49:20 -0400 Subject: [PATCH] binexport: add typing where applicable (#2106) --- .../extractors/binexport2/__init__.py | 94 +++++++-------- .../extractors/binexport2/basicblock.py | 9 +- .../extractors/binexport2/extractor.py | 41 +++---- capa/features/extractors/binexport2/file.py | 12 +- .../extractors/binexport2/function.py | 42 +++---- .../features/extractors/binexport2/helpers.py | 3 +- capa/features/extractors/binexport2/insn.py | 112 ++++++++++-------- 7 files changed, 163 insertions(+), 150 deletions(-) diff --git a/capa/features/extractors/binexport2/__init__.py b/capa/features/extractors/binexport2/__init__.py index e3c9add5e..d6fe87ae9 100644 --- a/capa/features/extractors/binexport2/__init__.py +++ b/capa/features/extractors/binexport2/__init__.py @@ -26,12 +26,13 @@ import capa.features.extractors.common import capa.features.extractors.binexport2.helpers from capa.features.extractors.binexport2.binexport2_pb2 import BinExport2 +from capa.features.extractors.binexport2.binexport2_pb2.BinExport2 import CallGraph, FlowGraph logger = logging.getLogger(__name__) def get_binexport2(sample: Path) -> BinExport2: - be2 = BinExport2() + be2: BinExport2 = BinExport2() be2.ParseFromString(sample.read_bytes()) return be2 @@ -54,15 +55,15 @@ def get_sample_from_binexport2(input_file: Path, be2: BinExport2, search_paths: searches in the same directory as the BinExport2 file, and then in search_paths. """ - def filename_similarity_key(p: Path): + def filename_similarity_key(p: Path) -> Tuple[int, str]: # note closure over input_file. # sort first by length of common prefix, then by name (for stability) return (compute_common_prefix_length(p.name, input_file.name), p.name) - wanted_sha256 = be2.meta_information.executable_id.lower() + wanted_sha256: str = be2.meta_information.executable_id.lower() - input_directory = input_file.parent - siblings = [p for p in input_directory.iterdir() if p.is_file()] + input_directory: Path = input_file.parent + siblings: List[Path] = [p for p in input_directory.iterdir() if p.is_file()] siblings.sort(key=filename_similarity_key, reverse=True) for sibling in siblings: # e.g. with open IDA files in the same directory on Windows @@ -71,7 +72,7 @@ def filename_similarity_key(p: Path): return sibling for search_path in search_paths: - candidates = [p for p in search_path.iterdir() if p.is_file()] + candidates: List[Path] = [p for p in search_path.iterdir() if p.is_file()] candidates.sort(key=filename_similarity_key, reverse=True) for candidate in candidates: with contextlib.suppress(PermissionError): @@ -83,7 +84,7 @@ def filename_similarity_key(p: Path): class BinExport2Index: def __init__(self, be2: BinExport2): - self.be2 = be2 + self.be2: BinExport2 = be2 self.callers_by_vertex_index: Dict[int, List[int]] = defaultdict(list) self.callees_by_vertex_index: Dict[int, List[int]] = defaultdict(list) @@ -93,9 +94,9 @@ def __init__(self, be2: BinExport2): self.flow_graph_address_by_index: Dict[int, int] = {} # edges that come from the given basic block - self.source_edges_by_basic_block_index: Dict[int, List[BinExport2.FlowGraph.Edge]] = defaultdict(list) + self.source_edges_by_basic_block_index: Dict[int, List[FlowGraph.Edge]] = defaultdict(list) # edges that end up at the given basic block - self.target_edges_by_basic_block_index: Dict[int, List[BinExport2.FlowGraph.Edge]] = defaultdict(list) + self.target_edges_by_basic_block_index: Dict[int, List[FlowGraph.Edge]] = defaultdict(list) self.vertex_index_by_address: Dict[int, int] = {} @@ -119,9 +120,8 @@ def get_insn_address(self, insn_index: int) -> int: return self.insn_address_by_index[insn_index] def get_basic_block_address(self, basic_block_index: int) -> int: - basic_block = self.be2.basic_block[basic_block_index] - first_instruction_index = next(self.instruction_indices(basic_block)) - + basic_block: BinExport2.BasicBlock = self.be2.basic_block[basic_block_index] + first_instruction_index: int = next(self.instruction_indices(basic_block)) return self.get_insn_address(first_instruction_index) def _index_vertex_edges(self): @@ -136,7 +136,7 @@ def _index_vertex_edges(self): def _index_flow_graph_nodes(self): for flow_graph_index, flow_graph in enumerate(self.be2.flow_graph): - function_address = self.get_basic_block_address(flow_graph.entry_basic_block_index) + function_address: int = self.get_basic_block_address(flow_graph.entry_basic_block_index) self.flow_graph_index_by_address[function_address] = flow_graph_index self.flow_graph_address_by_index[flow_graph_index] = function_address @@ -154,7 +154,7 @@ def _index_call_graph_vertices(self): if not vertex.HasField("address"): continue - vertex_address = vertex.address + vertex_address: int = vertex.address self.vertex_index_by_address[vertex_address] = vertex_index def _index_data_references(self): @@ -177,8 +177,8 @@ def _index_insn_addresses(self): assert self.be2.instruction[0].HasField("address"), "first insn must have explicit address" - addr = 0 - next_addr = 0 + addr: int = 0 + next_addr: int = 0 for idx, insn in enumerate(self.be2.instruction): if insn.HasField("address"): addr = insn.address @@ -208,14 +208,14 @@ def basic_block_instructions( the instruction instances, and their addresses. """ for instruction_index in self.instruction_indices(basic_block): - instruction = self.be2.instruction[instruction_index] - instruction_address = self.get_insn_address(instruction_index) + instruction: BinExport2.Instruction = self.be2.instruction[instruction_index] + instruction_address: int = self.get_insn_address(instruction_index) yield instruction_index, instruction, instruction_address def get_function_name_by_vertex(self, vertex_index: int) -> str: - vertex = self.be2.call_graph.vertex[vertex_index] - name = f"sub_{vertex.address:x}" + vertex: CallGraph.Vertex = self.be2.call_graph.vertex[vertex_index] + name: str = f"sub_{vertex.address:x}" if vertex.HasField("mangled_name"): name = vertex.mangled_name @@ -223,7 +223,7 @@ def get_function_name_by_vertex(self, vertex_index: int) -> str: name = vertex.demangled_name if vertex.HasField("library_index"): - library = self.be2.library[vertex.library_index] + library: BinExport2.Library = self.be2.library[vertex.library_index] if library.HasField("name"): name = f"{library.name}!{name}" @@ -233,15 +233,15 @@ def get_function_name_by_address(self, address: int) -> str: if address not in self.vertex_index_by_address: return "" - vertex_index = self.vertex_index_by_address[address] + vertex_index: int = self.vertex_index_by_address[address] return self.get_function_name_by_vertex(vertex_index) class BinExport2Analysis: def __init__(self, be2: BinExport2, idx: BinExport2Index, buf: bytes): - self.be2 = be2 - self.idx = idx - self.buf = buf + self.be2: BinExport2 = be2 + self.idx: BinExport2Index = idx + self.buf: bytes = buf self.base_address: int = 0 self.thunks: Dict[int, int] = {} @@ -249,7 +249,9 @@ def __init__(self, be2: BinExport2, idx: BinExport2Index, buf: bytes): self._compute_thunks() def _find_base_address(self): - sections_with_perms = filter(lambda s: s.flag_r or s.flag_w or s.flag_x, self.be2.section) + sections_with_perms: Iterator[BinExport2.Section] = filter( + lambda s: s.flag_r or s.flag_w or s.flag_x, self.be2.section + ) # assume the lowest address is the base address. # this works as long as BinExport doesn't record other # libraries mapped into memory. @@ -259,15 +261,13 @@ def _find_base_address(self): def _compute_thunks(self): for addr, idx in self.idx.vertex_index_by_address.items(): - vertex = self.be2.call_graph.vertex[idx] - if not capa.features.extractors.binexport2.helpers.is_vertex_type( - vertex, BinExport2.CallGraph.Vertex.Type.THUNK - ): + vertex: CallGraph.Vertex = self.be2.call_graph.vertex[idx] + if not capa.features.extractors.binexport2.helpers.is_vertex_type(vertex, CallGraph.Vertex.Type.THUNK): continue - curr_idx = idx + curr_idx: int = idx for _ in range(capa.features.common.THUNK_CHAIN_DEPTH_DELTA): - thunk_callees = self.idx.callees_by_vertex_index[curr_idx] + thunk_callees: List[int] = self.idx.callees_by_vertex_index[curr_idx] # if this doesn't hold, then it doesn't seem like this is a thunk, # because either, len is: # 0 and the thunk doesn't point to anything, or @@ -280,11 +280,11 @@ def _compute_thunks(self): assert len(thunk_callees) == 1 - thunked_idx = thunk_callees[0] - thunked_vertex = self.be2.call_graph.vertex[thunked_idx] + thunked_idx: int = thunk_callees[0] + thunked_vertex: CallGraph.Vertex = self.be2.call_graph.vertex[thunked_idx] if not capa.features.extractors.binexport2.helpers.is_vertex_type( - thunked_vertex, BinExport2.CallGraph.Vertex.Type.THUNK + thunked_vertex, CallGraph.Vertex.Type.THUNK ): assert thunked_vertex.HasField("address") @@ -321,21 +321,21 @@ class AddressSpace: memory_regions: Tuple[MemoryRegion, ...] def read_memory(self, address: int, length: int) -> bytes: - rva = address - self.base_address + rva: int = address - self.base_address for region in self.memory_regions: if region.contains(rva): - offset = rva - region.address + offset: int = rva - region.address return region.buf[offset : offset + length] raise AddressNotMappedError(address) @classmethod def from_pe(cls, pe: PE, base_address: int): - regions = [] + regions: List[MemoryRegion] = [] for section in pe.sections: - address = section.VirtualAddress - size = section.Misc_VirtualSize - buf = section.get_data() + address: int = section.VirtualAddress + size: int = section.Misc_VirtualSize + buf: bytes = section.get_data() if len(buf) != size: # pad the section with NULLs @@ -349,16 +349,16 @@ def from_pe(cls, pe: PE, base_address: int): @classmethod def from_elf(cls, elf: ELFFile, base_address: int): - regions = [] + regions: List[MemoryRegion] = [] # ELF segments are for runtime data, # ELF sections are for link-time data. for segment in elf.iter_segments(): # assume p_align is consistent with addresses here. # otherwise, should harden this loader. - segment_rva = segment.header.p_vaddr - segment_size = segment.header.p_memsz - segment_data = segment.data() + segment_rva: int = segment.header.p_vaddr + segment_size: int = segment.header.p_memsz + segment_data: bytes = segment.data() if len(segment_data) < segment_size: # pad the section with NULLs @@ -373,10 +373,10 @@ def from_elf(cls, elf: ELFFile, base_address: int): @classmethod def from_buf(cls, buf: bytes, base_address: int): if buf.startswith(capa.features.extractors.common.MATCH_PE): - pe = PE(data=buf) + pe: PE = PE(data=buf) return cls.from_pe(pe, base_address) elif buf.startswith(capa.features.extractors.common.MATCH_ELF): - elf = ELFFile(io.BytesIO(buf)) + elf: ELFFile = ELFFile(io.BytesIO(buf)) return cls.from_elf(elf, base_address) else: raise NotImplementedError("file format address space") diff --git a/capa/features/extractors/binexport2/basicblock.py b/capa/features/extractors/binexport2/basicblock.py index 4674791f7..f44754f58 100644 --- a/capa/features/extractors/binexport2/basicblock.py +++ b/capa/features/extractors/binexport2/basicblock.py @@ -6,13 +6,14 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from typing import Tuple, Iterator +from typing import List, Tuple, Iterator from capa.features.common import Feature, Characteristic from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.basicblock import BasicBlock from capa.features.extractors.binexport2 import FunctionContext, BasicBlockContext from capa.features.extractors.base_extractor import BBHandle, FunctionHandle +from capa.features.extractors.binexport2.binexport2_pb2.BinExport2 import FlowGraph def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: @@ -21,10 +22,10 @@ def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[F idx = fhi.ctx.idx - basic_block_index = bbi.basic_block_index - target_edges = idx.target_edges_by_basic_block_index[basic_block_index] + basic_block_index: int = bbi.basic_block_index + target_edges: List[FlowGraph.Edge] = idx.target_edges_by_basic_block_index[basic_block_index] if basic_block_index in (e.source_basic_block_index for e in target_edges): - basic_block_address = idx.get_basic_block_address(basic_block_index) + basic_block_address: int = idx.get_basic_block_address(basic_block_index) yield Characteristic("tight loop"), AbsoluteVirtualAddress(basic_block_address) diff --git a/capa/features/extractors/binexport2/extractor.py b/capa/features/extractors/binexport2/extractor.py index fad1d6927..665d0bda1 100644 --- a/capa/features/extractors/binexport2/extractor.py +++ b/capa/features/extractors/binexport2/extractor.py @@ -34,6 +34,7 @@ StaticFeatureExtractor, ) from capa.features.extractors.binexport2.binexport2_pb2 import BinExport2 +from capa.features.extractors.binexport2.binexport2_pb2.BinExport2 import CallGraph logger = logging.getLogger(__name__) @@ -41,12 +42,12 @@ class BinExport2FeatureExtractor(StaticFeatureExtractor): def __init__(self, be2: BinExport2, buf: bytes): super().__init__(hashes=SampleHashes.from_bytes(buf)) - self.be2 = be2 - self.buf = buf - self.idx = BinExport2Index(self.be2) - self.analysis = BinExport2Analysis(self.be2, self.idx, self.buf) - address_space = AddressSpace.from_buf(buf, self.analysis.base_address) - self.ctx = AnalysisContext(self.buf, self.be2, self.idx, self.analysis, address_space) + self.be2: BinExport2 = be2 + self.buf: bytes = buf + self.idx: BinExport2Index = BinExport2Index(self.be2) + self.analysis: BinExport2Analysis = BinExport2Analysis(self.be2, self.idx, self.buf) + address_space: AddressSpace = AddressSpace.from_buf(buf, self.analysis.base_address) + self.ctx: AnalysisContext = AnalysisContext(self.buf, self.be2, self.idx, self.analysis, address_space) self.global_features: List[Tuple[Feature, Address]] = [] self.global_features.extend(list(capa.features.extractors.common.extract_format(self.buf))) @@ -57,27 +58,25 @@ def __init__(self, be2: BinExport2, buf: bytes): # and gradually relax restrictions as they're tested. # https://github.com/mandiant/capa/issues/1755 - def get_base_address(self): + def get_base_address(self) -> AbsoluteVirtualAddress: return AbsoluteVirtualAddress(self.analysis.base_address) - def extract_global_features(self): + def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: yield from self.global_features - def extract_file_features(self): + def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]: yield from capa.features.extractors.binexport2.file.extract_features(self.be2, self.buf) def get_functions(self) -> Iterator[FunctionHandle]: for flow_graph_index, flow_graph in enumerate(self.be2.flow_graph): - entry_basic_block_index = flow_graph.entry_basic_block_index - flow_graph_address = self.idx.get_basic_block_address(entry_basic_block_index) + entry_basic_block_index: int = flow_graph.entry_basic_block_index + flow_graph_address: int = self.idx.get_basic_block_address(entry_basic_block_index) - vertex_idx = self.idx.vertex_index_by_address[flow_graph_address] - be2_vertex = self.be2.call_graph.vertex[vertex_idx] + vertex_idx: int = self.idx.vertex_index_by_address[flow_graph_address] + be2_vertex: CallGraph.Vertex = self.be2.call_graph.vertex[vertex_idx] # skip thunks - if capa.features.extractors.binexport2.helpers.is_vertex_type( - be2_vertex, BinExport2.CallGraph.Vertex.Type.THUNK - ): + if capa.features.extractors.binexport2.helpers.is_vertex_type(be2_vertex, CallGraph.Vertex.Type.THUNK): continue yield FunctionHandle( @@ -90,11 +89,11 @@ def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Featur def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: fhi: FunctionContext = fh.inner - flow_graph_index = fhi.flow_graph_index - flow_graph = self.be2.flow_graph[flow_graph_index] + flow_graph_index: int = fhi.flow_graph_index + flow_graph: BinExport2.FlowGraph = self.be2.flow_graph[flow_graph_index] for basic_block_index in flow_graph.basic_block_index: - basic_block_address = self.idx.get_basic_block_address(basic_block_index) + basic_block_address: int = self.idx.get_basic_block_address(basic_block_index) yield BBHandle( address=AbsoluteVirtualAddress(basic_block_address), inner=BasicBlockContext(basic_block_index), @@ -112,5 +111,7 @@ def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHa inner=InstructionContext(instruction_index), ) - def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle): + def extract_insn_features( + self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle + ) -> Iterator[Tuple[Feature, Address]]: yield from capa.features.extractors.binexport2.insn.extract_features(fh, bbh, ih) diff --git a/capa/features/extractors/binexport2/file.py b/capa/features/extractors/binexport2/file.py index a6ee7ce93..9d9872bc2 100644 --- a/capa/features/extractors/binexport2/file.py +++ b/capa/features/extractors/binexport2/file.py @@ -25,10 +25,10 @@ def extract_file_export_names(_be2: BinExport2, buf: bytes) -> Iterator[Tuple[Feature, Address]]: if buf.startswith(capa.features.extractors.common.MATCH_PE): - pe = pefile.PE(data=buf) + pe: pefile.PE = pefile.PE(data=buf) yield from capa.features.extractors.pefile.extract_file_export_names(pe) elif buf.startswith(capa.features.extractors.common.MATCH_ELF): - elf = ELFFile(io.BytesIO(buf)) + elf: ELFFile = ELFFile(io.BytesIO(buf)) yield from capa.features.extractors.elffile.extract_file_export_names(elf) else: logger.warning("unsupported format") @@ -36,10 +36,10 @@ def extract_file_export_names(_be2: BinExport2, buf: bytes) -> Iterator[Tuple[Fe def extract_file_import_names(_be2: BinExport2, buf: bytes) -> Iterator[Tuple[Feature, Address]]: if buf.startswith(capa.features.extractors.common.MATCH_PE): - pe = pefile.PE(data=buf) + pe: pefile.PE = pefile.PE(data=buf) yield from capa.features.extractors.pefile.extract_file_import_names(pe) elif buf.startswith(capa.features.extractors.common.MATCH_ELF): - elf = ELFFile(io.BytesIO(buf)) + elf: ELFFile = ELFFile(io.BytesIO(buf)) yield from capa.features.extractors.elffile.extract_file_import_names(elf) else: logger.warning("unsupported format") @@ -47,10 +47,10 @@ def extract_file_import_names(_be2: BinExport2, buf: bytes) -> Iterator[Tuple[Fe def extract_file_section_names(_be2: BinExport2, buf: bytes) -> Iterator[Tuple[Feature, Address]]: if buf.startswith(capa.features.extractors.common.MATCH_PE): - pe = pefile.PE(data=buf) + pe: pefile.PE = pefile.PE(data=buf) yield from capa.features.extractors.pefile.extract_file_section_names(pe) elif buf.startswith(capa.features.extractors.common.MATCH_ELF): - elf = ELFFile(io.BytesIO(buf)) + elf: ELFFile = ELFFile(io.BytesIO(buf)) yield from capa.features.extractors.elffile.extract_file_section_names(elf) else: logger.warning("unsupported format") diff --git a/capa/features/extractors/binexport2/function.py b/capa/features/extractors/binexport2/function.py index e437c0dc7..685b3ab08 100644 --- a/capa/features/extractors/binexport2/function.py +++ b/capa/features/extractors/binexport2/function.py @@ -10,33 +10,35 @@ from capa.features.file import FunctionName from capa.features.common import Feature, Characteristic from capa.features.address import Address, AbsoluteVirtualAddress -from capa.features.extractors.binexport2 import FunctionContext +from capa.features.extractors.binexport2 import BinExport2Index, FunctionContext from capa.features.extractors.base_extractor import FunctionHandle +from capa.features.extractors.binexport2.binexport2_pb2 import BinExport2 +from capa.features.extractors.binexport2.binexport2_pb2.BinExport2 import CallGraph -def extract_function_calls_to(fh: FunctionHandle): +def extract_function_calls_to(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: fhi: FunctionContext = fh.inner - be2 = fhi.ctx.be2 - idx = fhi.ctx.idx + be2: BinExport2 = fhi.ctx.be2 + idx: BinExport2Index = fhi.ctx.idx - flow_graph_index = fhi.flow_graph_index - flow_graph_address = idx.flow_graph_address_by_index[flow_graph_index] - vertex_index = idx.vertex_index_by_address[flow_graph_address] + flow_graph_index: int = fhi.flow_graph_index + flow_graph_address: int = idx.flow_graph_address_by_index[flow_graph_index] + vertex_index: int = idx.vertex_index_by_address[flow_graph_address] for caller_index in idx.callers_by_vertex_index[vertex_index]: - caller = be2.call_graph.vertex[caller_index] - caller_address = caller.address + caller: CallGraph.Vertex = be2.call_graph.vertex[caller_index] + caller_address: int = caller.address yield Characteristic("calls to"), AbsoluteVirtualAddress(caller_address) -def extract_function_loop(fh: FunctionHandle): +def extract_function_loop(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: fhi: FunctionContext = fh.inner - be2 = fhi.ctx.be2 + be2: BinExport2 = fhi.ctx.be2 - flow_graph_index = fhi.flow_graph_index - flow_graph = be2.flow_graph[flow_graph_index] + flow_graph_index: int = fhi.flow_graph_index + flow_graph: BinExport2.FlowGraph = be2.flow_graph[flow_graph_index] for edge in flow_graph.edge: if edge.is_back_edge: @@ -44,16 +46,16 @@ def extract_function_loop(fh: FunctionHandle): break -def extract_function_name(fh: FunctionHandle): +def extract_function_name(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: fhi: FunctionContext = fh.inner - be2 = fhi.ctx.be2 - idx = fhi.ctx.idx - flow_graph_index = fhi.flow_graph_index + be2: BinExport2 = fhi.ctx.be2 + idx: BinExport2Index = fhi.ctx.idx + flow_graph_index: int = fhi.flow_graph_index - flow_graph_address = idx.flow_graph_address_by_index[flow_graph_index] - vertex_index = idx.vertex_index_by_address[flow_graph_address] - vertex = be2.call_graph.vertex[vertex_index] + flow_graph_address: int = idx.flow_graph_address_by_index[flow_graph_index] + vertex_index: int = idx.vertex_index_by_address[flow_graph_address] + vertex: CallGraph.Vertex = be2.call_graph.vertex[vertex_index] if vertex.HasField("mangled_name"): yield FunctionName(vertex.mangled_name), fh.address diff --git a/capa/features/extractors/binexport2/helpers.py b/capa/features/extractors/binexport2/helpers.py index e4698da6b..6c1f17de8 100644 --- a/capa/features/extractors/binexport2/helpers.py +++ b/capa/features/extractors/binexport2/helpers.py @@ -5,7 +5,8 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +from capa.features.extractors.binexport2.binexport2_pb2.BinExport2 import CallGraph -def is_vertex_type(vertex, type_): +def is_vertex_type(vertex: CallGraph.Vertex, type_: CallGraph.Vertex.Type.ValueType) -> bool: return vertex.HasField("type") and vertex.type == type_ diff --git a/capa/features/extractors/binexport2/insn.py b/capa/features/extractors/binexport2/insn.py index 2f4145d8f..cee4f053a 100644 --- a/capa/features/extractors/binexport2/insn.py +++ b/capa/features/extractors/binexport2/insn.py @@ -14,25 +14,35 @@ from capa.features.insn import API, Number, Mnemonic, OperandNumber from capa.features.common import Bytes, String, Feature, Characteristic from capa.features.address import Address, AbsoluteVirtualAddress -from capa.features.extractors.binexport2 import FunctionContext, ReadMemoryError, BasicBlockContext, InstructionContext +from capa.features.extractors.binexport2 import ( + AddressSpace, + AnalysisContext, + BinExport2Index, + FunctionContext, + ReadMemoryError, + BasicBlockContext, + BinExport2Analysis, + InstructionContext, +) from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle from capa.features.extractors.binexport2.binexport2_pb2 import BinExport2 +from capa.features.extractors.binexport2.binexport2_pb2.BinExport2 import CallGraph logger = logging.getLogger(__name__) # security cookie checks may perform non-zeroing XORs, these are expected within a certain # byte range within the first and returning basic blocks, this helps to reduce FP features -SECURITY_COOKIE_BYTES_DELTA = 0x40 +SECURITY_COOKIE_BYTES_DELTA: int = 0x40 def extract_insn_api_features(fh: FunctionHandle, _bbh: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: fhi: FunctionContext = fh.inner ii: InstructionContext = ih.inner - be2 = fhi.ctx.be2 - be2_index = fhi.ctx.idx - be2_analysis = fhi.ctx.analysis - insn = be2.instruction[ii.instruction_index] + be2: BinExport2 = fhi.ctx.be2 + be2_index: BinExport2Index = fhi.ctx.idx + be2_analysis: BinExport2Analysis = fhi.ctx.analysis + insn: BinExport2.Instruction = be2.instruction[ii.instruction_index] for addr in insn.call_target: addr = be2_analysis.thunks.get(addr, addr) @@ -42,19 +52,17 @@ def extract_insn_api_features(fh: FunctionHandle, _bbh: BBHandle, ih: InsnHandle logger.debug("0x%x is not a vertex", addr) continue - vertex_idx = be2_index.vertex_index_by_address[addr] - vertex = be2.call_graph.vertex[vertex_idx] + vertex_idx: int = be2_index.vertex_index_by_address[addr] + vertex: CallGraph.Vertex = be2.call_graph.vertex[vertex_idx] - if not capa.features.extractors.binexport2.helpers.is_vertex_type( - vertex, BinExport2.CallGraph.Vertex.Type.IMPORTED - ): + if not capa.features.extractors.binexport2.helpers.is_vertex_type(vertex, CallGraph.Vertex.Type.IMPORTED): continue if not vertex.HasField("mangled_name"): logger.debug("vertex %d does not have mangled_name", vertex_idx) continue - api_name = vertex.mangled_name + api_name: str = vertex.mangled_name yield API(api_name), ih.address """ @@ -74,7 +82,7 @@ def extract_insn_api_features(fh: FunctionHandle, _bbh: BBHandle, ih: InsnHandle def is_address_mapped(be2: BinExport2, address: int) -> bool: """return True if the given address is mapped""" - sections_with_perms = filter(lambda s: s.flag_r or s.flag_w or s.flag_x, be2.section) + sections_with_perms: Iterator[BinExport2.Section] = filter(lambda s: s.flag_r or s.flag_w or s.flag_x, be2.section) return any(section.address <= address < section.address + section.size for section in sections_with_perms) @@ -126,11 +134,11 @@ def extract_insn_number_features( fhi: FunctionContext = fh.inner ii: InstructionContext = ih.inner - be2 = fhi.ctx.be2 - analysis = fhi.ctx.analysis + be2: BinExport2 = fhi.ctx.be2 + analysis: BinExport2Analysis = fhi.ctx.analysis - instruction_index = ii.instruction_index - instruction = be2.instruction[instruction_index] + instruction_index: int = ii.instruction_index + instruction: BinExport2.Instruction = be2.instruction[instruction_index] # x86 / amd64 mnemonic = be2.mnemonic[instruction.mnemonic_index] @@ -239,12 +247,12 @@ def extract_insn_bytes_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandl fhi: FunctionContext = fh.inner ii: InstructionContext = ih.inner - ctx = fhi.ctx - be2 = ctx.be2 - idx = ctx.idx - address_space = ctx.address_space + ctx: AnalysisContext = fhi.ctx + be2: BinExport2 = ctx.be2 + idx: BinExport2Index = ctx.idx + address_space: AddressSpace = ctx.address_space - instruction_index = ii.instruction_index + instruction_index: int = ii.instruction_index if instruction_index in idx.string_reference_index_by_source_instruction_index: # disassembler already identified string reference from instruction @@ -254,15 +262,15 @@ def extract_insn_bytes_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandl if instruction_index in idx.data_reference_index_by_source_instruction_index: for data_reference_index in idx.data_reference_index_by_source_instruction_index[instruction_index]: - data_reference = be2.data_reference[data_reference_index] - data_reference_address = data_reference.address + data_reference: BinExport2.DataReference = be2.data_reference[data_reference_index] + data_reference_address: int = data_reference.address reference_addresses.append(data_reference_address) for reference_address in reference_addresses: try: # if at end of segment then there might be an overrun here. - buf = address_space.read_memory(reference_address, 0x100) + buf: bytes = address_space.read_memory(reference_address, 0x100) except ReadMemoryError: logger.debug("failed to read memory: 0x%x", reference_address) continue @@ -270,7 +278,7 @@ def extract_insn_bytes_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandl if capa.features.extractors.helpers.all_zeros(buf): continue - is_string = False + is_string: bool = False # note: we *always* break after the first iteration for s in capa.features.extractors.strings.extract_ascii_strings(buf): @@ -300,16 +308,16 @@ def extract_insn_string_features( fhi: FunctionContext = fh.inner ii: InstructionContext = ih.inner - be2 = fhi.ctx.be2 - idx = fhi.ctx.idx + be2: BinExport2 = fhi.ctx.be2 + idx: BinExport2Index = fhi.ctx.idx - instruction_index = ii.instruction_index + instruction_index: int = ii.instruction_index if instruction_index in idx.string_reference_index_by_source_instruction_index: for string_reference_index in idx.string_reference_index_by_source_instruction_index[instruction_index]: - string_reference = be2.string_reference[string_reference_index] - string_index = string_reference.string_table_index - string = be2.string_table[string_index] + string_reference: BinExport2.Reference = be2.string_reference[string_reference_index] + string_index: int = string_reference.string_table_index + string: str = be2.string_table[string_index] yield String(string), ih.address @@ -329,26 +337,26 @@ def is_security_cookie( """ check if an instruction is related to security cookie checks. """ - be2 = fhi.ctx.be2 + be2: BinExport2 = fhi.ctx.be2 # security cookie check should use SP or BP - op1 = be2.operand[instruction.operand_index[1]] - op1_exprs = [be2.expression[expr_i] for expr_i in op1.expression_index] + op1: BinExport2.Operand = be2.operand[instruction.operand_index[1]] + op1_exprs: List[BinExport2.Expression] = [be2.expression[expr_i] for expr_i in op1.expression_index] if all(expr.symbol.lower() not in ("bp", "esp", "ebp", "rbp", "rsp") for expr in op1_exprs): return False # check_nzxor_security_cookie_delta # if insn falls at the start of first entry block of the parent function. - flow_graph = be2.flow_graph[fhi.flow_graph_index] - basic_block_index = bbi.basic_block_index - bb = be2.basic_block[basic_block_index] + flow_graph: BinExport2.FlowGraph = be2.flow_graph[fhi.flow_graph_index] + basic_block_index: int = bbi.basic_block_index + bb: BinExport2.BasicBlock = be2.basic_block[basic_block_index] if flow_graph.entry_basic_block_index == basic_block_index: - first_addr = min((be2.instruction[ir.begin_index].address for ir in bb.instruction_index)) + first_addr: int = min((be2.instruction[ir.begin_index].address for ir in bb.instruction_index)) if instruction.address < first_addr + SECURITY_COOKIE_BYTES_DELTA: return True # or insn falls at the end before return in a terminal basic block. if basic_block_index not in (e.source_basic_block_index for e in flow_graph.edge): - last_addr = max((be2.instruction[ir.end_index - 1].address for ir in bb.instruction_index)) + last_addr: int = max((be2.instruction[ir.end_index - 1].address for ir in bb.instruction_index)) if instruction.address > last_addr - SECURITY_COOKIE_BYTES_DELTA: return True return False @@ -364,11 +372,11 @@ def extract_insn_nzxor_characteristic_features( fhi: FunctionContext = fh.inner ii: InstructionContext = ih.inner - be2 = fhi.ctx.be2 + be2: BinExport2 = fhi.ctx.be2 - instruction = be2.instruction[ii.instruction_index] - mnemonic = be2.mnemonic[instruction.mnemonic_index] - mnemonic_name = mnemonic.name.lower() + instruction: BinExport2.Instruction = be2.instruction[ii.instruction_index] + mnemonic: BinExport2.Mnemonic = be2.mnemonic[instruction.mnemonic_index] + mnemonic_name: str = mnemonic.name.lower() if mnemonic_name not in ( "xor", "xorpd", @@ -378,7 +386,7 @@ def extract_insn_nzxor_characteristic_features( ): return - operands = [be2.operand[operand_index] for operand_index in instruction.operand_index] + operands: List[BinExport2.Operand] = [be2.operand[operand_index] for operand_index in instruction.operand_index] # check whether operands are same for x86 / amd64 if mnemonic_name in ("xor", "xorpd", "xorps", "pxor"): @@ -402,11 +410,11 @@ def extract_insn_mnemonic_features( fhi: FunctionContext = fh.inner ii: InstructionContext = ih.inner - be2 = fhi.ctx.be2 + be2: BinExport2 = fhi.ctx.be2 - instruction = be2.instruction[ii.instruction_index] - mnemonic = be2.mnemonic[instruction.mnemonic_index] - mnemonic_name = mnemonic.name.lower() + instruction: BinExport2.Instruction = be2.instruction[ii.instruction_index] + mnemonic: BinExport2.Mnemonic = be2.mnemonic[instruction.mnemonic_index] + mnemonic_name: str = mnemonic.name.lower() yield Mnemonic(mnemonic_name), ih.address @@ -419,11 +427,11 @@ def extract_function_calls_from(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandl fhi: FunctionContext = fh.inner ii: InstructionContext = ih.inner - be2 = fhi.ctx.be2 + be2: BinExport2 = fhi.ctx.be2 - instruction = be2.instruction[ii.instruction_index] + instruction: BinExport2.Instruction = be2.instruction[ii.instruction_index] for call_target_address in instruction.call_target: - addr = AbsoluteVirtualAddress(call_target_address) + addr: AbsoluteVirtualAddress = AbsoluteVirtualAddress(call_target_address) yield Characteristic("calls from"), addr if fh.address == addr: