from dataclasses import dataclass from io import BytesIO from typing import Generator from pefile import PE, Structure @dataclass class Import: library: bytes procedures: list[tuple[bytes, int]] @dataclass class Binary(object): starting_state: bytes entry_point: int imports: list[Import] def _single_or_none[T](ts: Generator[T]) -> T | None: items = [t for t in ts] if len(items) == 0: return None if len(items) == 1: return items[0] raise ValueError(f"expected 1 or 0, got {len(items)}") def _single[T](ts: Generator[T]) -> T: items = [t for t in ts] if len(items) == 1: return items[0] raise ValueError(f"expected 1, got {len(items)}") def _create_binary(subject: PE) -> Binary: optional_header = subject.OPTIONAL_HEADER assert isinstance(optional_header, Structure) text_section: Structure = _single(i for i in subject.sections if i.Name == b".text\0\0\0") data_section: Structure | None = _single_or_none(i for i in subject.sections if i.Name == b".data\0\0\0") rdata_section: Structure | None = _single_or_none(i for i in subject.sections if i.Name == b".rdata\0\0") relevant_sections: list[Structure] = [section for section in (text_section, data_section, rdata_section) if section is not None] if len(relevant_sections) == 0: raise ValueError("no sections to plot") min_address = min(getattr(i, "VirtualAddress") for i in relevant_sections) max_address = max(getattr(i, "VirtualAddress") + getattr(i, "SizeOfRawData") for i in relevant_sections) buffer = bytearray(max_address - min_address) for section in relevant_sections: data = getattr(section, "get_data")() start = getattr(section, "VirtualAddress") - min_address buffer[start:start+len(data)] = data starting_state = bytes(buffer) entry_point_rva = getattr(optional_header, "AddressOfEntryPoint") entry_point = entry_point_rva - min_address imports: list[Import] = [] for entry in getattr(subject, "DIRECTORY_ENTRY_IMPORT"): entry: Structure library: bytes = getattr(entry, "dll") procedures: list[tuple[bytes, int]] = [] for imp in getattr(entry, "imports"): imp: Structure import_address_rva = getattr(imp, "address") - getattr(optional_header, "ImageBase") import_address = import_address_rva - min_address procedures.append((getattr(imp, "name"), import_address)) imports.append(Import(library, procedures)) return Binary( starting_state=starting_state, entry_point=entry_point, imports=imports, ) def _encode_binary(binary: Binary) -> bytes: out = BytesIO() def _write_u32(n: int): out.write(n.to_bytes(4, "little", signed=False)) def _write_u8(n: int): out.write(n.to_bytes(1, "little", signed=False)) def _write_zt(s: bytes): out.write(s) _write_u8(0) _write_u32(binary.entry_point) for i in binary.imports: _write_zt(i.library) for (procedure, address) in i.procedures: _write_zt(procedure) _write_u32(address) _write_u8(0) _write_u8(0) _write_u32(len(binary.starting_state)) # == encode RLE == def _pull_repeats(start: int, data: bytes) -> int | None: i = 0 first_byte = data[start + i] while True: if i >= 255: break if start + i >= len(data): break if data[start + i] != first_byte: break i += 1 if i >= 2: _write_u8(i) _write_u8(first_byte) return start + i return None def _pull_non_repeats(start: int, data: bytes): i = 0 while True: if i >= 255: break if start + i >= len(data): break if i >= 1 and data[start + i] == data[start + i - 1]: break i += 1 _write_u8(0) _write_u8(i) for j in range(i): _write_u8(data[start + j]) return start + i i = 0 while i < len(binary.starting_state): if new_i := _pull_repeats(i, binary.starting_state): i = new_i continue i = _pull_non_repeats(i, binary.starting_state) return out.getbuffer() def main(): subject = PE("subjects\\main.exe") binary = _create_binary(subject) code = _encode_binary(binary) with open("binaries\\main.dat", "wb") as f: f.write(code) if __name__ == "__main__": main()