injector/analyzer/main.py
2025-04-25 21:30:11 -07:00

128 lines
3.7 KiB
Python

import base64
from dataclasses import dataclass
from io import BytesIO
import json
from typing import Generator
import pefile
@dataclass
class Import:
library: bytes
procedures: list[tuple[bytes, int]]
@dataclass
class Binary(object):
starting_state: bytes
entry_point: int
imports: list[Import]
def _single_or_none[T](ts: Generator[T]) -> T | None:
items = [t for t in ts]
if len(items) == 0:
return None
if len(items) == 1:
return items[0]
raise ValueError(f"expected 1 or 0, got {len(items)}")
def _single[T](ts: Generator[T]) -> T:
items = [t for t in ts]
if len(items) == 1:
return items[0]
raise ValueError(f"expected 1, got {len(items)}")
def _create_binary(subject: pefile.PE) -> Binary:
optional_header = subject.OPTIONAL_HEADER
assert isinstance(optional_header, pefile.Structure)
text_section = _single(i for i in subject.sections if i.Name == b".text\0\0\0")
data_section = _single_or_none(i for i in subject.sections if i.Name == b".data\0\0\0")
rdata_section = _single_or_none(i for i in subject.sections if i.Name == b".rdata\0\0")
relevant_sections = [section for section in (text_section, data_section, rdata_section) if section is not None]
if len(relevant_sections) == 0:
raise ValueError("no sections to plot")
print([(i.VirtualAddress, i) for i in relevant_sections])
min_address = min(i.VirtualAddress for i in relevant_sections)
max_address = max(_round_up_to_page(i.VirtualAddress + i.SizeOfRawData) for i in relevant_sections)
buffer = bytearray(max_address - min_address)
for section in relevant_sections:
data = section.get_data() # TODO: De-pad the text section from 0xccs
start = section.VirtualAddress - min_address
buffer[start:start+len(data)] = data
starting_state = bytes(buffer)
entry_point_rva = getattr(optional_header, "AddressOfEntryPoint")
print(entry_point_rva)
entry_point = (entry_point_rva - min_address)
imports: list[Import] = []
for entry in getattr(subject, "DIRECTORY_ENTRY_IMPORT"):
library: bytes = entry.dll
procedures: list[tuple[bytes, int]] = []
for imp in entry.imports:
# print(dir(imp))
import_address_rva = imp.address - getattr(optional_header, "ImageBase")
import_address = import_address_rva - min_address
procedures.append((imp.name, import_address))
imports.append(Import(library, procedures))
return Binary(
starting_state=starting_state,
entry_point=entry_point,
imports=imports,
)
def _encode_binary(binary: Binary) -> bytes:
out = BytesIO()
def _write_u32(n: int):
out.write(n.to_bytes(4, "little", signed=False))
def _write_u8(n: int):
out.write(n.to_bytes(1, "little", signed=False))
def _write_zt(s: bytes):
out.write(s)
_write_u8(0)
_write_u32(binary.entry_point)
for i in binary.imports:
print(i.library)
_write_zt(i.library)
print(i.procedures)
for (procedure, address) in i.procedures:
_write_zt(procedure)
_write_u32(address)
_write_u8(0)
_write_u8(0)
_write_u32(len(binary.starting_state))
# TODO: No RLE for now
for b in binary.starting_state:
_write_u8(b)
return out.getbuffer()
def main():
subject = pefile.PE("subjects\\main.exe")
binary = _create_binary(subject)
code = _encode_binary(binary)
with open("binaries\\main.dat", "wb") as f:
f.write(code)
def _round_up_to_page(x: int):
# TODO: Is this the page size on x64? I think it is
return ((x + 0x1000 - 1) // 0x1000) * 0x1000
if __name__ == "__main__":
main()