"""
Beacon client that can actively connect to a Cobalt Strike Team Server.
.. danger::
The client actively connects to a Cobalt Strike Team Server, caution should be taken when using this.
A default client will perform check-ins and only log the tasks it receives unless implemented otherwise.
"""
# Python imports
import argparse
import datetime
import hashlib
import inspect
import ipaddress
import logging
import random
import reprlib
import string
import sys
import time
import urllib.parse
# Typing imports
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# Third party imports
try:
import httpx
except ImportError:
raise ImportError("httpx is required for the HTTP beacon client, install it with `pip install httpx`")
try:
from flow.record import RecordWriter
except ImportError:
raise ImportError(
"flow.record is required for writing Beacon records, please install it with `pip install flow.record`"
)
# Local imports
from dissect.cobaltstrike.c2 import (
BeaconCallback,
BeaconCommand,
BeaconConfig,
BeaconMetadata,
C2Data,
C2Http,
CallbackPacket,
ClientC2Data,
HttpRequest,
HttpResponse,
TaskPacket,
c2packet_to_record,
encrypt_metadata,
encrypt_packet,
)
from dissect.cobaltstrike.utils import (
catch_sigpipe,
enable_reprlib_flow_record,
p32,
p32be,
)
[docs]
logger = logging.getLogger(__name__)
reprlib.aRepr.maxstring = 100
reprlib.aRepr.maxother = 100
# Source: https://github.com/fivethirtyeight/data/tree/master/most-common-name
# fmt: off
[docs]
FIRST_NAMES = ["Michael", "James", "John", "Robert", "David", "William", "Mary", "Christopher", "Joseph", "Richard", "Daniel", "Thomas", "Matthew", "Jennifer", "Charles", "Anthony", "Patricia", "Linda", "Mark", "Elizabeth", "Joshua", "Steven", "Andrew", "Kevin", "Brian", "Barbara", "Jessica", "Jason", "Susan", "Timothy", "Paul", "Kenneth", "Lisa", "Ryan", "Sarah", "Karen", "Jeffrey", "Donald", "Ashley", "Eric", "Jacob", "Nicholas", "Jonathan", "Ronald", "Michelle", "Kimberly", "Nancy", "Justin", "Sandra", "Amanda", "Brandon", "Stephanie", "Emily", "Melissa", "Gary", "Edward", "Stephen", "Scott", "George", "Donna", "Jose", "Rebecca", "Deborah", "Laura", "Cynthia", "Carol", "Amy", "Margaret", "Gregory", "Sharon", "Larry", "Angela", "Maria", "Alexander", "Benjamin", "Nicole", "Kathleen", "Patrick", "Samantha", "Tyler", "Samuel", "Betty", "Brenda", "Pamela", "Aaron", "Kelly", "Heather", "Rachel", "Adam", "Christine", "Zachary", "Debra", "Katherine", "Dennis", "Nathan", "Christina", "Julie", "Jordan", "Kyle", "Anna"] # noqa: E501
[docs]
LAST_NAMES = ["SMITH", "JOHNSON", "WILLIAMS", "BROWN", "JONES", "GARCIA", "RODRIGUEZ", "MILLER", "MARTINEZ", "DAVIS", "HERNANDEZ", "LOPEZ", "GONZALEZ", "WILSON", "ANDERSON", "THOMAS", "TAYLOR", "LEE", "MOORE", "JACKSON", "PEREZ", "MARTIN", "THOMPSON", "WHITE", "SANCHEZ", "HARRIS", "RAMIREZ", "CLARK", "LEWIS", "ROBINSON", "WALKER", "YOUNG", "HALL", "ALLEN", "TORRES", "NGUYEN", "WRIGHT", "FLORES", "KING", "SCOTT", "RIVERA", "GREEN", "HILL", "ADAMS", "BAKER", "NELSON", "MITCHELL", "CAMPBELL", "GOMEZ", "CARTER", "ROBERTS", "DIAZ", "PHILLIPS", "EVANS", "TURNER", "REYES", "CRUZ", "PARKER", "EDWARDS", "COLLINS", "STEWART", "MORRIS", "MORALES", "ORTIZ", "GUTIERREZ", "MURPHY", "ROGERS", "COOK", "KIM", "MORGAN", "COOPER", "RAMOS", "PETERSON", "GONZALES", "BELL", "REED", "BAILEY", "CHAVEZ", "KELLY", "HOWARD", "RICHARDSON", "WARD", "COX", "RUIZ", "BROOKS", "WATSON", "WOOD", "JAMES", "MENDOZA", "GRAY", "BENNETT", "ALVAREZ", "CASTILLO", "PRICE", "HUGHES", "VASQUEZ", "SANDERS", "JIMENEZ", "LONG", "FOSTER"] # noqa: E501
# Source: https://github.com/fox-it/cobaltstrike-beacon-data (top spawnto)
[docs]
PROCESS_NAMES = ["rundll32.exe", "dllhost.exe", "gpupdate.exe", "svchost.exe", "mstsc.exe", "WerFault.exe", "WUAUCLT.exe", "wusa.exe", "runonce.exe", "regsvr32.exe"] # noqa: E501
# fmt: on
[docs]
COMPUTERNAME_TEMPLATES = """
WIN-XXXXXXXXXXX
DESKTOP-XXXXXXX
WINDOWS-XXXXXXX
""".split()
[docs]
def random_computer_name(username: Optional[str] = None) -> str:
"""Returns a random Windows like computer name, if `username` is set it can also return ``<USERNAME>-PC``"""
if username:
template = random.choice(COMPUTERNAME_TEMPLATES + [username])
if template == username:
username, _, _ = username.partition(".")
username, _, _ = username.partition(" ")
return f"{username}-PC".upper()
else:
template = random.choice(COMPUTERNAME_TEMPLATES)
hostname = template.rstrip("X")
padding_len = len(template) - len(hostname)
chars = string.ascii_uppercase + string.digits
padding = "".join(random.choice(chars) for _ in range(padding_len))
hostname = hostname + padding
return hostname
[docs]
def random_username_name() -> str:
"""Returns a random username in the form of ``john.smith`` or ``John Smith``."""
first = random.choice(FIRST_NAMES)
last = random.choice(LAST_NAMES)
if random.getrandbits(1):
return "{0} {1}".format(first.capitalize(), last.capitalize())
return f"{first}.{last}".lower()
[docs]
def random_windows_ver() -> Tuple[int, int, int]:
"""Return a random Windows version in the form of the tuple (major, minor, build)."""
# Source: https://www.lifewire.com/windows-version-numbers-2625171
versions = [
"10.0.22000", # Windows 11
"10.0.19041", # Windows 10
"6.3.9600", # Windows 8.1
"6.2.9200", # Windows 8
"6.1.7601", # Windows 7
"5.1.2600", # Windows XP
"5.0.2195", # Windows 2000
]
version = random.choice(versions)
major, minor, build = list(map(int, version.split(".")))
return (major, minor, build)
[docs]
def random_process_name() -> str:
"""Return a random process name."""
return random.choice(PROCESS_NAMES)
[docs]
def random_internal_ip() -> ipaddress.IPv4Address:
"""Return a random internal RFC1918 IP address."""
network = random.choice(
[
ipaddress.IPv4Network("10.0.0.0/8"),
ipaddress.IPv4Network("172.16.0.0/12"),
ipaddress.IPv4Network("192.168.0.0/16"),
]
)
return ipaddress.IPv4Address(random.randrange(int(network.network_address) + 1, int(network.broadcast_address) - 1))
[docs]
def log_task(task):
logger.info("Received Task:")
task_dt = datetime.datetime.fromtimestamp(task.epoch, tz=datetime.timezone.utc)
data_r = reprlib.repr(task.data)
logger.info(f" - stamp: {task_dt} ({task.epoch:#04x})")
logger.info(f" - task: {task.command} ({task.command.value}, {task.command.value:#04x})")
logger.info(f" - size: {task.size}")
logger.info(f" - data: {data_r}")
# Some helper Callback Response functions
[docs]
def CallbackError(code: int, n1: int, n2: int, message: str) -> Tuple[int, bytes]:
return BeaconCallback.CALLBACK_ERROR, p32be(code) + p32be(n1) + p32be(n2) + message.encode() + p32be(0)
[docs]
def CallbackDebugMessage(message: str) -> Tuple[int, bytes]:
"""This will output ``'[-] DEBUG: <message>'`` to the Team Server console."""
return CallbackError(code=0, n1=0, n2=0, message=message)
[docs]
def CallbackOutputMessage(message: str) -> Tuple[int, bytes]:
"""This will output ``'[+] received output: <message>'`` to the Team Server console."""
return BeaconCallback.CALLBACK_OUTPUT_OEM, message.encode() + p32(0)
[docs]
class HttpBeaconClient:
"""A Beacon Client that can communicate with a Cobalt Strike Team Server over HTTP."""
def __init__(self):
self.task_map = {}
self.logger = logger
[docs]
def run(
self,
bconfig: BeaconConfig,
dry_run=False,
scheme=None,
domain=None,
port=None,
beacon_id=None,
pid=None,
computer=None,
user=None,
process=None,
internal_ip=None,
arch=None,
barch=None,
ansi_cp=58372,
oem_cp=46337,
high_integrity=False,
sleeptime=None,
jitter=None,
user_agent=None,
host_header=None,
verbose=None,
silent=None,
writer=None,
):
"""Run the Beacon Client."""
self.bconfig = bconfig
self.counter = int(time.time())
self.verbose = verbose
self.dry_run = dry_run
self.silent = silent
# Beacon doesn't verify TLS certificates so we disable it here too
self.verify = False
# uneven beacon_id's are considered SSH sessions so we ensure that it's even.
self.beacon_id = beacon_id if beacon_id is not None else (random.getrandbits(32) & 0x7FFFFFFF)
self.beacon_id = (self.beacon_id - self.beacon_id % 2) & 0xFFFFFFFF
if self.beacon_id > 0x7FFFFFFF:
raise ValueError("beacon_id must be less or equal than 2147483647")
# randomize pid
self.pid = pid or random.randrange(1000, 5000)
# The Beacon Session keys (AES and HMAC) are derived from `aes_rand` bytes.
# Beacon Session keys are persistent on the Team Server, so to make check-in and responses repeatable for the
# same `beacon_id` we use a deterministic `aes_rand`` here so we can re-use the same keys.
random.seed(self.beacon_id ^ 0xACCE55ED)
self.aes_rand = random.getrandbits(128).to_bytes(16, "big")
digest = hashlib.sha256(self.aes_rand).digest()
self.aes_key = digest[:16]
self.hmac_key = digest[16:]
if self.bconfig.protocol not in ("http", "https"):
raise ValueError("Not a HTTP or HTTPS beacon!")
if scheme and scheme not in ("http", "https"):
raise ValueError("Scheme must be either 'http' or 'https'")
self.user = random_username_name() if user is None else user
self.computer = random_computer_name(self.user) if computer is None else computer
self.process = random_process_name() if process is None else process
info = f"{self.computer}\t{self.user}\t{self.process}"
# info cannot be larger than 51 bytes, truncate it to be sure.
info = info[:51]
# ip is in little endian
self.internal_ip = ipaddress.IPv4Address(internal_ip or random_internal_ip())
internal_ip_int = int.from_bytes(self.internal_ip.packed, "little")
flag = 0
self.arch = random.choice(["x86", "x64"]) if arch is None else arch
self.barch = self.arch if barch is None else barch
self.high_integrity = high_integrity
if self.barch == "x64":
flag |= 0x2
if self.arch == "x64":
flag |= 0x4
if self.high_integrity:
flag |= 0x8
ver_major, ver_minor, ver_build = random_windows_ver()
self.metadata = BeaconMetadata(
magic=0xBEEF,
ansi_cp=ansi_cp,
oem_cp=oem_cp,
bid=self.beacon_id,
pid=self.pid,
flag=flag,
aes_rand=self.aes_rand,
ip=internal_ip_int,
ver_major=ver_major,
ver_minor=ver_minor,
ver_build=ver_build,
info=info.encode(),
)
self.c2http = C2Http(bconfig, aes_key=self.aes_key, hmac_key=self.hmac_key)
self.domain = domain or random.choice(self.bconfig.domains)
self.uri = random.choice(self.bconfig.uris)
self.scheme = self.bconfig.protocol if scheme is None else scheme
self.port = port or self.bconfig.port
self.base_url = f"{self.scheme}://{self.domain}:{self.port}"
self.get_verb: bytes = self.c2http.get_verb
self.get_uri: str = random.choice(self.bconfig.uris)
self.task_url: str = urllib.parse.urljoin(self.base_url, self.get_uri)
self.submit_verb: bytes = self.c2http.submit_verb
self.submit_uri: str = self.c2http.submit_uri.decode()
self.callback_url: str = urllib.parse.urljoin(self.base_url, self.submit_uri)
self.sleeptime: int = self.bconfig.settings["SETTING_SLEEPTIME"] if sleeptime is None else sleeptime
self.jitter: int = self.bconfig.settings["SETTING_JITTER"] if jitter is None else jitter
self.user_agent: str = self.bconfig.settings["SETTING_USERAGENT"] if user_agent is None else user_agent
self.host_header: str = self.bconfig.settings["SETTING_HOST_HEADER"] if host_header is None else host_header
self.writer = RecordWriter() if writer is not None else writer
_, _, fqdn = self.host_header.encode().partition(b": ")
self.host_header = fqdn.decode().strip() if fqdn else self.domain
self.print_settings()
if dry_run:
logger.info("Dry run enabled, not continuing.")
return 0
if self.writer:
enable_reprlib_flow_record()
logger.info("Writing records to %s", writer)
# start the beacon loop
logger.info("Starting beacon loop...")
if self.silent:
logger.info("Silent mode enabled, empty tasks and check-ins will not be printed.")
try:
self._beacon_loop()
except KeyboardInterrupt:
return
finally:
logger.info("Stopping beacon loop...")
if self.writer:
self.writer.close()
[docs]
def _initial_get_request(self) -> HttpRequest:
"""Return the initial HttpRequest object for retrieving tasks from the Team Server."""
return HttpRequest(
method=self.get_verb,
uri=self.get_uri.encode(),
headers={b"User-Agent": self.user_agent.encode(), b"Host": self.host_header.encode()},
params={},
body=b"",
)
[docs]
def _initial_post_request(self) -> HttpRequest:
"""Return the initial HttpRequest object for sending callback data to the Team Server."""
return HttpRequest(
method=self.submit_verb,
uri=self.submit_uri.encode(),
headers={b"User-Agent": self.user_agent.encode(), b"Host": self.host_header.encode()},
params={},
body=b"",
)
[docs]
def get_sleep_time(self) -> float:
"""Return the sleep time with jitter for the beacon loop."""
return self.sleeptime - random.uniform(0, self.sleeptime * self.jitter / 100)
[docs]
def register_task(self, command_id: Union[None, int], func):
"""Register a task handler for a given command ID.
Args:
command_id: The command ID to register the handler for.
``None`` is handler for empty tasks. ``-1`` is a catch-all handler.
func: The function to call when a task with the given command ID is received.
"""
if command_id not in self.task_map:
self.task_map[command_id] = []
self.task_map[command_id].append(func)
[docs]
def get_task(self) -> Optional[TaskPacket]:
"""Get a task from the Team Server."""
# Encrypt and transform metadata into a HTTP request
req = self.c2http.transform_get.transform(
C2Data(metadata=encrypt_metadata(self.metadata, public_key=self.c2http.pub)),
request=self._initial_get_request(),
)
url = urllib.parse.urljoin(self.base_url, req.uri.decode())
params = {k.decode(): v.decode() for k, v in req.params.items()}
try:
self.logger.debug("requesting : %r", req)
response = httpx.request(
req.method, url, headers=req.headers, params=params, content=req.body, verify=self.verify
)
response.raise_for_status()
except httpx.RequestError as exc:
self.logger.error("An error occurred while requesting %r : %r", exc.request.url, exc)
except httpx.HTTPStatusError as exc:
self.logger.error(
"HttpStatusError, response %s while requesting %r.", exc.response.status_code, exc.request.url
)
else:
req = HttpResponse(
body=response.content,
headers=response.headers,
status=response.status_code,
reason=response.reason_phrase,
)
for packet in self.c2http.iter_recover_http(req):
if packet.command == BeaconCommand.COMMAND_NOOP:
logger.debug("Received NOOP packet: %s", packet)
continue
return packet
return None
[docs]
def send_callback(self, callback_id: int, data: bytes):
"""Send callback data to the Team Server."""
self.counter += 1
# Encrypt Callback data and transform into a request
packet = CallbackPacket(counter=self.counter, size=len(data), callback=callback_id, data=data)
if self.writer:
self.writer.write(c2packet_to_record(packet))
self.writer.flush()
enc_packet = encrypt_packet(packet.dumps(), **self.c2http.beacon_keys._asdict())
# Transform data into a HTTP request
req = self.c2http.transform_submit.transform(
ClientC2Data(
id=str(self.beacon_id).encode(),
output=enc_packet.dumps(),
),
request=self._initial_post_request(),
)
# Construct url for callback
url = urllib.parse.urljoin(self.base_url, req.uri.decode())
params = {k.decode(): v.decode() for k, v in req.params.items()}
try:
response = httpx.request(
req.method, url, headers=req.headers, params=params, content=req.body, verify=self.verify
)
response.raise_for_status()
except httpx.RequestError as exc:
self.logger.error("An error occurred while requesting %r : %r", exc.request.url, exc)
except httpx.HTTPStatusError as exc:
self.logger.error(
"HttpStatusError, response %s while requesting %r.", exc.response.status_code, exc.request.url
)
[docs]
def handle(self, command: Union[None, int, BeaconCommand]):
"""decorator to register a handler for `command`, if ``None`` it registers a handler for empty tasks"""
def decorator(func):
logger.debug("register_task %s -> %s", command, func)
value = command
if command and not isinstance(command, int):
value = command.value
self.register_task(value, func)
return func
return decorator
[docs]
def catch_all(self):
"""decorator to handle all `unhandled` commands."""
def decorator(func):
self.register_task(-1, func)
return func
return decorator
[docs]
def print_settings(self):
if self.dry_run and logger.getEffectiveLevel() >= logging.INFO:
logger.setLevel(logging.INFO)
logger.info("Logging level set to INFO for dry run.")
logger.info("Using %r", self.metadata)
logger.info(" - barch: %r", self.barch)
logger.info(" - arch: %r", self.arch)
logger.info(" - high_integrity: %s", self.high_integrity)
logger.info(" - internal_ip: %r", str(self.internal_ip))
logger.info(" - beacon_id: %r", self.beacon_id)
logger.info(" - information: %r", self.metadata.info)
logger.info(" + computer: %r", self.computer)
logger.info(" + user: %r", self.user)
logger.info(" + process: %r", self.process)
logger.info("Using %r", self.bconfig)
logger.info(" - domains: %r -> %r", self.bconfig.domains, self.domain)
logger.info(" - uris: %r -> %r", self.bconfig.uris, self.uri)
logger.info(" - port: %u -> %u", self.bconfig.port, self.port)
logger.info(" - protocol: %r -> %r", self.bconfig.protocol, self.scheme)
logger.info(" - get_verb: %r", self.get_verb)
logger.info(" - submit_verb: %r", self.submit_verb)
logger.info(" - sleeptime (ms): %u -> %u", self.bconfig.sleeptime, self.sleeptime)
logger.info(" - jitter (%%): %u -> %u", self.bconfig.jitter, self.jitter)
[docs]
def get_handlers(self, command_id: Union[int, None]) -> List[Callable]:
"""Get a list of handlers for a given command ID."""
if command_id is not None:
task = BeaconCommand(command_id)
command_name = task.name.replace("COMMAND_", "").lower() if task else "empty_task"
else:
command_name = "empty_task"
on_handler = getattr(self, f"on_{command_name}", None)
handlers = self.task_map.get(command_id, [])
# if there is a "on_command" handler, add it to the list
if on_handler:
handlers.append(on_handler)
# if there is no handler, check if there is a catch all handler
if not handlers:
handlers = self.task_map.get(-1, [])
on_catch_all = getattr(self, "on_catch_all", None)
if on_catch_all:
handlers.append(on_catch_all)
return handlers
[docs]
def _beacon_loop(self):
while True:
task = self.get_task()
if task:
log_task(task)
if self.writer:
self.writer.write(c2packet_to_record(task))
elif not self.silent:
sleeptime = self.get_sleep_time()
logger.info("Empty Task, sleeping for %.2f seconds", sleeptime / 1000)
time.sleep(sleeptime / 1000)
continue
command_id = task.command.value if task else None
handlers = self.get_handlers(command_id)
for handler in handlers:
if callable(handler):
try:
response = handler(task)
if response:
self.send_callback(*response)
except Exception as e:
logger.exception(e)
sleeptime = self.get_sleep_time()
if not self.silent:
logger.info("Sleeping for %.2f seconds", sleeptime / 1000)
time.sleep(sleeptime / 1000)
[docs]
def build_parser() -> argparse.ArgumentParser:
"""Return the default ArgumentParser for the beacon client."""
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("beacon", metavar="BEACON", help="beacon to use as configuration")
comms = parser.add_argument_group("beacon communication")
comms.add_argument(
"-d",
"--domain",
help="override the domain configured in the beacon",
)
comms.add_argument(
"-p",
"--port",
type=int,
help="override the port configured in the beacon",
)
timing = parser.add_argument_group("beacon sleep options")
timing.add_argument(
"--sleeptime",
type=int,
help="override sleeptime settings (in milliseconds)",
)
timing.add_argument(
"--jitter",
type=int,
help="override jitter settings (in percentage)",
)
metadata = parser.add_argument_group("beacon metadata")
metadata.add_argument(
"-c",
"--computer",
default=None,
help="computer name (None = random)",
)
metadata.add_argument(
"-u",
"--user",
default=None,
help="user name (None = random)",
)
metadata.add_argument(
"-P",
"--process",
default=None,
help="process name (None = random)",
)
metadata.add_argument(
"-i",
"--beacon-id",
required=False,
type=int,
help="beacon id (None = random)",
)
metadata.add_argument(
"-I",
"--internal-ip",
help="internal ip (None = random)",
)
flags = parser.add_argument_group("beacon metadata flags")
flags.add_argument(
"--arch",
choices=["x86", "x64"],
default=None,
help="system architecture (None = random)",
)
flags.add_argument(
"--barch",
choices=["x86", "x64"],
default=None,
help="beacon architecture (None = random)",
)
flags.add_argument("--high-integrity", action="store_true", default=False, help="set high integrity flag")
parser.add_argument(
"-n",
"--dry-run",
action="store_true",
default=False,
help="show settings and exit",
)
writer = parser.add_argument_group("output options")
writer.add_argument("-w", "--writer", help="record writer")
writer.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="verbosity level (-v for INFO, -vv for DEBUG)",
)
writer.add_argument(
"-s",
"--silent",
action="store_true",
default=False,
help="suppress empty task messages",
)
return parser
[docs]
def parse_commandline_options(parser=None, defaults=None) -> Tuple[argparse.Namespace, Dict[str, Any]]:
"""Helper function to parse commandline options and return a tuple of (args, options).
This method is useful for creating default commandline options for a Beacon client.
The returned options can be passed to :meth:`HttpBeaconClient.run()` as follows:
.. code-block:: python
from dissect.cobaltstrike.client import HttpBeaconClient, parse_commandline_options
beacon = HttpBeaconClient()
args, options = parse_commandline_options(defaults={
"beacon_id": 1234,
"computer": "dissect",
"user": "cobaltstrike",
"process": "calc.exe",
})
beacon.run(**options)
If `parser` is not defined it will use the default argparse parser created by :meth:`build_parser`.
The `defaults` dictionary can be used to override the default argparse settings.
Args:
parser: an instance of :class:`argparse.ArgumentParser`, if `None` it will use the parser created by
:meth:`client.build_parser`.
defaults: A dictionary to override the default settings for the argument parser. Unknown keys will be ignored.
Returns:
Tuple of (args, options) where `args` is the parsed arguments from the commandline and `options` is a
dictionary of options that can be passed to :meth:`HttpBeaconClient.run()`.
"""
parser = parser or build_parser()
defaults = defaults or {}
parser.set_defaults(**defaults)
args = parser.parse_args()
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(len(levels) - 1, args.verbose)]
# Use rich logging if available
try:
from rich.console import Console
from rich.logging import RichHandler
logging.basicConfig(
level=level,
handlers=[RichHandler(console=Console(stderr=True))],
format="%(message)s",
)
except ImportError:
logging.basicConfig(
level=level,
format="%(asctime)s | %(levelname)-7s | %(message)s",
)
if level == logging.INFO:
logger.info("INFO logging enabled")
elif level == logging.DEBUG:
logger.debug("DEBUG logging enabled")
# create a dictionary with only arguments that are valid kwargs on .run()
sig = inspect.signature(HttpBeaconClient.run)
run_options = {k: v for k, v in vars(args).items() if k in sig.parameters}
run_options["bconfig"] = BeaconConfig.from_path(args.beacon)
return args, run_options
@catch_sigpipe
[docs]
def main():
parser = build_parser()
parser.add_argument(
"--no-warning",
action="store_true",
default=False,
help="disable connect warning",
)
args, options = parse_commandline_options(parser)
if not args.no_warning and not args.dry_run:
logger.warning(options["bconfig"])
logger.warning("Connecting to server in 5 seconds... (disable this warning with --no-warning)")
logger.warning("Press CTRL+C to exit.")
time.sleep(5)
client = HttpBeaconClient()
client.run(**options)
if __name__ == "__main__":
sys.exit(main())