This module is responsible for parsing and generating Cobalt Strike Malleable C2 profiles.
It uses the `lark-parser` library for parsing the syntax using the ``c2profile.lark`` grammar file.
from __future__ import annotations
import collections
import logging
import sys
from typing import TYPE_CHECKING
from os import PathLike
from typing import Any, List, Tuple, Union
from lark import Lark, Token, Tree
from lark.reconstruct import Reconstructor
from dissect.cobaltstrike.beacon import BeaconConfig, BeaconSetting
from dissect.cobaltstrike.utils import catch_sigpipe
logger = logging.getLogger(__name__)
c2profile_parser = Lark.open("c2profile.lark", parser="lalr", rel_to=__file__, maybe_placeholders=False)
def value_to_string(value: Union[str, bytes]) -> str:
"""Converts value to it's STRING Token value"""
if isinstance(value, bytes):
# we prepend a double quote to the bytes so repr() always escapes using single quote and strip it afterwards
value = repr(b'"' + value)[3:-1]
if isinstance(value, str):
# we escape double quotes, because we return it as a double quoted string value
value = value.replace('"', '\\"')
# we don't have to escape single quotes, as we return it as a double quoted value
value = value.replace("\\'", "'")
return f'"{value}"'
def string_token_to_bytes(token: Token) -> Union[Token, bytes]:
"""Convert a STRING Token value to it's native Python bytes value.
If the input is not of Token.type STRING it will return the original Token.
if isinstance(token, Token) and token.type == "STRING":
# strip the surrounding double quotes
bstring = token.value[1:-1]
buffer = []
# logger.debug(bstring)
it = StringIterator(bstring)
for c in it:
if c == "\\" and it.has_next():
next2 = next(it)
if next2 == "u":
if not it.has_next(4):
raise ValueError("not enough remaining chars for \\uXXXX")
_ = it.next(2)
hexstr = "".join(it.next(2))
buffer.append(int(hexstr, 16))
elif next2 == "x":
if not it.has_next(2):
raise ValueError("not enough remaining chars for \\xXX")
hexstr = "".join(it.next(2))
buffer.append(int(hexstr, 16))
elif next2 == "n":
elif next2 == "r":
elif next2 == "t":
elif next2 == "\\":
elif next2 == '"':
elif next2 == "'":
# logger.debug(f"DEBUG: {bytes(buffer)}")
return bytes(buffer)
return token
class StringIterator:
"""Helper class for iterating over characters in a string"""
def __init__(self, string: str) -> None:
self.buffer: List[str] = [chr(ord(c) & 0xFF) for c in string]
def has_next(self, count: int = 1) -> bool:
return self.index + count <= len(self.buffer)
def next(self, count: int) -> List[str]:
c = self.buffer[self.index : self.index + count]
self.index += count
return c
def __iter__(self):
self.index = 0
return self
def __next__(self):
if self.index < len(self.buffer):
c = self.buffer[self.index]
self.index += 1
return c
raise StopIteration
class ConfigBlock:
"""Base class for configuration blocks"""
__name__ = "ConfigBlock"
def __init__(self, **kwargs):
#: The AST tree
self.tree = Tree(self.__name__, [])
def init_kwargs(self, **kwargs):
for option, value in kwargs.items():
func = getattr(self, option, None)
if callable(func):
func(option, value)
elif isinstance(value, ConfigBlock):
self.set_config_block(option, value)
self.set_option(option, value)
def set_config_block(self, option, config_block):
self.tree.children.append(Tree(option, config_block.tree.children))
def set_non_empty_config_block(self, option, config_block):
if config_block.tree.children:
self.set_config_block(option, config_block)
def set_option(self, option, value):
value = value_to_string(value)
Tree("string", [Token("STRING", value)]),
def _pair(self, option, value):
for a, b in value:
a = value_to_string(a)
b = value_to_string(b)
Tree("string", [Token("STRING", a)]),
Tree("string", [Token("STRING", b)]),
def _enable(self, option, value):
self.tree.children.append(Tree(option, []))
def _parameter(self, option, value):
for param, val in value:
param = value_to_string(param)
val = value_to_string(val)
Tree("string", [Token("STRING", param)]),
Tree("string", [Token("STRING", val)]),
class HttpOptionsBlock(ConfigBlock):
"""`.http-{stager,get,post}.{client,server}` block"""
__name__ = "http_options"
parameter = ConfigBlock._pair
class HttpStagerBlock(ConfigBlock):
"""`.http-stager` block"""
__name__ = "http_stager"
class HttpConfigBlock(ConfigBlock):
"""`.http-config` block"""
__name__ = "http_config"
class StageBlock(ConfigBlock):
"""`.stage` block"""
class ProcessInjectBlock(ConfigBlock):
"""`.process-inject` block"""
__name__ = "process_inject"
class HttpGetBlock(ConfigBlock):
"""`.http-get` block"""
class HttpPostBlock(ConfigBlock):
"""`.http-post` block"""
__name__ = "http_post"
class PostExBlock(ConfigBlock):
"""`.post-ex` block"""
__name__ = "post_ex"
class DnsBeaconBlock(ConfigBlock):
"""`.dns-beacon` block"""
__name__ = "dns_beacon"
class HttpBeaconBlock(ConfigBlock):
"""`.http-beacon` block"""
__name__ = "http_beacon"
class ExecuteOptionsBlock(ConfigBlock):
"""`.process-inject.execute` block"""
__name__ = "ExecuteOptionsBlock"
createthread_special = ConfigBlock.set_option
createremotethread_special = ConfigBlock.set_option
createthread = ConfigBlock._enable
createremotethread = ConfigBlock._enable
ntqueueapcthread = ConfigBlock._enable
ntqueueapcthread_s = ConfigBlock._enable
rtlcreateuserthread = ConfigBlock._enable
setthreadcontext = ConfigBlock._enable
def from_execute_list(cls, execute_list=None):
block = cls()
for option in execute_list:
if isinstance(option, (list, tuple)):
option, value = option
if option == "CreateThread":
block.set_option("createthread_special", value)
elif option == "CreateRemoteThread":
block.set_option("createremotethread_special", value)
raise ValueError(f"Unknown option: {option}")
if option in [
block._enable(option.lower().replace("-", "_"), True)
raise ValueError(f"Unknown option: {option}")
return block
class BeaconGateBlock(ConfigBlock):
"""`.stage.beacon_gate` block"""
__name__ = "BeaconGateBlock"
none = ConfigBlock._enable
comms = ConfigBlock._enable
core = ConfigBlock._enable
cleanup = ConfigBlock._enable
all = ConfigBlock._enable
internetopena = ConfigBlock._enable
internetconnecta = ConfigBlock._enable
virtualalloc = ConfigBlock._enable
virtualallocex = ConfigBlock._enable
virtualprotect = ConfigBlock._enable
virtualprotextex = ConfigBlock._enable
virtualfree = ConfigBlock._enable
getthreadcontext = ConfigBlock._enable
setthreadcontext = ConfigBlock._enable
resumethread = ConfigBlock._enable
createthread = ConfigBlock._enable
createremotethread = ConfigBlock._enable
openprocess = ConfigBlock._enable
openthread = ConfigBlock._enable
closehandle = ConfigBlock._enable
createfilemappinga = ConfigBlock._enable
mapviewoffile = ConfigBlock._enable
unmapviewoffile = ConfigBlock._enable
virtualquery = ConfigBlock._enable
duplicatehandle = ConfigBlock._enable
readprocessmemory = ConfigBlock._enable
writeprocessmemory = ConfigBlock._enable
exitthread = ConfigBlock._enable
def from_beacon_gate_option_strings(cls, options: list[str]):
block = cls()
for option in options:
block._enable(option.lower(), True)
return block
class C2Profile(ConfigBlock):
"""A :class:`C2Profile` object represents a parsed Malleable C2 Profile
Besides loading C2 Profiles, it also provides methods for building a C2 Profile from scratch.
def __init__(self, **kwargs):
def set_option(self, option, value):
"""Sets a global option in the AST tree. E.g: ``set_option("jitter", "6000")``"""
value = value_to_string(value)
Token("OPTION", option),
Tree("string", [Token("STRING", value)]),
def from_path(cls, path: Union[str, PathLike]) -> "C2Profile":
"""Construct a :class:`C2Profile` from given path (path to a malleable C2 profile)"""
with open(path, "r") as f:
return cls.from_text(f.read())
def from_text(cls, source: str) -> "C2Profile":
"""Construct a :class:`C2Profile` from text (malleable C2 profile syntax)"""
profile = cls()
profile.tree = c2profile_parser.parse(source)
return profile
def from_beacon_config(cls, config: BeaconConfig) -> "C2Profile":
"""Construct a :class:`C2Profile` from a :class:`~dissect.cobaltstrike.beacon.BeaconConfig`"""
profile = cls()
http_get = HttpGetBlock()
http_post = HttpPostBlock()
stage = StageBlock()
c2_recover: List[Union[Tuple[str, Any], str]] = []
http_get_client = HttpOptionsBlock()
http_post_client = HttpOptionsBlock()
proc_inj = ProcessInjectBlock()
dns_beacon = DnsBeaconBlock()
http_beacon = HttpBeaconBlock()
# http_get_server = HttpOptionsBlock()
for setting, value in config.settings_by_index.items():
logger.debug(f"{setting} -> {value}")
if setting == BeaconSetting.SETTING_SLEEPTIME:
profile.set_option("sleeptime", value)
elif setting == BeaconSetting.SETTING_MAXGET:
# determined by .http-get.server.output", 1048576
elif setting == BeaconSetting.SETTING_JITTER:
profile.set_option("jitter", value)
elif setting == BeaconSetting.SETTING_DOMAINS:
uris = ", ".join(config.uris)
http_get.set_option("uri", uris)
elif setting == BeaconSetting.SETTING_SPAWNTO:
# profile.set_option("spawnto", value)
# deprecated
elif setting == BeaconSetting.SETTING_SPAWNTO_X86:
profile.set_option("spawnto_x86", value)
elif setting == BeaconSetting.SETTING_SPAWNTO_X64:
profile.set_option("spawnto_x64", value)
elif setting == BeaconSetting.SETTING_C2_VERB_GET:
http_get.set_option("verb", value)
elif setting == BeaconSetting.SETTING_C2_VERB_POST:
http_post.set_option("verb", value)
elif setting == BeaconSetting.SETTING_C2_CHUNK_POST:
# public boolean shouldChunkPosts()
# return !this.posts(".http-post.client.output");
elif setting == BeaconSetting.SETTING_CLEANUP:
stage.set_option("cleanup", value)
elif setting == BeaconSetting.SETTING_CFG_CAUTION:
elif setting == BeaconSetting.SETTING_USERAGENT:
profile.set_option("useragent", value)
elif setting == BeaconSetting.SETTING_SUBMITURI:
http_post.set_option("uri", value)
elif setting == BeaconSetting.SETTING_C2_RECOVER:
c2_recover = []
for k, v in value:
if v is True:
elif isinstance(v, int):
c2_recover.append((k, "X" * v))
c2_recover.append((k, v))
elif setting == BeaconSetting.SETTING_C2_REQUEST:
# .http-get.client
_build = None
headers = []
params = []
block_steps = collections.defaultdict(list)
for k, v in value:
if k in ("_HEADER", "_HOSTHEADER"):
v = v.decode("latin-1")
header, _, header_val = v.partition(": ")
headers.append((header, header_val))
elif k == "_PARAMETER":
v = v.decode("latin-1")
param, _, param_val = v.partition("=")
params.append((param, param_val))
elif k == "BUILD":
_build = v
elif v is True:
block_steps[_build].append((k.lower(), v.decode("latin-1")))
logger.debug(f"block_steps: {block_steps}")
if headers:
http_get_client._pair("header", headers)
if params:
http_get_client._pair("parameter", params)
for block, steps in block_steps.items():
http_get_client.set_config_block(block, DataTransformBlock(steps=steps))
elif setting == BeaconSetting.SETTING_C2_POSTREQ:
# .http-post.client
_build = None
headers = []
params = []
block_steps = collections.defaultdict(list)
for k, v in value:
if k in ("_HEADER", "_HOSTHEADER"):
v = v.decode("latin-1")
header, _, header_val = v.partition(": ")
headers.append((header, header_val))
elif k == "_PARAMETER":
v = v.decode("latin-1")
param, _, param_val = v.partition("=")
params.append((param, param_val))
elif k == "BUILD":
_build = v
elif v is True:
# log.debug(f"{k} -> {v}")
v = repr(v)[2:-1]
block_steps[_build].append((k.lower(), v))
logger.debug(f"block_steps: {block_steps}")
if headers:
http_post_client._pair("header", headers)
if params:
http_post_client._pair("parameter", params)
for block, steps in block_steps.items():
http_post_client.set_config_block(block, DataTransformBlock(steps=steps))
elif setting == BeaconSetting.SETTING_HOST_HEADER:
elif setting == BeaconSetting.SETTING_HTTP_NO_COOKIES:
elif setting == BeaconSetting.SETTING_PROXY_BEHAVIOR:
elif setting == BeaconSetting.SETTING_TCP_FRAME_HEADER and value:
profile.set_option("tcp_frame_header", repr(value)[2:-1])
elif setting == BeaconSetting.SETTING_SMB_FRAME_HEADER and value:
profile.set_option("smb_frame_header", repr(value)[2:-1])
elif setting == BeaconSetting.SETTING_EXIT_FUNK:
elif setting == BeaconSetting.SETTING_KILLDATE:
elif setting == BeaconSetting.SETTING_GARGLE_NOOK and value:
stage.set_option("sleep_mask", value)
elif setting == BeaconSetting.SETTING_PROCINJ_PERMS_I:
if value == 64:
proc_inj.set_option("startrwx", "true")
elif value == 4:
proc_inj.set_option("startrwx", "false")
elif setting == BeaconSetting.SETTING_PROCINJ_PERMS:
if value == 64:
proc_inj.set_option("userwx", "true")
elif value == 32:
proc_inj.set_option("userwx", "false")
elif setting == BeaconSetting.SETTING_PROCINJ_MINALLOC and value:
proc_inj.set_option("min_alloc", value)
elif setting == BeaconSetting.SETTING_PROCINJ_TRANSFORM_X86:
steps = []
prepend = ""
append = ""
for k, v in value:
# v = v.decode()
v = repr(v)[2:-1]
if k == "prepend":
prepend = v
elif k == "append":
append = v
transform_block = StageTransformBlock()
if prepend:
transform_block.set_option("prepend", prepend)
if append:
transform_block.set_option("append", append)
if prepend or append:
proc_inj.set_config_block("transform_x86", transform_block)
elif setting == BeaconSetting.SETTING_PROCINJ_TRANSFORM_X64:
steps = []
# proc_inj.set_config_block("transform_x64", DataTransformBlock(steps=steps))
prepend = ""
append = ""
for k, v in value:
v = repr(v)[2:-1]
if k == "prepend":
prepend = v
elif k == "append":
append = v
transform_block = StageTransformBlock()
if prepend:
transform_block.set_option("prepend", prepend)
if append:
transform_block.set_option("append", append)
if prepend or append:
proc_inj.set_config_block("transform_x64", transform_block)
elif setting == BeaconSetting.SETTING_PROCINJ_STUB:
elif setting == BeaconSetting.SETTING_PROCINJ_EXECUTE:
exec_options = ExecuteOptionsBlock()
for item in value:
if " " in item:
option, _, val = item.partition(" ")
val = val[1:-1]
if option == "CreateThread":
exec_options.set_option("createthread_special", val)
elif option == "CreateRemoteThread":
exec_options.set_option("createremotethread_special", val)
if item in [
exec_options._enable(item.lower().replace("-", "_"), True)
if value:
proc_inj.set_config_block("execute", exec_options)
elif setting == BeaconSetting.SETTING_PROCINJ_ALLOCATOR:
proc_inj.set_option("allocator", "NtMapViewOfSection" if value else "VirtualAllocEx")
elif setting == BeaconSetting.SETTING_DNS_BEACON_BEACON:
dns_beacon.set_option("beacon", value)
elif setting == BeaconSetting.SETTING_DNS_BEACON_GET_A:
dns_beacon.set_option("get_a", value)
elif setting == BeaconSetting.SETTING_DNS_BEACON_GET_AAAA:
dns_beacon.set_option("get_aaaa", value)
elif setting == BeaconSetting.SETTING_DNS_BEACON_GET_TXT:
dns_beacon.set_option("get_txt", value)
elif setting == BeaconSetting.SETTING_DNS_BEACON_PUT_METADATA:
dns_beacon.set_option("put_metadata", value)
elif setting == BeaconSetting.SETTING_DNS_BEACON_PUT_OUTPUT:
dns_beacon.set_option("put_output", value)
elif setting == BeaconSetting.SETTING_DNSRESOLVER and value:
# this is not a c2profile setting, but a DNS Listener configuration option
dns_beacon.set_option("comment_dns_resolver", value)
elif setting == BeaconSetting.SETTING_DNS_IDLE:
dns_beacon.set_option("dns_idle", value)
elif setting == BeaconSetting.SETTING_DNS_SLEEP:
dns_beacon.set_option("dns_sleep", value)
elif setting == BeaconSetting.SETTING_MAXDNS:
dns_beacon.set_option("maxdns", value)
elif setting == BeaconSetting.SETTING_PROCINJ_BOF_REUSE_MEM and value:
proc_inj.set_option("bof_reuse_memory", "true")
elif setting == BeaconSetting.SETTING_BOF_ALLOCATOR:
proc_inj.set_option("bof_allocator", value)
elif setting == BeaconSetting.SETTING_DATA_STORE_SIZE:
stage.set_option("data_store_size", value)
elif setting == BeaconSetting.SETTING_HTTP_DATA_REQUIRED and value:
http_beacon.set_option("data_required", "true")
elif setting == BeaconSetting.SETTING_BEACON_GATE and value:
block = BeaconGateBlock.from_beacon_gate_option_strings(value)
stage.set_config_block("beacon_gate", block)
if c2_recover:
http_get.set_non_empty_config_block("server", HttpOptionsBlock(output=DataTransformBlock(steps=c2_recover)))
http_get.set_non_empty_config_block("client", http_get_client)
profile.set_non_empty_config_block("http_get", http_get)
http_post.set_non_empty_config_block("client", http_post_client)
profile.set_non_empty_config_block("http_post", http_post)
profile.set_non_empty_config_block("stage", stage)
profile.set_non_empty_config_block("process_inject", proc_inj)
profile.set_non_empty_config_block("dns_beacon", dns_beacon)
profile.set_non_empty_config_block("http_beacon", http_beacon)
return profile
def __str__(self) -> str:
return self.as_text()
def as_text(self) -> str:
"""Return the C2 Profile settings as text (malleable C2 profile syntax)."""
def postproc(items):
line = []
indent = 0
for item in items:
if item in "{};":
if "}" in line:
indent -= 1
if "{" in line:
yield "\n"
yield " " * 4 * indent
for i, x in enumerate(line):
yield x
if len(line) > i + 1 and line[i + 1] != ";":
yield " "
yield "\n"
if "{" in line:
indent += 1
line = []
return Reconstructor(c2profile_parser).reconstruct(self.tree, postproc)
def as_dict(self) -> dict:
"""Return the C2 Profile settings as a dictionary"""
if self._dict_hash == hash(self.tree):
return self._dict_cache
line = []
stack = []
list_props = [
items = Reconstructor(c2profile_parser)._reconstruct(self.tree)
properties = collections.defaultdict(list)
for item in items:
if item == "set":
if item in "{};":
if item == "{":
line.pop() # pop '{'
x = line[-1]
if isinstance(x, Token) and x == '"default"':
# handle "default" variant as default.
# stack.append(line[-2])
elif item == "}":
x = stack.pop()
if isinstance(x, Token):
# pop variant token
elif item == ";":
line.pop() # pop ;
key = ".".join(stack)
if key in list_props:
value = tuple(string_token_to_bytes(x) for x in line)
if len(value) == 1:
value = value[0]
line = []
elif len(line) > 2:
value = []
for x in line[-2:]:
if x.type == "STRING":
value = tuple(value)
line = line[:-2]
value = line.pop()
key = ".".join(stack + line)
if isinstance(value, Token):
if value.type == "STRING":
# strip quotes
value = str(value)[1:-1]
line = []
self._dict_hash = hash(self.tree)
self._dict_cache = dict(properties)
return self._dict_cache
def properties(self):
"""C2 Profile settings as dictionary, alias for :func:`~dissect.cobaltstrike.c2profile.C2Profile.as_dict`"""
return self.as_dict()
def build_parser():
import argparse
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("input", metavar="FILE", help="c2 profile or beacon to dump")
help="input is a beacon instead of a .profile file",
help="when using --beacon, try all xor keys when default ones fail",
choices=["pretty", "ast", "c2profile", "properties"],
help="output format",
help="verbosity level (-v for INFO, -vv for DEBUG)",
return parser
def main():
"""Entrypoint for c2profile-dump."""
import logging
from dissect.cobaltstrike.beacon import BeaconConfig
parser = build_parser()
args = parser.parse_args()
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(len(levels) - 1, args.verbose)]
format="%(asctime)s %(name)s %(message)s",
path = args.input
if args.beacon:
config = BeaconConfig.from_path(path, all_xor_keys=args.all)
if not config:
return f"BeaconConfig not found for {path!r}"
profile = C2Profile.from_beacon_config(config)
with open(args.input) as f:
profile = C2Profile.from_text(f.read())
except Exception as e:
logging.exception(f"Failed to parse {path}: {e}", exc_info=False)
return 1
if args.type == "pretty":
elif args.type == "ast":
elif args.type == "c2profile":
elif args.type == "properties":
for key, value in profile.properties.items():
print(key, value)
if __name__ == "__main__":