Source code for dissect.cobaltstrike.pe

"""
This module contains helper functions for parsing PE files, mainly for extracting Beacon specific PE artifacts.
"""

from __future__ import annotations

import io
import logging
from typing import BinaryIO, Optional, Tuple

from dissect import cstruct

[docs] logger = logging.getLogger(__name__)
[docs] PE_DEF = """ #define IMAGE_FILE_MACHINE_AMD64 0x8664 #define IMAGE_FILE_MACHINE_I386 0x014c #define IMAGE_FILE_MACHINE_IA64 0x0200 #define IMAGE_NUMBEROF_DIRECTORY_ENTRIES 16 #define IMAGE_SIZEOF_SHORT_NAME 8 #define IMAGE_DIRECTORY_ENTRY_EXPORT 0 #define IMAGE_DIRECTORY_ENTRY_IMPORT 1 #define IMAGE_DIRECTORY_ENTRY_RESOURCE 2 typedef struct _IMAGE_DOS_HEADER { WORD e_magic; WORD e_cblp; WORD e_cp; WORD e_crlc; WORD e_cparhdr; WORD e_minalloc; WORD e_maxalloc; WORD e_ss; WORD e_sp; WORD e_csum; WORD e_ip; WORD e_cs; WORD e_lfarlc; WORD e_ovno; WORD e_res[4]; WORD e_oemid; WORD e_oeminfo; WORD e_res2[10]; LONG e_lfanew; } IMAGE_DOS_HEADER; typedef struct _IMAGE_FILE_HEADER { WORD Machine; WORD NumberOfSections; DWORD TimeDateStamp; DWORD PointerToSymbolTable; DWORD NumberOfSymbols; WORD SizeOfOptionalHeader; WORD Characteristics; } IMAGE_FILE_HEADER; typedef struct _IMAGE_DATA_DIRECTORY { ULONG VirtualAddress; ULONG Size; } IMAGE_DATA_DIRECTORY; typedef struct _IMAGE_OPTIONAL_HEADER { WORD Magic; BYTE MajorLinkerVersion; BYTE MinorLinkerVersion; DWORD SizeOfCode; DWORD SizeOfInitializedData; DWORD SizeOfUninitializedData; DWORD AddressOfEntryPoint; DWORD BaseOfCode; DWORD BaseOfData; DWORD ImageBase; DWORD SectionAlignment; DWORD FileAlignment; WORD MajorOperatingSystemVersion; WORD MinorOperatingSystemVersion; WORD MajorImageVersion; WORD MinorImageVersion; WORD MajorSubsystemVersion; WORD MinorSubsystemVersion; DWORD Win32VersionValue; DWORD SizeOfImage; DWORD SizeOfHeaders; DWORD CheckSum; WORD Subsystem; WORD DllCharacteristics; DWORD SizeOfStackReserve; DWORD SizeOfStackCommit; DWORD SizeOfHeapReserve; DWORD SizeOfHeapCommit; DWORD LoaderFlags; DWORD NumberOfRvaAndSizes; IMAGE_DATA_DIRECTORY DataDirectory[IMAGE_NUMBEROF_DIRECTORY_ENTRIES]; } IMAGE_OPTIONAL_HEADER; typedef struct _IMAGE_OPTIONAL_HEADER64 { WORD Magic; BYTE MajorLinkerVersion; BYTE MinorLinkerVersion; DWORD SizeOfCode; DWORD SizeOfInitializedData; DWORD SizeOfUninitializedData; DWORD AddressOfEntryPoint; DWORD BaseOfCode; ULONGLONG ImageBase; DWORD SectionAlignment; DWORD FileAlignment; WORD MajorOperatingSystemVersion; WORD MinorOperatingSystemVersion; WORD MajorImageVersion; WORD MinorImageVersion; WORD MajorSubsystemVersion; WORD MinorSubsystemVersion; DWORD Win32VersionValue; DWORD SizeOfImage; DWORD SizeOfHeaders; DWORD CheckSum; WORD Subsystem; WORD DllCharacteristics; ULONGLONG SizeOfStackReserve; ULONGLONG SizeOfStackCommit; ULONGLONG SizeOfHeapReserve; ULONGLONG SizeOfHeapCommit; DWORD LoaderFlags; DWORD NumberOfRvaAndSizes; IMAGE_DATA_DIRECTORY DataDirectory[IMAGE_NUMBEROF_DIRECTORY_ENTRIES]; } IMAGE_OPTIONAL_HEADER64; typedef struct _IMAGE_SECTION_HEADER { char Name[IMAGE_SIZEOF_SHORT_NAME]; ULONG VirtualSize; ULONG VirtualAddress; ULONG SizeOfRawData; ULONG PointerToRawData; ULONG PointerToRelocations; ULONG PointerToLinenumbers; USHORT NumberOfRelocations; USHORT NumberOfLinenumbers; ULONG Characteristics; } IMAGE_SECTION_HEADER; typedef struct _IMAGE_IMPORT_DESCRIPTOR { union { ULONG Characteristics; ULONG OriginalFirstThunk; } u; ULONG TimeDateStamp; ULONG ForwarderChain; ULONG Name; ULONG FirstThunk; } IMAGE_IMPORT_DESCRIPTOR; typedef struct _IMAGE_EXPORT_DIRECTORY { ULONG Characteristics; ULONG TimeDateStamp; USHORT MajorVersion; USHORT MinorVersion; ULONG Name; ULONG Base; ULONG NumberOfFunctions; ULONG NumberOfNames; ULONG AddressOfFunctions; // RVA from base of image ULONG AddressOfNames; // RVA from base of image ULONG AddressOfNameOrdinals; // RVA from base of image } IMAGE_EXPORT_DIRECTORY; """
[docs] pestruct = cstruct.cstruct()
pestruct.load(PE_DEF)
[docs] DOSHEADER_X64 = bytes.fromhex("554889e54881")
[docs] DOSHEADER_X86 = bytes.fromhex("e8000000005b")
[docs] def find_mz_offset(fh: BinaryIO, start_offset: int = 0, maxrange: int = 1024) -> Optional[int]: """Find and return the start offset of a valid IMAGE_DOS_HEADER or ``None`` if it cannot be found. It uses `IMAGE_DOS_HEADER.e_lfanew` and `IMAGE_FILE_HEADER.Machine` as a constraint. Side effects: file handle position due to seeking Args: fh: file like object start_offset: offset to start searching from, None indicates from current file position maxrange: how far to search for into the file object Returns: offset of the start of IMAGE_DOS_HEADER in the file object or ``None`` if it's not found """ start_offset = start_offset if start_offset is not None else fh.tell() for offset in range(maxrange): fh.seek(start_offset + offset, io.SEEK_SET) try: mz = pestruct.IMAGE_DOS_HEADER(fh) if mz.e_lfanew > 0 and mz.e_lfanew < maxrange: fh.seek(start_offset + offset + 4 + mz.e_lfanew) image = pestruct.IMAGE_FILE_HEADER(fh) if image.Machine in ( pestruct.IMAGE_FILE_MACHINE_AMD64, pestruct.IMAGE_FILE_MACHINE_I386, ): return start_offset + offset except EOFError: continue return None
[docs] def find_compile_stamps( fh: BinaryIO, start_offset: int = 0, maxrange: int = 1024 ) -> Tuple[Optional[int], Optional[int]]: """Find and return a tuple with the `PE compile` and `PE export` timestamps. If one or more `TimeDateStamps` are not found it will be returned as ``None`` in the tuple. Side effects: file handle position due to seeking Args: fh: file like object start_offset: offset to start searching from, ``None`` indicates from current file position maxrange: how far to search for into the file object Returns: Tuple with ``(IMAGE_FILE_HEADER.TimeDateStamp, IMAGE_EXPORT_DIRECTORY.TimeDateStamp)``. Either tuple values can be ``None`` if it's not found. """ mz_offset = find_mz_offset(fh, start_offset=start_offset, maxrange=maxrange) if mz_offset is None: return (None, None) compile_stamp = None export_stamp = None fh.seek(mz_offset) mz = pestruct.IMAGE_DOS_HEADER(fh) fh.seek(mz.e_lfanew + mz_offset) signature = pestruct.uint32(fh).to_bytes(4, "little") logger.debug("PE signature: %r", signature) image = pestruct.IMAGE_FILE_HEADER(fh) compile_stamp = image.TimeDateStamp if image.Machine == pestruct.IMAGE_FILE_MACHINE_AMD64: optional_header = pestruct.IMAGE_OPTIONAL_HEADER64(fh) else: optional_header = pestruct.IMAGE_OPTIONAL_HEADER(fh) export_dd = optional_header.DataDirectory[pestruct.IMAGE_DIRECTORY_ENTRY_EXPORT] sections = [pestruct.IMAGE_SECTION_HEADER(fh) for _ in range(image.NumberOfSections)] ds = None for section in sections: if section.VirtualAddress <= export_dd.VirtualAddress < (section.VirtualAddress + section.VirtualSize): ds = section break if ds is not None: offset = export_dd.VirtualAddress - ds.VirtualAddress + ds.PointerToRawData + mz_offset fh.seek(offset) export_dir = pestruct.IMAGE_EXPORT_DIRECTORY(fh) export_stamp = export_dir.TimeDateStamp return (compile_stamp, export_stamp)
[docs] def find_magic_mz(fh: BinaryIO, start_offset: int = 0, maxrange: int = 1024) -> Optional[bytes]: """Find and returns the MZ header bytes or ``None`` if cannot be found Cobalt Strike allows changing the MZ magic header using `magic_mz_x86` or `magic_mz_x64` in the c2 profile. This function recovers these bytes. Side effects: file handle position due to seeking Args: fh: file like object start_offset: offset to start searching from, None indicates from current file position maxrange: how far to search for into the file object Returns: MZ header bytes or ``None`` if not found. """ mz_offset = find_mz_offset(fh, start_offset=start_offset, maxrange=maxrange) if mz_offset is None: return None fh.seek(mz_offset) data = fh.read(256) pos = data.find(DOSHEADER_X86) pos = data.find(DOSHEADER_X64) if pos == -1 else pos if pos >= 0: return data[:pos] return None
[docs] def find_magic_pe(fh: BinaryIO, start_offset: int = 0, maxrange: int = 1024) -> Optional[bytes]: """Find and returns the PE header (``magic_pe``) bytes or ``None`` if cannot be found Cobalt Strike allows changing the PE magic header using the ``magic_pe`` in the malleable c2 profile. This function tries to recovers these bytes. Side effects: file handle position due to seeking Args: fh: file like object start_offset: offset to start searching from, None indicates from current file position maxrange: how far to search for into the file object Returns: PE header bytes or ``None`` if not found. """ mz_offset = find_mz_offset(fh, start_offset=start_offset, maxrange=maxrange) if mz_offset is None: return None magic_pe = None fh.seek(mz_offset) mz = pestruct.IMAGE_DOS_HEADER(fh) fh.seek(mz.e_lfanew + mz_offset) magic_pe = fh.read(4).rstrip(b"\x00") return magic_pe
[docs] def find_stage_prepend_append( fh: BinaryIO, start_offset: int = 0, maxrange: int = 1024 ) -> Tuple[Optional[bytes], Optional[bytes]]: """Find and return the stage prepend and append bytes as a tuple. Cobalt Strike allows prepending and appending extra bytes to the beacon using malleable c2 profile settings. This function tries to recover these bytes. Side effects: file handle position due to seeking Args: fh: file like object start_offset: offset to start searching from, None indicates from current file position maxrange: how far to search for into the file object Returns: Tuple containing ``(prepend_bytes, append_bytes)``. Either tuple values can be ``None`` if it's not found. """ mz_offset = find_mz_offset(fh, start_offset=start_offset, maxrange=maxrange) if mz_offset is None: return (None, None) prepend = None append = None if mz_offset > 0: fh.seek(0) prepend = fh.read(mz_offset) logger.debug("stage prepend: %r", prepend) fh.seek(mz_offset) mz = pestruct.IMAGE_DOS_HEADER(fh) fh.seek(mz.e_lfanew + mz_offset + 4) image = pestruct.IMAGE_FILE_HEADER(fh) if image.Machine == pestruct.IMAGE_FILE_MACHINE_AMD64: optional_header = pestruct.IMAGE_OPTIONAL_HEADER64(fh) elif image.Machine == pestruct.IMAGE_FILE_MACHINE_I386: optional_header = pestruct.IMAGE_OPTIONAL_HEADER(fh) else: return (prepend, None) size = optional_header.SizeOfHeaders sections = [pestruct.IMAGE_SECTION_HEADER(fh) for _ in range(image.NumberOfSections)] for section in sections: size += section.SizeOfRawData logger.debug("Total PE size: %u", size) fh.seek(mz_offset + size) # we limit the append size to 1024, just in case. append = fh.read(1024) or None logger.debug("stage append: %r", append) # remove padding if append is not None: append = append.rstrip(b"\x00") return (prepend, append)
[docs] def find_architecture(fh: BinaryIO, start_offset: int = 0, maxrange: int = 1024) -> Optional[str]: """Find and return the PE image architecture, either ``"x86"`` or ``"x64"`` or ``None`` if not found. It uses `IMAGE_DOS_HEADER.e_lfanew` and `IMAGE_FILE_HEADER.Machine` as a constraint. Only `x86` and `x64` are considered, other machine architectures are ignored. Side effects: file handle position due to seeking Args: fh: file like object start_offset: offset to start searching from, None indicates from current file position maxrange: how far to search for into the file object Returns: ``"x86"`` or ``"x64"``, ``None`` if not found. """ start_offset = start_offset if start_offset is not None else fh.tell() for offset in range(maxrange): fh.seek(start_offset + offset, io.SEEK_SET) try: mz = pestruct.IMAGE_DOS_HEADER(fh) if mz.e_lfanew > 0 and mz.e_lfanew < maxrange: fh.seek(start_offset + offset + 4 + mz.e_lfanew) image = pestruct.IMAGE_FILE_HEADER(fh) if image.Machine == pestruct.IMAGE_FILE_MACHINE_AMD64: return "x64" elif image.Machine == pestruct.IMAGE_FILE_MACHINE_I386: return "x86" except EOFError: continue return None