"""
This module is responsible for working with Cobalt Strike C2 traffic.
"""
# Python imports
import base64
import hashlib
import hmac
import io
import logging
import random
# Typing imports
from typing import Dict, Iterator, List, NamedTuple, Optional, Tuple, Union, overload
from urllib.parse import parse_qsl, urlparse
# Pycryptodome imports
try:
from Crypto.Cipher import AES, PKCS1_v1_5
from Crypto.PublicKey import RSA
except ImportError:
raise ImportError(
"pycryptodome is required for Cobalt Strike C2 traffic parsing, install with `pip install pycryptodome`"
)
# flow.record imports
try:
from flow.record import Record, RecordDescriptor
except ImportError:
raise ImportError("flow.record is required for logging C2 packet records, install with `pip install flow.record`")
# Local imports
from dissect.cobaltstrike.beacon import BeaconConfig
from dissect.cobaltstrike.c_c2 import ( # noqa: F401
BeaconCallback,
BeaconCommand,
BeaconMetadata,
CallbackPacket,
TaskPacket,
c2struct,
)
from dissect.cobaltstrike.utils import (
namedtuple_reprlib_repr,
netbios_decode,
netbios_encode,
p32be,
xor,
)
"""Type TransformStep."""
[docs]
C2Packet = Union[BeaconMetadata, TaskPacket, CallbackPacket]
"""Type that is either a :class:`BeaconMetadata`, a :class:`TaskPacket` or a :class:`CallbackPacket`."""
[docs]
logger = logging.getLogger(__name__)
[docs]
class EncryptedPacket(NamedTuple):
"""Container to hold ciphertext and HMAC signature."""
[docs]
def dumps(self):
"""Return the EncryptedPacket as a bytes object with a size frame header.
``| size | ciphertext | signature |``
"""
payload = self.ciphertext + self.signature
return p32be(len(payload)) + payload
[docs]
def raise_for_signature(self, hmac_key: bytes):
"""
Args:
hmac_key: HMAC key to use for signature verification
Raises:
ValueError: if signature of the ciphertext is incorrect.
"""
signature = hmac.new(hmac_key, self.ciphertext, "sha256").digest()[:16]
if signature != self.signature:
raise ValueError(f"Invalid HMAC signature, expected {signature.hex()} got {self.signature.hex()}")
[docs]
class C2Data(NamedTuple):
"""Container for holding C2 data that is used for transform and recover steps."""
[docs]
output: Optional[bytes] = None
[docs]
id: Optional[bytes] = None
[docs]
class ServerC2Data(C2Data):
"""Container for holding recovered server-side C2Data."""
[docs]
def iter_encrypted_packets(self) -> Iterator[EncryptedPacket]:
"""Iterate over ``EncryptedPacket``, parsed from server-side `c2data.output` data.
For server-side data this is always one packet.
"""
data = self.output
if not data:
return
fobj = io.BytesIO(data)
ciphertext = fobj.read(len(data) - 16)
signature = fobj.read(16)
yield EncryptedPacket(ciphertext, signature)
[docs]
class ClientC2Data(C2Data):
"""Container for holding recovered client-side C2Data."""
[docs]
def iter_encrypted_packets(self) -> Iterator[EncryptedPacket]:
"""Iterate over ``EncryptedPacket``, parsed from client-side `c2data.output` data.
For client-side data this could be one or more packets.
"""
data = self.output
while data:
fobj = io.BytesIO(data)
size = c2struct.uint32(fobj)
ciphertext = fobj.read(size - 16)
signature = fobj.read(16)
data = fobj.read()
yield EncryptedPacket(ciphertext, signature)
[docs]
class HttpRequest(NamedTuple):
"""HTTP Request container."""
[docs]
params: Dict[bytes, bytes]
[docs]
class HttpResponse(NamedTuple):
"""HTTP Response container."""
[docs]
request: Optional[HttpRequest] = None
[docs]
class BeaconKeys(NamedTuple):
"""Helper container to hold beacon session keys (AES + HMAC)."""
[docs]
DEFAULT_AES_IV = b"abcdefghijklmnop"
[docs]
aes_key: Optional[bytes]
[docs]
hmac_key: Optional[bytes] = None
[docs]
iv: bytes = DEFAULT_AES_IV
@classmethod
[docs]
def from_aes_rand(cls, aes_rand: bytes, iv: bytes = DEFAULT_AES_IV) -> "BeaconKeys":
"""Create a :class:`BeaconKeys` instance from AES random bytes."""
aes_key, hmac_key = derive_aes_hmac_keys(aes_rand)
return cls(aes_key=aes_key, hmac_key=hmac_key, iv=iv)
@classmethod
[docs]
def enable_reprlib_c2():
"""Enables reprlib __repr__ for most of the namedtuple classes in this module."""
HttpRequest.__repr__ = namedtuple_reprlib_repr
HttpResponse.__repr__ = namedtuple_reprlib_repr
C2Data.__repr__ = namedtuple_reprlib_repr
[docs]
def c2packet_to_record(c2packet: C2Packet) -> Record:
"""Convert `c2packet` to a flow.record."""
fields = [("bytes", "raw_http")]
kv = dict(c2packet._values)
for field in c2packet._type.fields:
ftype = str(field.type)
if ftype.startswith("char"):
ftype = "bytes"
elif ftype == "uint8":
ftype = "varint"
elif ftype in ("BeaconCommand", "BeaconCallback", "BeaconMetadata"):
ftype = "string"
kv[field.name] = kv[field.name].name
elif field.name == "epoch":
ftype = "datetime"
elif field.name == "ip":
ftype = "net.ipaddress"
fields.append((ftype, field.name))
PacketDescriptor = RecordDescriptor(f"Beacon/{c2packet._type.name}", fields)
return PacketDescriptor(**kv)
[docs]
def parse_raw_http(data: bytes) -> Union[HttpRequest, HttpResponse]:
"""Parse a raw HTTP request/response bytes and returns a :class:`HttpRequest` or :class:`HttpResponse` accordingly.
Args:
data: raw HTTP request or response data bytes.
Returns:
Either a :class:`HttpRequest` or :class:`HttpResponse` object based on the data.
Raises:
ValueError: if it cannot be parsed as :class:`HttpRequest` or :class:`HttpResponse`.
"""
header_data, _, body = data.partition(b"\r\n\r\n")
first_line, _, header_data = header_data.partition(b"\r\n")
headers = {}
for header in header_data.split(b"\r\n"):
key, _, value = header.partition(b": ")
headers[key] = value
# HTTP/1.1 200 OK
if first_line.upper().startswith(b"HTTP/"):
parts = first_line.rstrip().split()
if len(parts) != 3:
raise ValueError(f"Error in parsing response status line: {first_line!r}")
_version, status, reason = parts
status_code = int(status.decode())
return HttpResponse(body=body, headers=headers, status=status_code, reason=reason)
# GET /uri HTTP/1.1
parts = first_line.rstrip().split()
if len(parts) != 3:
raise ValueError(f"Error in parsing request status line: {first_line!r}")
method, uri, _version = parts
# sanitize uri bytes for `urlparse()` to avoid possible decode errors
uri = uri.decode("ascii", errors="ignore").encode()
result = urlparse(uri)
uri = result.path
params = dict(parse_qsl(result.query))
return HttpRequest(method=method, body=body, headers=headers, uri=uri, params=params)
[docs]
class C2Http:
"""Class for decrypting and encrypting Cobalt Strike HTTP C2 traffic.
It requires to be initialized with a :class:`BeaconConfig` and one of the following *key* material:
* `aes_key` and optionally `hmac_key`
* `aes_rand`
* `rsa_private_key` (most preferred when available)
"""
def __init__(
self,
bconfig: BeaconConfig,
aes_key: Optional[bytes] = None,
hmac_key: Optional[bytes] = None,
aes_rand: Optional[bytes] = None,
rsa_private_key: Optional[RSA.RsaKey] = None,
verify_hmac=True,
) -> None:
self.bconfig = bconfig
if aes_rand and aes_key:
raise ValueError("Cannot specify both aes_rand and aes_key.")
if not any([aes_key, aes_rand, rsa_private_key]):
raise ValueError("One of the following arguments is required: aes_key, aes_rand, rsa_private_key")
self.aes_key = aes_key
self.hmac_key = hmac_key
self.verify_hmac = verify_hmac
if aes_rand:
self.aes_key, self.hmac_key = derive_aes_hmac_keys(aes_rand)
if self.aes_key is not None and len(self.aes_key) != 16:
raise ValueError(f"AES key must be 16 bytes, got: {self.aes_key!r}")
if self.hmac_key is not None and len(self.hmac_key) != 16:
raise ValueError(f"HMAC key must be 16 bytes, got: {self.hmac_key!r}")
self.pub = RSA.import_key(bconfig.public_key)
self.priv = rsa_private_key
if self.priv:
logger.debug("RSA Private Key: %r", self.priv)
logger.debug("RSA Public Key: %r", self.pub)
assert self.priv.n == self.pub.n, ValueError(
f"RSA PrivateKey does not match PublicKey pair, {self.priv.n:#x} != {self.pub.n:#x}"
)
if self.bconfig.is_trial:
raise ValueError("Trial beacons are not yet supported, please submit an issue")
# Get the different URIs used by the beacon for matching HTTP requests (note everything is in bytes)
self.submit_uri: bytes = bconfig.settings["SETTING_SUBMITURI"].encode()
self.submit_verb: bytes = bconfig.settings["SETTING_C2_VERB_POST"].encode()
self.get_uris: Tuple[bytes, ...] = tuple(uri.encode() for uri in bconfig.uris)
self.get_verb: bytes = bconfig.settings["SETTING_C2_VERB_GET"].encode()
# Load transform/recover steps from beacon config
self.transform_submit = HttpDataTransform(steps=bconfig.settings["SETTING_C2_POSTREQ"])
self.transform_get = HttpDataTransform(steps=bconfig.settings["SETTING_C2_REQUEST"])
self.transform_response = HttpDataTransform(
steps=bconfig.settings["SETTING_C2_RECOVER"], reverse=True, build="output"
)
# Used to map unencrypted metadata to decrypted BeaconMetadata
self.metadata_cache: Dict[bytes, BeaconMetadata] = {}
# Default decryption keys
self.beacon_keys = BeaconKeys(aes_key=self.aes_key, hmac_key=self.hmac_key)
[docs]
def iter_recover_http(
self, http: Union[bytes, HttpRequest, HttpResponse], keys: Optional[BeaconKeys] = None
) -> Iterator[C2Packet]:
"""Yield decrypted :class:`C2Packet` objects from given `http` object.
You can pass your own set of :class:`BeaconKeys` `keys` to use for decryption instead of the default
initialized ones. This can be useful if you are processing multiple Beacon sessions and do some sort of session
tracking outside this class.
Args:
http: A :class:`HttpRequest` or :class:`HttpResponse` object, or raw HTTP request or response bytes.
keys: Optional :class:`BeaconKeys` to use for decryption instead of current default keys.
Yields:
C2Packet: A :class:`C2Packet` object for each decrypted packet found in the HTTP request or response.
"""
http = parse_raw_http(http) if isinstance(http, bytes) else http
keys = keys or self.beacon_keys
transform = self.get_transform_for_http(http)
c2data = transform.recover(http)
# decrypt c2data.metadata, if available and we have a private key
if c2data.metadata and self.priv:
metadata = self.metadata_cache.get(c2data.metadata)
if metadata is None:
metadata = decrypt_metadata(c2data.metadata, self.priv)
self.metadata_cache[c2data.metadata] = metadata
# if we do not have an AES key or HMAC key yet, we derive it.
if not all([self.beacon_keys.aes_key, self.beacon_keys.hmac_key]):
aes_key, hmac_key = derive_aes_hmac_keys(metadata.aes_rand)
self.beacon_keys = BeaconKeys(aes_key, hmac_key)
logging.info("Derived AES + HMAC keys from %r", metadata)
yield metadata
# decrypt c2data.output, if any
for enc_packet in c2data.iter_encrypted_packets():
plaintext = decrypt_packet(enc_packet, verify=self.verify_hmac, **keys._asdict())
if isinstance(c2data, ClientC2Data):
yield CallbackPacket(plaintext)
elif isinstance(c2data, ServerC2Data):
yield TaskPacket(plaintext)
# ------------------
# Crypto functions
# ------------------
[docs]
def derive_aes_hmac_keys(aes_random: bytes) -> Tuple[bytes, bytes]:
"""Derive the AES and HMAC keys from the `aes_random` bytes.
Args:
aes_random: the bytes to derive the keys from
Returns:
Tuple of (aes_key, hmac_key)
"""
digest = hashlib.sha256(aes_random).digest()
return digest[:16], digest[16:]
[docs]
def pad(data: bytes, block_size: int = AES.block_size) -> bytes:
"""Mimics the padding behaviour in Cobalt Strike (which is to fill it with b'A').
Args:
data: the data to pad
block_size: the block size to use for padding
Returns:
The padded data
"""
to_pad = block_size - len(data) % block_size
return data + b"A" * to_pad
[docs]
def encrypt_data(data: bytes, aes_key: bytes, iv: bytes) -> bytes:
"""AES encrypt `data` with given `aes_key` and `iv`.
Args:
data: the data to encrypt
aes_key: the AES key to use
iv: the initialization vector to use
Returns:
The encrypted data as bytes
"""
if aes_key is None:
raise ValueError("Cannot encrypt without AES key")
cipher = AES.new(aes_key, AES.MODE_CBC, iv=iv)
return cipher.encrypt(pad(data))
[docs]
def decrypt_data(data: bytes, aes_key: bytes, iv: bytes) -> bytes:
"""AES decrypt the `data` with given `aes_key` and `iv` and return the decrypted bytes.
Args:
data: the encrypted data
aes_key: the AES key to use for decryption
iv: the AES IV to use for decryption
Returns:
The decrypted data as bytes
"""
if aes_key is None:
raise ValueError("Cannot decrypt without AES key")
cipher = AES.new(aes_key, AES.MODE_CBC, iv=iv)
# Beacon and Team Server does not unpad data
return cipher.decrypt(data)
[docs]
def decrypt_packet(
packet: EncryptedPacket,
aes_key: bytes,
hmac_key: Optional[bytes] = None,
iv: bytes = BeaconKeys.DEFAULT_AES_IV,
verify: bool = True,
) -> bytes:
"""Decrypt :class:`EncryptedPacket` `packet` and return the decrypted plaintext bytes.
If `hmac_key` is defined, the signature of the ciphertext is verified first before decrypting.
Args:
packet: the :class:`EncryptedPacket` to decrypt
aes_key: the AES key to use for decryption
hmac_key: the HMAC key to use for signature verification
iv: the AES IV to use for decryption
verify: whether to verify the HMAC signature of the ciphertext
Returns:
The decrypted plaintext bytes
"""
if verify:
if not hmac_key:
raise ValueError("Cannot verify signature without hmac_key.")
packet.raise_for_signature(hmac_key)
return decrypt_data(packet.ciphertext, aes_key, iv)
[docs]
def encrypt_packet(
plaintext: bytes, aes_key: bytes, hmac_key: bytes, iv: bytes = BeaconKeys.DEFAULT_AES_IV
) -> EncryptedPacket:
"""Encrypt `plaintext` bytes and return a :class:`EncryptedPacket`.
Args:
plaintext: the plaintext bytes to encrypt
aes_key: the AES key to use for encryption
hmac_key: the HMAC key to use for signature generation
iv: the AES IV to use for encryption
Returns:
The :class:`EncryptedPacket` containing the ciphertext and HMAC signature
"""
ciphertext = encrypt_data(plaintext, aes_key=aes_key, iv=iv)
signature = hmac.new(hmac_key, ciphertext, "sha256").digest()[:16]
return EncryptedPacket(ciphertext, signature)