injector/compiler/main.py

162 lines
4.6 KiB
Python

from dataclasses import dataclass
from io import BytesIO
from typing import Generator
from pefile import PE, Structure
@dataclass
class Import:
library: bytes
procedures: list[tuple[bytes, int]]
@dataclass
class Binary(object):
starting_state: bytes
entry_point: int
imports: list[Import]
def _single_or_none[T](ts: Generator[T]) -> T | None:
items = [t for t in ts]
if len(items) == 0:
return None
if len(items) == 1:
return items[0]
raise ValueError(f"expected 1 or 0, got {len(items)}")
def _single[T](ts: Generator[T]) -> T:
items = [t for t in ts]
if len(items) == 1:
return items[0]
raise ValueError(f"expected 1, got {len(items)}")
def _create_binary(subject: PE) -> Binary:
optional_header = subject.OPTIONAL_HEADER
assert isinstance(optional_header, Structure)
text_section: Structure = _single(i for i in subject.sections if i.Name == b".text\0\0\0")
data_section: Structure | None = _single_or_none(i for i in subject.sections if i.Name == b".data\0\0\0")
rdata_section: Structure | None = _single_or_none(i for i in subject.sections if i.Name == b".rdata\0\0")
relevant_sections: list[Structure] = [section for section in (text_section, data_section, rdata_section) if section is not None]
if len(relevant_sections) == 0:
raise ValueError("no sections to plot")
min_address = min(getattr(i, "VirtualAddress") for i in relevant_sections)
max_address = max(getattr(i, "VirtualAddress") + getattr(i, "SizeOfRawData") for i in relevant_sections)
buffer = bytearray(max_address - min_address)
for section in relevant_sections:
data = getattr(section, "get_data")()
start = getattr(section, "VirtualAddress") - min_address
buffer[start:start+len(data)] = data
starting_state = bytes(buffer)
entry_point_rva = getattr(optional_header, "AddressOfEntryPoint")
entry_point = entry_point_rva - min_address
imports: list[Import] = []
for entry in getattr(subject, "DIRECTORY_ENTRY_IMPORT"):
entry: Structure
library: bytes = getattr(entry, "dll")
procedures: list[tuple[bytes, int]] = []
for imp in getattr(entry, "imports"):
imp: Structure
import_address_rva = getattr(imp, "address") - getattr(optional_header, "ImageBase")
import_address = import_address_rva - min_address
procedures.append((getattr(imp, "name"), import_address))
imports.append(Import(library, procedures))
return Binary(
starting_state=starting_state,
entry_point=entry_point,
imports=imports,
)
def _encode_binary(binary: Binary) -> bytes:
out = BytesIO()
def _write_u32(n: int):
out.write(n.to_bytes(4, "little", signed=False))
def _write_u8(n: int):
out.write(n.to_bytes(1, "little", signed=False))
def _write_zt(s: bytes):
out.write(s)
_write_u8(0)
_write_u32(binary.entry_point)
for i in binary.imports:
_write_zt(i.library)
for (procedure, address) in i.procedures:
_write_zt(procedure)
_write_u32(address)
_write_u8(0)
_write_u8(0)
_write_u32(len(binary.starting_state))
# == encode RLE ==
def _pull_repeats(start: int, data: bytes) -> int | None:
i = 0
first_byte = data[start + i]
while True:
if i >= 255:
break
if start + i >= len(data):
break
if data[start + i] != first_byte:
break
i += 1
if i >= 2:
_write_u8(i)
_write_u8(first_byte)
return start + i
return None
def _pull_non_repeats(start: int, data: bytes):
i = 0
while True:
if i >= 255:
break
if start + i >= len(data):
break
if i >= 1 and data[start + i] == data[start + i - 1]:
break
i += 1
_write_u8(0)
_write_u8(i)
for j in range(i):
_write_u8(data[start + j])
return start + i
i = 0
while i < len(binary.starting_state):
if new_i := _pull_repeats(i, binary.starting_state):
i = new_i
continue
i = _pull_non_repeats(i, binary.starting_state)
return out.getbuffer()
def main():
subject = PE("subjects\\main.exe")
binary = _create_binary(subject)
code = _encode_binary(binary)
with open("binaries\\main.dat", "wb") as f:
f.write(code)
if __name__ == "__main__":
main()