app.utility namespace

Submodules

app.utility.base_knowledge_svc module

class app.utility.base_knowledge_svc.BaseKnowledgeService

Bases: BaseService

app.utility.base_obfuscator module

class app.utility.base_obfuscator.BaseObfuscator(agent)

Bases: BaseWorld

run(link, **kwargs)

app.utility.base_object module

class app.utility.base_object.AppConfigGlobalVariableIdentifier

Bases: object

classmethod is_global_variable(variable)
class app.utility.base_object.BaseObject

Bases: BaseWorld

property access
static clean(d)
property created
property display
display_schema = None
static hash(s)
classmethod load(dict_obj)
load_schema = None
match(criteria)
replace_app_props(encoded_string)
static retrieve(collection, unique)
schema = None
search_tags(value)
update(field, value)

Updates the given field to the given value as long as the value is not None and the new value is different from the current value. Ignoring None prevents current property values from being overwritten to None if the given property is not intentionally passed back to be updated (example: Agent heartbeat)

Parameters:
  • field – object property to update

  • value – value to update to

app.utility.base_parser module

class app.utility.base_parser.BaseParser(parser_info)

Bases: object

static broadcastip(blob)
static email(blob)

Parse out email addresses :param blob: :return:

static filename(blob)

Parse out filenames :param blob: :return:

static ip(blob)
static line(blob)

Split a blob by line :param blob: :return:

static load_json(blob)
static set_value(search, match, used_facts)

Determine the value of a source/target for a Relationship :param search: a fact property to look for; either a source or target fact :param match: a parsing match :param used_facts: a list of facts that were used in a command :return: either None, the value of a matched used_fact, or the parsing match

app.utility.base_planning_svc module

class app.utility.base_planning_svc.BasePlanningService(global_variable_owners=None)

Bases: BaseService

add_global_variable_owner(global_variable_owner)

Adds a global variable owner to the internal registry.

These will be used for identification of global variables when performing variable-fact substitution.

Args:
global_variable_owner: An object that exposes an is_global_variable(…) method and accepts a string

containing a bare/unwrapped variable.

async add_test_variants(links, agent, facts=(), rules=(), operation=None, trim_unset_variables=False, trim_missing_requirements=False)

Create a list of all possible links for a given set of templates

Parameters:
  • links

  • agent

  • facts

  • rules

  • operation

  • trim_unset_variables

  • trim_missing_requirements

Returns:

updated list of links

is_global_variable(variable)
async obfuscate_commands(agent, obfuscator, links)
re_index = re.compile('(?<=\\[filters\\().+?(?=\\)\\])')
re_limited = re.compile('#{.*\\[*\\]}')
re_trait = re.compile('(?<=\\{).+?(?=\\[)')
re_variable = re.compile('#{(.*?)}', re.DOTALL)

Remove any links that have already been completed by the operation for the agent

Parameters:
  • operation

  • links

  • agent

Returns:

updated list of links

Remove any links that contain variables that have not been filled in.

Parameters:

links

Returns:

updated list of links

Trim links in supplied list. Where ‘trim’ entails:
  • adding all possible test variants

  • removing completed links (i.e. agent has already completed)

  • removing links that did not have template fact variables replaced by fact values

Parameters:
  • operation

  • links

  • agent

Returns:

trimmed list of links

app.utility.base_service module

class app.utility.base_service.BaseService

Bases: BaseWorld

add_service(name, svc)
classmethod get_service(name)
classmethod get_services()
classmethod remove_service(name)

app.utility.base_world module

class app.utility.base_world.AccessSchema(*, only: Sequence[str] | AbstractSet[str] | None = None, exclude: Sequence[str] | AbstractSet[str] = (), many: bool = False, context: dict | None = None, load_only: Sequence[str] | AbstractSet[str] = (), dump_only: Sequence[str] | AbstractSet[str] = (), partial: bool | Sequence[str] | AbstractSet[str] | None = None, unknown: str | None = None)

Bases: Schema

opts: SchemaOpts = <marshmallow.schema.SchemaOpts object>
class app.utility.base_world.BaseWorld

Bases: object

A collection of base static functions for service & object module usage

class Access(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: Enum

APP = 0
BLUE = 2
HIDDEN = 3
RED = 1
class Privileges(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: Enum

Elevated = 1
User = 0
TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
static apply_config(name, config)
static check_requirement(params)
static clear_config()
static create_logger(name)
static decode_bytes(s, strip_newlines=True)
static encode_string(s)
static generate_name(size=16)
static generate_number(size=6)
static get_config(prop=None, name=None)
static get_current_timestamp(date_format='%Y-%m-%dT%H:%M:%SZ')
static get_timestamp_from_string(datetime_str, date_format='%Y-%m-%dT%H:%M:%SZ')
static is_base64(s)
static is_uuid4(s)
static jitter(fraction)
async static load_module(module_type, module_info)
static prepend_to_file(filename, line)
re_base64 = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', re.DOTALL)
static set_config(name, prop, value)
static strip_yml(path)
class app.utility.base_world.PrivilegesSchema(*, only: Sequence[str] | AbstractSet[str] | None = None, exclude: Sequence[str] | AbstractSet[str] = (), many: bool = False, context: dict | None = None, load_only: Sequence[str] | AbstractSet[str] = (), dump_only: Sequence[str] | AbstractSet[str] = (), partial: bool | Sequence[str] | AbstractSet[str] | None = None, unknown: str | None = None)

Bases: Schema

opts: SchemaOpts = <marshmallow.schema.SchemaOpts object>

app.utility.config_generator module

app.utility.config_generator.ensure_local_config()

Checks if a local.yml config file exists. If not, generates a new local.yml file using secure random values.

app.utility.config_generator.log_config_message(config_path)
app.utility.config_generator.make_secure_config()

app.utility.file_decryptor module

app.utility.file_decryptor.decrypt(filename, configuration, output_file=None, b64decode=False)
app.utility.file_decryptor.get_encryptor(salt, key)
app.utility.file_decryptor.read(filename, encryptor)

app.utility.payload_encoder module

This module contains helper functions for encoding and decoding payload files.

If AV is running on the server host, then it may sometimes flag, quarantine, or delete Caldera payloads. To help prevent this, encoded payloads can be used to prevent AV from breaking the server. The convention expected by the server is that encoded payloads will be XOR’ed with the DEFAULT_KEY contained in the payload_encoder.py module.

Additionally, payload_encoder.py can be used from the command-line to add a new encoded payload.

` python /path/to/payload_encoder.py input_file output_file `

NOTE: In order for the server to detect the availability of an encoded payload, the payload file’s name must end in the .xored extension.

app.utility.payload_encoder.xor_bytes(in_bytes, key=None)
app.utility.payload_encoder.xor_file(input_file, output_file=None, key=None)

app.utility.rule_set module

class app.utility.rule_set.RuleAction(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: Enum

ALLOW = 1
DENY = 0
class app.utility.rule_set.RuleSet(rules)

Bases: object

async apply_rules(facts)
async is_fact_allowed(fact)