Source code for dissect.cobaltstrike.utils

"""
This module contains generic helper functions used by ``dissect.cobaltstrike``.
"""

import errno
import io
import itertools
import os
import random
import re
import reprlib
import string
import sys
from collections import OrderedDict
from contextlib import contextmanager
from functools import partial, wraps
from typing import BinaryIO, Iterator, NamedTuple


[docs] def xor(data: bytes, key: bytes) -> bytes: """XOR data with key (simd version)""" if sum(key) == 0: return data size = len(data) if len(key) < size: key = key * ((size // len(key)) + 1) key = key[:size] return int.to_bytes(int.from_bytes(data, "little") ^ int.from_bytes(key, "little"), size, "little")
[docs] def netbios_encode(data: bytes, offset: int = 0x41) -> bytes: """Encode `data` using NetBIOS encoding and return the encoded bytes. Args: data: bytes to be NetBIOS encoded offset: offset used for encoding, defaults to char ``A`` (``0x41``) Returns: NetBIOS encoded bytes """ barray = [] for c in bytearray(data): a = ((c & 0xF0) >> 4) + offset b = (c & 0x0F) + offset barray.append(a) barray.append(b) return bytes(barray)
[docs] def netbios_decode(data: bytes, offset: int = 0x41) -> bytes: """Decode the netbios encoded `data` and return the decoded bytes. Args: data: bytes to be NetBIOS decoded offset: offset used for decoding, defaults to char ``A`` (``0x41``) Returns: NetBIOS decoded bytes """ barray = [] for i in range(0, len(data), 2): a = (data[i] - offset) << 4 b = data[i + 1] - offset barray.append(a + b) return bytes(barray)
@contextmanager
[docs] def retain_file_offset(fobj, offset=None, whence=io.SEEK_SET): """Return a context manager that changes the position of the file-like object `fobj` to the given byte `offset`. After completion of the block it restores the original position of the file. Args: fobj: file-like object offset: offset to seek to relative to position indicated by `whence`. If ``None`` no seek will be done. whence: default is ``SEEK_SET``, values for `whence` are: - ``SEEK_SET`` or ``0`` – start of the stream (the default); offset should be zero or positive - ``SEEK_CUR`` or ``1`` – current stream position; offset may be negative - ``SEEK_END`` or ``2`` – end of the stream; offset is usually negative Returns: context manager """ try: pos = fobj.tell() if offset is not None: fobj.seek(offset, whence) yield fobj finally: fobj.seek(pos)
[docs] def catch_sigpipe(func): """Decorator for catching KeyboardInterrupt and BrokenPipeError (OSError 22 on Windows).""" @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except KeyboardInterrupt: print("Aborted!", file=sys.stderr) return 1 except OSError as e: # Only catch: # - BrokenPipeError: [Errno 32] Broken pipe # - OSError: [Errno 22] Invalid argument if e.errno in (errno.EPIPE, errno.EINVAL): devnull = os.open(os.devnull, os.O_WRONLY) os.dup2(devnull, sys.stdout.fileno()) return 1 # Raise other OSError exceptions raise return wrapper
[docs] def unpack(data: bytes, size: int = None, byteorder="little", signed=False) -> int: return int.from_bytes(data[:size], byteorder=byteorder, signed=signed)
[docs] def pack(n: int, size: int = None, byteorder="little", signed=False) -> bytes: if size is None: size = (n.bit_length() + 7) // 8 return n.to_bytes(size, byteorder=byteorder, signed=signed)
[docs] unpack_be = partial(unpack, byteorder="big")
[docs] pack_be = partial(pack, byteorder="big")
[docs] u8 = partial(unpack, size=1)
[docs] p8 = partial(pack, size=1)
[docs] u16 = partial(unpack, size=2)
[docs] p16 = partial(pack, size=2)
[docs] u16be = partial(unpack, size=2, byteorder="big")
[docs] p16be = partial(pack, size=2, byteorder="big")
[docs] u32 = partial(unpack, size=4)
[docs] p32 = partial(pack, size=4)
[docs] u32be = partial(unpack, size=4, byteorder="big")
[docs] p32be = partial(pack, size=4, byteorder="big")
[docs] u64 = partial(unpack, size=8)
[docs] p64 = partial(pack, size=8)
[docs] u64be = partial(unpack, size=8, byteorder="big")
[docs] p64be = partial(pack, size=8, byteorder="big")
[docs] def iter_find_needle( fp: BinaryIO, needle: bytes, start_offset: int = None, max_offset: int = 0, ) -> Iterator[int]: """Return an iterator yielding `offset` for found `needle` bytes in file `fp`. Side effects: file handle position due to seeking. Args: fp: file like object needle: needle to search for start_offset: offset in file object to start searching from, if None it will search from current position max_offset: how far we search for into the file, 0 for no limit Yields: offset where `needle` was found in file `fp` """ needle_len = len(needle) overlap_len = needle_len - 1 saved = b"\x00" * overlap_len if start_offset is not None: fp.seek(start_offset) while True: pos = fp.tell() if max_offset and pos > max_offset: break block = fp.read(io.DEFAULT_BUFFER_SIZE) if not block: break d = saved + block p = -1 while True: p = d.find(needle, p + 1) if p == -1 or max_offset and p > max_offset: break offset = pos + p - overlap_len yield offset saved = d[-overlap_len:]
[docs] def checksum8(text: str) -> int: """Compute the *checksum8* value of text""" if len(text) < 4: return 0 text = text.replace("/", "") return sum(map(ord, text)) % 256
[docs] def is_stager_x86(uri: str) -> bool: """Return ``True`` if URI is a x86 stager URI, otherwise ``False``""" return checksum8(uri) == 92
[docs] def is_stager_x64(uri: str) -> bool: """Return ``True`` if URI is a x64 stager URI, otherwise ``False``""" return bool(checksum8(uri) == 93 and re.match("^/[A-Za-z0-9]{4}$", uri))
[docs] def random_stager_uri(*, x64: bool = False, length: int = 4) -> str: """Generate a random (valid *checksum8*) stager URI. Defaults to x86 URIs unless `x64` is ``True``. Args: x64: generate a x64 stager URI if ``True``, ``False`` for a x86 stager URI. (default: ``False``) length: length of URI to generate, excluding the "/" prefix. (default: 4) Returns: random stager URI """ if x64 and length != 4: raise ValueError("length must be exactly 4 for x64 stager uris") if length < 3: raise ValueError("length must be at least 3 chars") is_stager = is_stager_x64 if x64 else is_stager_x86 chars = string.ascii_letters + string.digits while True: uri = "/" + "".join(random.choice(chars) for _ in range(length)) if is_stager(uri): return uri
[docs] def namedtuple_reprlib_repr(nt: NamedTuple) -> str: """Return a `reprlib` version of __repr__ for namedtuple `nt`""" return "{name}({fields})".format( name=nt.__class__.__name__, fields=", ".join(f"{field}=" + reprlib.repr(getattr(nt, field)) for field in nt._fields), )
[docs] def enable_reprlib_cstruct(): """Enable `reprlib` style __repr__ for `dissect.cstruct` instances.""" from dissect.cstruct.types.instance import Instance def reprlib_repr(self) -> str: values = ", ".join(f"{k}={hex(v) if isinstance(v, int) else reprlib.repr(v)}" for k, v in self.__dict__.items()) return f"<{self._type.name} {values}>" Instance.__repr__ = reprlib_repr
[docs] def enable_reprlib_flow_record(): """Enable `reprlib` style __repr__ for `flow.record` instances.""" from flow.record import Record def reprlib_repr(self) -> str: return "<{} {}>".format( self._desc.name, " ".join("{}={}".format(k, reprlib.repr(getattr(self, k))) for k in self._desc.fields) ) Record.__repr__ = reprlib_repr
[docs] class LRUDict(OrderedDict): "Limit size, evicting the least recently looked-up key when full" def __init__(self, maxsize=128, *args, **kwds):
[docs] self.maxsize = maxsize
super().__init__(*args, **kwds)
[docs] def __getitem__(self, key): value = super().__getitem__(key) self.move_to_end(key) return value
[docs] def __setitem__(self, key, value): if key in self: self.move_to_end(key) super().__setitem__(key, value) if len(self) > self.maxsize: oldest = next(iter(self)) del self[oldest]
[docs] def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return itertools.zip_longest(*args, fillvalue=fillvalue)