"""
This module is responsible for extracting and parsing configuration from Cobalt Strike beacon payloads.
"""
from __future__ import annotations
import collections
import functools
import hashlib
import io
import ipaddress
import logging
import sys
import time
from collections import OrderedDict
from types import MappingProxyType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from os import PathLike
from typing import (
Any,
BinaryIO,
Callable,
Dict,
Iterator,
List,
Mapping,
Optional,
Tuple,
Union,
cast,
)
from dissect import cstruct
from dissect.cobaltstrike import pe
from dissect.cobaltstrike.guardrails import GuardrailMetadata, iter_guardrail_configs_with_beacon
from dissect.cobaltstrike.utils import (
catch_sigpipe,
grouper,
iter_find_needle,
p8,
u16be,
u32,
u32be,
xor,
)
from dissect.cobaltstrike.version import BeaconVersion
from dissect.cobaltstrike.xordecode import XorEncodedFile
[docs]
logger = logging.getLogger(__name__)
[docs]
CS_DEF = """
enum BeaconSetting: uint16 {
SETTING_PROTOCOL = 1,
SETTING_PORT = 2,
SETTING_SLEEPTIME = 3,
SETTING_MAXGET = 4,
SETTING_JITTER = 5,
SETTING_MAXDNS = 6,
SETTING_PUBKEY = 7,
SETTING_DOMAINS = 8,
SETTING_USERAGENT = 9,
SETTING_SUBMITURI = 10,
SETTING_C2_RECOVER = 11,
SETTING_C2_REQUEST = 12,
SETTING_C2_POSTREQ = 13,
SETTING_SPAWNTO = 14, // releasenotes.txt
// CobaltStrike version >= 3.4 (27 Jul, 2016)
SETTING_PIPENAME = 15,
SETTING_KILLDATE_YEAR = 16, // Deprecated since Cobalt Strike 4.7
SETTING_BOF_ALLOCATOR = 16, // Introduced in Cobalt Strike 4.7
SETTING_KILLDATE_MONTH = 17, // Deprecated since Cobalt Strike 4.8
SETTING_SYSCALL_METHOD = 17, // Introduced in Cobalt Strike 4.8
SETTING_KILLDATE_DAY = 18,
SETTING_DNS_IDLE = 19,
SETTING_DNS_SLEEP = 20,
// CobaltStrike version >= 3.5 (22 Sept, 2016)
SETTING_SSH_HOST = 21,
SETTING_SSH_PORT = 22,
SETTING_SSH_USERNAME = 23,
SETTING_SSH_PASSWORD = 24,
SETTING_SSH_KEY = 25,
SETTING_C2_VERB_GET = 26,
SETTING_C2_VERB_POST = 27,
SETTING_C2_CHUNK_POST = 28,
SETTING_SPAWNTO_X86 = 29,
SETTING_SPAWNTO_X64 = 30,
// CobaltStrike version >= 3.6 (8 Dec, 2016)
SETTING_CRYPTO_SCHEME = 31,
// CobaltStrike version >= 3.7 (15 Mar, 2016)
SETTING_PROXY_CONFIG = 32,
SETTING_PROXY_USER = 33,
SETTING_PROXY_PASSWORD = 34,
SETTING_PROXY_BEHAVIOR = 35,
// CobaltStrike version >= 3.8 (23 May 2017)
// DEPRECATED_SETTING_INJECT_OPTIONS = 36,
// Renamed from DEPRECATED_SETTING_INJECT_OPTIONS in CobaltStrike 4.5
SETTING_WATERMARKHASH = 36,
// CobaltStrike version >= 3.9 (Sept 26, 2017)
SETTING_WATERMARK = 37,
// CobaltStrike version >= 3.11 (April 9, 2018)
SETTING_CLEANUP = 38,
// CobaltStrike version >= 3.11 (May 24, 2018)
SETTING_CFG_CAUTION = 39,
// CobaltStrike version >= 3.12 (Sept 6, 2018)
SETTING_KILLDATE = 40,
SETTING_GARGLE_NOOK = 41, // https://www.youtube.com/watch?v=nLTgWdXrx3U
SETTING_GARGLE_SECTIONS = 42,
SETTING_PROCINJ_PERMS_I = 43,
SETTING_PROCINJ_PERMS = 44,
SETTING_PROCINJ_MINALLOC = 45,
SETTING_PROCINJ_TRANSFORM_X86 = 46,
SETTING_PROCINJ_TRANSFORM_X64 = 47,
SETTING_PROCINJ_ALLOWED = 48, // Deprecated since Cobalt Strike 4.7
SETTING_PROCINJ_BOF_REUSE_MEM = 48, // Introduced in Cobalt Strike 4.7
// CobaltStrike version >= 3.13 (Jan 2, 2019)
SETTING_BINDHOST = 49,
// CobaltStrike version >= 3.14 (May 4, 2019)
SETTING_HTTP_NO_COOKIES = 50,
SETTING_PROCINJ_EXECUTE = 51,
SETTING_PROCINJ_ALLOCATOR = 52,
SETTING_PROCINJ_STUB = 53, // .self = MD5(cobaltstrike.jar)
// CobaltStrike version >= 4.0 (Dec 5, 2019)
SETTING_HOST_HEADER = 54,
SETTING_EXIT_FUNK = 55,
// CobaltStrike version >= 4.1 (June 25, 2020)
SETTING_SSH_BANNER = 56,
SETTING_SMB_FRAME_HEADER = 57,
SETTING_TCP_FRAME_HEADER = 58,
// CobaltStrike version >= 4.2 (Nov 6, 2020)
SETTING_HEADERS_REMOVE = 59,
// CobaltStrike version >= 4.3 (Mar 3, 2021)
SETTING_DNS_BEACON_BEACON = 60,
SETTING_DNS_BEACON_GET_A = 61,
SETTING_DNS_BEACON_GET_AAAA = 62,
SETTING_DNS_BEACON_GET_TXT = 63,
SETTING_DNS_BEACON_PUT_METADATA = 64,
SETTING_DNS_BEACON_PUT_OUTPUT = 65,
SETTING_DNSRESOLVER = 66,
SETTING_DOMAIN_STRATEGY = 67,
SETTING_DOMAIN_STRATEGY_SECONDS = 68,
SETTING_DOMAIN_STRATEGY_FAIL_X = 69,
SETTING_DOMAIN_STRATEGY_FAIL_SECONDS = 70,
// CobaltStrike version >= 4.5 (Dec 14, 2021)
SETTING_MAX_RETRY_STRATEGY_ATTEMPTS = 71,
SETTING_MAX_RETRY_STRATEGY_INCREASE = 72,
SETTING_MAX_RETRY_STRATEGY_DURATION = 73,
// CobaltStrike version >= 4.7 (Aug 17, 2022)
SETTING_MASKED_WATERMARK = 74,
// CobaltStrike version >= 4.9 (Sep 19, 2023)
SETTING_DATA_STORE_SIZE = 76,
// CobaltStrike version >= 4.10 (Jul 16, 2024)
SETTING_HTTP_DATA_REQUIRED = 77,
SETTING_BEACON_GATE = 78,
};
enum DeprecatedBeaconSetting: uint16 {
SETTING_KILLDATE_YEAR = 16,
SETTING_INJECT_OPTIONS = 36,
};
enum TransformStep: uint32 {
APPEND = 1,
PREPEND = 2,
BASE64 = 3,
PRINT = 4,
PARAMETER = 5,
HEADER = 6,
BUILD = 7,
NETBIOS = 8,
_PARAMETER = 9,
_HEADER = 10,
NETBIOSU = 11,
URI_APPEND = 12,
BASE64URL = 13,
STRREP = 14,
MASK = 15,
// CobaltStrike version >= 4.0 (Dec 5, 2019)
_HOSTHEADER = 16,
};
enum SettingsType: uint16 {
TYPE_NONE = 0,
TYPE_SHORT = 1,
TYPE_INT = 2,
TYPE_PTR = 3,
};
struct Setting {
BeaconSetting index; // uint16
SettingsType type; // uint16
uint16 length; // uint16
char value[length];
};
flag BeaconProtocol {
http = 0,
dns = 1,
smb = 2,
tcp = 4,
https = 8,
bind = 16
};
flag ProxyServer {
MANUAL = 0,
DIRECT = 1,
PRECONFIG = 2,
MANUAL_CREDS = 4
};
enum CryptoScheme: uint16 {
CRYPTO_LICENSED_PRODUCT = 0,
CRYPTO_TRIAL_PRODUCT = 1
};
enum InjectAllocator: uint8 {
VirtualAllocEx = 0,
NtMapViewOfSection = 1,
};
enum InjectExecutor: uint8 {
CreateThread = 1,
SetThreadContext = 2,
CreateRemoteThread = 3,
RtlCreateUserThread = 4,
NtQueueApcThread = 5,
CreateThread_ = 6,
CreateRemoteThread_ = 7,
NtQueueApcThread_s = 8
};
enum BofAllocator: uint16 {
VirtualAlloc = 0,
MapViewOfFile = 1,
HeapAlloc = 2,
};
// https://hstechdocs.helpsystems.com/manuals/cobaltstrike/current/userguide/content/topics/beacon-gate.htm
struct BeaconGateOptions {
uint8 InternetOpenA; // commms
uint8 InternetConnectA;
uint8 VirtualAlloc; // core
uint8 VirtualAllocEx;
uint8 VirtualProtect;
uint8 VirtualProtectEx;
uint8 VirtualFree;
uint8 GetThreadContext;
uint8 SetThreadContext;
uint8 ResumeThread;
uint8 CreateThread;
uint8 CreateRemoteThread;
uint8 OpenProcess;
uint8 OpenThread;
uint8 CloseHandle;
uint8 CreateFileMappingA;
uint8 MapViewOfFile;
uint8 UnmapViewOfFile;
uint8 VirtualQuery;
uint8 DuplicateHandle;
uint8 ReadProcessMemory;
uint8 WriteProcessMemory;
uint8 ExitThread; // cleanup
};
"""
[docs]
cs_struct = cstruct.cstruct(endian=">")
cs_struct.load(CS_DEF)
[docs]
BeaconSetting = cs_struct.BeaconSetting
[docs]
DeprecatedBeaconSetting = cs_struct.DeprecatedBeaconSetting
[docs]
SettingsType = cs_struct.SettingsType
[docs]
Setting = cs_struct.Setting
[docs]
BeaconProtocol = cs_struct.BeaconProtocol
[docs]
CryptoScheme = cs_struct.CryptoScheme
[docs]
ProxyServer = cs_struct.ProxyServer
[docs]
InjectAllocator = cs_struct.InjectAllocator
[docs]
InjectExecutor = cs_struct.InjectExecutor
[docs]
BofAllocator = cs_struct.BofAllocator
[docs]
BeaconGateOptions = cs_struct.BeaconGateOptions
[docs]
DEFAULT_XOR_KEYS: List[bytes] = [b"\x69", b"\x2e", b"\x00"]
""" Default XOR keys used by Cobalt Strike for obfuscating Beacon config bytes """
[docs]
def find_beacon_config_bytes(fh: BinaryIO, xorkey: bytes) -> Iterator[bytes]:
r"""Find and yield (possible) Cobalt Strike configuration bytes from file `fh` using `xorkey` (eg: b"\x69").
This is done by scraping the file `fh` for XOR encoded configuration blocks.
A beacon configuration block always (unless modified) starts with::
Setting(index=SETTING_PROTOCOL, type=TYPE_SHORT, length=0x2)
# which translates to the following bytes
b"\x00\x01\x00\x01\x00\x02\x00"
These bytes are used in conjunction with the XOR key for finding the (potential) start of a configuration block.
Args:
fh: file object
xorkey: XOR key (as bytes)
Yields:
Beacon configuration bytes (4096 bytes), in deobfuscated (un-XOR'd) form.
"""
# This is the maximum size for the beacon config and is also padded as such
PATCH_SIZE = 4096
# This is the default Beacon config starting bytes (unless it's modified)
CONFIG_HEADER = b"\x00\x01\x00\x01\x00\x02\x00"
xorred_config_block = xor(CONFIG_HEADER, xorkey)
for pos in iter_find_needle(fh, xorred_config_block, start_offset=0):
fh.seek(pos)
data = fh.read(PATCH_SIZE)
logger.debug(f"Found CONFIG_HEADER using xorkey: 0x{xorkey.hex()}")
yield xor(data, xorkey)
[docs]
def iter_beacon_config_blocks(
fobj: BinaryIO, xor_keys=None, xordecode=True, all_xor_keys=False
) -> Iterator[Tuple[bytes, dict]]:
"""Yield tuple with found Beacon `config_block_bytes` from file `fobj` and `extra_info` dict
It always start seeking from the beginning of `fobj`. Side effects: file handle position due to seeking
The `extra_info` dictionary holds some metadata such as if the `fobj` was xorencoded and which xorkey was used.
Args:
xor_keys: list XOR keys (as bytes), defaults to: :attr:`DEFAULT_XOR_KEYS` if not specified.
xordecode: If ``True`` it will also try to `XorDecode` the file object.
all_xor_keys: Try ALL single-byte XOR keys if no beacon config is found using the default keys.
Yields:
Tuple as ``(config_block_bytes, extra_info_dict)``
-- `extra_info` dict contains: ``{"xorkey": bytes, "xorencoded": bool}``
"""
found = False
xor_keys = xor_keys or DEFAULT_XOR_KEYS
logger.debug(f"xor_keys: {xor_keys!r}")
# Try XorEncoded files first as they are more common
if not found and xordecode:
try:
fxor = cast(BinaryIO, XorEncodedFile.from_file(fobj))
for xorkey in xor_keys:
for config_block in find_beacon_config_bytes(fxor, xorkey):
found = True
yield config_block, {"xorkey": xorkey, "xorencoded": True}
except ValueError:
pass
# Try finding config block without XorEncoding
if not found:
for xorkey in xor_keys:
for config_block in find_beacon_config_bytes(fobj, xorkey):
found = True
yield config_block, {"xorkey": xorkey, "xorencoded": False}
# Retry with left over xor keys if specified
if not found and all_xor_keys:
logger.debug("config_block not found, trying all xor keys...")
if xordecode:
try:
fxor = XorEncodedFile.from_file(fobj)
except ValueError:
fxor = fobj
# Determine left over xor keys
left_xor_keys = make_byte_list(exclude=xor_keys)
# Determine most common bytes in the (xordecoded) file
bytes_counter = collections.Counter()
for chunk in iter(functools.partial(fxor.read, io.DEFAULT_BUFFER_SIZE), b""):
fourgrams = grouper(chunk, n=4, fillvalue=0)
bytes_counter.update(gram[0] for gram in fourgrams if gram[0] == gram[1] == gram[2] == gram[3])
most_common_bytes = [p8(x[0]) for x in bytes_counter.most_common()]
# Sort left xor keys by most common bytes first
left_xor_keys.sort(key=lambda x: most_common_bytes.index(x) if x in most_common_bytes else 256)
logger.debug(f"left xor keys to try: {left_xor_keys}")
yield from iter_beacon_config_blocks(fobj, left_xor_keys, xordecode=xordecode, all_xor_keys=False)
[docs]
def make_byte_list(exclude: List[bytes] = None) -> List[bytes]:
"""Return all single-byte bytes as an ordered list, excluding `exclude` bytes."""
return sorted({p8(x) for x in range(256)} - set(exclude or []))
[docs]
def iter_settings(fobj: Union[bytes, BinaryIO]) -> Iterator["Setting"]:
"""Returns an iterator yielding :class:`Setting` objects by reading data from `fobj`
The file position will be at the end of the Beacon config after parsing is done.
This can be used to determine the exact size of the Beacon configuration block.
Some edge cases are also handled:
- User-Agent string that exceeds the Setting length.
- Deprecated setting SETTING_INJECT_OPTIONS
Args:
fobj: bytes or file-like object with Beacon configuration data
Yields:
:class:`Setting` objects
"""
if isinstance(fobj, bytes):
fobj = io.BytesIO(fobj)
while True:
peek = fobj.read(2)[:2]
if peek == b"\x00\x00":
# end of beacon config
break
try:
fobj.seek(-2, io.SEEK_CUR)
setting = Setting(fobj)
except EOFError:
break
if setting.index == BeaconSetting.SETTING_USERAGENT:
# Handle cases where User-Agent is too long in some configs
# eg: fcece52fd030ca66043ae29af2116a79
if setting.length == 0x80:
if len(setting.value.rstrip(b"\x00")) >= 0x80:
while True:
x = fobj.read(1)
if x == b"\x00":
fobj.seek(-1, io.SEEK_CUR)
break
setting.value += x
elif setting.index == BeaconSetting.SETTING_WATERMARKHASH:
# Handle deprecated setting INJECT_OPTIONS -> WATERMARKHASH
# We can identify the difference using TYPE_SHORT vs TYPE_PTR.
if setting.type == SettingsType.TYPE_SHORT:
setting.index = DeprecatedBeaconSetting.SETTING_INJECT_OPTIONS
yield setting
[docs]
def parse_recover_binary(program: bytes) -> List[Tuple[str, Union[int, bool]]]:
"""Parse ``SETTING_C2_RECOVER`` (`.http-get.server.output`) data"""
rsteps: List[Tuple[str, Union[int, bool]]] = []
p = io.BytesIO(program)
while True:
d = p.read(4)
if not d:
break
step = u32be(d)
if step == TransformStep.APPEND:
length = u32be(p.read(4))
rsteps.append(("append", length))
elif step == TransformStep.PREPEND:
length = u32be(p.read(4))
rsteps.append(("prepend", length))
elif step == TransformStep.BASE64:
rsteps.append(("base64", True))
elif step == TransformStep.PRINT:
rsteps.append(("print", True))
elif step == TransformStep.NETBIOS:
rsteps.append(("netbios", True))
elif step == TransformStep.NETBIOSU:
rsteps.append(("netbiosu", True))
elif step == TransformStep.BASE64URL:
rsteps.append(("base64url", True))
elif step == TransformStep.MASK:
rsteps.append(("mask", True))
elif step == 0:
break
else:
logger.error("Unknown recover step {}".format(step))
return rsteps
[docs]
def parse_execute_list(data: bytes) -> List[str]:
"""Parse ``SETTING_PROCINJ_EXECUTE`` (`.process-inject.execute`) data"""
ret: List[str] = []
p = io.BytesIO(data)
while True:
d = p.read(1)
if not d or d == b"\x00":
break
inject = InjectExecutor(d)
if inject in (InjectExecutor.CreateThread_, InjectExecutor.CreateRemoteThread_):
s4 = u16be(p.read(2))
length = u32be(p.read(4))
s2 = p.read(length).rstrip(b"\x00")
length = u32be(p.read(4))
s3 = p.read(length).rstrip(b"\x00")
s = "{}!{}".format(s2.decode(), s3.decode())
if s4:
s += "+0x{:x}".format(s4)
ret.append('{} "{}"'.format(inject.name.rstrip("_"), s))
else:
ret.append(inject.name)
return ret
[docs]
def parse_gargle(data: bytes) -> list:
"""Parse ``SETTING_GARGLE_SECTIONS`` (`.stage.{sleep_mask,obfuscate,userwx}`) data"""
addresses = []
p = io.BytesIO(data)
while True:
d = p.read(4)
if not d:
break
start = u32(d)
end = u32(p.read(4))
# addresses.append((x1, x2))
# addresses.append((hex(x1), hex(x2)))
# value = f"sectionAddress={x1:x}, sectionEnd={x2:x}"
if (start, end) != (0, 0):
value = f"0x{start:x}-0x{end:x}"
addresses.append(value)
return addresses
[docs]
def parse_pivot_frame(data: bytes) -> bytes:
"""Parse ``SETTING_{TCP,SMB}_FRAME_HEADER`` (`.{tcp,smb}_frame_header`) data"""
p = io.BytesIO(data)
length = u16be(p.read(2))
return p.read(length - 4)
[docs]
def parse_beacon_gate(data: bytes) -> BeaconGateOptions:
"""Parse ``SETTING_BEACON_GATE`` (`.stage.beacon_gate`) data"""
return BeaconGateOptions(data)
[docs]
def beacon_gate_options_string(bgo: BeaconGateOptions) -> list[str]:
"""Return the enabled BeaconGate WinAPI's as a list of strings"""
options = {k for k, v in bgo.__values__.items() if v}
comms = {"InternetOpenA", "InternetConnectA"}
core = {
"VirtualAlloc",
"VirtualAllocEx",
"VirtualProtect",
"VirtualProtectEx",
"VirtualFree",
"GetThreadContext",
"SetThreadContext",
"ResumeThread",
"CreateThread",
"CreateRemoteThread",
"OpenProcess",
"OpenThread",
"CloseHandle",
"CreateFileMappingA",
"MapViewOfFile",
"UnmapViewOfFile",
"VirtualQuery",
"DuplicateHandle",
"ReadProcessMemory",
"WriteProcessMemory",
}
cleanup = {"ExitThread"}
ret = []
if options.issuperset(comms | core | cleanup):
ret.append("All")
options -= comms | core | cleanup
if options.issuperset(comms):
ret.append("Comms")
options -= comms
if options.issuperset(core):
ret.append("Core")
options -= core
if options.issuperset(cleanup):
ret.append("Cleanup")
options -= cleanup
ret.extend(options)
return ret
[docs]
def sha256sum_pubkey(der_data: bytes) -> str:
"""Return the SHA-256 digest of `der_data`"""
return hashlib.sha256(der_data.rstrip(b"\x00")).hexdigest()
[docs]
def null_terminated_bytes(data: bytes) -> bytes:
r"""Return null terminated `data` as bytes.
>>> null_terminated_bytes(b"Hello World\x00\x00Foobar\x00\x00")
b'Hello World'
>>> null_terminated_bytes(b"foo\xffbar\x00\x00\x00baz\x00")
b'foo\xffbar'
"""
a, _, _ = data.partition(b"\x00")
return a
[docs]
def null_terminated_str(data: bytes) -> str:
r"""Return null terminated `data` as string. Non ascii characters are ignored.
>>> null_terminated_str(b"Hello World\x00\x00foo bar\x00\x00")
'Hello World'
>>> null_terminated_str(b"Goodbye\xffPlanet\x00\x00")
'GoodbyePlanet'
"""
return null_terminated_bytes(data).decode("latin-1", "ignore")
[docs]
SETTING_TO_PRETTYFUNC: Dict[BeaconSetting, Callable] = {
BeaconSetting.SETTING_PROCINJ_STUB: lambda x: x.hex(),
BeaconSetting.SETTING_SPAWNTO: lambda x: x.hex(),
BeaconSetting.SETTING_C2_RECOVER: parse_recover_binary,
BeaconSetting.SETTING_C2_REQUEST: parse_transform_binary,
BeaconSetting.SETTING_C2_POSTREQ: functools.partial(parse_transform_binary, build="id"),
BeaconSetting.SETTING_PROCINJ_EXECUTE: parse_execute_list,
BeaconSetting.SETTING_PROCINJ_TRANSFORM_X86: parse_process_injection_transform_steps,
BeaconSetting.SETTING_PROCINJ_TRANSFORM_X64: parse_process_injection_transform_steps,
BeaconSetting.SETTING_GARGLE_SECTIONS: parse_gargle,
BeaconSetting.SETTING_TCP_FRAME_HEADER: parse_pivot_frame,
BeaconSetting.SETTING_SMB_FRAME_HEADER: parse_pivot_frame,
BeaconSetting.SETTING_DOMAINS: null_terminated_str,
BeaconSetting.SETTING_HOST_HEADER: null_terminated_str,
BeaconSetting.SETTING_C2_VERB_GET: null_terminated_str,
BeaconSetting.SETTING_C2_VERB_POST: null_terminated_str,
BeaconSetting.SETTING_PIPENAME: null_terminated_str,
BeaconSetting.SETTING_SPAWNTO_X86: null_terminated_str,
BeaconSetting.SETTING_SPAWNTO_X64: null_terminated_str,
BeaconSetting.SETTING_USERAGENT: null_terminated_str,
BeaconSetting.SETTING_SUBMITURI: null_terminated_str,
# BeaconSetting.SETTING_PUBKEY: lambda x: x.rstrip(b"\x00"),
BeaconSetting.SETTING_PUBKEY: sha256sum_pubkey,
BeaconSetting.SETTING_DNS_BEACON_BEACON: null_terminated_str,
BeaconSetting.SETTING_DNS_BEACON_GET_A: null_terminated_str,
BeaconSetting.SETTING_DNS_BEACON_GET_AAAA: null_terminated_str,
BeaconSetting.SETTING_DNS_BEACON_GET_TXT: null_terminated_str,
BeaconSetting.SETTING_DNS_BEACON_PUT_METADATA: null_terminated_str,
BeaconSetting.SETTING_DNS_BEACON_PUT_OUTPUT: null_terminated_str,
BeaconSetting.SETTING_DNSRESOLVER: null_terminated_str,
BeaconSetting.SETTING_DNS_IDLE: lambda x: str(ipaddress.IPv4Address(x)),
BeaconSetting.SETTING_WATERMARKHASH: lambda x: null_terminated_bytes(x) if isinstance(x, bytes) else x,
BeaconSetting.SETTING_MASKED_WATERMARK: lambda x: x.hex(),
BeaconSetting.SETTING_BOF_ALLOCATOR: lambda x: BofAllocator(x).name,
BeaconSetting.SETTING_BEACON_GATE: lambda x: beacon_gate_options_string(parse_beacon_gate(x)),
# BeaconSetting.SETTING_PROTOCOL: lambda x: BeaconProtocol(x).name,
# BeaconSetting.SETTING_CRYPTO_SCHEME: lambda x: CryptoScheme(x).name,
# BeaconSetting.SETTING_PROXY_BEHAVIOR: lambda x: ProxyServer(x).name,
}
"""BeaconSetting enum to pretty function mapping"""
[docs]
class BeaconConfig:
"""A :class:`BeaconConfig` object represents a single Beacon configuration
It holds configuration data, parsed settings and other metadata of a Cobalt Strike Beacon and provides useful
methods and properties for accessing the Beacon settings. It does *not* contain the Beacon payload data itself.
It can be directly instantiated using configuration data. Otherwise, use the following constructors:
- :meth:`BeaconConfig.from_file`
- :meth:`BeaconConfig.from_path`
- :meth:`BeaconConfig.from_bytes`
The **from_** constructors automatically tries to extract the configuration data (first candidate only) and also
handles `xorencoded` payloads and `XOR` decoding of obfuscated configuration blocks that is common
with Cobalt Strike.
"""
def __init__(self, config_block: bytes) -> None:
[docs]
self.config_block: bytes = config_block
""" Raw beacon configuration block bytes """
[docs]
self.settings_tuple = tuple(iter_settings(config_block))
""" Tuple containing the `Setting` objects parsed from `config_block` """
[docs]
self.xorkey: Optional[bytes] = None
""" XOR key that was used to obfuscate the configuration block, ``None`` if unknown. """
[docs]
self.xorencoded: bool = False
""" ``True`` if the beacon was xorencoded, otherwise ``False`` """
[docs]
self.pe_export_stamp: Optional[int] = None
""" PE export timestamp, ``None`` if unknown. """
[docs]
self.pe_compile_stamp: Optional[int] = None
""" PE compile timestamp, ``None`` if unknown. """
[docs]
self.architecture: Optional[str] = None
""" PE architecture, ``"x86"`` or ``"x64"`` and ``None`` if unknown. """
[docs]
self.guardrails: Optional[GuardrailMetadata] = None
""" Guardrails metadata, ``None`` if not available. """
# Used for caching
[docs]
self._settings: Optional[Mapping[str, Any]] = None
[docs]
self._settings_by_index: Optional[Mapping[int, Any]] = None
[docs]
self._raw_settings: Optional[Mapping[str, Any]] = None
[docs]
self._raw_settings_by_index: Optional[Mapping[int, Any]] = None
@classmethod
[docs]
def from_file(cls, fobj: BinaryIO, xor_keys: List[bytes] = None, all_xor_keys: bool = False) -> "BeaconConfig":
"""Create a :class:`BeaconConfig` from file object, or raises ValueError if no beacon config is found.
Args:
fobj: file-like object
xor_keys: override the default `XOR` keys (as bytes) when specified. Default ``None``.
all_xor_keys: if ``True``, it will try ALL single-byte `XOR` keys if the defaults don't work
Returns:
:class:`BeaconConfig`
Raises:
ValueError: If no valid beacon configuration was found
"""
for config_block, extra_info in iter_beacon_config_blocks(fobj, xor_keys=xor_keys, all_xor_keys=all_xor_keys):
bconfig = cls(config_block)
# Set extra metadata
bconfig.xorkey = extra_info["xorkey"]
bconfig.xorencoded = extra_info["xorencoded"]
# Try to extract some PE artifacts
try:
fh = XorEncodedFile.from_file(fobj) if bconfig.xorencoded else fobj
except ValueError:
fh = fobj
bconfig.pe_compile_stamp, bconfig.pe_export_stamp = pe.find_compile_stamps(fh)
bconfig.architecture = pe.find_architecture(fh)
# Return the first found beacon config.
return bconfig
# Try finding Beacon config protected with Guardrails
try:
fxor = XorEncodedFile.from_file(fobj)
except ValueError:
fxor = fobj
for grconfig in iter_guardrail_configs_with_beacon(fxor):
if not grconfig.unmasked_beacon_config:
continue
bconfig = cls(grconfig.unmasked_beacon_config)
bconfig.guardrails = grconfig
bconfig.xorkey = grconfig.beacon_xor_key
bconfig.pe_compile_stamp, bconfig.pe_export_stamp = pe.find_compile_stamps(fxor)
bconfig.architecture = pe.find_architecture(fxor)
return bconfig
raise ValueError("No valid Beacon configuration found")
@classmethod
[docs]
def from_path(
cls,
path: Union[str, PathLike],
xor_keys: List[bytes] = None,
all_xor_keys: bool = False,
) -> "BeaconConfig":
"""Create a :class:`BeaconConfig` from path, or raises ValueError if no beacon config is found.
Args:
path: path to file on disk
xor_keys: override the default `XOR` keys (as bytes) when specified. Default ``None``.
all_xor_keys: if ``True`` it will try ALL single-byte `XOR` keys if the defaults don't work
Returns:
:class:`BeaconConfig`
Raises:
ValueError: If no valid beacon configuration was found
"""
with open(path, "rb") as fobj:
return cls.from_file(fobj, xor_keys=xor_keys, all_xor_keys=all_xor_keys)
@classmethod
[docs]
def from_bytes(
cls,
data: bytes,
xor_keys: List[bytes] = None,
all_xor_keys: bool = False,
) -> "BeaconConfig":
"""Create a :class:`BeaconConfig` from bytes, or raises ValueError if no beacon config is found.
Args:
data: configuration bytes
xor_keys: override the default `XOR` keys when specified. Default ``None``.
all_xor_keys: if ``True`` it will try ALL single-byte `XOR` keys if the defaults don't work
Returns:
:class:`BeaconConfig`
Raises:
ValueError: If no valid beacon configuration was found
"""
return cls.from_file(io.BytesIO(data), xor_keys=xor_keys, all_xor_keys=all_xor_keys)
[docs]
def __repr__(self) -> str:
return f"<BeaconConfig {self.domains}>"
@property
[docs]
def setting_enums(self) -> list:
"""List of BeaconSetting `enum` values in the order of appearance within the Beacon configuration.
Example value::
[1, 2, 3, 4, 5, 7, ..., 45, 46, 47, 53, 51, 52]
"""
return [s.index.value for s in self.settings_tuple]
@property
[docs]
def max_setting_enum(self) -> int:
"""The maximum BeaconSetting `enum` value present in the Beacon configuration."""
return max(self.setting_enums)
[docs]
def settings_map(self, index_type="enum", pretty=False, parse=True) -> MappingProxyType:
"""Return a read-only settings mapping indexed by given `index_type`.
Args:
index_type: index type of the dictionary, can be one of:
- ``name``: indexed by `BeaconSetting` name (str)
- ``const``: indexed by `BeaconSetting` constant (int)
- ``enum``: indexed by `BeaconSetting` enum (enum object).
pretty: if `True`, apply pretty functions on the values.
parse: if `True`, the raw bytes of `TYPE_SHORT` and `TYPE_INT` values are converted to int.
Returns:
OrderedDict
"""
settings = OrderedDict()
for setting in self.settings_tuple:
val = setting.value
if index_type == "name":
key = setting.index.name or str(setting.index).replace(".", "_")
elif index_type == "const":
key = setting.index.value
else:
key = setting.index
if parse or pretty:
if setting.type == SettingsType.TYPE_SHORT:
val = u16be(val)
elif setting.type == SettingsType.TYPE_INT:
val = u32be(val)
if pretty:
pretty_func = SETTING_TO_PRETTYFUNC.get(setting.index)
if pretty_func:
val = pretty_func(val)
settings[key] = val
return MappingProxyType(settings)
@property
[docs]
def raw_settings(self) -> Mapping[str, Any]:
r"""Read-only Beacon settings mapping with raw values, indexed by `BeaconSetting` name.
The raw bytes of `TYPE_SHORT` and `TYPE_INT` values are converted to int.
Example value::
mappingproxy({
'SETTING_PROTOCOL': 8,
'SETTING_PORT': 443,
'SETTING_SLEEPTIME': 60000,
...
'SETTING_C2_VERB_POST': b'POST\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
'SETTING_PROCINJ_STUB': b'\x0c\xe2\xf5TD\xe4y5\x16\xb5\xaf\xe9g\xbe\x92U',
})
"""
if self._raw_settings is None:
self._raw_settings = self.settings_map(index_type="name")
return self._raw_settings
@property
[docs]
def raw_settings_by_index(self) -> Mapping[int, Any]:
r"""Read-only Beacon settings mapping with raw values, indexed by `BeaconSetting` constant.
The raw bytes of `TYPE_SHORT` and `TYPE_INT` values are converted to int.
Example value::
mappingproxy({
1: 8,
2: 443,
3: 60000,
...
27: b'POST\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
53: b'\x0c\xe2\xf5TD\xe4y5\x16\xb5\xaf\xe9g\xbe\x92U',
})
"""
if self._raw_settings_by_index is None:
self._raw_settings_by_index = self.settings_map(index_type="const")
return self._raw_settings_by_index
@property
[docs]
def settings(self) -> Mapping[str, Any]:
r"""Read-only Beacon settings mapping with human readable values, indexed by `BeaconSetting` name.
Example value::
mappingproxy({
'SETTING_PROTOCOL': 8,
'SETTING_PORT': 443,
'SETTING_SLEEPTIME': 60000,
...
'SETTING_C2_VERB_POST': 'POST',
'SETTING_PROCINJ_STUB': '0ce2f55444e4793516b5afe967be9255',
})
"""
if self._settings is None:
self._settings = self.settings_map(index_type="name", pretty=True)
return self._settings
@property
[docs]
def settings_by_index(self) -> Mapping[int, Any]:
r"""Read-only Beacon settings mapping with human readable values, indexed by `BeaconSetting` constant.
Example value::
mappingproxy({
1: 8,
2: 443,
3: 60000,
...
27: 'POST',
53: '0ce2f55444e4793516b5afe967be9255',
})
"""
if self._settings_by_index is None:
self._settings_by_index = self.settings_map(index_type="const", pretty=True)
return self._settings_by_index
@property
[docs]
def domain_uri_pairs(self) -> List[Tuple[str, str]]:
"""List of configured `(domain, uri)` pairs in the Beacon.
Example value::
[
('c1.example.com', '/__utm.gif'),
('c2.example.com', '/en_US/all.js'),
]
"""
domains = self.raw_settings.get("SETTING_DOMAINS")
if not isinstance(domains, bytes):
return []
return list(grouper(null_terminated_str(domains).split(","), 2))
@property
[docs]
def uris(self) -> List[str]:
"""List of configured Beacon URIs.
Example value::
['/__utm.gif', '/en_US/all.js']
"""
return list(dict.fromkeys(uri for (_domain, uri) in self.domain_uri_pairs))
@property
[docs]
def domains(self) -> List[str]:
"""List of configured Beacon domains.
Example value::
['c1.example.com', 'c2.example.com']
"""
return list(dict.fromkeys(domain for (domain, _uri) in self.domain_uri_pairs))
@property
[docs]
def submit_uri(self) -> Optional[str]:
"""The submit URI that the beacon uses for sending callback data.
Example value::
'/submit.php'
"""
return self.settings.get("SETTING_SUBMITURI", None)
@property
[docs]
def killdate(self) -> Optional[str]:
"""Normalized kill date as YYYY-mm-dd string or ``None`` if not defined in Beacon.
.. note::
The reason why the return type is a :class:`str` instead of a :class:`datetime.date` object is that
the configured `killdate` in the Beacon can be arbitrary. e.g. 9999-99-99
"""
s = self.settings
killdate = s.get("SETTING_KILLDATE", 0)
if killdate:
date_str = str(killdate)
year = int(date_str[:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
killdate = f"{year:02d}-{month:02d}-{day:02d}"
else:
killdate = None
year = s.get("SETTING_KILLDATE_YEAR", 0)
month = s.get("SETTING_KILLDATE_MONTH", 0)
day = s.get("SETTING_KILLDATE_DAY", 0)
if year and month and day:
killdate = f"{year:02d}-{month:02d}-{day:02d}"
return killdate
@property
[docs]
def protocol(self) -> Optional[str]:
"""The protocol the Beacon uses for communication, e.g. ``"http"``, ``"dns"``. ``None`` if unknown."""
protocol = self.raw_settings.get("SETTING_PROTOCOL", None)
if protocol is None:
return None
return BeaconProtocol(protocol).name
@property
[docs]
def port(self) -> Optional[int]:
"""The port the Beacon uses for communication, e.g. ``80``, ``443``. ``None`` if not defined in config."""
return self.raw_settings.get("SETTING_PORT", None)
@property
[docs]
def watermark(self) -> Optional[int]:
"""Beacon watermark (also known as customer or authorization id)."""
return self.raw_settings.get("SETTING_WATERMARK", None)
@property
[docs]
def is_trial(self) -> bool:
"""True if Beacon is a trial version (CRYPTO_TRIAL_PRODUCT). Otherwise, False."""
return self.raw_settings.get("SETTING_CRYPTO_SCHEME") == CryptoScheme.CRYPTO_TRIAL_PRODUCT
@property
[docs]
def version(self) -> BeaconVersion:
"""Deduced version of Cobalt Strike as :class:`~dissect.cobaltstrike.version.BeaconVersion` object.
The version is deduced from the Beacon's :attr:`pe_export_stamp` when available,
otherwise from :attr:`max_setting_enum`.
"""
if self.pe_export_stamp:
return BeaconVersion.from_pe_export_stamp(self.pe_export_stamp)
return BeaconVersion.from_max_setting_enum(self.max_setting_enum)
@property
[docs]
def public_key(self) -> bytes:
"""The RSA public key used by the Beacon in DER format."""
return self.raw_settings.get("SETTING_PUBKEY", b"").rstrip(b"\x00")
@property
[docs]
def sleeptime(self) -> Optional[int]:
"""The sleep time in milliseconds the Beacon uses between communication attempts."""
return self.raw_settings.get("SETTING_SLEEPTIME", None)
@property
[docs]
def jitter(self) -> Optional[int]:
"""The jitter in milliseconds the Beacon uses between communication attempts."""
return self.raw_settings.get("SETTING_JITTER", None)
[docs]
def build_parser():
import argparse
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("input", metavar="FILE", nargs="+", help="Beacon to dump")
parser.add_argument(
"-x",
"--xorkey",
action="append",
help="override default xor key(s) (default: -x 0x69 -x 0x2e -x 0x00)",
)
parser.add_argument(
"--default-xor-keys-only",
action="store_true",
help="only try the default xor keys instead of all possible ones",
)
parser.add_argument(
"-t",
"--type",
choices=["normal", "raw", "dumpstruct", "c2profile"],
default="normal",
help="output format",
)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="verbosity level (-v for INFO, -vv for DEBUG)",
)
return parser
@catch_sigpipe
[docs]
def main():
"""Entrypoint for beacon-dump."""
from . import c2profile, utils
parser = build_parser()
args = parser.parse_args()
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(len(levels) - 1, args.verbose)]
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
xor_keys = None
if args.xorkey:
xor_keys = tuple(utils.pack_be(int(x, 0)) for x in args.xorkey)
dumped = False
for fname in args.input:
logging.info("Processing: %r", fname)
try:
if fname in ("-", "/dev/stdin"):
with io.BytesIO(sys.stdin.buffer.read()) as fin:
config = BeaconConfig.from_file(fin, xor_keys=xor_keys, all_xor_keys=not args.default_xor_keys_only)
else:
config = BeaconConfig.from_path(fname, xor_keys=xor_keys, all_xor_keys=not args.default_xor_keys_only)
except ValueError:
print(f"{fname}: No beacon configuration found.", file=sys.stderr)
continue
dumped = True
if args.type == "raw":
for setting in config.settings_tuple:
print(setting)
elif args.type == "dumpstruct":
cstruct.hexdump(config.config_block)
print("-----")
for setting in config.settings_tuple:
cstruct.dumpstruct(setting)
print("-" * 10)
elif args.type == "normal":
settings = config.settings
for setting, value in settings.items():
print(f"{setting} = {value!r}")
if args.verbose >= 1:
print("-" * 50)
print(
"pe_export_stamp = {}, {}, {} - {}".format(
config.pe_export_stamp,
hex(config.pe_export_stamp),
time.ctime(config.pe_export_stamp),
config.version,
)
)
print(
"pe_compile_stamp = {}, {}, {}".format(
config.pe_compile_stamp,
hex(config.pe_compile_stamp),
time.ctime(config.pe_compile_stamp),
)
)
print(
"max_setting_enum = {} - {}".format(
config.max_setting_enum,
BeaconSetting(config.max_setting_enum),
)
)
print("beacon_version =", config.version)
if config.guardrails:
print("guardrail payload xor key =", config.guardrails.payload_xor_key)
print("guardrail options =", [s.option for s in config.guardrails.settings])
elif args.type == "c2profile":
profile = c2profile.C2Profile.from_beacon_config(config)
print(profile.as_text())
return 0 if dumped else 1
if __name__ == "__main__":
sys.exit(main())