171 lines
4.8 KiB
Python
171 lines
4.8 KiB
Python
import base64
|
|
from dataclasses import dataclass
|
|
from io import BytesIO
|
|
import json
|
|
from typing import Generator
|
|
import pefile
|
|
|
|
@dataclass
|
|
class Import:
|
|
library: bytes
|
|
procedures: list[tuple[bytes, int]]
|
|
|
|
|
|
@dataclass
|
|
class Binary(object):
|
|
starting_state: bytes
|
|
entry_point: int
|
|
imports: list[Import]
|
|
|
|
|
|
def _single_or_none[T](ts: Generator[T]) -> T | None:
|
|
items = [t for t in ts]
|
|
if len(items) == 0:
|
|
return None
|
|
|
|
if len(items) == 1:
|
|
return items[0]
|
|
|
|
raise ValueError(f"expected 1 or 0, got {len(items)}")
|
|
|
|
def _single[T](ts: Generator[T]) -> T:
|
|
items = [t for t in ts]
|
|
if len(items) == 1:
|
|
return items[0]
|
|
|
|
raise ValueError(f"expected 1, got {len(items)}")
|
|
|
|
|
|
def _create_binary(subject: pefile.PE) -> Binary:
|
|
optional_header = subject.OPTIONAL_HEADER
|
|
assert isinstance(optional_header, pefile.Structure)
|
|
text_section = _single(i for i in subject.sections if i.Name == b".text\0\0\0")
|
|
data_section = _single_or_none(i for i in subject.sections if i.Name == b".data\0\0\0")
|
|
rdata_section = _single_or_none(i for i in subject.sections if i.Name == b".rdata\0\0")
|
|
|
|
relevant_sections = [section for section in (text_section, data_section, rdata_section) if section is not None]
|
|
if len(relevant_sections) == 0:
|
|
raise ValueError("no sections to plot")
|
|
print([(i.VirtualAddress, i) for i in relevant_sections])
|
|
min_address = min(i.VirtualAddress for i in relevant_sections)
|
|
max_address = max(_round_up_to_page(i.VirtualAddress + i.SizeOfRawData) for i in relevant_sections)
|
|
|
|
buffer = bytearray(max_address - min_address)
|
|
for section in relevant_sections:
|
|
data = section.get_data() # TODO: De-pad the text section from 0xccs
|
|
start = section.VirtualAddress - min_address
|
|
buffer[start:start+len(data)] = data
|
|
|
|
starting_state = bytes(buffer)
|
|
|
|
entry_point_rva = getattr(optional_header, "AddressOfEntryPoint")
|
|
print(entry_point_rva)
|
|
entry_point = (entry_point_rva - min_address)
|
|
|
|
imports: list[Import] = []
|
|
for entry in getattr(subject, "DIRECTORY_ENTRY_IMPORT"):
|
|
library: bytes = entry.dll
|
|
procedures: list[tuple[bytes, int]] = []
|
|
for imp in entry.imports:
|
|
# print(dir(imp))
|
|
import_address_rva = imp.address - getattr(optional_header, "ImageBase")
|
|
import_address = import_address_rva - min_address
|
|
procedures.append((imp.name, import_address))
|
|
|
|
imports.append(Import(library, procedures))
|
|
|
|
return Binary(
|
|
starting_state=starting_state,
|
|
entry_point=entry_point,
|
|
imports=imports,
|
|
)
|
|
|
|
|
|
def _encode_binary(binary: Binary) -> bytes:
|
|
out = BytesIO()
|
|
|
|
def _write_u32(n: int):
|
|
out.write(n.to_bytes(4, "little", signed=False))
|
|
|
|
def _write_u8(n: int):
|
|
out.write(n.to_bytes(1, "little", signed=False))
|
|
|
|
def _write_zt(s: bytes):
|
|
out.write(s)
|
|
_write_u8(0)
|
|
|
|
_write_u32(binary.entry_point)
|
|
for i in binary.imports:
|
|
print(i.library)
|
|
_write_zt(i.library)
|
|
print(i.procedures)
|
|
for (procedure, address) in i.procedures:
|
|
_write_zt(procedure)
|
|
_write_u32(address)
|
|
_write_u8(0)
|
|
_write_u8(0)
|
|
|
|
_write_u32(len(binary.starting_state))
|
|
|
|
# == encode RLE ==
|
|
def _pull_repeats(start: int, data: bytes) -> int | None:
|
|
i = 0
|
|
first_byte = data[start + i]
|
|
while True:
|
|
if i >= 255:
|
|
break
|
|
if start + i >= len(data):
|
|
break
|
|
if data[start + i] != first_byte:
|
|
break
|
|
|
|
i += 1
|
|
|
|
if i >= 2:
|
|
_write_u8(i)
|
|
_write_u8(first_byte)
|
|
return start + i
|
|
|
|
return None
|
|
|
|
def _pull_non_repeats(start: int, data: bytes):
|
|
i = 0
|
|
while True:
|
|
if i >= 255:
|
|
break
|
|
if start + i >= len(data):
|
|
break
|
|
if i >= 1 and data[start + i] == data[start + i - 1]:
|
|
break
|
|
|
|
i += 1
|
|
|
|
_write_u8(0)
|
|
_write_u8(i)
|
|
for j in range(i):
|
|
_write_u8(data[start + j])
|
|
return start + i
|
|
|
|
i = 0
|
|
while i < len(binary.starting_state):
|
|
if new_i := _pull_repeats(i, binary.starting_state):
|
|
i = new_i
|
|
continue
|
|
i = _pull_non_repeats(i, binary.starting_state)
|
|
|
|
return out.getbuffer()
|
|
|
|
def main():
|
|
subject = pefile.PE("subjects\\main.exe")
|
|
|
|
binary = _create_binary(subject)
|
|
code = _encode_binary(binary)
|
|
with open("binaries\\main.dat", "wb") as f:
|
|
f.write(code)
|
|
|
|
def _round_up_to_page(x: int):
|
|
# TODO: Is this the page size on x64? I think it is
|
|
return ((x + 0x1000 - 1) // 0x1000) * 0x1000
|
|
|
|
if __name__ == "__main__":
|
|
main() |